← Retour au cours

📜 Cheatsheet Phase 3

Ecosysteme Modern Data Stack - Reference rapide

❄ Snowflake - Commandes Essentielles

-- Creer un warehouse
CREATE WAREHOUSE my_wh
  WITH WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3;

-- Time Travel (jusqu'a 90 jours)
SELECT * FROM my_table
  AT(TIMESTAMP => '2026-01-15 10:00:00'::TIMESTAMP);

SELECT * FROM my_table
  BEFORE(STATEMENT => '01b2c3d4-...');

-- Zero-Copy Cloning
CREATE TABLE dev_table CLONE prod_table;
CREATE DATABASE dev_db CLONE prod_db;
CREATE SCHEMA test_schema CLONE prod_schema;

-- Streams (CDC)
CREATE STREAM my_stream ON TABLE source_table;
SELECT * FROM my_stream; -- changes only

-- Tasks (scheduling)
CREATE TASK my_task
  WAREHOUSE = my_wh
  SCHEDULE = 'USING CRON 0 9 * * * UTC'
AS
  INSERT INTO target SELECT * FROM my_stream;

-- Data Sharing
CREATE SHARE my_share;
GRANT USAGE ON DATABASE db TO SHARE my_share;
GRANT SELECT ON TABLE db.schema.t TO SHARE my_share;

-- Snowpipe (auto-ingest)
CREATE PIPE my_pipe AUTO_INGEST = TRUE AS
  COPY INTO my_table FROM @my_stage;
💡 Astuce: Utilisez WAREHOUSE_SIZE = 'XSMALL' pour le dev, montez en prod. Le cout est lineaire par taille.

🏗 Snowflake Architecture

┌─────────────────────────────────┐
│       Cloud Services Layer      │
│  (Auth, Metadata, Optimizer)    │
├─────────────────────────────────┤
│       Compute Layer             │
│  ┌─────┐ ┌─────┐ ┌─────┐     │
│  │ WH1 │ │ WH2 │ │ WH3 │     │
│  │ ETL │ │ BI  │ │ ML  │     │
│  └─────┘ └─────┘ └─────┘     │
├─────────────────────────────────┤
│       Storage Layer             │
│    (Micro-partitions, S3/GCS)   │
│    Columnar, Compressed, Immutable│
└─────────────────────────────────┘
ConceptDescription
Micro-partitions50-500 MB, columnar, immutable, auto-clustered
Virtual WarehouseCluster de compute independant (XS a 6XL)
Time TravelAcces historique: 1 jour (Standard) a 90 jours (Enterprise)
Fail-safe7 jours supplementaires (non accessible utilisateur)
Zero-Copy CloneClone metadata sans copier les donnees
SnowparkPython/Java/Scala executant dans Snowflake

☁ Google BigQuery

-- Partitionnement
CREATE TABLE dataset.my_table (
  id INT64,
  event_date DATE,
  user_id STRING,
  amount NUMERIC
)
PARTITION BY event_date
CLUSTER BY user_id;

-- BigQuery ML - Modele en SQL
CREATE OR REPLACE MODEL dataset.my_model
OPTIONS(model_type='logistic_reg',
        input_label_cols=['is_churned'])
AS SELECT * FROM dataset.training_data;

-- Prediction
SELECT * FROM ML.PREDICT(
  MODEL dataset.my_model,
  TABLE dataset.new_data);

-- Federated Query
SELECT * FROM
  EXTERNAL_QUERY('connection_id',
  'SELECT * FROM postgres_table');

-- Materialized View
CREATE MATERIALIZED VIEW dataset.mv AS
SELECT date, SUM(revenue) as total
FROM dataset.sales
GROUP BY date;

-- Scheduled Query
-- Via Console ou API: bq mk --transfer_config
FeatureBigQuery
ArchitectureServerless, Dremel engine
StockageCapacitor (columnar)
PricingOn-demand ($5/TB) ou Slots ($0.04/slot/h)
MLBQML (SQL-based ML)
BI EngineIn-memory acceleration
BigLakeExternal tables (S3, GCS)

⚡ Databricks & Apache Spark

# PySpark - DataFrame Operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg

spark = SparkSession.builder \
    .appName("DataPipeline") \
    .getOrCreate()

# Lire Delta Lake
df = spark.read.format("delta").load("/data/bronze/events")

# Transformations
result = (df
    .filter(col("event_date") >= "2026-01-01")
    .groupBy("category")
    .agg(
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount")
    )
    .orderBy(col("total_amount").desc())
)

# Ecrire en Delta
result.write.format("delta") \
    .mode("overwrite") \
    .save("/data/gold/category_summary")

# Delta Lake - MERGE (Upsert)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, "/data/silver/users")
target.alias("t").merge(
    source_df.alias("s"),
    "t.user_id = s.user_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Unity Catalog
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA analytics")
spark.sql("GRANT SELECT ON TABLE t TO GROUP analysts")
ComposantRole
DriverCoordonne le job, cree le plan d'execution
ExecutorsExecutent les taches sur les partitions
DAGGraphe acyclique dirige des transformations
Delta LakeACID transactions sur data lake
Unity CatalogGouvernance unifiee multi-workspace
PhotonMoteur C++ vectorise (3-8x plus rapide)

📈 Comparaison Cloud Data Warehouses

CritereSnowflakeBigQueryDatabricks
ArchitectureShared-diskServerlessLakehouse
Compute/StorageSeparesDecouplesSepares
PricingCredits/seconde$/TB ou slotsDBU/heure
Format natifProprietaireCapacitorDelta Lake
ML natifCortex AIBQMLMLflow
Multi-cloudAWS/GCP/AzureGCP onlyAWS/GCP/Azure
Data SharingNatifAnalytics HubDelta Sharing
StreamingSnowpipeStreaming insertStructured Streaming
Ideal pourSQL analyticsGCP ecosystemeML + Data Eng
💡 Regle du mentor: Snowflake pour le SQL-first, BigQuery si full GCP, Databricks si ML-heavy ou multi-cloud open-source.

🔨 dbt (data build tool)

# Structure projet dbt
my_project/
├── dbt_project.yml
├── profiles.yml        # connexion DWH
├── models/
│   ├── staging/        # 1:1 avec sources
│   │   ├── stg_orders.sql
│   │   └── _stg_models.yml
│   ├── intermediate/   # logique metier
│   │   └── int_orders_enriched.sql
│   └── marts/          # tables finales
│       ├── fct_orders.sql
│       └── dim_customers.sql
├── tests/
│   └── assert_positive_amounts.sql
├── macros/
│   └── cents_to_dollars.sql
├── seeds/
│   └── country_codes.csv
└── snapshots/
    └── snap_customers.sql
-- Materialization types
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}

SELECT
    id as order_id,
    customer_id,
    order_date,
    status,
    amount_cents / 100.0 as amount
FROM {{ source('raw', 'orders') }}

-- models/marts/fct_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'
) }}

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    o.order_date,
    o.amount,
    o.status
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('dim_customers') }} c
  ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.order_date > (
    SELECT MAX(order_date) FROM {{ this }}
)
{% endif %}

