❄ Snowflake - Commandes Essentielles
-- Creer un warehouse
CREATE WAREHOUSE my_wh
WITH WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3;
-- Time Travel (jusqu'a 90 jours)
SELECT * FROM my_table
AT(TIMESTAMP => '2026-01-15 10:00:00'::TIMESTAMP);
SELECT * FROM my_table
BEFORE(STATEMENT => '01b2c3d4-...');
-- Zero-Copy Cloning
CREATE TABLE dev_table CLONE prod_table;
CREATE DATABASE dev_db CLONE prod_db;
CREATE SCHEMA test_schema CLONE prod_schema;
-- Streams (CDC)
CREATE STREAM my_stream ON TABLE source_table;
SELECT * FROM my_stream; -- changes only
-- Tasks (scheduling)
CREATE TASK my_task
WAREHOUSE = my_wh
SCHEDULE = 'USING CRON 0 9 * * * UTC'
AS
INSERT INTO target SELECT * FROM my_stream;
-- Data Sharing
CREATE SHARE my_share;
GRANT USAGE ON DATABASE db TO SHARE my_share;
GRANT SELECT ON TABLE db.schema.t TO SHARE my_share;
-- Snowpipe (auto-ingest)
CREATE PIPE my_pipe AUTO_INGEST = TRUE AS
COPY INTO my_table FROM @my_stage;
💡 Astuce: Utilisez WAREHOUSE_SIZE = 'XSMALL' pour le dev, montez en prod. Le cout est lineaire par taille.
🏗 Snowflake Architecture
┌─────────────────────────────────┐
│ Cloud Services Layer │
│ (Auth, Metadata, Optimizer) │
├─────────────────────────────────┤
│ Compute Layer │
│ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │ WH1 │ │ WH2 │ │ WH3 │ │
│ │ ETL │ │ BI │ │ ML │ │
│ └─────┘ └─────┘ └─────┘ │
├─────────────────────────────────┤
│ Storage Layer │
│ (Micro-partitions, S3/GCS) │
│ Columnar, Compressed, Immutable│
└─────────────────────────────────┘
| Concept | Description |
| Micro-partitions | 50-500 MB, columnar, immutable, auto-clustered |
| Virtual Warehouse | Cluster de compute independant (XS a 6XL) |
| Time Travel | Acces historique: 1 jour (Standard) a 90 jours (Enterprise) |
| Fail-safe | 7 jours supplementaires (non accessible utilisateur) |
| Zero-Copy Clone | Clone metadata sans copier les donnees |
| Snowpark | Python/Java/Scala executant dans Snowflake |
☁ Google BigQuery
-- Partitionnement
CREATE TABLE dataset.my_table (
id INT64,
event_date DATE,
user_id STRING,
amount NUMERIC
)
PARTITION BY event_date
CLUSTER BY user_id;
-- BigQuery ML - Modele en SQL
CREATE OR REPLACE MODEL dataset.my_model
OPTIONS(model_type='logistic_reg',
input_label_cols=['is_churned'])
AS SELECT * FROM dataset.training_data;
-- Prediction
SELECT * FROM ML.PREDICT(
MODEL dataset.my_model,
TABLE dataset.new_data);
-- Federated Query
SELECT * FROM
EXTERNAL_QUERY('connection_id',
'SELECT * FROM postgres_table');
-- Materialized View
CREATE MATERIALIZED VIEW dataset.mv AS
SELECT date, SUM(revenue) as total
FROM dataset.sales
GROUP BY date;
-- Scheduled Query
-- Via Console ou API: bq mk --transfer_config
| Feature | BigQuery |
| Architecture | Serverless, Dremel engine |
| Stockage | Capacitor (columnar) |
| Pricing | On-demand ($5/TB) ou Slots ($0.04/slot/h) |
| ML | BQML (SQL-based ML) |
| BI Engine | In-memory acceleration |
| BigLake | External tables (S3, GCS) |
⚡ Databricks & Apache Spark
# PySpark - DataFrame Operations
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg
spark = SparkSession.builder \
.appName("DataPipeline") \
.getOrCreate()
# Lire Delta Lake
df = spark.read.format("delta").load("/data/bronze/events")
# Transformations
result = (df
.filter(col("event_date") >= "2026-01-01")
.groupBy("category")
.agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
)
.orderBy(col("total_amount").desc())
)
# Ecrire en Delta
result.write.format("delta") \
.mode("overwrite") \
.save("/data/gold/category_summary")
# Delta Lake - MERGE (Upsert)
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, "/data/silver/users")
target.alias("t").merge(
source_df.alias("s"),
"t.user_id = s.user_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Unity Catalog
spark.sql("USE CATALOG main")
spark.sql("USE SCHEMA analytics")
spark.sql("GRANT SELECT ON TABLE t TO GROUP analysts")
| Composant | Role |
| Driver | Coordonne le job, cree le plan d'execution |
| Executors | Executent les taches sur les partitions |
| DAG | Graphe acyclique dirige des transformations |
| Delta Lake | ACID transactions sur data lake |
| Unity Catalog | Gouvernance unifiee multi-workspace |
| Photon | Moteur C++ vectorise (3-8x plus rapide) |
📈 Comparaison Cloud Data Warehouses
| Critere | Snowflake | BigQuery | Databricks |
| Architecture | Shared-disk | Serverless | Lakehouse |
| Compute/Storage | Separes | Decouples | Separes |
| Pricing | Credits/seconde | $/TB ou slots | DBU/heure |
| Format natif | Proprietaire | Capacitor | Delta Lake |
| ML natif | Cortex AI | BQML | MLflow |
| Multi-cloud | AWS/GCP/Azure | GCP only | AWS/GCP/Azure |
| Data Sharing | Natif | Analytics Hub | Delta Sharing |
| Streaming | Snowpipe | Streaming insert | Structured Streaming |
| Ideal pour | SQL analytics | GCP ecosysteme | ML + Data Eng |
💡 Regle du mentor: Snowflake pour le SQL-first, BigQuery si full GCP, Databricks si ML-heavy ou multi-cloud open-source.
🔨 dbt (data build tool)
# Structure projet dbt
my_project/
├── dbt_project.yml
├── profiles.yml # connexion DWH
├── models/
│ ├── staging/ # 1:1 avec sources
│ │ ├── stg_orders.sql
│ │ └── _stg_models.yml
│ ├── intermediate/ # logique metier
│ │ └── int_orders_enriched.sql
│ └── marts/ # tables finales
│ ├── fct_orders.sql
│ └── dim_customers.sql
├── tests/
│ └── assert_positive_amounts.sql
├── macros/
│ └── cents_to_dollars.sql
├── seeds/
│ └── country_codes.csv
└── snapshots/
└── snap_customers.sql
-- Materialization types
-- models/staging/stg_orders.sql
{{ config(materialized='view') }}
SELECT
id as order_id,
customer_id,
order_date,
status,
amount_cents / 100.0 as amount
FROM {{ source('raw', 'orders') }}
-- models/marts/fct_orders.sql
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns'
) }}
SELECT
o.order_id,
o.customer_id,
c.customer_name,
o.order_date,
o.amount,
o.status
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('dim_customers') }} c
ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o.order_date > (
SELECT MAX(order_date) FROM {{ this }}
)
{% endif %}
🔎 dbt Tests & Macros
# _stg_models.yml - Tests generiques
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['pending','shipped','delivered','cancelled']
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
# dbt_expectations (package)
- name: amount
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- Usage dans un modele:
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} as amount
FROM {{ source('raw', 'orders') }}
-- Snapshot (SCD Type 2)
{% snapshot snap_customers %}
{{ config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at'
) }}
SELECT * FROM {{ source('raw', 'customers') }}
{% endsnapshot %}
dbt run
Execute tous les modeles
dbt test
Execute tous les tests
dbt run -s +fct_orders
Execute fct_orders et ses dependances
dbt run -s tag:daily
Execute les modeles tagues 'daily'
dbt docs generate
Genere la documentation
dbt docs serve
Lance le serveur de docs
dbt snapshot
Execute les snapshots SCD2
dbt build
Run + test + snapshot en une commande
🌈 Apache Airflow
# DAG avec TaskFlow API
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='0 6 * * *', # 6h chaque jour
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['production', 'dbt']
)
def daily_pipeline():
@task()
def extract():
"""Extract data from API"""
import requests
data = requests.get('https://api.example.com/data').json()
return data
@task()
def transform(raw_data):
"""Clean and transform data"""
cleaned = [
{**row, 'amount': float(row['amount'])}
for row in raw_data
if row['status'] != 'cancelled'
]
return cleaned
@task()
def load(data):
"""Load to data warehouse"""
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
hook = SnowflakeHook('snowflake_conn')
hook.insert_rows('target_table', data)
@task()
def run_dbt():
"""Run dbt transformations"""
import subprocess
subprocess.run(['dbt', 'build', '--target', 'prod'], check=True)
# Define dependencies
raw = extract()
clean = transform(raw)
load(clean) >> run_dbt()
daily_pipeline()
| Composant | Role |
| Scheduler | Declenche les DAGs selon le schedule |
| Webserver | Interface UI (port 8080) |
| Worker | Execute les taches |
| Metadata DB | PostgreSQL stockant l'etat |
| DAG | Directed Acyclic Graph (pipeline) |
| Operator | Template de tache (Python, Bash, SQL...) |
| Sensor | Attend une condition externe |
| XCom | Communication entre taches |
🔄 Dagster & Prefect
# Dagster - Software-Defined Assets
from dagster import asset, AssetExecutionContext
from dagster_dbt import DbtCliResource, dbt_assets
@asset(group_name="ingestion")
def raw_orders(context: AssetExecutionContext):
"""Extract orders from source"""
import pandas as pd
df = pd.read_csv("s3://bucket/orders.csv")
df.to_parquet("/data/raw/orders.parquet")
context.log.info(f"Loaded {len(df)} rows")
return df
@asset(deps=[raw_orders], group_name="transform")
def clean_orders():
"""Clean and validate orders"""
import pandas as pd
df = pd.read_parquet("/data/raw/orders.parquet")
return df.dropna(subset=['order_id', 'amount'])
# Prefect - Flows & Tasks
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
retries=3)
def extract_data(url: str) -> dict:
import httpx
return httpx.get(url).json()
@task
def transform_data(data: dict) -> list:
return [r for r in data['results'] if r['active']]
@task
def load_data(records: list):
# Load to database
print(f"Loaded {len(records)} records")
@flow(name="ETL Pipeline", log_prints=True)
def etl_pipeline():
data = extract_data("https://api.example.com/data")
clean = transform_data(data)
load_data(clean)
etl_pipeline()
| Critere | Airflow | Dagster | Prefect |
| Paradigme | Task-centric | Asset-centric | Flow-centric |
| Testabilite | Moyenne | Excellente | Bonne |
| DX | Bonne | Excellente | Excellente |
| Ecosysteme | Enorme | Croissant | Moyen |
| Managed | MWAA, Composer | Dagster+ | Prefect Cloud |
| dbt natif | Via operator | dagster-dbt | prefect-dbt |
| Ideal pour | Standard industrie | Data-aware pipelines | Flexibilite max |
📂 Formats de Fichiers Data
| Format | Type | Compression | Schema | Use Case |
| Parquet | Columnar | Snappy/Gzip | Embarque | Analytics, DWH |
| Avro | Row-based | Snappy/Deflate | Evolution | Streaming, Kafka |
| ORC | Columnar | Zlib/Snappy | Embarque | Hive ecosystem |
| JSON | Row-based | Non | Flexible | APIs, logs |
| CSV | Row-based | Non | Non | Legacy, export |
⚠ Regle: Utilisez TOUJOURS Parquet pour le stockage analytique. CSV et JSON uniquement en entree.
# Python - Lire/Ecrire Parquet
import pandas as pd
# Parquet
df.to_parquet('data.parquet', compression='snappy')
df = pd.read_parquet('data.parquet')
# Avec PyArrow (plus performant)
import pyarrow.parquet as pq
table = pq.read_table('data.parquet',
columns=['col1', 'col2']) # lecture selective
🗃 Open Table Formats
| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
| Createur | Databricks | Netflix/Apple | Uber |
| ACID | Oui | Oui | Oui |
| Time Travel | Oui | Oui | Oui |
| Schema Evolution | Oui | Excellente | Oui |
| Partition Evolution | Non | Oui (hidden) | Non |
| Multi-engine | Bon | Excellent | Moyen |
| CDC natif | CDF | Via Flink | Natif |
| Adoption | Databricks eco | Multi-cloud | Streaming |
-- Delta Lake (Databricks SQL)
CREATE TABLE events USING DELTA
LOCATION 's3://bucket/events';
-- Iceberg (Spark)
CREATE TABLE catalog.db.events
USING iceberg
PARTITIONED BY (days(event_date));
-- Iceberg - Schema evolution
ALTER TABLE catalog.db.events
ADD COLUMNS (new_col STRING);
-- Pas besoin de rewrite!
💡 Tendance 2026: Apache Iceberg domine pour le multi-engine. Delta Lake reste fort dans l'ecosysteme Databricks. Hudi pour le CDC lourd.
📤 Outils d'Ingestion & CDC
| Outil | Type | Open Source | Prix |
| Fivetran | ELT manage | Non | $$$ (MAR-based) |
| Airbyte | ELT | Oui | Gratuit / Cloud $$ |
| Debezium | CDC | Oui | Gratuit |
| AWS DMS | Migration/CDC | Non | $ (instance) |
| Stitch | ELT manage | Non | $$ (row-based) |
| Meltano | ELT/Singer | Oui | Gratuit |
# Airbyte - Docker Compose
git clone https://github.com/airbytehq/airbyte.git
cd airbyte
./run-ab-platform.sh
# UI: http://localhost:8000
# Debezium - Kafka Connect config
{
"name": "postgres-cdc",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.dbname": "mydb",
"topic.prefix": "cdc",
"plugin.name": "pgoutput"
}
}
🚀 Architecture Pipeline Moderne
Sources Ingestion Storage
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Postgres │───────>│ Airbyte/ │─────>│ Bronze │
│ APIs │───────>│ Fivetran │─────>│ (Raw S3) │
│ Fichiers │───────>│ Debezium │─────>│ Parquet │
└──────────┘ └──────────┘ └────┬─────┘
│
Transformation Processing │
┌──────────────┐ ┌──────────┐ │
│ dbt │<────│ Silver │<─────────┘
│ (staging) │────>│ (Clean) │
│ (marts) │ └────┬─────┘
└──────────────┘ │
v
Serving ┌──────────┐
┌──────────┐ │ Gold │
│ Tableau │<─────────│(Business)│
│ Looker │<─────────│ │
│ Metabase │ └──────────┘
└──────────┘
Orchestration: Airflow/Dagster
Quality: Great Expectations/Soda
Catalog: DataHub/Atlan
⌨ dbt CLI Reference Rapide
dbt init
Initialise un nouveau projet dbt
dbt debug
Teste la connexion au DWH
dbt deps
Installe les packages (packages.yml)
dbt seed
Charge les fichiers CSV seeds
dbt compile
Compile le SQL sans l'executer
dbt run --full-refresh
Rebuild complet des incrementaux
dbt test --select source:*
Teste uniquement les sources
dbt ls --resource-type model
Liste tous les modeles
dbt run -s state:modified+
Slim CI: modeles modifies + descendants
dbt source freshness
Verifie la fraicheur des sources
🏅 Medallion Architecture avec dbt
-- BRONZE: Raw data (view ou table)
-- models/staging/stg_raw_events.sql
{{ config(materialized='view') }}
SELECT
_airbyte_extracted_at as loaded_at,
*
FROM {{ source('raw_airbyte', 'events') }}
-- SILVER: Cleaned & typed
-- models/intermediate/int_events_cleaned.sql
{{ config(materialized='incremental',
unique_key='event_id') }}
SELECT
event_id,
CAST(event_date AS DATE) as event_date,
LOWER(TRIM(user_email)) as user_email,
COALESCE(amount, 0) as amount,
CASE
WHEN status IN ('active','completed') THEN status
ELSE 'unknown'
END as status
FROM {{ ref('stg_raw_events') }}
{% if is_incremental() %}
WHERE loaded_at > (SELECT MAX(loaded_at) FROM {{ this }})
{% endif %}
-- GOLD: Business metrics
-- models/marts/fct_daily_revenue.sql
{{ config(materialized='table') }}
SELECT
event_date,
COUNT(DISTINCT user_email) as unique_users,
COUNT(*) as total_events,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM {{ ref('int_events_cleaned') }}
WHERE status = 'completed'
GROUP BY event_date
🗜 Algorithmes de Compression
| Algorithme | Ratio | Vitesse | Recommande pour |
| Snappy | 2-4x | Tres rapide | Default Parquet, bon compromis |
| Zstd | 3-6x | Rapide | Meilleur ratio 2025+, recommande |
| Gzip | 4-8x | Lent | Archivage, stockage long terme |
| LZ4 | 2-3x | Tres rapide | Streaming, latence critique |
| Brotli | 5-10x | Tres lent | Donnees textuelles, web |
# Python - Parquet avec compression Zstd
import pandas as pd
df.to_parquet(
'data/orders/',
engine='pyarrow',
compression='zstd', # Meilleur ratio/vitesse
partition_cols=['date'], # Partitionnement Hive
row_group_size=100_000,
)
# Lecture avec predicate pushdown
import pyarrow.parquet as pq
table = pq.read_table('data/orders/',
columns=['id', 'amount'], # Column pruning
filters=[('date', '>=', '2024-06-01')] # Partition pruning
)
💡 Regle 2025: Zstd (level 3) par defaut. LZ4 pour Kafka. Evitez Gzip pour les lectures frequentes.
🔄 Patterns CDC (Change Data Capture)
ETL (Legacy)
Source ──> Transform ──> Warehouse
(serveur ETL = bottleneck)
ELT (Modern Data Stack)
Source ──> Warehouse (raw) ──> Transform (dbt)
Utilise la puissance du cloud DWH
CDC (Real-time)
Source DB ──> WAL/Binlog ──> Kafka ──> Consumer ──> DWH
(Debezium) (events) (Flink/Spark)
| Type CDC | Methode | Avantage | Inconvenient |
| Log-based | Lit le WAL (Debezium) | Zero impact source, capte DELETE | Acces replication log requis |
| Query-based | WHERE updated_at > last | Simple a configurer | Rate les DELETE, impacte source |
| Trigger-based | Triggers DB | Temps reel | Impact perf majeur, maintenance |
⚠ 90% des use cases analytics fonctionnent en batch 15 min (Airbyte/Fivetran). N'utilisez CDC Kafka que si latence < 1 min requise.
⚙ dbt CI/CD Pipeline
# .github/workflows/dbt-ci.yml
name: dbt CI/CD
on:
pull_request:
paths: ['dbt_project/**']
push:
branches: [main]
jobs:
dbt-ci:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dbt-core dbt-snowflake
- run: dbt deps
# Slim CI: ne build que les modeles modifies
- run: |
dbt build --target ci \
--select state:modified+ \
--defer --state target-prod/ \
--fail-fast
dbt-deploy:
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- run: dbt build --target prod
- run: dbt docs generate
state:modified
Modeles modifies vs production
state:modified+
Modifies + tous les descendants
--defer
Utilise les tables prod pour les refs non-modifies
--fail-fast
Arrete au premier echec
💡 Slim CI reduit les builds de 45 min a 3 min sur un projet de 500 modeles.
🌐 Airflow Executors & Services Manages
| Executor | Workers | Scalabilite | Use Case |
| Sequential | 1 (serie) | Aucune | Dev/debug uniquement |
| Local | Processus | 1 machine | PME, 10-50 DAGs |
| Celery | Redis/RabbitMQ | Horizontale | Enterprise, 100+ DAGs |
| Kubernetes | 1 pod/task | Elastique | Cloud-native, isolation |
| Service | Cloud | Prix env. small | Avantage |
| MWAA | AWS | ~$0.49/h | Integration AWS native |
| Cloud Composer | GCP | ~$0.35/h | Integration GCP native |
| Astronomer | Multi | Custom | Cosmos + support expert |
| Dagster+ | Multi | Gratuit/Pro | Asset-centric, DX superieure |
| Prefect Cloud | Multi | Gratuit/Pro | Zero-infra, Python-native |
🧊 Apache Iceberg Avance
-- Partition Evolution (unique a Iceberg!)
ALTER TABLE db.orders
ADD PARTITION FIELD months(ordered_at);
-- Ancien partitionnement coexiste avec le nouveau
-- Pas de reecriture des donnees existantes!
-- Hidden Partitioning (transforms)
CREATE TABLE db.events
USING iceberg
PARTITIONED BY (
days(event_ts), -- Partition par jour
bucket(16, user_id) -- Hash partitioning
);
-- L'utilisateur ecrit WHERE event_ts = '...'
-- Iceberg applique automatiquement le pruning
-- Compaction (small files)
CALL system.rewrite_data_files(
table => 'db.orders',
options => map(
'target-file-size-bytes', '134217728' -- 128MB
)
);
-- Expirer les anciens snapshots
CALL system.expire_snapshots(
table => 'db.orders',
older_than => TIMESTAMP '2024-06-01'
);
-- Delta UniForm (Databricks)
-- Genere automatiquement les metadonnees Iceberg
ALTER TABLE delta_table
SET TBLPROPERTIES ('delta.universalFormat.enabledFormats' = 'iceberg');
💡 Convergence 2025: Iceberg est le standard de facto. Snowflake, BigQuery, Databricks le supportent tous. Delta UniForm rend Delta compatible Iceberg.