🔎 dbt Tests & Macros

# _stg_models.yml - Tests generiques
version: 2
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['pending','shipped','delivered','cancelled']
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_id

# dbt_expectations (package)
      - name: amount
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- Usage dans un modele:
SELECT
    order_id,
    {{ cents_to_dollars('amount_cents') }} as amount
FROM {{ source('raw', 'orders') }}

-- Snapshot (SCD Type 2)
{% snapshot snap_customers %}
{{ config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',
    updated_at='updated_at'
) }}
SELECT * FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
dbt run Execute tous les modeles
dbt test Execute tous les tests
dbt run -s +fct_orders Execute fct_orders et ses dependances
dbt run -s tag:daily Execute les modeles tagues 'daily'
dbt docs generate Genere la documentation
dbt docs serve Lance le serveur de docs
dbt snapshot Execute les snapshots SCD2
dbt build Run + test + snapshot en une commande

🌈 Apache Airflow

# DAG avec TaskFlow API
from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='0 6 * * *',  # 6h chaque jour
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['production', 'dbt']
)
def daily_pipeline():

    @task()
    def extract():
        """Extract data from API"""
        import requests
        data = requests.get('https://api.example.com/data').json()
        return data

    @task()
    def transform(raw_data):
        """Clean and transform data"""
        cleaned = [
            {**row, 'amount': float(row['amount'])}
            for row in raw_data
            if row['status'] != 'cancelled'
        ]
        return cleaned

    @task()
    def load(data):
        """Load to data warehouse"""
        from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
        hook = SnowflakeHook('snowflake_conn')
        hook.insert_rows('target_table', data)

    @task()
    def run_dbt():
        """Run dbt transformations"""
        import subprocess
        subprocess.run(['dbt', 'build', '--target', 'prod'], check=True)

    # Define dependencies
    raw = extract()
    clean = transform(raw)
    load(clean) >> run_dbt()

daily_pipeline()
ComposantRole
SchedulerDeclenche les DAGs selon le schedule
WebserverInterface UI (port 8080)
WorkerExecute les taches
Metadata DBPostgreSQL stockant l'etat
DAGDirected Acyclic Graph (pipeline)
OperatorTemplate de tache (Python, Bash, SQL...)
SensorAttend une condition externe
XComCommunication entre taches

🔄 Dagster & Prefect

# Dagster - Software-Defined Assets
from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets

@asset(group_name="ingestion")
def raw_orders(context: AssetExecutionContext):
    """Extract orders from source"""
    import pandas as pd
    df = pd.read_csv("s3://bucket/orders.csv")
    df.to_parquet("/data/raw/orders.parquet")
    context.log.info(f"Loaded {len(df)} rows")
    return df

@asset(deps=[raw_orders], group_name="transform")
def clean_orders():
    """Clean and validate orders"""
    import pandas as pd
    df = pd.read_parquet("/data/raw/orders.parquet")
    return df.dropna(subset=['order_id', 'amount'])
# Prefect - Flows & Tasks
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash,
      cache_expiration=timedelta(hours=1),
      retries=3)
def extract_data(url: str) -> dict:
    import httpx
    return httpx.get(url).json()

@task
def transform_data(data: dict) -> list:
    return [r for r in data['results'] if r['active']]

@task
def load_data(records: list):
    # Load to database
    print(f"Loaded {len(records)} records")

@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline():
    data = extract_data("https://api.example.com/data")
    clean = transform_data(data)
    load_data(clean)

etl_pipeline()
CritereAirflowDagsterPrefect
ParadigmeTask-centricAsset-centricFlow-centric
TestabiliteMoyenneExcellenteBonne
DXBonneExcellenteExcellente
EcosystemeEnormeCroissantMoyen
ManagedMWAA, ComposerDagster+Prefect Cloud
dbt natifVia operatordagster-dbtprefect-dbt
Ideal pourStandard industrieData-aware pipelinesFlexibilite max

📂 Formats de Fichiers Data

FormatTypeCompressionSchemaUse Case
ParquetColumnarSnappy/GzipEmbarqueAnalytics, DWH
AvroRow-basedSnappy/DeflateEvolutionStreaming, Kafka
ORCColumnarZlib/SnappyEmbarqueHive ecosystem
JSONRow-basedNonFlexibleAPIs, logs
CSVRow-basedNonNonLegacy, export
Regle: Utilisez TOUJOURS Parquet pour le stockage analytique. CSV et JSON uniquement en entree.
# Python - Lire/Ecrire Parquet
import pandas as pd

# Parquet
df.to_parquet('data.parquet', compression='snappy')
df = pd.read_parquet('data.parquet')

# Avec PyArrow (plus performant)
import pyarrow.parquet as pq
table = pq.read_table('data.parquet',
    columns=['col1', 'col2'])  # lecture selective

🗃 Open Table Formats

FeatureDelta LakeApache IcebergApache Hudi
CreateurDatabricksNetflix/AppleUber
ACIDOuiOuiOui
Time TravelOuiOuiOui
Schema EvolutionOuiExcellenteOui
Partition EvolutionNonOui (hidden)Non
Multi-engineBonExcellentMoyen
CDC natifCDFVia FlinkNatif
AdoptionDatabricks ecoMulti-cloudStreaming
-- Delta Lake (Databricks SQL)
CREATE TABLE events USING DELTA
LOCATION 's3://bucket/events';

-- Iceberg (Spark)
CREATE TABLE catalog.db.events
USING iceberg
PARTITIONED BY (days(event_date));

-- Iceberg - Schema evolution
ALTER TABLE catalog.db.events
ADD COLUMNS (new_col STRING);
-- Pas besoin de rewrite!
💡 Tendance 2026: Apache Iceberg domine pour le multi-engine. Delta Lake reste fort dans l'ecosysteme Databricks. Hudi pour le CDC lourd.

📤 Outils d'Ingestion & CDC

OutilTypeOpen SourcePrix
FivetranELT manageNon$$$ (MAR-based)
AirbyteELTOuiGratuit / Cloud $$
DebeziumCDCOuiGratuit
AWS DMSMigration/CDCNon$ (instance)
StitchELT manageNon$$ (row-based)
MeltanoELT/SingerOuiGratuit
# Airbyte - Docker Compose
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh
# UI: http://localhost:8000

# Debezium - Kafka Connect config
{
  "name": "postgres-cdc",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.dbname": "mydb",
    "topic.prefix": "cdc",
    "plugin.name": "pgoutput"
  }
}

🚀 Architecture Pipeline Moderne

     Sources              Ingestion          Storage
  ┌──────────┐        ┌──────────┐      ┌──────────┐
  │ Postgres │───────>│ Airbyte/ │─────>│  Bronze  │
  │ APIs     │───────>│ Fivetran │─────>│ (Raw S3) │
  │ Fichiers │───────>│ Debezium │─────>│  Parquet │
  └──────────┘        └──────────┘      └────┬─────┘
                                              │
     Transformation        Processing         │
  ┌──────────────┐     ┌──────────┐          │
  │     dbt      │<────│  Silver  │<─────────┘
  │  (staging)   │────>│ (Clean)  │
  │  (marts)     │     └────┬─────┘
  └──────────────┘          │
                             v
     Serving            ┌──────────┐
  ┌──────────┐          │   Gold   │
  │ Tableau  │<─────────│(Business)│
  │ Looker   │<─────────│          │
  │ Metabase │          └──────────┘
  └──────────┘
                    Orchestration: Airflow/Dagster
                    Quality: Great Expectations/Soda
                    Catalog: DataHub/Atlan

⌨ dbt CLI Reference Rapide

dbt init Initialise un nouveau projet dbt
dbt debug Teste la connexion au DWH
dbt deps Installe les packages (packages.yml)
dbt seed Charge les fichiers CSV seeds
dbt compile Compile le SQL sans l'executer
dbt run --full-refresh Rebuild complet des incrementaux
dbt test --select source:* Teste uniquement les sources
dbt ls --resource-type model Liste tous les modeles
dbt run -s state:modified+ Slim CI: modeles modifies + descendants
dbt source freshness Verifie la fraicheur des sources

🏅 Medallion Architecture avec dbt

-- BRONZE: Raw data (view ou table)
-- models/staging/stg_raw_events.sql
{{ config(materialized='view') }}
SELECT
    _airbyte_extracted_at as loaded_at,
    *
FROM {{ source('raw_airbyte', 'events') }}

-- SILVER: Cleaned & typed
-- models/intermediate/int_events_cleaned.sql
{{ config(materialized='incremental',
          unique_key='event_id') }}
SELECT
    event_id,
    CAST(event_date AS DATE) as event_date,
    LOWER(TRIM(user_email)) as user_email,
    COALESCE(amount, 0) as amount,
    CASE
        WHEN status IN ('active','completed') THEN status
        ELSE 'unknown'
    END as status
FROM {{ ref('stg_raw_events') }}
{% if is_incremental() %}
WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}

-- GOLD: Business metrics
-- models/marts/fct_daily_revenue.sql
{{ config(materialized='table') }}
SELECT
    event_date,
    COUNT(DISTINCT user_email) as unique_users,
    COUNT(*) as total_events,
    SUM(amount) as total_revenue,
    AVG(amount) as avg_order_value
FROM {{ ref('int_events_cleaned') }}
WHERE status = 'completed'
GROUP BY event_date

🗜 Algorithmes de Compression

AlgorithmeRatioVitesseRecommande pour
Snappy2-4xTres rapideDefault Parquet, bon compromis
Zstd3-6xRapideMeilleur ratio 2025+, recommande
Gzip4-8xLentArchivage, stockage long terme
LZ42-3xTres rapideStreaming, latence critique
Brotli5-10xTres lentDonnees textuelles, web
# Python - Parquet avec compression Zstd
import pandas as pd

df.to_parquet(
    'data/orders/',
    engine='pyarrow',
    compression='zstd',           # Meilleur ratio/vitesse
    partition_cols=['date'],       # Partitionnement Hive
    row_group_size=100_000,
)

# Lecture avec predicate pushdown
import pyarrow.parquet as pq
table = pq.read_table('data/orders/',
    columns=['id', 'amount'],     # Column pruning
    filters=[('date', '>=', '2024-06-01')]  # Partition pruning
)
💡 Regle 2025: Zstd (level 3) par defaut. LZ4 pour Kafka. Evitez Gzip pour les lectures frequentes.

🔄 Patterns CDC (Change Data Capture)

  ETL (Legacy)
  Source ──> Transform ──> Warehouse
             (serveur ETL = bottleneck)

  ELT (Modern Data Stack)
  Source ──> Warehouse (raw) ──> Transform (dbt)
             Utilise la puissance du cloud DWH

  CDC (Real-time)
  Source DB ──> WAL/Binlog ──> Kafka ──> Consumer ──> DWH
               (Debezium)    (events)  (Flink/Spark)
Type CDCMethodeAvantageInconvenient
Log-basedLit le WAL (Debezium)Zero impact source, capte DELETEAcces replication log requis
Query-basedWHERE updated_at > lastSimple a configurerRate les DELETE, impacte source
Trigger-basedTriggers DBTemps reelImpact perf majeur, maintenance
⚠ 90% des use cases analytics fonctionnent en batch 15 min (Airbyte/Fivetran). N'utilisez CDC Kafka que si latence < 1 min requise.

⚙ dbt CI/CD Pipeline

# .github/workflows/dbt-ci.yml
name: dbt CI/CD
on:
  pull_request:
    paths: ['dbt_project/**']
  push:
    branches: [main]

jobs:
  dbt-ci:
    if: github.event_name == 'pull_request'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install dbt-core dbt-snowflake
      - run: dbt deps
      # Slim CI: ne build que les modeles modifies
      - run: |
          dbt build --target ci \
            --select state:modified+ \
            --defer --state target-prod/ \
            --fail-fast

  dbt-deploy:
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - run: dbt build --target prod
      - run: dbt docs generate
state:modified Modeles modifies vs production
state:modified+ Modifies + tous les descendants
--defer Utilise les tables prod pour les refs non-modifies
--fail-fast Arrete au premier echec
💡 Slim CI reduit les builds de 45 min a 3 min sur un projet de 500 modeles.

🌐 Airflow Executors & Services Manages

ExecutorWorkersScalabiliteUse Case
Sequential1 (serie)AucuneDev/debug uniquement
LocalProcessus1 machinePME, 10-50 DAGs
CeleryRedis/RabbitMQHorizontaleEnterprise, 100+ DAGs
Kubernetes1 pod/taskElastiqueCloud-native, isolation
ServiceCloudPrix env. smallAvantage
MWAAAWS~$0.49/hIntegration AWS native
Cloud ComposerGCP~$0.35/hIntegration GCP native
AstronomerMultiCustomCosmos + support expert
Dagster+MultiGratuit/ProAsset-centric, DX superieure
Prefect CloudMultiGratuit/ProZero-infra, Python-native

🧊 Apache Iceberg Avance

-- Partition Evolution (unique a Iceberg!)
ALTER TABLE db.orders
ADD PARTITION FIELD months(ordered_at);
-- Ancien partitionnement coexiste avec le nouveau
-- Pas de reecriture des donnees existantes!

-- Hidden Partitioning (transforms)
CREATE TABLE db.events
USING iceberg
PARTITIONED BY (
    days(event_ts),        -- Partition par jour
    bucket(16, user_id)    -- Hash partitioning
);
-- L'utilisateur ecrit WHERE event_ts = '...'
-- Iceberg applique automatiquement le pruning

-- Compaction (small files)
CALL system.rewrite_data_files(
    table => 'db.orders',
    options => map(
        'target-file-size-bytes', '134217728'  -- 128MB
    )
);

-- Expirer les anciens snapshots
CALL system.expire_snapshots(
    table => 'db.orders',
    older_than => TIMESTAMP '2024-06-01'
);

-- Delta UniForm (Databricks)
-- Genere automatiquement les metadonnees Iceberg
ALTER TABLE delta_table
SET TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg');
💡 Convergence 2025: Iceberg est le standard de facto. Snowflake, BigQuery, Databricks le supportent tous. Delta UniForm rend Delta compatible Iceberg.