Introduction au Modern Data Stack
Objectifs
- Comprendre ce qu'est le Modern Data Stack et pourquoi il a emerge
- Identifier les composants cles : ingestion, transformation, stockage, orchestration, visualisation
- Analyser l'evolution du legacy vers le cloud-native
- Evaluer les avantages et limites de cette approche modulaire
En tant qu'architecte data ayant accompagne plus de 30 migrations vers le cloud, je peux vous affirmer que le Modern Data Stack n'est pas qu'un buzzword. C'est un changement fondamental dans la facon dont nous concevons les pipelines de donnees. La cle, c'est de comprendre que chaque composant est interchangeable : vous n'etes plus enferme dans un ecosysteme monolithique. Mais attention, cette modularite a un prix : la complexite d'integration. Mon conseil : commencez simple, puis iterez.
Qu'est-ce que le Modern Data Stack ?
Le Modern Data Stack (MDS) designe un ensemble d'outils cloud-native, modulaires et souvent SaaS, qui couvrent l'ensemble du cycle de vie des donnees. Contrairement aux architectures traditionnelles basees sur des solutions monolithiques on-premise (Informatica, Teradata, Oracle DWH), le MDS privilegie la specialisation : chaque outil excelle dans une tache precise et communique avec les autres via des API et des formats standardises.
Les 5 piliers du Modern Data Stack
1. Ingestion (Fivetran, Airbyte, Stitch) - Extraction et chargement des donnees brutes depuis les sources.
2. Stockage / Compute (Snowflake, BigQuery, Databricks) - Entrepot cloud elastique separant stockage et calcul.
3. Transformation (dbt, Dataform) - Transformation SQL-first dans l'entrepot (ELT vs ETL).
4. Orchestration (Airflow, Dagster, Prefect) - Planification et monitoring des pipelines.
5. Visualisation (Looker, Tableau, Metabase, Power BI) - Consommation et exploration des donnees.
SOURCES INGESTION STOCKAGE/COMPUTE TRANSFORMATION CONSOMMATION
+----------+ +------------+ +-----------------+ +-------------+ +-------------+
| SaaS APIs|-------->| | | | | | | Looker |
| (Stripe, | | Fivetran |------>| Snowflake |----->| dbt |---->| Tableau |
| HubSpot)| | Airbyte | | BigQuery | | Dataform | | Metabase |
+----------+ | Stitch | | Databricks | | | | Power BI |
| Databases|-------->| | | | +-------------+ +-------------+
| (Postgres| +------------+ +-----------------+ | |
| MySQL) | | v v
+----------+ +-----+------+ +-----------+ +-----------+
| Fichiers | | Data Lake | | Data | | Reverse |
| (S3, GCS)| | (Parquet, | | Catalog | | ETL |
+----------+ | Delta) | | (dbt docs)| | (Census, |
+------------+ +-----------+ | Hightouch)|
+-----------+
De l'ETL au ELT : un changement de paradigme
L'approche traditionnelle ETL (Extract, Transform, Load) imposait de transformer les donnees avant de les charger dans l'entrepot, principalement parce que le compute etait couteux et limite. Avec le cloud, le stockage est devenu quasi-gratuit et le compute elastique. Le paradigme ELT (Extract, Load, Transform) s'impose naturellement :
| Critere | ETL Traditionnel | ELT Moderne |
|---|---|---|
| Ou se fait la transformation ? | Serveur ETL dedie (Informatica, Talend) | Dans l'entrepot cloud (via dbt, SQL) |
| Donnees brutes conservees ? | Souvent non (transformees avant chargement) | Oui, couche raw preservee |
| Scalabilite | Limitee par le serveur ETL | Elastique (scale-out cloud) |
| Cout initial | Eleve (licences, serveurs) | Pay-as-you-go |
| Time-to-value | Semaines a mois | Heures a jours |
| Flexibilite schema | Schema-on-write rigide | Schema-on-read flexible |
Netflix : Migration vers le Modern Data Stack
Netflix a ete l'un des pionniers de l'adoption du cloud pour ses donnees. En migrant d'Oracle on-premise vers un ecosysteme base sur AWS (S3 pour le stockage, Spark pour le compute, et des outils internes pour la transformation), Netflix a pu :
- Reduire le temps de traitement de ses pipelines de recommandation de 12h a 45 minutes
- Passer de 50 TB a plus de 100 PB de donnees gerees sans augmentation proportionnelle des couts
- Permettre a plus de 500 data engineers et analysts de travailler en parallele sans contention de ressources
- Deployer des modeles de ML de personnalisation alimentes en quasi temps reel
RetailCorp (anonymise) : Migration precipitee vers le MDS
Une grande chaine de distribution a voulu migrer l'ensemble de son infrastructure data en 6 mois vers un Modern Data Stack complet (Fivetran + Snowflake + dbt + Looker). Le projet a connu un depassement budgetaire de 300% car :
- Les pipelines legacy avaient des logiques metier complexes non documentees, impossibles a reproduire en SQL pur
- L'absence de data governance a entraine des doublons et des incoherences dans les dashboards
- Les equipes n'avaient pas ete formees aux nouveaux outils, generant une resistance au changement massive
Anti-pattern : le "Shiny Object Syndrome"
Adopter chaque nouvel outil du MDS parce qu'il est a la mode, sans evaluer s'il repond a un besoin reel. On se retrouve avec 15 outils SaaS, des couts d'abonnement exponentiels, et une equipe incapable de maitriser l'ensemble.
Solution : Commencez avec un stack minimal (un ingestion tool, un DWH cloud, dbt, un outil de BI). Ajoutez des composants uniquement quand un besoin concret est identifie et que l'equipe a la capacite de l'absorber. Evaluez le TCO sur 3 ans avant chaque ajout.
Scenario : Votre premier audit Modern Data Stack
Vous etes recrute comme Data Architect dans une scale-up de 200 employes. L'equipe data (3 personnes) utilise actuellement : des scripts Python pour l'extraction, un PostgreSQL de 500 Go comme "data warehouse", des notebooks Jupyter pour la transformation, et Google Sheets pour le reporting. Le CEO veut "passer au cloud". Comment structurez-vous votre recommandation ?
Pensez aux criteres suivants : volume de donnees prevu a 2 ans, competences de l'equipe actuelle, budget disponible, cas d'usage prioritaires (reporting operationnel vs analytics avancee vs ML), et contraintes reglementaires (RGPD, residences des donnees).
Conseil pratique : le Medallion Architecture
Quelle que soit la solution choisie, structurez vos donnees en 3 couches :
Bronze (Raw) : donnees brutes telles qu'extraites des sources, immutables.
Silver (Cleaned) : donnees nettoyees, dedupliquees, typees correctement.
Gold (Business) : modeles metier agrages, prets pour la consommation BI et ML.
Cette approche, popularisee par Databricks, est universellement applicable.
Snowflake Deep Dive
Objectifs
- Maitriser l'architecture multi-cluster de Snowflake (separation stockage, compute, services)
- Comprendre et utiliser le Time Travel et le Zero-Copy Cloning
- Configurer le Data Sharing et les materialized views
- Optimiser les couts avec les warehouse sizing et les resource monitors
J'ai deploye Snowflake chez plus de 15 clients, et je peux vous dire que la puissance de cet outil est aussi sa faiblesse potentielle. La separation compute/stockage est revolutionnaire, mais si vous ne mettez pas en place des resource monitors des le premier jour, la facture peut exploser en quelques heures. J'ai vu un client depenser 40 000 euros en un week-end parce qu'un warehouse XL etait reste allume. Mon premier reflexe : toujours configurer l'auto-suspend a 5 minutes et mettre des alertes budgetaires.
Architecture Snowflake en profondeur
Snowflake repose sur une architecture unique en trois couches independantes, ce qui le differencie fondamentalement des solutions traditionnelles (Teradata, Oracle) ou meme de certains concurrents cloud.
+=========================================================+
| COUCHE SERVICES (Cloud Services) |
| +------------------+ +------------+ +--------------+ |
| | Query Optimizer | | Metadata | | Security & | |
| | & Compiler | | Management | | Auth (RBAC) | |
| +------------------+ +------------+ +--------------+ |
| +------------------+ +------------+ +--------------+ |
| | Transaction Mgmt | | Result | | Infrastructure| |
| | | | Cache | | Management | |
| +------------------+ +------------+ +--------------+ |
+=========================================================+
| | |
+=========================================================+
| COUCHE COMPUTE (Virtual Warehouses) |
| +-------------+ +-------------+ +-------------+ |
| | WH_ANALYST | | WH_ETL | | WH_ML | |
| | (X-Small) | | (Large) | | (X-Large) | |
| | 1 credit/h | | 8 credits/h | | 16 credits/h| |
| +-------------+ +-------------+ +-------------+ |
| Chaque warehouse = cluster independant, pas de contention|
+=========================================================+
| | |
+=========================================================+
| COUCHE STOCKAGE (Cloud Storage) |
| +---------------------------------------------------+ |
| | Micro-partitions compressees (columnar, immutables)| |
| | Stockage : AWS S3 / Azure Blob / GCP GCS | |
| | Format interne optimise (pas Parquet, pas ORC) | |
| +---------------------------------------------------+ |
+=========================================================+
Les Micro-partitions : le secret de Snowflake
Snowflake stocke les donnees dans des micro-partitions de 50 a 500 Mo (compressees). Chaque micro-partition est immutable et contient les metadonnees suivantes : valeurs min/max par colonne, nombre de valeurs distinctes, et null count. Cela permet un pruning extremement efficace : le query optimizer peut eliminer des partitions entieres sans les lire, grace aux metadonnees. Contrairement a un index traditionnel, ce pruning est automatique et ne necessite aucune maintenance.
Time Travel : remonter dans le temps
Le Time Travel de Snowflake permet d'acceder aux donnees telles qu'elles etaient a un moment passe. C'est possible grace a l'immutabilite des micro-partitions : quand une donnee est modifiee, une nouvelle micro-partition est creee, mais l'ancienne est conservee pendant la periode de retention.
-- Interroger une table telle qu'elle etait il y a 30 minutes SELECT * FROM orders AT(OFFSET => -60*30); -- Interroger a un timestamp precis SELECT * FROM orders AT(TIMESTAMP => '2025-12-01 10:00:00'::TIMESTAMP); -- Restaurer une table supprimee par erreur UNDROP TABLE orders; -- Creer une table a partir d'un etat passe CREATE TABLE orders_backup AS SELECT * FROM orders AT(TIMESTAMP => '2025-12-01 08:00:00'::TIMESTAMP); -- Retention configurable (0 a 90 jours selon l'edition) ALTER TABLE orders SET DATA_RETENTION_TIME_IN_DAYS = 30;
Zero-Copy Cloning
Le Zero-Copy Cloning permet de creer une copie logique d'une table, d'un schema ou d'une base entiere sans dupliquer physiquement les donnees. Les deux objets partagent les memes micro-partitions sous-jacentes. Ce n'est que lorsqu'une modification est effectuee sur l'un des deux objets qu'une nouvelle micro-partition est creee (copy-on-write).
-- Cloner une base entiere pour un environnement de dev
CREATE DATABASE dev_analytics CLONE prod_analytics;
-- Cloner un schema specifique
CREATE SCHEMA staging.test_schema CLONE staging.prod_schema;
-- Cloner une table pour des tests
CREATE TABLE orders_test CLONE orders;
-- Verifier l'espace reel consomme (quasi-zero au debut)
SELECT TABLE_NAME, ACTIVE_BYTES, TIME_TRAVEL_BYTES, RETAINED_FOR_CLONE_BYTES
FROM INFORMATION_SCHEMA.TABLE_STORAGE_METRICS
WHERE TABLE_NAME IN ('ORDERS', 'ORDERS_TEST');
Spotify : Data Sharing et multi-cluster
Spotify utilise Snowflake pour partager des donnees d'ecoute agregees avec ses partenaires (labels musicaux, artistes). Grace au Secure Data Sharing, Spotify expose des vues securisees sans copier les donnees :
- Les labels accedent aux statistiques de streaming via leur propre compte Snowflake, sans transfert de donnees
- Les couts de compute sont supportes par le consommateur (le label), pas par Spotify
- Les donnees sont toujours fraiches car il n'y a pas de copie a synchroniser
- La gouvernance est centralisee : Spotify peut revoquer l'acces instantanement
Data Sharing
-- Creer un share pour un partenaire
CREATE SHARE partner_analytics;
-- Ajouter des objets au share (vues securisees recommandees)
CREATE SECURE VIEW analytics.public.v_partner_metrics AS
SELECT region, product_category, SUM(revenue) as total_revenue,
COUNT(DISTINCT customer_id) as unique_customers
FROM analytics.public.sales
GROUP BY region, product_category;
GRANT USAGE ON DATABASE analytics TO SHARE partner_analytics;
GRANT USAGE ON SCHEMA analytics.public TO SHARE partner_analytics;
GRANT SELECT ON VIEW analytics.public.v_partner_metrics TO SHARE partner_analytics;
-- Ajouter le compte consommateur
ALTER SHARE partner_analytics ADD ACCOUNTS = 'PARTNER_ACCT_123';
Anti-pattern : Warehouse surdimensionne permanent
Utiliser un warehouse X-Large (16 credits/h ~ 48$/h) en permanence "au cas ou" alors que 95% des requetes pourraient tourner sur un Small (2 credits/h ~ 6$/h). Certaines equipes configurent un seul gros warehouse partage par tous les utilisateurs, recreant exactement les problemes de contention qu'on voulait eviter.
Solution : Creer des warehouses dedies par usage (WH_ETL pour les pipelines, WH_ANALYST pour les analyses ad-hoc, WH_BI pour les dashboards). Configurer l'auto-suspend a 1-5 minutes, et l'auto-resume. Utiliser les resource monitors pour alerter a 75% et suspendre a 100% du budget mensuel.
-- Configuration optimale des warehouses
CREATE WAREHOUSE wh_etl WITH
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 60 -- 1 minute sans activite
AUTO_RESUME = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 3 -- Multi-cluster auto-scaling
SCALING_POLICY = 'ECONOMY';
-- Resource monitor pour controler les couts
CREATE RESOURCE MONITOR monthly_budget WITH
CREDIT_QUOTA = 5000
FREQUENCY = MONTHLY
START_TIMESTAMP = IMMEDIATELY
TRIGGERS
ON 75 PERCENT DO NOTIFY
ON 90 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND;
Piege du clustering key
Ne definissez un clustering key que sur les tables de plus de 1 To avec des patterns de requetes tres previsibles. Le re-clustering automatique consomme des credits en arriere-plan. Pour les tables de moins de 1 To, le pruning naturel des micro-partitions est generalement suffisant. Verifiez d'abord avec SYSTEM$CLUSTERING_INFORMATION('table').
Scenario : Optimisation des couts Snowflake
Vous etes Data Architect chez un e-commerce. La facture Snowflake est passee de 5 000 a 25 000 euros/mois en 3 mois. Le CFO vous demande de reduire de 50% sans impacter les performances. Vous decouvrez : un warehouse XL qui ne s'eteint jamais, 200 tables avec Time Travel a 90 jours, des requetes COPY INTO qui rechargent les memes fichiers, et 50 clones abandonnes de databases de production. Quel plan d'action proposez-vous ?
Google BigQuery
Objectifs
- Comprendre l'architecture serverless de BigQuery et le systeme de slots
- Maitriser le partitionnement, le clustering et les tables materialisees
- Utiliser BI Engine, les streaming inserts et l'integration ML (BQML)
- Optimiser les couts avec les modeles de facturation on-demand vs flat-rate
BigQuery est la solution que je recommande quand le client veut zero gestion d'infrastructure. Pas de warehouse a dimensionner, pas de cluster a monitorer. Vous ecrivez du SQL, Google gere le reste. Mais la contrepartie, c'est que vous avez moins de controle sur les performances granulaires. Mon astuce principale : toujours partitionner par date et clusterer par les colonnes les plus filtrees. J'ai vu des factures divisees par 10 simplement en ajoutant un partitionnement sur une table de 5 To.
Architecture Serverless de BigQuery
BigQuery est un data warehouse entierement serverless. Contrairement a Snowflake ou vous gerez des virtual warehouses, BigQuery abstrait completement le compute. Il repose sur trois composants internes de Google :
+------------------------------------------------------------------+
| BigQuery API / Console |
+------------------------------------------------------------------+
| | |
+-------------------+ +------------------+ +------------------+
| Dremel Engine | | Colossus | | Jupiter Network|
| (Execution moteur) | | (Stockage dist.) | | (Reseau interne) |
| | | | | |
| - Arbre de servers | | - Columnar format| | - 1 Petabit/sec |
| (mixers + leaf) | | - Capacitor | | - Transfert |
| - Execution SQL | | - Replique auto | | stockage<>calc |
| - Slots = unites | | - Compression | | |
| de calcul | | automatique | | |
+-------------------+ +------------------+ +------------------+
| | |
+------------------------------------------------------------------+
| Borg (Orchestration Google) |
| Allocation dynamique des ressources compute |
+------------------------------------------------------------------+
Les Slots BigQuery
Un slot est une unite de calcul virtuelle dans BigQuery. Chaque requete consomme un certain nombre de slots qui varient dynamiquement. En mode on-demand, vous partagez un pool de 2 000 slots avec d'autres utilisateurs de votre projet. En mode flat-rate (editions), vous reservez un nombre fixe de slots (100 minimum) pour un tarif mensuel previsible. Le choix entre les deux depend du volume de requetes : au-dela de ~30 To/mois scannes, le flat-rate devient generalement plus economique.
Partitionnement et Clustering
-- Table partitionnee par date + clusterisee CREATE TABLE `project.dataset.events` ( event_id STRING, event_timestamp TIMESTAMP, user_id STRING, event_type STRING, country STRING, device_type STRING, revenue NUMERIC ) PARTITION BY DATE(event_timestamp) -- Partitionnement par jour CLUSTER BY country, event_type, device_type -- Clustering (max 4 colonnes) OPTIONS ( partition_expiration_days = 365, -- Supprime les partitions > 1 an require_partition_filter = TRUE -- OBLIGE a filtrer par date ); -- Requete optimisee : scanne seulement la partition du jour + cluster prune SELECT country, event_type, SUM(revenue) as total_revenue FROM `project.dataset.events` WHERE DATE(event_timestamp) = '2025-12-01' -- Partition pruning AND country = 'FR' -- Cluster pruning GROUP BY country, event_type; -- Verifier les octets scannes (estimation) -- Utiliser l'option --dry_run dans bq CLI ou l'estimation dans la console
BI Engine : acceleration in-memory
BI Engine est un service d'acceleration in-memory integre a BigQuery. Il maintient un cache intelligent des donnees les plus requetees, reduisant la latence des requetes de secondes a millisecondes. Il est particulierement utile pour les dashboards Looker ou Data Studio.
-- Creer une reservation BI Engine (via SQL admin) -- Reserve 10 Go de memoire pour le projet CREATE BI_ENGINE_RESERVATION PROJECT_ID = 'my-analytics-project', LOCATION = 'EU', SIZE_GB = 10; -- Les tables materialisees sont automatiquement candidates pour BI Engine CREATE MATERIALIZED VIEW `project.dataset.mv_daily_sales` AS SELECT DATE(order_timestamp) as order_date, product_category, region, COUNT(*) as order_count, SUM(amount) as total_revenue, AVG(amount) as avg_order_value FROM `project.dataset.orders` GROUP BY 1, 2, 3;
Streaming Inserts et BigQuery ML
-- BigQuery ML : creer un modele de prediction directement en SQL CREATE OR REPLACE MODEL `project.dataset.churn_model` OPTIONS ( model_type = 'LOGISTIC_REG', input_label_cols = ['churned'], auto_class_weights = TRUE, data_split_method = 'AUTO_SPLIT' ) AS SELECT days_since_last_purchase, total_orders, avg_order_value, support_tickets_count, email_open_rate, churned FROM `project.dataset.customer_features` WHERE signup_date < '2025-01-01'; -- Predire le churn sur les nouveaux clients SELECT customer_id, predicted_churned, predicted_churned_probs FROM ML.PREDICT(MODEL `project.dataset.churn_model`, (SELECT * FROM `project.dataset.customer_features` WHERE signup_date >= '2025-01-01')) ORDER BY predicted_churned_probs[OFFSET(1)].prob DESC LIMIT 100;
Uber : BigQuery pour l'analyse temps reel
Uber utilise BigQuery pour analyser les patterns de deplacement et optimiser la tarification dynamique (surge pricing). Grace aux streaming inserts et BI Engine :
- Plus de 100 milliards d'evenements de trajet analyses par jour
- Latence d'ingestion en streaming reduite a moins de 5 secondes
- Les analystes pricing peuvent requeter les donnees en temps quasi reel pour ajuster les modeles
- BQML permet de prototyper des modeles de demande directement en SQL, accelerant le cycle d'experimentation de semaines a heures
StartupTech (anonymise) : Explosion des couts BigQuery
Une startup SaaS a migre vers BigQuery en mode on-demand sans mettre en place de guardrails. Un data analyst a lance un SELECT * sur une table de 20 To par erreur depuis un notebook, generant une facture de 100$ en une seule requete. Le probleme s'est aggrave quand des requetes automatisees non optimisees ont ete planifiees :
- Pas de
require_partition_filtersur les grandes tables - Aucune limite de bytes scannes configuree au niveau du projet
- Facture mensuelle passee de 500$ a 15 000$ en un mois
Anti-pattern : SELECT * sans partition filter
Lancer des requetes SELECT * sur des tables de plusieurs teraoctets sans filtrer par la colonne de partition. En mode on-demand, BigQuery facture 6.25$/To scanne. Une requete full-scan sur 10 To coute 62.50$ a chaque execution.
Solution : Activer require_partition_filter = TRUE sur toutes les tables partitionnees. Configurer des custom quotas par utilisateur pour limiter les To scannes par jour. Utiliser des vues materialisees pour les requetes recurrentes. Former les equipes a toujours verifier l'estimation de couts avant d'executer.
Optimisation : colonnes imbriquees (STRUCT et ARRAY)
BigQuery excelle avec les types STRUCT et ARRAY, qui evitent les jointures couteuses. Au lieu de normaliser en etoile classique, denormalisez en utilisant des colonnes imbriquees. Exemple : au lieu d'une table orders jointe a order_items, creez une seule table orders avec une colonne items ARRAY<STRUCT<product_id STRING, qty INT64, price NUMERIC>>. Cela reduit les shuffles et accelere les requetes.
Scenario : Migration Redshift vers BigQuery
Vous etes sollicite par un client media qui veut migrer de Redshift vers BigQuery. Leur cluster Redshift (ra3.4xlarge, 6 noeuds) coute 18 000$/mois et les analystes se plaignent de requetes lentes aux heures de pointe. Le dataset fait 50 To, avec 200 utilisateurs concurrents. Comment structurez-vous la migration ? Quel modele de facturation recommandez-vous (on-demand vs editions) ? Comment gerez-vous la compatibilite SQL (Redshift SQL vs BigQuery Standard SQL) ?
Databricks & Spark
Objectifs
- Comprendre le concept de Lakehouse et son positionnement entre Data Lake et Data Warehouse
- Maitriser Delta Lake : transactions ACID, time travel, schema evolution
- Decouvrir Unity Catalog pour la gouvernance unifiee
- Evaluer les performances du moteur Photon et ses cas d'usage
Databricks est la solution que je recommande quand le client a des besoins a la fois en analytics et en machine learning avance. Le concept de Lakehouse est brillant : vous gardez la flexibilite du data lake (stocker tout type de donnees) avec les garanties du data warehouse (transactions ACID, schema enforcement). Cependant, Databricks demande plus de competences techniques que Snowflake ou BigQuery. Si votre equipe est composee principalement d'analystes SQL, orientez-vous vers les deux autres. Si vous avez des data engineers et des ML engineers, Databricks sera votre terrain de jeu ideal.
Le Paradigme Lakehouse
Le Lakehouse est une architecture qui combine les avantages du Data Lake (stockage pas cher, formats ouverts, support ML) et du Data Warehouse (transactions ACID, performances SQL, gouvernance). Databricks a popularise ce concept avec Delta Lake, une couche transactionnelle open-source au-dessus de Parquet.
DATA WAREHOUSE (1990-2010) DATA LAKE (2010-2020) LAKEHOUSE (2020+)
+------------------+ +------------------+ +------------------+
| Schema rigide | | Schema flexible | | Schema flexible |
| SQL performant | | Tout format | | + ACID transactions|
| ACID transactions| | Pas cher (S3) | | SQL performant |
| Gouvernance | | ML/DS friendly | | ML/DS friendly |
+------------------+ +------------------+ | Gouvernance |
| MAIS: | | MAIS: | | Formats ouverts |
| - Cher | | - Pas de ACID | +------------------+
| - Formats proprio| | - "Data Swamp" | | = Delta Lake + |
| - Pas ML-friendly| | - Pas de gouver. | | Spark Engine + |
+------------------+ | - Perf SQL faible| | Unity Catalog |
+------------------+ +------------------+
Delta Lake en profondeur
Delta Lake est le format de table open-source qui rend le Lakehouse possible. Il ajoute une couche de metadonnees transactionnelles (le transaction log ou _delta_log) au-dessus de fichiers Parquet standards.
Les 4 garanties de Delta Lake
Atomicite : chaque operation (INSERT, UPDATE, DELETE, MERGE) est entierement reussie ou entierement annulee.
Coherence : le schema est enforce, les contraintes sont respectees (CHECK, NOT NULL).
Isolation : les ecritures concurrentes sont gerees via un optimistic concurrency control.
Durabilite : les donnees validees sont persistees de maniere fiable sur le stockage cloud.
# Creer une table Delta avec Spark
from delta.tables import DeltaTable
# Ecriture initiale
df_events = spark.read.json("s3://raw-data/events/2025/12/")
df_events.write.format("delta") \
.partitionBy("event_date") \
.mode("overwrite") \
.save("s3://lakehouse/bronze/events")
# MERGE (upsert) - operation impossible avec un Data Lake classique
delta_table = DeltaTable.forPath(spark, "s3://lakehouse/silver/customers")
delta_table.alias("target").merge(
df_new_customers.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
"email": "source.email",
"last_active": "source.last_active",
"updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
"customer_id": "source.customer_id",
"email": "source.email",
"last_active": "source.last_active",
"created_at": "current_timestamp()",
"updated_at": "current_timestamp()"
}).execute()
-- Time Travel avec Delta Lake
-- Voir l'historique des modifications
DESCRIBE HISTORY lakehouse.silver.customers;
-- Requeter une version precedente
SELECT * FROM lakehouse.silver.customers VERSION AS OF 42;
-- Requeter a un timestamp
SELECT * FROM lakehouse.silver.customers TIMESTAMP AS OF '2025-12-01 10:00:00';
-- Restaurer une version precedente
RESTORE TABLE lakehouse.silver.customers TO VERSION AS OF 42;
-- Schema Evolution : ajouter une colonne automatiquement
ALTER TABLE lakehouse.silver.customers
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');
-- OPTIMIZE : compacter les petits fichiers (small file problem)
OPTIMIZE lakehouse.silver.customers
ZORDER BY (country, signup_date);
-- VACUUM : supprimer les fichiers obsoletes (liberer du stockage)
VACUUM lakehouse.silver.customers RETAIN 168 HOURS;
Unity Catalog : gouvernance unifiee
Unity Catalog est la solution de gouvernance de Databricks qui offre un namespace a 3 niveaux : catalog.schema.table. Il centralise les permissions, le lineage, et la decouverte des donnees sur l'ensemble des workspaces.
METASTORE (niveau compte)
|
+-- CATALOG: prod_analytics
| +-- SCHEMA: bronze
| | +-- TABLE: raw_events
| | +-- TABLE: raw_users
| +-- SCHEMA: silver
| | +-- TABLE: clean_events
| | +-- TABLE: dim_users
| +-- SCHEMA: gold
| +-- TABLE: fact_revenue
| +-- TABLE: agg_daily_metrics
|
+-- CATALOG: dev_analytics (clone ou separe)
|
+-- CATALOG: ml_features
+-- SCHEMA: features
+-- TABLE: user_features
+-- FUNCTION: compute_rfm_score
Moteur Photon
Photon est le moteur d'execution vectorise de Databricks, ecrit en C++ pour maximiser les performances. Il remplace le moteur Spark JVM pour les operations SQL et DataFrame, offrant des accelerations de 2x a 8x sur les requetes analytiques.
Airbnb : Lakehouse pour la plateforme data unifiee
Airbnb a adopte l'architecture Lakehouse avec Databricks pour unifier ses workflows analytics et ML. Avant, les data scientists devaient copier les donnees du warehouse Hive vers des notebooks, creant des incoherences :
- Reduction de 60% du temps de preparation des donnees pour les equipes ML
- Un seul point de verite pour les features utilisees en production et en experimentation
- Lineage complet via Unity Catalog : chaque prediction peut etre tracee jusqu'a la donnee source
- Le moteur Photon a reduit les couts de compute de 40% sur les pipelines ETL les plus lourds
FinanceGroup (anonymise) : le Data Lake devenu Data Swamp
Un groupe financier avait investi 2 ans dans un Data Lake sur S3 avec Spark (sans Delta Lake). Le resultat : des milliers de fichiers Parquet sans schema coherent, des donnees corrompues apres des ecritures concurrentes, et aucune possibilite d'UPDATE ou DELETE (requis par RGPD).
- Migration vers Delta Lake en 4 mois, permettant enfin les operations MERGE pour le droit a l'oubli
- Les transactions ACID ont elimine les corruptions de donnees qui survenaient 2-3 fois par semaine
Anti-pattern : le "Small File Problem"
Avec Spark et Delta Lake, les ecritures streaming ou les pipelines avec beaucoup de petits batchs generent des milliers de petits fichiers (< 1 Mo chacun). Cela degrade enormement les performances de lecture car le moteur doit ouvrir des milliers de connexions fichier au lieu de lire quelques gros fichiers.
Solution : Planifier des OPTIMIZE reguliers qui compactent les petits fichiers en fichiers de ~1 Go. Utiliser ZORDER BY sur les colonnes frequemment filtrees pour co-localiser les donnees. Configurer l'auto-optimize (delta.autoOptimize.optimizeWrite = true) pour les tables critiques.
Scenario : Choix Lakehouse vs Data Warehouse
Vous etes Data Architect dans une entreprise pharma. L'equipe data (15 personnes : 5 data engineers, 5 analystes, 5 data scientists) doit gerer : des donnees cliniques structurees (SQL), des images medicales (non structure), des modeles ML de detection de molecules, et du reporting reglementaire strict. Le budget est de 30 000$/mois. Recommandez-vous Databricks Lakehouse, Snowflake, ou une architecture hybride ? Justifiez avec les cas d'usage specifiques.
Comparaison Cloud DWH
Objectifs
- Comparer objectivement Snowflake, BigQuery et Databricks sur des criteres techniques et business
- Identifier les forces et faiblesses de chaque plateforme selon le cas d'usage
- Construire une grille de decision pour choisir la bonne solution
- Comprendre les strategies multi-cloud et de portabilite
La question que j'entends le plus souvent est : "Snowflake, BigQuery ou Databricks ?". Ma reponse est toujours la meme : ca depend. Pas du marketing, pas des benchmarks biaises publies par chaque vendor, mais de VOTRE contexte : votre equipe, vos cas d'usage, votre cloud provider actuel, et votre budget. J'ai accompagne des clients qui etaient parfaitement heureux avec chacune des trois solutions. Le vrai piege, c'est de choisir en se basant sur un seul critere (le cout, ou la performance brute) sans considerer l'ensemble.
Tableau comparatif detaille
| Critere | Snowflake | BigQuery | Databricks |
|---|---|---|---|
| Architecture | Separation stockage/compute avec virtual warehouses | Serverless, slots partages ou reserves | Lakehouse, clusters Spark manages |
| Modele de facturation | Credits (compute) + stockage au To/mois | Par To scanne (on-demand) ou slots reserves | DBU (Databricks Units) + compute cloud |
| Gestion infra | Moyenne (warehouses a dimensionner) | Faible (serverless) | Elevee (clusters a configurer) |
| SQL natif | Excellent (ANSI SQL complet) | Excellent (Standard SQL + extensions) | Bon (Spark SQL, en progres avec DBSQL) |
| Support ML/DS | Limite (Snowpark recente) | Bon (BQML integre) | Excellent (MLflow, notebooks, GPU) |
| Donnees non structurees | Limite (VARIANT semi-structure) | Bon (STRUCT, ARRAY, JSON) | Excellent (tout format, images, texte) |
| Streaming | Snowpipe (micro-batch) | Streaming inserts natif | Structured Streaming natif |
| Data Sharing | Excellent (natif, cross-cloud) | Analytics Hub | Delta Sharing (open protocol) |
| Gouvernance | Bonne (RBAC, masking, tags) | Bonne (IAM GCP, column-level) | Excellente (Unity Catalog) |
| Vendor lock-in | Moyen (format proprietaire) | Eleve (ecosysteme GCP) | Faible (formats ouverts Delta/Parquet) |
| Multi-cloud | Oui (AWS, Azure, GCP) | GCP uniquement | Oui (AWS, Azure, GCP) |
Analyse par cas d'usage
Arbre de decision simplifie
Choisissez Snowflake si : votre equipe est SQL-first, vous avez besoin de data sharing intensif, vous voulez une solution multi-cloud mature, et vos cas d'usage sont principalement analytics/BI.
Choisissez BigQuery si : vous etes deja sur GCP, vous voulez zero gestion d'infrastructure, vous avez des besoins en ML legers (BQML), et votre budget est variable (on-demand).
Choisissez Databricks si : vous avez des besoins forts en ML/AI, vous gerez des donnees non structurees, vous voulez eviter le vendor lock-in (formats ouverts), et votre equipe a des competences Spark/Python.
Comparaison des couts : scenario concret
Scenario : 10 To stockes, 500 requetes/jour, 5h de compute/jour SNOWFLAKE (Enterprise, AWS) +------------------------------------------+ | Stockage : 10 To x 23$/To = 230$ | | Compute : WH Medium x 5h x 30j | | = 150h x 4 credits x 3$ | | = 1 800$ | | Cloud Svcs: ~10% compute = 180$ | | TOTAL ~ 2 210$ | +------------------------------------------+ BIGQUERY (On-demand, US) +------------------------------------------+ | Stockage : 10 To x 20$/To = 200$ | | Compute : ~50 To scannes/mois | | x 6.25$/To = 312$ | | TOTAL ~ 512$ | +------------------------------------------+ (Attention: depend enormement du volume scanne) DATABRICKS (Premium, AWS) +------------------------------------------+ | Stockage S3: 10 To x 23$/To = 230$ | | Compute : cluster i3.xlarge | | 5h/jour x 30j = 150h | | x 0.75$/DBU x 4 DBU/h | | = 450$ | | EC2 infra : 150h x 0.312$ = 47$ | | TOTAL ~ 727$ | +------------------------------------------+ (Hors SQL Warehouse serverless, couts variables)
Les couts caches a surveiller
Snowflake : les cloud services au-dela de 10% du compute sont factures. Le clustering automatique et la materialisation consomment des credits en arriere-plan.
BigQuery : les streaming inserts coutent 0.05$/Go insere. Le stockage "active" (modifie dans les 90 derniers jours) coute 20$/To vs 10$/To pour le stockage long-term.
Databricks : le cout de l'infrastructure cloud (EC2/VM) s'ajoute aux DBU. Les jobs clusters qui restent allumes coutent meme sans requetes.
Capital One : approche multi-plateforme
Capital One, l'une des plus grandes banques americaines, utilise une approche hybride pour ses differents cas d'usage :
- Snowflake pour le reporting reglementaire et le partage de donnees avec les regulateurs
- Databricks pour la detection de fraude en temps reel (ML avance, streaming Spark)
- Cette approche multi-plateforme permet d'utiliser le meilleur outil pour chaque cas d'usage
- Le cout supplementaire de gestion de deux plateformes est compense par les gains de performance et d'adequation fonctionnelle
Anti-pattern : choisir uniquement sur le benchmark de performance
Se baser sur des benchmarks TPC-DS pour choisir sa plateforme. Ces benchmarks mesurent des scenarios standardises qui ne representent pas votre charge de travail reelle. De plus, chaque vendor optimise specifiquement pour ces benchmarks.
Solution : Effectuez un Proof of Concept (POC) de 4 a 6 semaines avec vos donnees reelles et vos requetes reelles sur les 2-3 plateformes preselectionnees. Evaluez : performance, facilite d'utilisation pour votre equipe, cout reel, et qualite du support. Impliquez les utilisateurs finaux dans l'evaluation, pas uniquement les ingenieurs.
Strategies de portabilite et multi-cloud
-- Strategie : utiliser dbt comme couche d'abstraction
-- Le meme modele dbt peut cibler Snowflake, BigQuery ou Databricks
-- en changeant simplement le profil de connexion
-- models/marts/finance/revenue_monthly.sql
-- Ce modele fonctionne sur les 3 plateformes avec dbt
{{ config(
materialized='table',
partition_by={'field': 'month_date', 'data_type': 'date'} -- BigQuery
-- cluster_by=['region', 'product_line'] -- Snowflake/Databricks
) }}
SELECT
DATE_TRUNC('month', order_date) AS month_date, -- ANSI SQL
region,
product_line,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM {{ ref('stg_orders') }}
WHERE order_status = 'completed'
GROUP BY 1, 2, 3
Scenario : Recommandation pour un groupe international
Vous etes consultant Data Architect mandate par un groupe industriel present dans 30 pays. Contraintes : equipes data reparties (Paris, New York, Singapour), donnees RGPD en Europe et CCPA aux USA, 500 To de donnees, 200 utilisateurs BI et 30 data scientists. Le groupe utilise AWS pour l'Europe et Azure pour les USA. Budget : 100 000$/mois tout compris. Quelle architecture recommandez-vous et pourquoi ?
Quiz Cloud DWH
Objectifs
- Valider vos connaissances sur Snowflake, BigQuery et Databricks
- Tester votre capacite a choisir la bonne plateforme selon le contexte
- Verifier votre comprehension des architectures et modeles de couts
- Identifier vos axes d'amelioration avant de passer a la suite
Ce quiz couvre les concepts essentiels des trois plateformes cloud que nous avons etudiees. Prenez votre temps pour chaque question. Si vous obtenez moins de 6/8, je vous recommande de relire les lecons precedentes avant de continuer. Les questions sont concues pour tester votre comprehension profonde, pas juste la memorisation.
Quiz : Cloud Data Warehouses
Question 1 : Quelle est la caracteristique principale de l'architecture Snowflake qui la differencie des Data Warehouses traditionnels ?
Question 2 : Dans BigQuery, quelle est la difference fondamentale entre le mode de facturation "on-demand" et le mode "flat-rate" (editions) ?
Question 3 : Qu'est-ce que le Zero-Copy Cloning de Snowflake ?
Question 4 : Quel probleme principal Delta Lake resout-il par rapport a un Data Lake classique base sur Parquet ?
Question 5 : Vous devez choisir une plateforme pour une equipe composee a 80% d'analystes SQL et 20% de data engineers, avec des besoins principalement en reporting BI. Quel est le choix le plus pertinent ?
Question 6 : Quelle fonctionnalite de BigQuery permet de creer des modeles de machine learning directement en SQL ?
Question 7 : Quel est le role d'Unity Catalog dans l'ecosysteme Databricks ?
Question 8 : Quelle strategie permet de reduire le vendor lock-in lorsqu'on utilise un Cloud Data Warehouse ?
Interpretation de vos resultats
8/8 : Excellent ! Vous maitrisez les fondamentaux des Cloud Data Warehouses. Passez directement aux lecons dbt.
6-7/8 : Bon niveau. Relisez les sections correspondant aux questions manquees.
4-5/8 : Des bases a consolider. Reprenez les lecons 1 a 4 en vous concentrant sur les concepts cles.
0-3/8 : Il est recommande de revoir en detail les lecons precedentes avant de continuer.
dbt Fondamentaux
Objectifs
- Comprendre le role de dbt dans le Modern Data Stack et le paradigme ELT
- Maitriser les concepts fondamentaux : models, sources, refs, materializations
- Structurer un projet dbt avec les bonnes pratiques (staging, intermediate, marts)
- Utiliser les macros, les packages et les variables pour un code reutilisable
dbt a revolutionne ma facon de travailler en tant que data architect. Avant, les transformations etaient des scripts SQL eparpilles, sans versioning, sans tests, sans documentation. Avec dbt, on applique enfin les pratiques du genie logiciel aux pipelines de donnees : git, CI/CD, tests unitaires, documentation generee automatiquement. Mon conseil numero un : adoptez les conventions de nommage des le premier jour. J'ai vu trop de projets dbt devenir ingerables parce que chacun nommait ses modeles a sa facon.
Qu'est-ce que dbt ?
dbt (data build tool) est un outil de transformation qui permet aux analystes et data engineers d'ecrire des transformations en SQL (ou Python), de les organiser en modeles, de les tester et de les documenter. dbt s'execute dans le data warehouse : il ne fait que du T (Transform) dans le paradigme ELT.
SOURCES INGESTION DATA WAREHOUSE CONSOMMATION
(APIs, DBs, (Fivetran, +---------------------------+ (Looker,
fichiers) Airbyte) | RAW (sources) | Tableau,
| | | | | Metabase)
+--------->-----+--------->-----+ v |
| STAGING (stg_*) |-------->
| | [dbt models] |
| v |
| INTERMEDIATE (int_*) |
| | [dbt models] |
| v |
| MARTS (fct_*, dim_*) |
| [dbt models] |
+---------------------------+
^
|
dbt run, dbt test, dbt docs
Structure d'un projet dbt
# dbt_project.yml - Configuration du projet
name: 'analytics'
version: '1.0.0'
config-version: 2
profile: 'analytics' # Connexion au DWH (Snowflake, BigQuery, etc.)
model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
seed-paths: ["seeds"]
models:
analytics:
staging:
+materialized: view # Les staging models sont des vues
+schema: staging
intermediate:
+materialized: ephemeral # Pas materialise, inline dans les requetes
marts:
+materialized: table # Les marts sont des tables physiques
+schema: analytics
# models/staging/sources.yml - Declaration des sources
version: 2
sources:
- name: raw_ecommerce
database: raw_database
schema: ecommerce
description: "Donnees brutes extraites par Fivetran depuis Shopify"
tables:
- name: orders
description: "Table des commandes Shopify"
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
columns:
- name: order_id
description: "Identifiant unique de la commande"
tests:
- unique
- not_null
- name: customers
description: "Table des clients Shopify"
- name: products
description: "Catalogue produits"
Les modeles dbt : staging, intermediate, marts
-- models/staging/stg_orders.sql
-- Staging : nettoyage, renommage, typage. 1 modele par source.
WITH source AS (
SELECT * FROM {{ source('raw_ecommerce', 'orders') }}
),
renamed AS (
SELECT
id AS order_id,
customer_id,
CAST(created_at AS TIMESTAMP) AS order_timestamp,
DATE(created_at) AS order_date,
status AS order_status,
CAST(total_price AS NUMERIC) AS order_amount,
currency,
LOWER(TRIM(shipping_country)) AS shipping_country,
_fivetran_synced AS loaded_at
FROM source
WHERE id IS NOT NULL -- Exclure les lignes corrompues
)
SELECT * FROM renamed
-- models/intermediate/int_orders_enriched.sql
-- Intermediate : jointures, logique metier, enrichissement
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
),
products AS (
SELECT * FROM {{ ref('stg_order_items') }}
)
SELECT
o.order_id,
o.order_timestamp,
o.order_date,
o.order_status,
o.order_amount,
c.customer_id,
c.customer_segment,
c.first_order_date,
c.country AS customer_country,
-- Calcul metier : est-ce une premiere commande ?
CASE WHEN o.order_date = c.first_order_date
THEN TRUE ELSE FALSE END AS is_first_order,
-- Nombre de jours depuis la derniere commande
DATEDIFF('day', c.last_order_date, o.order_date) AS days_since_last_order
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
-- models/marts/finance/fct_revenue.sql
-- Mart : modele final pret pour la BI
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns'
) }}
SELECT
order_id,
order_date,
customer_id,
customer_segment,
customer_country,
order_amount,
is_first_order,
-- Metriques derivees
SUM(order_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS UNBOUNDED PRECEDING
) AS customer_lifetime_value
FROM {{ ref('int_orders_enriched') }}
WHERE order_status = 'completed'
{% if is_incremental() %}
AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}
Les 4 types de materialisation
view : cree une vue SQL (pas de stockage, recalculee a chaque requete). Ideal pour les modeles staging legers.
table : cree une table physique (recree entierement a chaque dbt run). Ideal pour les marts finaux de petite/moyenne taille.
incremental : insere uniquement les nouvelles lignes a chaque run. Ideal pour les grandes tables factuelles.
ephemeral : pas de materialisation, le SQL est injecte comme CTE dans les modeles dependants. Ideal pour les modeles intermediaires reutilisables.
La fonction ref() et le DAG
La fonction {{ ref('model_name') }} est au coeur de dbt. Elle cree une dependance declarative entre les modeles, permettant a dbt de construire automatiquement le DAG (Directed Acyclic Graph) d'execution. dbt garantit que les modeles sont executes dans le bon ordre.
source('raw','orders') source('raw','customers') source('raw','products')
| | |
v v v
stg_orders stg_customers stg_products
| | |
+----------+-------------+ |
| |
v |
int_orders_enriched <-----------------------------+
|
+----------+----------+
| |
v v
fct_revenue dim_customers
| |
+----------+----------+
|
v
rpt_monthly_revenue (exposure -> Looker)
GitLab : dbt a grande echelle
GitLab a construit l'un des plus grands projets dbt open-source, avec plus de 800 modeles et une transparence totale (le repo est public). Leurs pratiques sont une reference :
- Plus de 800 modeles dbt organises en staging, intermediate et marts
- Convention de nommage stricte :
stg_,int_,fct_,dim_,rpt_ - CI/CD sur chaque merge request : dbt test + dbt build sur le schema de la branche
- Documentation generee automatiquement et accessible a toute l'entreprise via dbt docs
- Temps de build complet reduit de 4h a 45 min grace aux modeles incrementaux
Anti-pattern : le modele monolithique de 500 lignes
Ecrire un seul modele dbt contenant toute la logique metier : jointures, nettoyage, calculs, agregations. Ce modele devient impossible a maintenir, tester et deboguer. Les performances se degradent car le moteur SQL ne peut pas optimiser une requete aussi complexe.
Solution : Decomposez en couches. Staging (1 modele par source, nettoyage uniquement), Intermediate (jointures et logique metier), Marts (agregations finales). Chaque modele doit idealement faire moins de 100 lignes de SQL et avoir une responsabilite unique. Utilisez les CTEs pour structurer le code a l'interieur de chaque modele.
Anti-pattern : hardcoder les noms de tables
Ecrire FROM raw_database.ecommerce.orders directement au lieu d'utiliser {{ source('raw_ecommerce', 'orders') }} ou {{ ref('stg_orders') }}. Cela casse la tracabilite du DAG et rend le projet fragile aux changements d'environnement.
Solution : Toujours utiliser {{ ref() }} pour les modeles dbt et {{ source() }} pour les tables externes. dbt gerera automatiquement les noms de schemas et databases selon l'environnement (dev, staging, prod).
Lab : Creer votre premier projet dbt
Etape 1 : Installation et initialisation
Installez dbt-core avec l'adaptateur de votre choix et initialisez un nouveau projet :
# Installation avec pip (adaptateur Snowflake) pip install dbt-snowflake # Initialisation d'un nouveau projet dbt init my_analytics_project # Verification de la connexion cd my_analytics_project dbt debug
Etape 2 : Declarer vos sources
Creez le fichier models/staging/sources.yml et declarez au moins une source avec ses tables. Ajoutez des tests de freshness.
Etape 3 : Creer un modele staging
Creez models/staging/stg_orders.sql qui selectionne depuis votre source, renomme les colonnes et caste les types. Utilisez {{ source() }}.
Etape 4 : Creer un modele mart
Creez models/marts/fct_daily_revenue.sql qui agregue les commandes par jour en utilisant {{ ref('stg_orders') }}. Materialisez-le en table.
Etape 5 : Executer et valider
# Executer tous les modeles dbt run # Executer un modele specifique et ses dependances dbt run --select fct_daily_revenue+ # Generer et ouvrir la documentation dbt docs generate dbt docs serve
dbt Tests & Documentation
Objectifs
- Maitriser les tests schema (unique, not_null, accepted_values, relationships) et les tests custom
- Utiliser dbt-expectations et dbt-utils pour des tests avances
- Generer et publier une documentation interactive avec dbt docs
- Mettre en place une CI/CD pour valider automatiquement les changements
La documentation et les tests sont les parents pauvres de la data. J'ai travaille avec des equipes brillantes qui avaient des centaines de modeles dbt sans un seul test. Resultats : des dashboards qui affichaient des chiffres faux pendant des semaines sans que personne ne le detecte. Avec dbt, il n'y a plus d'excuse. Les tests sont declaratifs (quelques lignes de YAML), la documentation est generee automatiquement. Je considere qu'un modele sans tests est un modele en dette technique. Minimum : unique + not_null sur chaque primary key, et accepted_values sur les colonnes de statut.
Les tests dbt : votre filet de securite
dbt propose deux types de tests : les tests schema (declaratifs, en YAML) et les tests data (requetes SQL personnalisees). Les tests schema sont les plus courants et couvrent la majorite des besoins.
Tests schema (YAML)
# models/marts/schema.yml
version: 2
models:
- name: fct_revenue
description: "Table de faits des revenus par commande completee"
columns:
- name: order_id
description: "Identifiant unique de la commande"
tests:
- unique
- not_null
- name: order_date
description: "Date de la commande"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: "'2020-01-01'"
max_value: "CURRENT_DATE()"
- name: order_status
description: "Statut de la commande"
tests:
- accepted_values:
values: ['completed', 'refunded', 'partially_refunded']
- name: customer_id
description: "Reference vers la dimension client"
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_amount
description: "Montant total de la commande en euros"
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
Tests data custom (SQL)
-- tests/assert_revenue_not_negative.sql
-- Un test custom passe si la requete retourne 0 ligne
-- Ici on verifie qu'aucun client n'a un revenue cumule negatif
SELECT
customer_id,
SUM(order_amount) AS total_revenue
FROM {{ ref('fct_revenue') }}
GROUP BY customer_id
HAVING SUM(order_amount) < 0
-- tests/assert_daily_revenue_consistency.sql
-- Verifier que le revenue journalier ne s'ecarte pas
-- de plus de 50% de la moyenne mobile sur 7 jours
WITH daily_revenue AS (
SELECT
order_date,
SUM(order_amount) AS daily_total,
AVG(SUM(order_amount)) OVER (
ORDER BY order_date
ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
) AS avg_7d
FROM {{ ref('fct_revenue') }}
GROUP BY order_date
)
SELECT
order_date,
daily_total,
avg_7d,
ABS(daily_total - avg_7d) / NULLIF(avg_7d, 0) AS pct_deviation
FROM daily_revenue
WHERE avg_7d IS NOT NULL
AND ABS(daily_total - avg_7d) / NULLIF(avg_7d, 0) > 0.5
AND order_date >= DATEADD('day', -7, CURRENT_DATE())
Packages dbt : dbt-utils et dbt-expectations
# packages.yml - Dependances du projet
packages:
- package: dbt-labs/dbt_utils
version: ">=1.1.0"
- package: calogica/dbt_expectations
version: ">=0.10.0"
- package: dbt-labs/codegen
version: ">=0.12.0"
# Installer avec : dbt deps
Tests les plus utiles de dbt-expectations
expect_column_values_to_be_between : les valeurs sont dans un intervalle (dates, montants).
expect_column_values_to_match_regex : les valeurs respectent un format (email, UUID, code postal).
expect_table_row_count_to_be_between : le nombre de lignes est dans une fourchette attendue.
expect_column_proportion_of_unique_values_to_be_between : detection de doublons anormaux.
expect_column_values_to_not_be_null : version avancee de not_null avec seuil de tolerance.
Documentation dbt : auto-generee et interactive
dbt genere automatiquement une documentation interactive a partir des descriptions YAML, du DAG, et du schema des tables. C'est un atout majeur pour la gouvernance et l'onboarding des nouveaux membres.
# models/marts/schema.yml - Documentation enrichie
version: 2
models:
- name: fct_revenue
description: |
Table de faits contenant une ligne par commande completee.
**Grain** : une ligne par order_id.
**Source** : donnees Shopify via Fivetran.
**Rafraichissement** : quotidien a 06h UTC.
**Owner** : equipe Finance Data (finance-data@company.com).
### Logique metier
- Seules les commandes avec statut 'completed' sont incluses
- Le montant est en euros, apres conversion si necessaire
- Le customer_lifetime_value est calcule cumulativement
### Exemples de requetes
```sql
-- Revenue mensuel par segment
SELECT customer_segment, DATE_TRUNC('month', order_date),
SUM(order_amount) FROM fct_revenue GROUP BY 1, 2
```
columns:
- name: order_id
description: "Cle primaire - identifiant unique Shopify de la commande"
- name: customer_lifetime_value
description: |
Somme cumulee de tous les montants de commande du client,
ordonnee par date de commande. Calcul : window function
SUM(order_amount) OVER (PARTITION BY customer_id ORDER BY order_date)
# models/exposures.yml - Documenter les consommateurs
version: 2
exposures:
- name: monthly_revenue_dashboard
type: dashboard
description: "Dashboard Looker utilise par la direction financiere"
owner:
name: Marie Dupont
email: marie.dupont@company.com
url: https://looker.company.com/dashboards/42
depends_on:
- ref('fct_revenue')
- ref('dim_customers')
- ref('dim_products')
maturity: high
- name: churn_prediction_model
type: ml
description: "Modele ML de prediction du churn client"
owner:
name: Ahmed Ben Ali
email: ahmed.benali@company.com
depends_on:
- ref('fct_revenue')
- ref('dim_customers')
CI/CD avec dbt
DEVELOPPEUR CI (GitHub Actions) PRODUCTION
+-----------+ +------------------------+ +-----------+
| 1. Code | Pull Request | 3. dbt build --select | Merge PR | 5. dbt run |
| modele |---------------->| state:modified+ |------------>| complet |
| 2. dbt run| | 4. dbt test --select | | 6. dbt test|
| local | | state:modified+ | | complet |
+-----------+ +------------------------+ +-----------+
| | |
dev schema CI schema temporaire prod schema
(dev_jean_*) (ci_pr_123_*) (analytics.*)
|
Si echec -> PR bloquee
Si succes -> PR approuvable
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
pull_request:
branches: [main]
paths:
- 'models/**'
- 'tests/**'
- 'macros/**'
jobs:
dbt-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install dbt-snowflake
- run: dbt deps
- run: dbt build --select state:modified+ --defer --state ./prod-manifest
env:
DBT_PROFILES_DIR: ./ci_profiles
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
Shopify : dbt pour la qualite des donnees a grande echelle
Shopify utilise dbt pour transformer les donnees de plus de 2 millions de marchands. Leur approche des tests est exemplaire :
- Plus de 3 000 tests dbt executes quotidiennement, couvrant l'integrite referentielle, les volumes attendus et les anomalies statistiques
- Un systeme d'alertes gradue : les tests critiques (primary keys, not null) bloquent le pipeline, les tests d'anomalie envoient des alertes Slack
- La documentation dbt est le point d'entree unique pour decouvrir les donnees disponibles
- Chaque pull request declenche un dbt build sur les modeles modifies, empechant les regressions
DataStartup (anonymise) : les donnees fausses dans les dashboards
Une startup SaaS B2B a decouvert que son dashboard de MRR (Monthly Recurring Revenue) affichait des chiffres errones depuis 3 mois. La cause : un changement dans le format d'un champ de la source (le statut d'abonnement est passe de "active" a "Active") qui a fait disparaitre 30% des abonnements actifs des calculs.
- Si un test
accepted_valuesavait ete en place, l'anomalie aurait ete detectee en moins de 24h - L'impact business : des decisions d'investissement basees sur des chiffres sous-estimes de 30%
- Apres l'incident, l'equipe a implemente 200+ tests dbt et un monitoring de freshness
Anti-pattern : tests uniquement sur les modeles finaux
Ne tester que les modeles marts (fct_, dim_) et ignorer les modeles staging et intermediate. Si une anomalie entre dans la couche staging sans etre detectee, elle se propage a travers tout le DAG et peut impacter des dizaines de modeles en aval.
Solution : Appliquez la strategie de tests en couches. Sur les staging : unique et not_null sur les primary keys, accepted_values sur les statuts. Sur les intermediate : relationships pour verifier les jointures. Sur les marts : tests metier custom (coherence des totaux, fourchettes attendues). Ajoutez des tests de volume (expect_table_row_count_to_be_between) sur les tables critiques.
Anti-pattern : documentation "une fois et on oublie"
Documenter les modeles une seule fois lors de la creation, puis ne jamais mettre a jour les descriptions quand la logique evolue. Au bout de 6 mois, la documentation ne correspond plus a la realite et personne ne lui fait confiance.
Solution : Integrez la documentation dans le processus de code review. Chaque PR qui modifie un modele doit aussi mettre a jour sa description. Utilisez des hooks pre-commit pour verifier que les modeles et colonnes critiques ont une description non vide. Publiez la doc automatiquement apres chaque deploy en production.
Astuce : le macro generate_schema_name
Par defaut, dbt prefixe les schemas avec le nom de l'utilisateur en dev (ex: dev_jean_staging). Personnalisez le macro generate_schema_name pour controler ou vos modeles sont materialises selon l'environnement. Cela evite les conflits entre developpeurs et assure une separation propre dev/staging/prod.
Lab : Ajouter des tests et de la documentation
Etape 1 : Tests schema de base
Ajoutez un fichier schema.yml pour chaque couche de votre projet. Minimum : unique + not_null sur chaque primary key, accepted_values sur les colonnes de statut.
Etape 2 : Tests custom
Creez un test SQL dans le dossier tests/ qui verifie une regle metier specifique a votre domaine (par exemple : le total des debits egale le total des credits dans une table comptable).
Etape 3 : Installer dbt-expectations
Ajoutez le package dbt-expectations dans packages.yml et executez dbt deps. Ajoutez un test de fourchette de valeurs et un test de volume de table.
Etape 4 : Enrichir la documentation
Ajoutez des descriptions detaillees (grain, source, logique metier, exemples) pour au moins 3 modeles. Declarez une exposure pour un dashboard ou un modele ML.
Etape 5 : Generer et explorer
# Executer tous les tests dbt test # Executer uniquement les tests d'un modele dbt test --select fct_revenue # Generer la documentation dbt docs generate # Ouvrir dans le navigateur (inclut le DAG interactif) dbt docs serve --port 8080
Scenario : Mise en place de la qualite des donnees
Vous rejoignez une equipe qui a 150 modeles dbt en production mais zero test. Le CTO vous donne 2 semaines pour mettre en place une strategie de qualite des donnees. Comment priorisez-vous ? Par quels modeles commencez-vous ? Quel niveau de couverture visez-vous a court terme vs long terme ? Comment embarquez-vous l'equipe dans cette demarche sans ralentir la velocite de developpement ?
dbt Patterns Avances
Objectifs
- Maitriser les modeles incrementaux et leurs strategies
- Comprendre et implementer les snapshots pour le suivi historique
- Creer des macros reutilisables et des packages personnalises
- Utiliser les hooks et les materialisations custom
- Appliquer les bonnes pratiques de performance dbt
Les patterns avances de dbt sont ce qui differencie un analyste qui utilise dbt d'un Data Engineer qui maitrise dbt. Les modeles incrementaux peuvent diviser votre temps de build par 10, mais mal configures, ils peuvent introduire des bugs silencieux dans vos donnees. Prenez le temps de comprendre chaque strategie.
Modeles Incrementaux
Les modeles incrementaux permettent de ne traiter que les nouvelles donnees au lieu de reconstruire l'integralite de la table a chaque execution. C'est le pattern le plus important pour les pipelines a grande echelle.
Principe Fondamental
Un modele incremental verifie si la table cible existe deja. Si oui, il ne traite que les lignes nouvelles ou modifiees depuis la derniere execution. Sinon, il effectue un full refresh comme une table classique.
-- models/staging/stg_events.sql
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='append_new_columns'
)
}}
SELECT
event_id,
user_id,
event_type,
event_properties,
created_at,
_loaded_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}
Strategies Incrementales
| Strategie | Mecanisme | Cas d'usage | Performance |
|---|---|---|---|
| append | INSERT INTO simple | Logs, evenements immutables | Tres rapide |
| merge | MERGE (upsert) | Donnees avec mises a jour | Moderee |
| delete+insert | DELETE puis INSERT par partition | Grandes partitions Redshift/Postgres | Rapide sur partitions |
| insert_overwrite | Ecrase les partitions concernees | BigQuery, Spark | Tres rapide sur partitions |
-- models/intermediate/int_daily_revenue.sql
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
},
cluster_by=["region", "product_category"]
)
}}
SELECT
DATE(order_timestamp) AS order_date,
region,
product_category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE DATE(order_timestamp) >= DATE_SUB(
(SELECT MAX(order_date) FROM {{ this }}),
INTERVAL 3 DAY -- Lookback pour rattraper les retards
)
{% endif %}
GROUP BY 1, 2, 3
Incremental sans Lookback
Ne jamais utiliser un WHERE strict comme _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }}) sans marge de securite. Les donnees en retard (late-arriving data) seront perdues definitivement.
Solution : Ajoutez toujours un lookback de 1 a 3 jours et utilisez un unique_key pour eviter les doublons lors du retraitement.
Snapshots : Historisation des Donnees
Les snapshots capturent les changements dans les tables source au fil du temps en implementant une Slowly Changing Dimension Type 2 (SCD2).
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at',
invalidate_hard_deletes=True
)
}}
SELECT
customer_id,
company_name,
subscription_plan,
mrr,
status,
updated_at
FROM {{ source('crm', 'customers') }}
{% endsnapshot %}
Source: customers (t0) Source: customers (t1) +---------+-------+-----+ +---------+-------+-----+ | cust_id | plan | mrr | | cust_id | plan | mrr | +---------+-------+-----+ +---------+-------+-----+ | 101 | Basic | 29 | | 101 | Pro | 99 | <-- changement | 102 | Pro | 99 | | 102 | Pro | 99 | +---------+-------+-----+ +---------+-------+-----+ Snapshot resultant : +---------+-------+-----+---------------------+---------------------+ | cust_id | plan | mrr | dbt_valid_from | dbt_valid_to | +---------+-------+-----+---------------------+---------------------+ | 101 | Basic | 29 | 2024-01-01 00:00:00 | 2024-02-01 00:00:00 | | 101 | Pro | 99 | 2024-02-01 00:00:00 | NULL | | 102 | Pro | 99 | 2024-01-01 00:00:00 | NULL | +---------+-------+-----+---------------------+---------------------+
Macros et Packages
Les macros permettent de creer du SQL reutilisable. Elles sont essentielles pour maintenir un projet dbt DRY (Don't Repeat Yourself).
-- macros/generate_surrogate_key.sql
{% macro generate_surrogate_key(field_list) %}
{% set fields = [] %}
{% for field in field_list %}
{% do fields.append(
"COALESCE(CAST(" ~ field ~ " AS VARCHAR), '_null_')"
) %}
{% endfor %}
{{ dbt_utils.hash(fields | join(" || '|' || ")) }}
{% endmacro %}
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- Utilisation dans un modele :
-- SELECT {{ generate_surrogate_key(['order_id', 'line_item_id']) }} AS sk,
-- {{ cents_to_dollars('amount_cents') }} AS amount_dollars
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
- package: dbt-labs/dbt_expectations
version: [">=0.8.0", "<1.0.0"]
- package: calogica/dbt_date
version: [">=0.7.0", "<1.0.0"]
- git: "https://github.com/mon-org/dbt-internal-macros.git"
revision: v1.2.0
Hooks et Materialisations Custom
-- dbt_project.yml
models:
my_project:
marts:
+pre-hook:
- "SET query_tag = 'dbt_marts_{{ this.name }}'"
+post-hook:
- "GRANT SELECT ON {{ this }} TO ROLE analyst_role"
- "ALTER TABLE {{ this }} SET DATA_RETENTION_TIME_IN_DAYS = 7"
-- Ou directement dans un modele :
{{
config(
materialized='table',
post_hook=[
"CREATE INDEX IF NOT EXISTS idx_{{ this.name }}_user
ON {{ this }} (user_id)",
"ANALYZE {{ this }}"
]
)
}}
Shopify : dbt a Grande Echelle
Shopify gere plus de 5 000 modeles dbt dans un mono-repo. Leur approche des patterns avances :
- Modeles incrementaux partitionnes pour traiter +100 milliards de lignes d'evenements sans full refresh
- Macros partagees dans un package interne utilise par 30+ equipes data
- Snapshots automatises pour tracker les changements de prix et d'inventaire sur des millions de produits
- Materialisation custom "streaming_table" pour les cas ou le batch classique n'etait pas assez rapide
Resultat : temps de build passe de 8h a 45min grace aux incrementaux bien configures.
Startup Fintech : Snapshots Non Testes
Une startup fintech a deploye des snapshots sur les tables de transactions sans tester le comportement lors des changements de schema source.
- Un ajout de colonne dans la source a casse tous les snapshots existants
- 3 mois d'historique perdus car les snapshots echouaient silencieusement
- Le
invalidate_hard_deletesn'etait pas active, donc les clients supprimes apparaissaient toujours comme actifs dans les rapports
Lecon : Toujours ajouter des tests dbt sur les snapshots (unicite, non-null sur dbt_valid_from) et monitorer les echecs de snapshot.
on_schema_change='sync_all_columns' pour les changements mineurs.dbt CI/CD
Objectifs
- Configurer un pipeline CI/CD complet pour dbt avec GitHub Actions
- Implementer le Slim CI avec comparaison de manifests
- Maitriser les strategies de deploiement (blue-green, canary)
- Automatiser les tests, le linting et la documentation
- Gerer les environnements (dev, staging, prod) avec dbt
Le CI/CD pour dbt est souvent neglige, pourtant c'est lui qui vous protege contre les regressions en production. Un pipeline bien configure vous permet de fusionner vos PR en toute confiance. Investissez du temps dans le Slim CI : il reduit vos couts cloud de 80% sur les builds CI.
Architecture CI/CD pour dbt
PR Ouverte Merge dans main Deploiement
========= =============== ============
[Lint SQL] [dbt build [dbt run
sqlfluff --target staging] --target prod]
| | |
[dbt compile] [dbt test [dbt test
--target ci] --target staging] --target prod]
| | |
[Slim CI: [Generate docs] [Notify Slack]
dbt build | |
--select [Deploy docs [Update
state:modified+] to S3/GCS] monitoring]
|
[dbt test
--select
state:modified+]
|
[Check results
& PR comment]
GitHub Actions pour dbt
name: dbt CI
on:
pull_request:
branches: [main]
paths:
- 'models/**'
- 'macros/**'
- 'tests/**'
- 'dbt_project.yml'
env:
DBT_PROFILES_DIR: ./ci_profiles
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install sqlfluff dbt-sqlfluff
- run: sqlfluff lint models/ --dialect snowflake
slim-ci:
needs: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install dbt-snowflake~=1.7.0
- name: Telecharger manifest de production
run: |
aws s3 cp s3://dbt-artifacts/prod/manifest.json \
./target-defer/manifest.json
- name: dbt deps
run: dbt deps
- name: Slim CI - Build modeles modifies
run: |
dbt build \
--select state:modified+ \
--defer \
--state ./target-defer \
--target ci \
--fail-fast
- name: Commenter la PR
if: always()
uses: actions/github-script@v7
with:
script: |
const fs = require('fs');
const results = JSON.parse(
fs.readFileSync('target/run_results.json')
);
const summary = results.results.map(r =>
`| ${r.unique_id} | ${r.status} | ${r.execution_time}s |`
).join('\n');
github.rest.issues.createComment({
issue_number: context.issue.number,
owner: context.repo.owner,
repo: context.repo.repo,
body: `## Resultats dbt CI\n| Modele | Statut | Temps |\n|---|---|---|\n${summary}`
});
Slim CI : Comparaison de Manifests
Qu'est-ce que le Slim CI ?
Le Slim CI compare le manifest de production (etat actuel) avec le manifest de la branche PR (etat futur). Seuls les modeles modifies et leurs descendants sont executes, reduisant drastiquement le temps et le cout des builds CI.
# Selectionner uniquement les modeles modifies
dbt run --select state:modified --state ./prod-manifest/
# Modeles modifies + leurs enfants (downstream)
dbt run --select state:modified+ --state ./prod-manifest/
# Tester les modeles modifies et 1 niveau d'enfants
dbt test --select state:modified+1 --state ./prod-manifest/
# Combiner avec defer pour utiliser les tables prod non-modifiees
dbt build --select state:modified+ \
--defer \
--state ./prod-manifest/ \
--target ci
# Verifier les changements de schema uniquement
dbt run --select state:modified --state ./prod-manifest/ \
--vars '{"schema_check_only": true}'
Strategies de Deploiement
Blue-Green Deployment
Deux schemas identiques (blue/green). On build dans le schema inactif, puis on permute.
- Zero downtime
- Rollback instantane
- Double cout de stockage
-- Permutation atomique ALTER SCHEMA prod_blue RENAME TO prod_old; ALTER SCHEMA prod_green RENAME TO prod_blue;
Rolling Deployment
Les modeles sont mis a jour incrementalement dans le schema de production directement.
- Simple a implementer
- Pas de surcharge stockage
- Risque d'etat inconsistant
# Build couche par couche dbt run --select tag:staging dbt run --select tag:intermediate dbt run --select tag:marts
GitLab : CI/CD dbt avec Merge Request Pipelines
GitLab utilise dbt en interne pour leur data warehouse analytique et a partage leur approche CI/CD :
- Slim CI sur chaque MR : seuls les modeles impactes sont testes dans un schema ephemere dedie a la MR
- Schema CI unique par MR :
ci_mr_{{ merge_request_id }}evite les collisions entre branches - Cleanup automatique : un job CRON supprime les schemas CI de plus de 48h
- Comparaison des row counts : un script compare le nombre de lignes entre CI et prod pour detecter les anomalies
Impact : le temps moyen de revue des MR data est passe de 3 jours a 4 heures grace aux commentaires automatiques de resultats CI.
Full Build en CI a chaque PR
Executer dbt build sans selecteurs sur chaque pull request est un gaspillage massif de ressources. Pour un projet de 500 modeles, cela peut couter des centaines de dollars par jour en credits cloud.
Couts observes : Une equipe a depense 15 000 $/mois en credits Snowflake uniquement pour le CI, avant de passer au Slim CI (reduit a 1 200 $/mois).
Solution : Toujours utiliser state:modified+ avec --defer en CI. Planifier un full build uniquement en staging hebdomadaire.
Mise en place d'un CI/CD dbt pour une equipe de 10 analystes
Votre equipe utilise dbt avec Snowflake. Actuellement, chacun execute dbt run manuellement et deploie en prod via dbt run --target prod depuis son poste.
Objectif : Implementer un pipeline CI/CD avec les contraintes suivantes :
- Chaque PR doit etre validee automatiquement avant merge
- Le deploiement en prod ne doit se faire que depuis la branche main
- Le cout CI ne doit pas depasser 500 $/mois
- Les resultats CI doivent etre visibles directement dans la PR
Approche recommandee : Slim CI avec schemas ephemeres, deploiement Blue-Green automatise via GitHub Actions, et nettoyage CRON des schemas obsoletes.
Securite des Secrets
Ne jamais stocker les identifiants de base de donnees dans le code. Utilisez les secrets GitHub Actions (${{ secrets.* }}), les variables d'environnement, ou un gestionnaire de secrets comme HashiCorp Vault. Verifiez que profiles.yml est bien dans .gitignore.
--defer permet a dbt de referencer les tables de production pour les modeles qui ne sont pas inclus dans la selection courante. Ainsi, un modele modifie peut s'appuyer sur les tables prod existantes sans avoir a les reconstruire. C'est essentiel pour le Slim CI.Lab : Projet dbt Complet
Objectifs
- Construire un projet dbt complet de bout en bout
- Implementer les couches staging, intermediate et mart
- Appliquer les conventions de nommage et les tests
- Configurer la documentation et les exposures
- Creer un DAG coherent et performant
Ce lab est votre meilleur entrainement avant un projet reel. Prenez le temps de bien structurer vos couches. Un bon projet dbt se reconnait a la clarte de son DAG : staging en entree, marts en sortie, et des couches intermediaires bien nommees entre les deux. Ne sautez pas l'etape des tests et de la documentation !
Contexte du Projet
Vous etes Data Engineer chez un e-commerce. Vous devez construire un data warehouse analytique a partir de trois sources : le systeme de commandes (PostgreSQL), le CRM (Salesforce), et les evenements web (Segment). L'objectif final est de fournir des marts pour le marketing, la finance et le produit.
Sources Staging Intermediate Marts ======== ========= ============== ======== [PostgreSQL] ------> stg_orders int_orders_enriched fct_orders [orders, stg_order_items int_customer_orders fct_daily_revenue products, stg_products dim_products customers] stg_customers [Salesforce] ------> stg_sf_accounts int_customer_360 dim_customers [accounts, stg_sf_contacts fct_customer_ltv opportunities] stg_sf_opportunities [Segment] ------> stg_web_events int_session_events fct_sessions [pages, stg_web_pages int_user_journey fct_funnel_analysis identifies, tracks]
Etape 1 : Initialisation du Projet
1.1 Structure des Dossiers
dbt init ecommerce_analytics cd ecommerce_analytics # Structure recommandee mkdir -p models/staging/postgres mkdir -p models/staging/salesforce mkdir -p models/staging/segment mkdir -p models/intermediate mkdir -p models/marts/marketing mkdir -p models/marts/finance mkdir -p models/marts/product mkdir -p tests/generic mkdir -p macros mkdir -p snapshots mkdir -p seeds
1.2 Configuration dbt_project.yml
name: 'ecommerce_analytics'
version: '1.0.0'
config-version: 2
profile: 'ecommerce'
model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
seed-paths: ["seeds"]
models:
ecommerce_analytics:
staging:
+materialized: view
+schema: staging
postgres:
+tags: ['postgres', 'staging']
salesforce:
+tags: ['salesforce', 'staging']
segment:
+tags: ['segment', 'staging']
intermediate:
+materialized: ephemeral
+schema: intermediate
marts:
+materialized: table
marketing:
+schema: marketing
+tags: ['marketing']
finance:
+schema: finance
+tags: ['finance']
product:
+schema: product
+tags: ['product']
1.3 Definir les Sources
version: 2
sources:
- name: postgres
database: raw_db
schema: ecommerce
description: "Systeme transactionnel e-commerce"
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
loaded_at_field: _loaded_at
tables:
- name: orders
description: "Commandes clients"
columns:
- name: order_id
description: "Identifiant unique de commande"
tests:
- unique
- not_null
- name: order_items
description: "Lignes de commande"
- name: products
description: "Catalogue produits"
- name: customers
description: "Comptes clients"
Etape 2 : Couche Staging
2.1 Modele stg_orders.sql
WITH source AS (
SELECT * FROM {{ source('postgres', 'orders') }}
),
renamed AS (
SELECT
-- Cles
order_id,
customer_id,
-- Dimensions
LOWER(status) AS order_status,
LOWER(payment_method) AS payment_method,
shipping_country,
-- Metriques
{{ cents_to_dollars('total_amount_cents') }} AS order_total,
{{ cents_to_dollars('shipping_cost_cents') }} AS shipping_cost,
{{ cents_to_dollars('discount_cents') }} AS discount_amount,
-- Timestamps
ordered_at,
shipped_at,
delivered_at,
cancelled_at,
-- Meta
_loaded_at
FROM source
WHERE order_id IS NOT NULL
)
SELECT * FROM renamed
2.2 Tests et Documentation Staging
version: 2
models:
- name: stg_orders
description: "Commandes nettoyees et renommees"
columns:
- name: order_id
description: "PK - Identifiant unique"
tests:
- unique
- not_null
- name: order_status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped',
'delivered', 'cancelled', 'refunded']
- name: order_total
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 100000
Etape 3 : Couche Intermediate
3.1 Enrichissement des Commandes
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
),
order_items AS (
SELECT * FROM {{ ref('stg_order_items') }}
),
item_summary AS (
SELECT
order_id,
COUNT(*) AS item_count,
COUNT(DISTINCT product_id) AS unique_products,
SUM(quantity) AS total_units
FROM order_items
GROUP BY order_id
),
enriched AS (
SELECT
o.*,
s.item_count,
s.unique_products,
s.total_units,
DATEDIFF('day', o.ordered_at, o.shipped_at) AS days_to_ship,
DATEDIFF('day', o.shipped_at, o.delivered_at) AS days_to_deliver,
CASE
WHEN o.order_total >= 500 THEN 'high_value'
WHEN o.order_total >= 100 THEN 'medium_value'
ELSE 'low_value'
END AS order_tier
FROM orders o
LEFT JOIN item_summary s ON o.order_id = s.order_id
)
SELECT * FROM enriched
Etape 4 : Couche Marts
4.1 Fact Table : fct_daily_revenue
{{
config(
materialized='incremental',
unique_key='revenue_date',
incremental_strategy='merge'
)
}}
WITH orders AS (
SELECT * FROM {{ ref('int_orders_enriched') }}
WHERE order_status NOT IN ('cancelled', 'refunded')
{% if is_incremental() %}
AND ordered_at >= (
SELECT DATEADD('day', -3, MAX(revenue_date))
FROM {{ this }}
)
{% endif %}
),
daily_agg AS (
SELECT
DATE(ordered_at) AS revenue_date,
COUNT(DISTINCT order_id) AS order_count,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(order_total) AS gross_revenue,
SUM(discount_amount) AS total_discounts,
SUM(order_total - discount_amount) AS net_revenue,
SUM(shipping_cost) AS total_shipping,
AVG(order_total) AS avg_order_value,
SUM(total_units) AS total_units_sold
FROM orders
GROUP BY DATE(ordered_at)
)
SELECT * FROM daily_agg
Convention de Nommage dbt Labs
Les conventions officielles recommandees par dbt Labs, utilisees par la majorite des projets matures :
- stg_[source]_[entite] : Staging, nettoyage 1:1 avec la source
- int_[entite]_[verbe] : Intermediate, enrichissement et jointures
- fct_[entite] : Fact table dans les marts (evenements, transactions)
- dim_[entite] : Dimension table dans les marts (entites de reference)
- Pas de logique metier dans staging : uniquement renommage, typage, filtrage null
Marts qui referent directement les sources
Ne jamais faire {{ source('raw', 'orders') }} dans un mart. Les marts doivent toujours passer par staging (et idealement intermediate). Sinon :
- Toute modification de schema source casse N modeles au lieu d'un seul
- La logique de nettoyage est dupliquee dans chaque mart
- Le DAG devient un plat de spaghetti impossible a maintenir
Conseil de Performance
Materialisez vos modeles intermediate en ephemeral par defaut. Ils seront inlines comme CTE dans les modeles downstream. Si un intermediate est reference par 3+ modeles, passez-le en view pour eviter la duplication de calcul.
Quiz : dbt Maitrise
Objectifs
- Valider la comprehension des concepts dbt fondamentaux et avances
- Tester les connaissances sur les patterns, le CI/CD et les bonnes pratiques
- Identifier les lacunes a approfondir avant le module suivant
Ce quiz couvre l'ensemble du module dbt. Prenez votre temps et reflechissez bien a chaque question. Si vous obtenez moins de 6/8, relisez les lecons correspondantes avant de passer au module Orchestration.
Quiz dbt - 8 Questions
1. Quelle materialisation dbt est la plus adaptee pour une table de faits de 500 millions de lignes alimentee quotidiennement ?
2. Que fait la commande dbt build --select state:modified+ --defer --state ./prod-manifest/ ?
state:modified+ cible les modeles modifies et leurs descendants (le + signifie downstream). L'option --defer permet de referencer les tables de production pour les dependances non incluses dans la selection.3. Dans un snapshot dbt avec la strategie "timestamp", que se passe-t-il si la colonne updated_at n'est pas fiable ?
4. Quelle couche du projet dbt doit contenir la logique metier complexe (jointures, calculs derives) ?
5. Quel est le role principal d'une macro dbt ?
6. Quel est l'avantage principal du deploiement Blue-Green pour dbt ?
7. Quelle incremental_strategy est la plus adaptee pour BigQuery avec des donnees partitionnees par date ?
8. Que signifie le selecteur state:modified+1 dans une commande dbt ?
+1 apres le selecteur indique de prendre les modeles correspondants (ici state:modified) plus 1 niveau de descendants directs (enfants). Le + sans chiffre prend tous les descendants.Evaluation
8/8 : Excellent ! Vous etes pret pour le module Orchestration.
6-7/8 : Bon niveau. Relisez les points manques avant de continuer.
Moins de 6 : Revoyez les lecons 7 a 10 avant de poursuivre.
Apache Airflow
Objectifs
- Comprendre l'architecture d'Airflow et ses composants cles
- Creer et configurer des DAGs avec operators, sensors et XCom
- Maitriser les connections, pools et variables
- Comparer les executors : Local, Celery et Kubernetes
- Appliquer les bonnes pratiques de production
Airflow est le standard de facto pour l'orchestration de pipelines data. Meme si des alternatives plus modernes existent, 80% des equipes data utilisent Airflow en production. Comprendre ses forces ET ses limites est essentiel pour un Data Architect. La cle : un DAG Airflow orchestre, il ne transforme pas. Gardez la logique metier dans dbt ou Spark, pas dans vos DAGs.
Architecture d'Airflow
+------------------+
| Webserver |
| (Flask UI) |
+--------+---------+
|
v
+------------------+ +------------------+ +------------------+
| Scheduler |-->| Metadata DB |<--| Triggerer |
| (Planification)| | (PostgreSQL) | | (Async Sensors)|
+--------+---------+ +------------------+ +------------------+
|
v
+------------------+ +------------------+ +------------------+
| Executor |-->| Worker 1 | | Worker N |
| (Celery/K8s) | | (Task exec) | | (Task exec) |
+------------------+ +------------------+ +------------------+
|
v
+------------------+
| Message Broker |
| (Redis/RabbitMQ)|
+------------------+
Anatomie d'un DAG
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email': ['data-alerts@company.com'],
'email_on_failure': True,
'retries': 2,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
'sla': timedelta(hours=4),
}
with DAG(
dag_id='ecommerce_daily_pipeline',
default_args=default_args,
description='Pipeline quotidien e-commerce ELT',
schedule_interval='0 6 * * *', # Tous les jours a 6h
start_date=datetime(2024, 1, 1),
catchup=False,
max_active_runs=1,
tags=['ecommerce', 'daily', 'production'],
) as dag:
# Sensor : attendre que l'ingestion soit terminee
wait_for_ingestion = ExternalTaskSensor(
task_id='wait_for_ingestion',
external_dag_id='raw_data_ingestion',
external_task_id='load_complete',
timeout=3600,
poke_interval=60,
mode='reschedule', # Libere le worker entre les checks
)
# Verifier la fraicheur des donnees
check_freshness = BashOperator(
task_id='check_source_freshness',
bash_command='cd /opt/dbt && dbt source freshness --select source:postgres',
pool='dbt_pool',
)
# Groupe de taches : dbt build
with TaskGroup('dbt_build', tooltip='Build dbt par couche') as dbt_group:
staging = BashOperator(
task_id='dbt_staging',
bash_command='cd /opt/dbt && dbt run --select tag:staging',
pool='dbt_pool',
)
intermediate = BashOperator(
task_id='dbt_intermediate',
bash_command='cd /opt/dbt && dbt run --select tag:intermediate',
pool='dbt_pool',
)
marts = BashOperator(
task_id='dbt_marts',
bash_command='cd /opt/dbt && dbt run --select tag:marts',
pool='dbt_pool',
)
staging >> intermediate >> marts
# Tests dbt
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='cd /opt/dbt && dbt test --select tag:marts',
pool='dbt_pool',
)
# Notification
def notify_success(**context):
execution_date = context['execution_date']
print(f"Pipeline complete pour {execution_date}")
notify = PythonOperator(
task_id='notify_success',
python_callable=notify_success,
)
wait_for_ingestion >> check_freshness >> dbt_group >> dbt_test >> notify
XCom : Communication entre Taches
# Pousser des donnees vers XCom
def extract_stats(**context):
stats = {'total_rows': 150000, 'new_rows': 1200, 'errors': 3}
context['ti'].xcom_push(key='pipeline_stats', value=stats)
return stats
# Tirer des donnees depuis XCom
def check_quality(**context):
stats = context['ti'].xcom_pull(
task_ids='extract_stats',
key='pipeline_stats'
)
if stats['errors'] > 100:
raise ValueError(f"Trop d'erreurs : {stats['errors']}")
print(f"Qualite OK : {stats['new_rows']} nouvelles lignes")
Limites de XCom
XCom stocke les donnees dans la metadata DB (PostgreSQL). Ne transmettez jamais de gros volumes via XCom (max quelques Ko). Pour les DataFrames, utilisez un stockage intermediaire (S3, GCS) et passez uniquement le chemin via XCom.
Connections, Pools et Variables
| Concept | Role | Exemple |
|---|---|---|
| Connection | Identifiants de connexion aux systemes externes | Snowflake, S3, Slack, SMTP |
| Pool | Limite le nombre de taches concurrentes pour une ressource | dbt_pool: max 4 slots |
| Variable | Configuration dynamique accessible dans les DAGs | env=production, lookback_days=3 |
Comparaison des Executors
| Executor | Scalabilite | Isolation | Complexite | Cas d'usage |
|---|---|---|---|---|
| LocalExecutor | Faible (1 machine) | Aucune | Simple | Dev, petites equipes |
| CeleryExecutor | Haute (N workers) | Processus | Moyenne | Production standard |
| KubernetesExecutor | Tres haute | Container | Elevee | Multi-tenant, cloud-native |
Spotify : Airflow a l'Echelle Massive
Spotify est l'un des plus grands utilisateurs d'Airflow au monde avec un deploiement impressionnant :
- +30 000 DAGs geres par plus de 600 ingenieurs data a travers l'entreprise
- KubernetesExecutor pour l'isolation complete entre les equipes et la scalabilite dynamique
- Systeme de "Golden Paths" : templates de DAGs standardises que les equipes peuvent reutiliser
- Monitoring custom avec Datadog : alertes sur les SLA, les retries excessifs, et les DAGs bloques
- Migration progressive vers une architecture event-driven pour les cas temps reel
Lecon cle : A grande echelle, la standardisation et le monitoring sont aussi importants que l'outil lui-meme.
E-commerce : DAGs Monolithiques
Un e-commerce francais a deploye Airflow avec des DAGs contenant plus de 200 taches chacun, melangeant extraction, transformation et chargement dans un seul pipeline.
- Le scheduler mettait 15 minutes a parser un seul DAG au lieu de quelques secondes
- Un echec sur la tache 180 necessitait de relancer tout le DAG (pas de clear granulaire pratique)
- Les dependances entre taches formaient un graphe tellement complexe que personne ne comprenait le flux
- La metadata DB a atteint 200 Go, ralentissant l'ensemble du cluster
Solution appliquee : Decomposition en 40 DAGs modulaires avec ExternalTaskSensor, nettoyage regulier de la metadata DB, et delegation de la transformation a dbt.
Logique Metier dans les DAGs
Placer des transformations SQL ou des calculs complexes directement dans les PythonOperator est un anti-pattern majeur. Le DAG Airflow doit uniquement orchestrer (declencher, ordonnancer, monitorer), pas transformer.
Problemes : Code non testable, pas de versioning des transformations, impossible de re-executer une transformation sans Airflow.
Solution : Utilisez dbt pour les transformations SQL, Spark pour le processing, et Airflow uniquement pour les declencher au bon moment.
catchup=True, Airflow execute retroactivement toutes les executions manquees entre start_date et maintenant. Si vous avez un DAG quotidien avec un start_date d'il y a un an, cela declenchera 365 executions simultanees, surchargeant votre cluster et potentiellement votre base de donnees cible.Dagster
Objectifs
- Comprendre la philosophie "Software-Defined Assets" de Dagster
- Maitriser les concepts cles : assets, ops, graphs, jobs et IO managers
- Utiliser Dagit (l'interface web) pour le monitoring et le debug
- Configurer les resources et les schedules
- Comparer l'approche Dagster vs Airflow
Dagster represente un changement de paradigme par rapport a Airflow. Au lieu de definir "comment orchestrer des taches", vous definissez "quels assets de donnees existent et comment ils sont produits". C'est une approche plus declarative qui correspond mieux a la realite du travail data. Si vous demarrez un nouveau projet en 2024+, Dagster merite serieusement d'etre considere.
Philosophie : Software-Defined Assets
Changement de Paradigme
Airflow : "Je definis des taches et leur ordre d'execution." (Task-centric)
Dagster : "Je definis des assets de donnees et comment ils dependent les uns des autres." (Asset-centric)
Dagster deduit automatiquement le graphe d'execution a partir des dependances entre assets.
Airflow (Task-centric) Dagster (Asset-centric)
====================== ======================
DAG: Assets:
task_extract ----+ raw_orders (source)
task_transform ---+---> task_load |
task_notify ------+ clean_orders (depend de raw_orders)
|
"Execute A puis B puis C" daily_revenue (depend de clean_orders)
"Ces assets existent, voici
comment les produire"
Software-Defined Assets
import dagster as dg
import pandas as pd
# Asset source
@dg.asset(
description="Commandes brutes depuis PostgreSQL",
group_name="staging",
compute_kind="sql",
metadata={"owner": "data-team", "freshness_policy": "daily"},
)
def raw_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:
"""Extraction des commandes depuis la base transactionnelle."""
query = "SELECT * FROM orders WHERE date >= CURRENT_DATE - INTERVAL '1 day'"
df = pd.read_sql(query, context.resources.postgres_conn)
context.log.info(f"Extrait {len(df)} commandes")
context.add_output_metadata({
"row_count": len(df),
"date_range": f"{df['ordered_at'].min()} - {df['ordered_at'].max()}"
})
return df
# Asset dependant
@dg.asset(
description="Commandes nettoyees et enrichies",
group_name="intermediate",
compute_kind="pandas",
deps=[raw_orders], # Dependance explicite
)
def clean_orders(context: dg.AssetExecutionContext, raw_orders: pd.DataFrame) -> pd.DataFrame:
"""Nettoyage : suppression des doublons, typage, filtrage."""
df = raw_orders.copy()
df = df.drop_duplicates(subset=['order_id'])
df['order_total'] = df['total_amount_cents'] / 100.0
df['ordered_at'] = pd.to_datetime(df['ordered_at'])
df = df[df['order_total'] > 0]
context.log.info(f"Apres nettoyage : {len(df)} commandes valides")
return df
# Asset mart
@dg.asset(
description="Revenu quotidien agrege",
group_name="marts",
compute_kind="pandas",
freshness_policy=dg.FreshnessPolicy(maximum_lag_minutes=60 * 8),
)
def daily_revenue(context: dg.AssetExecutionContext, clean_orders: pd.DataFrame) -> pd.DataFrame:
"""Agregation du revenu par jour pour le dashboard finance."""
df = clean_orders.groupby(clean_orders['ordered_at'].dt.date).agg(
order_count=('order_id', 'nunique'),
gross_revenue=('order_total', 'sum'),
avg_order_value=('order_total', 'mean'),
).reset_index()
context.add_output_metadata({
"row_count": len(df),
"total_revenue": float(df['gross_revenue'].sum()),
})
return df
Ops, Graphs et Jobs
Pour les cas ou les assets ne suffisent pas (operations sans sortie durable, side-effects), Dagster propose les ops et graphs :
from dagster import op, graph, job, In, Out, String
@op(
description="Valide la qualite des donnees",
ins={"table_name": In(String)},
out={"validation_result": Out(bool)},
)
def validate_data_quality(context, table_name: str) -> bool:
"""Execute des checks de qualite sur une table."""
checks = {
'null_check': f"SELECT COUNT(*) FROM {table_name} WHERE id IS NULL",
'dup_check': f"SELECT COUNT(*) - COUNT(DISTINCT id) FROM {table_name}",
}
all_passed = True
for name, query in checks.items():
result = context.resources.db.execute(query)
if result > 0:
context.log.warning(f"Check {name} echoue : {result} problemes")
all_passed = False
return all_passed
@op
def send_alert(context, validation_passed: bool):
if not validation_passed:
context.resources.slack.send_message(
channel="#data-alerts",
text="Probleme de qualite detecte !"
)
@graph
def quality_check_pipeline():
result = validate_data_quality()
send_alert(result)
# Transformer le graph en job executable
quality_job = quality_check_pipeline.to_job(
resource_defs={
"db": snowflake_resource,
"slack": slack_resource,
}
)
IO Managers
from dagster import IOManager, io_manager, InputContext, OutputContext
import pandas as pd
class SnowflakeIOManager(IOManager):
"""Stocke et charge les assets depuis Snowflake."""
def __init__(self, connection_string: str, schema: str):
self.connection_string = connection_string
self.schema = schema
def handle_output(self, context: OutputContext, obj: pd.DataFrame):
table_name = context.asset_key.to_user_string().replace("/", "_")
full_table = f"{self.schema}.{table_name}"
obj.to_sql(full_table, self.connection_string, if_exists='replace')
context.log.info(f"Ecrit {len(obj)} lignes dans {full_table}")
def load_input(self, context: InputContext) -> pd.DataFrame:
table_name = context.asset_key.to_user_string().replace("/", "_")
full_table = f"{self.schema}.{table_name}"
return pd.read_sql(f"SELECT * FROM {full_table}", self.connection_string)
@io_manager(config_schema={"connection_string": str, "schema": str})
def snowflake_io_manager(context):
return SnowflakeIOManager(
connection_string=context.resource_config["connection_string"],
schema=context.resource_config["schema"],
)
Dagit : Interface Web
Fonctionnalites de Dagit
Dagit offre une experience developpeur superieure a l'UI Airflow :
- Global Asset Lineage : graphe complet de tous les assets et leurs dependances
- Asset Catalog : documentation automatique de chaque asset avec metadata
- Freshness monitoring : alerte si un asset n'est pas rafraichi a temps
- Backfill partitionne : reprocesser des partitions specifiques en quelques clics
- Launchpad : configuration et lancement interactif des jobs
Elementl (Createurs de Dagster) : Migration chez de Grands Clients
Plusieurs entreprises majeures ont migre d'Airflow vers Dagster avec des resultats mesurables :
- Drizly (e-commerce alcool, acquis par Uber) : migration de 200+ DAGs Airflow vers Dagster en 3 mois. Reduction de 60% du temps de debug grace au lineage global des assets
- Materialize : utilise Dagster pour orchestrer ses pipelines internes. Les Software-Defined Assets ont reduit le nombre de lignes de code de pipeline de 40%
- Le pattern "asset-centric" a naturellement encourage les equipes a mieux documenter leurs donnees, car la documentation fait partie de la definition de l'asset
Tout Mettre dans des Ops au lieu d'Assets
L'erreur classique des equipes migrant depuis Airflow : reproduire le pattern task-centric avec des ops au lieu d'adopter les Software-Defined Assets.
- Perte du lineage automatique des donnees
- Pas de freshness monitoring
- Pas de backfill intelligent par partition
- Code plus complexe sans benefice
Regle : Si votre operation produit un dataset persistant (table, fichier), c'est un asset. Reservez les ops pour les side-effects purs (notifications, appels API sans stockage).
Airflow - Forces
- Ecosysteme mature (800+ providers)
- Communaute massive
- Services manages (MWAA, Cloud Composer)
- Documentation abondante
- Ideal pour orchestration pure
Dagster - Forces
- Asset-centric (paradigme moderne)
- Testabilite native
- UI/UX superieure (Dagit)
- Freshness policies
- Ideal pour equipes data modernes
Prefect
Objectifs
- Comprendre la philosophie de Prefect et son approche "Pythonic"
- Maitriser les flows, tasks, deployments et work pools
- Utiliser les blocks pour la configuration et les secrets
- Configurer Prefect Cloud et le monitoring
- Comparer Prefect avec Airflow et Dagster
Prefect se positionne comme l'orchestrateur le plus "developer-friendly". Son approche est radicalement differente d'Airflow : pas de DAG a declarer, votre code Python devient automatiquement un workflow en ajoutant des decorateurs. C'est l'outil ideal si vous voulez la puissance d'un orchestrateur sans la complexite operationnelle d'Airflow.
Philosophie de Prefect
Les 3 Principes de Prefect
1. Code natif Python : Pas de DSL, pas de fichier de config YAML complexe. Votre code Python + decorateurs = workflow.
2. Negative engineering : Prefect gere tout ce qui peut mal tourner (retries, timeouts, logging, notifications) sans que vous ayez a le coder.
3. Hybrid execution : L'orchestration dans le cloud, l'execution dans votre infrastructure. Vos donnees ne transitent jamais par Prefect Cloud.
+-------------------------------------------+
| Prefect Cloud |
| +----------+ +-----------+ +---------+ |
| | API | | Dashboard | | Automations| |
| | Server | | (UI) | | & Alerts | |
| +----+-----+ +-----------+ +---------+ |
+-------|-----------------------------------+
| API calls (metadata seulement)
v
+-------------------------------------------+
| Votre Infrastructure |
| +-------------+ +------------------+ |
| | Work Pool |--->| Worker | |
| | (Process/ | | (execute flows) | |
| | Docker/K8s)| +------------------+ |
| +-------------+ |
| | |
| +----v------+ +------------------+ |
| | Flow Run |--->| Vos Donnees | |
| | (local) | | (DB, S3, DWH) | |
| +-----------+ +------------------+ |
+-------------------------------------------+
Flows et Tasks
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import httpx
@task(
name="extract-api-data",
description="Extrait les donnees depuis l'API externe",
retries=3,
retry_delay_seconds=30,
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
tags=["extract", "api"],
)
def extract_from_api(endpoint: str, date: str) -> dict:
"""Appel API avec retry automatique et cache."""
logger = get_run_logger()
response = httpx.get(
f"https://api.ecommerce.com/v2/{endpoint}",
params={"date": date},
timeout=30,
)
response.raise_for_status()
data = response.json()
logger.info(f"Extrait {len(data['results'])} enregistrements depuis {endpoint}")
return data
@task(
name="transform-orders",
tags=["transform"],
)
def transform_orders(raw_data: dict) -> pd.DataFrame:
"""Transforme les donnees brutes en DataFrame structure."""
logger = get_run_logger()
df = pd.DataFrame(raw_data['results'])
df['order_total'] = df['amount_cents'] / 100.0
df['ordered_at'] = pd.to_datetime(df['ordered_at'])
df = df[df['order_total'] > 0]
logger.info(f"Transforme {len(df)} commandes valides")
return df
@task(
name="load-to-warehouse",
retries=2,
retry_delay_seconds=60,
tags=["load", "snowflake"],
)
def load_to_snowflake(df: pd.DataFrame, table_name: str):
"""Charge les donnees dans Snowflake."""
logger = get_run_logger()
from prefect_snowflake import SnowflakeConnector
connector = SnowflakeConnector.load("prod-snowflake")
with connector.get_connection() as conn:
df.to_sql(table_name, conn, if_exists='append', index=False)
logger.info(f"Charge {len(df)} lignes dans {table_name}")
@flow(
name="ecommerce-daily-pipeline",
description="Pipeline ELT quotidien e-commerce",
version="2.1.0",
retries=1,
retry_delay_seconds=300,
log_prints=True,
)
def daily_pipeline(target_date: str = None):
"""Pipeline principal : extract, transform, load."""
from datetime import date
target = target_date or str(date.today())
# Extract (parallel par defaut si independant)
orders_raw = extract_from_api("orders", target)
products_raw = extract_from_api("products", target)
# Transform
orders_clean = transform_orders(orders_raw)
# Load
load_to_snowflake(orders_clean, "staging.orders")
print(f"Pipeline termine pour {target}")
# Execution locale pour test
if __name__ == "__main__":
daily_pipeline(target_date="2024-03-15")
Deployments et Work Pools
# prefect.yaml
deployments:
- name: ecommerce-daily
entrypoint: pipelines/daily.py:daily_pipeline
work_pool:
name: production-k8s
job_variables:
image: registry.company.com/data-pipelines:latest
cpu: "2000m"
memory: "4Gi"
schedule:
cron: "0 6 * * *"
timezone: "Europe/Paris"
parameters:
target_date: null # Sera calcule automatiquement
tags: ["production", "daily", "ecommerce"]
- name: ecommerce-backfill
entrypoint: pipelines/daily.py:daily_pipeline
work_pool:
name: production-k8s
job_variables:
image: registry.company.com/data-pipelines:latest
cpu: "4000m"
memory: "8Gi"
tags: ["backfill", "ecommerce"]
# Creer un work pool Kubernetes
prefect work-pool create production-k8s --type kubernetes
# Demarrer un worker
prefect worker start --pool production-k8s
# Deployer les flows
prefect deploy --all
# Lancer manuellement un flow
prefect deployment run 'ecommerce-daily-pipeline/ecommerce-daily' \
--param target_date=2024-03-01
# Backfill sur une plage de dates
for d in $(seq -w 1 31); do
prefect deployment run 'ecommerce-daily-pipeline/ecommerce-backfill' \
--param target_date=2024-03-$d
done
Blocks : Configuration et Secrets
from prefect_snowflake import SnowflakeConnector
from prefect.blocks.notifications import SlackWebhook
from prefect.blocks.system import Secret
# Configurer un block Snowflake (via UI ou code)
snowflake_block = SnowflakeConnector(
account="myorg-account",
user="dbt_user",
password="...", # Ou reference a un Secret block
database="analytics",
warehouse="transform_wh",
schema="public",
role="transformer",
)
snowflake_block.save("prod-snowflake", overwrite=True)
# Block Slack pour les notifications
slack = SlackWebhook(url="https://hooks.slack.com/services/...")
slack.save("data-alerts", overwrite=True)
# Utilisation dans un flow
@flow
def monitored_pipeline():
connector = SnowflakeConnector.load("prod-snowflake")
slack = SlackWebhook.load("data-alerts")
try:
# ... logique pipeline
pass
except Exception as e:
slack.notify(f"Pipeline echoue : {str(e)}")
raise
Automations et Monitoring
Prefect Cloud Automations
Prefect Cloud offre un systeme d'automations declenchees par des evenements :
- Flow run failed : Envoyer une alerte Slack et creer un ticket Jira
- Flow run late : Notifier l'equipe si un flow planifie n'a pas demarre
- Work pool unhealthy : Alerter l'ops si aucun worker n'est disponible
- Custom events : Declencher un flow B a la completion du flow A
Shopify avec Prefect : Orchestration Cloud-Native
Shopify a adopte Prefect pour certains de ses pipelines data en complement d'Airflow, notamment pour les cas d'usage necessitant une grande flexibilite :
- Pipelines ML : Les flows Prefect orchestrent l'entrainement et le deploiement des modeles de recommandation, avec des retries intelligents sur les echecs GPU
- Work pools differencies : Un pool CPU pour l'ETL standard, un pool GPU pour le ML, un pool spot pour les backfills (cout reduit de 70%)
- Execution hybride : L'orchestration dans Prefect Cloud, l'execution dans leurs clusters Kubernetes prives -- zero donnees dans le cloud Prefect
- Integration dbt : Les flows Prefect declenchent dbt Cloud via l'API, avec monitoring des resultats et alertes automatiques
Ignorer le Cache des Tasks
Ne pas utiliser le cache de Prefect sur les tasks d'extraction est un gaspillage majeur de ressources, surtout lors du developpement et des re-runs.
- Chaque re-run rappelle les APIs externes inutilement
- Les quotas API sont consommes pour rien
- Le temps de test est multiplie par le nombre de retries
Solution : Utilisez cache_key_fn=task_input_hash avec un cache_expiration adapte. En dev, un cache d'1 heure evite des centaines d'appels API inutiles.
Choix d'Orchestrateur pour un Nouveau Projet
Votre entreprise (50 personnes, equipe data de 5) lance un nouveau data warehouse. Vous devez choisir un orchestrateur. Voici les criteres :
| Critere | Airflow | Dagster | Prefect |
|---|---|---|---|
| Courbe d'apprentissage | Moyenne | Moyenne-haute | Faible |
| Ops overhead | Eleve | Moyen | Faible (Cloud) |
| Ecosysteme | Tres riche | En croissance | Riche |
| Testabilite | Faible | Excellente | Bonne |
| Service manage | MWAA, Composer, Astronomer | Dagster Cloud | Prefect Cloud |
| Ideal pour | Grandes equipes, legacy | Equipes data modernes | Petites equipes, rapidite |
Recommandation : Pour une petite equipe, Prefect Cloud offre le meilleur ratio valeur/effort. Pour un projet data-centric avec lineage important, Dagster. Pour un environnement existant avec beaucoup d'integrations, Airflow.
Startup SaaS : Migration Prematuree vers Prefect 2
Une startup SaaS a migre de Prefect 1 (Orion) vers Prefect 2 des sa sortie beta, sans planification adequte :
- L'API Prefect 2 etait incompatible avec Prefect 1 -- 100% des flows a reecrire
- Les blocks remplacaient les anciens secrets et connections -- migration manuelle de 50+ configurations
- Le nouveau work pool system necessitait une reconfiguration complete de l'infrastructure
- 3 semaines de downtime partiel des pipelines pendant la migration
Lecon : Ne jamais migrer un outil d'orchestration en production sur une version beta. Planifier la migration sur 2-3 sprints avec une periode de cohabitation ou les deux versions tournent en parallele.
.submit() au lieu de l'appel direct, ce qui soumet la task a un task runner (ConcurrentTaskRunner pour l'async, DaskTaskRunner pour le parallelisme distribue, RayTaskRunner pour le calcul GPU). Exemple : result = my_task.submit(arg).Resume du Module Orchestration
Les trois orchestrateurs (Airflow, Dagster, Prefect) resolvent le meme probleme fondamental -- coordonner l'execution de pipelines data -- mais avec des philosophies tres differentes. Airflow est le standard etabli, Dagster apporte l'approche asset-centric, et Prefect mise sur la simplicite Pythonic. Le choix depend de la taille de l'equipe, de la complexite des pipelines, et de l'appetence pour le DevOps.
16. Comparaison Orchestrateurs : Airflow vs Dagster vs Prefect
Objectifs
- Comparer en profondeur Apache Airflow, Dagster et Prefect sur des criteres objectifs
- Construire une matrice de decision adaptee a votre contexte
- Evaluer le TCO (Total Cost of Ownership) de chaque solution
- Identifier le meilleur orchestrateur selon le profil d'equipe et le cas d'usage
Le choix d'un orchestrateur est une decision structurante qui engage votre equipe pour 3 a 5 ans. Ne choisissez jamais sur la base d'un tutoriel ou d'un benchmark isole. Evaluez la maturite de l'ecosysteme, la taille de la communaute, et surtout la capacite de votre equipe a operer l'outil au quotidien.
Vue d'ensemble des trois orchestrateurs
Apache Airflow, Dagster et Prefect representent trois generations d'orchestrateurs de donnees. Airflow, cree par Airbnb en 2014, est le standard de facto avec la plus grande communaute. Dagster, lance en 2019 par Elementl, introduit le concept de Software-Defined Assets. Prefect, fonde en 2018, mise sur la simplicite et l'experience developpeur.
| Critere | Apache Airflow | Dagster | Prefect |
|---|---|---|---|
| Annee de creation | 2014 (Airbnb) | 2019 (Elementl) | 2018 (Prefect Technologies) |
| Paradigme | DAG de taches (task-centric) | Assets materialisables (asset-centric) | Flows et taches (task-centric moderne) |
| Langage | Python | Python | Python |
| Licence | Apache 2.0 | Apache 2.0 | Apache 2.0 (OSS) / Proprietary (Cloud) |
| Offre Cloud | Astronomer, MWAA, Cloud Composer | Dagster Cloud | Prefect Cloud |
| Communaute GitHub Stars | ~37 000 | ~12 000 | ~16 000 |
| Scheduling | Cron natif, Timetables | Cron, Sensors, Freshness policies | Cron, RRule, event-driven |
| UI / Interface | Web UI complete, Graph view | Dagit/Dagster UI, Asset lineage | Prefect UI, dashboard cloud |
| Testing | Difficile (mocking complexe) | Natif (execution in-process) | Natif (execution locale simple) |
| Scalabilite | CeleryExecutor, KubernetesExecutor | Multi-process, K8s | Workers distribues, K8s |
| Courbe d'apprentissage | Moyenne a elevee | Elevee (concepts uniques) | Faible a moyenne |
| Plugins / Connecteurs | 2000+ providers | Integrations croissantes | Collections/blocks modulaires |
Comparaison architecturale detaillee
Airflow : Architecture classique
Airflow repose sur un Scheduler central qui parse les fichiers DAG, un Webserver Flask pour l'UI, et un Executor qui delegue les taches aux workers. La metabase PostgreSQL stocke l'etat de toutes les executions.
- Architecture eprouvee, bien documentee
- Separation claire scheduler/worker
- Nombreux executors (Local, Celery, Kubernetes)
- XCom pour passage de donnees inter-taches
Dagster : Architecture asset-centric
Dagster introduit le Dagster Daemon pour le scheduling, Dagit pour la visualisation, et un systeme d'IO Managers pour gerer la persistance des assets. Chaque asset declare ses dependances et son type de donnees.
- Software-Defined Assets (SDA)
- Type system integre avec validation
- IO Managers abstraient le stockage
- Partitioning natif et puissant
AIRFLOW DAGSTER PREFECT
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Scheduler β β Daemon β β Orion Serverβ
β (parse DAGs)β β (sensors, β β (API + UI) β
β β β schedules) β β β
ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Executor β β Run Launcherβ β Work Pool β
β (Celery/K8s)β β (process/K8sβ β (agents/ β
β β β /Docker) β β workers) β
ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β Workers β β User Code β β Flow Runs β
β (tasks) β β (assets + β β (tasks) β
β β β resources) β β β
ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ
β β β
βΌ βΌ βΌ
PostgreSQL PostgreSQL PostgreSQL/
(metastore) (event log) SQLite/Cloud
Matrice de decision
Utilisez cette matrice pour evaluer chaque orchestrateur selon vos criteres prioritaires. Notez de 1 (faible) a 5 (excellent) :
| Critere de decision | Poids | Airflow | Dagster | Prefect |
|---|---|---|---|---|
| Maturite et stabilite | Eleve | 5/5 | 3/5 | 3/5 |
| Ecosysteme de plugins | Eleve | 5/5 | 3/5 | 3/5 |
| Facilite de deploiement | Moyen | 2/5 | 3/5 | 5/5 |
| Experience developpeur | Moyen | 3/5 | 4/5 | 5/5 |
| Testabilite du code | Moyen | 2/5 | 5/5 | 4/5 |
| Data Lineage natif | Eleve | 2/5 | 5/5 | 3/5 |
| Gestion des erreurs | Moyen | 3/5 | 4/5 | 5/5 |
| Offre managed cloud | Moyen | 4/5 (multi) | 4/5 | 5/5 |
| Recrutement talent | Eleve | 5/5 | 2/5 | 3/5 |
| Support grandes equipes | Eleve | 5/5 | 4/5 | 3/5 |
Analyse TCO (Total Cost of Ownership)
Le cout total ne se limite pas aux frais de licence. Voici une estimation sur 12 mois pour une equipe de 5 data engineers avec 200 pipelines :
| Poste de cout | Airflow (self-hosted) | Airflow (Astronomer) | Dagster Cloud | Prefect Cloud |
|---|---|---|---|---|
| Infrastructure | 1 200 EUR/mois | Inclus | Inclus | Inclus |
| Licence / SaaS | 0 EUR | ~3 000 EUR/mois | ~2 500 EUR/mois | ~1 500 EUR/mois |
| Ops / Maintenance | 0.5 ETP (~3 500 EUR) | ~0.1 ETP (~700 EUR) | ~0.1 ETP (~700 EUR) | ~0.05 ETP (~350 EUR) |
| Formation equipe | 2 semaines | 1.5 semaines | 3 semaines | 1 semaine |
| Total mensuel estime | ~4 700 EUR | ~3 700 EUR | ~3 200 EUR | ~1 850 EUR |
Attention au cout cache du self-hosting
Le self-hosting d'Airflow parait gratuit mais consomme enormement de temps d'ingenierie : mises a jour, gestion des dependances, tuning du scheduler, monitoring. De nombreuses equipes sous-estiment ce cout de 40 a 60 % la premiere annee.
Quand choisir quel orchestrateur ?
Scenario A : Grande entreprise, 50+ pipelines, equipe de 10+
Recommandation : Apache Airflow (managed via Astronomer ou MWAA). La maturite, l'ecosysteme de providers, et la facilite de recrutement en font le choix le plus sur pour les grandes organisations. Le pattern DAG est bien compris par la majorite des data engineers.
Scenario B : Equipe data-centric, focus qualite et lineage
Recommandation : Dagster. Si la qualite des donnees et la tracabilite des assets sont vos priorites absolues, Dagster apporte un cadre rigoureux avec les Software-Defined Assets, le type system, et le lineage integre. Ideal pour les equipes analytiques avancees.
Scenario C : Startup/PME, rapidite de mise en oeuvre
Recommandation : Prefect. La courbe d'apprentissage la plus douce, le deploiement le plus rapide, et le cout le plus bas. Parfait pour les equipes restreintes qui veulent etre operationnelles en quelques jours sans investissement infrastructure lourd.
Spotify : Migration d'Airflow vers une architecture hybride
Spotify exploite plus de 30 000 DAGs Airflow pour orchestrer ses pipelines de recommandation musicale. L'equipe a developpe des extensions internes (Backstage integration, custom operators) mais a rencontre des limites de scalabilite du scheduler. Leur approche : garder Airflow comme orchestrateur principal tout en evaluant Dagster pour les pipelines ML ou le lineage asset-centric est critique.
- Lecon : un orchestrateur unique ne suffit pas toujours a grande echelle
- La coexistence de plusieurs outils est viable si les frontieres sont claires
Choisir un orchestrateur uniquement sur la hype
Un anti-pattern frequent consiste a migrer vers Dagster ou Prefect simplement parce qu'Airflow semble "vieux". Si vos DAGs Airflow fonctionnent et que votre equipe les maitrise, une migration represente un cout enorme (reecriture, formation, risques de regression). Migrez uniquement si vous avez identifie des limitations concretes et quantifiees.
Echec : Migration precipitee vers Prefect 2.0
Une fintech europeenne a decide de migrer ses 120 DAGs Airflow vers Prefect 2.0 en 3 mois. L'equipe a sous-estime les differences fondamentales : absence de sensors equivalents, gestion differente des retries, et pattern d'execution hybride (cloud + agent). Resultat : 6 mois de retard, pipelines instables pendant 2 mois en production, et retour partiel a Airflow pour les pipelines critiques.
- Lecon : toujours faire un POC sur 5-10 pipelines representatifs avant de migrer
- Prevoir un plan de rollback et une phase de coexistence
Code compare : meme pipeline, trois syntaxes
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract(): ...
def transform(): ...
def load(): ...
with DAG("etl_pipeline", start_date=datetime(2024,1,1),
schedule_interval="@daily", catchup=False) as dag:
t1 = PythonOperator(task_id="extract", python_callable=extract)
t2 = PythonOperator(task_id="transform", python_callable=transform)
t3 = PythonOperator(task_id="load", python_callable=load)
t1 >> t2 >> t3
from dagster import asset, define_asset_job, ScheduleDefinition
@asset
def raw_data():
"""Extract raw data from source."""
return extract()
@asset(deps=[raw_data])
def transformed_data(raw_data):
"""Transform raw data."""
return transform(raw_data)
@asset(deps=[transformed_data])
def loaded_data(transformed_data):
"""Load transformed data to warehouse."""
return load(transformed_data)
etl_job = define_asset_job("etl_pipeline", selection="*")
etl_schedule = ScheduleDefinition(job=etl_job, cron_schedule="@daily")
from prefect import flow, task
from prefect.deployments import Deployment
@task(retries=3, retry_delay_seconds=60)
def extract(): ...
@task
def transform(data): ...
@task
def load(data): ...
@flow(name="etl_pipeline", log_prints=True)
def etl_pipeline():
raw = extract()
transformed = transform(raw)
load(transformed)
# Deploiement avec schedule
if __name__ == "__main__":
etl_pipeline.serve(name="daily-etl", cron="0 0 * * *")
17. Quiz Orchestration
Objectifs
- Valider vos connaissances sur Airflow, Dagster et Prefect
- Tester votre comprehension des patterns d'orchestration
- Verifier votre capacite a choisir le bon outil selon le contexte
Ce quiz couvre l'ensemble du module orchestration. Prenez le temps de bien lire chaque question. Si vous obtenez moins de 6/8, revisez les lecons 13 a 16 avant de continuer.
Quiz Module 3.3 - Orchestration
Q1. Quel composant Airflow est responsable de parser les fichiers DAG et de planifier les executions ?
Q2. Quel concept unique differencie fondamentalement Dagster d'Airflow ?
Q3. Dans Prefect, quel decorateur est utilise pour definir un point d'entree de pipeline ?
Q4. Quel Executor Airflow est recommande pour une scalabilite maximale en production sur Kubernetes ?
Q5. Quelle est la methode recommandee pour passer des donnees entre taches dans Airflow 2.x ?
Q6. Quelle approche est un anti-pattern lors du choix d'un orchestrateur ?
Q7. Quel mecanisme Dagster utilise-t-il pour abstraire la couche de persistance des assets ?
Q8. Quel orchestrateur offre le TCO le plus bas pour une petite equipe (3-5 personnes) avec 50 pipelines ?
18. Outils ELT/CDC : Fivetran, Airbyte, Debezium, Kafka Connect, Meltano
Objectifs
- Comprendre la difference entre ELT batch et CDC (Change Data Capture) en temps reel
- Maitriser les architectures et cas d'usage de Fivetran, Airbyte, Debezium, Kafka Connect et Meltano
- Savoir choisir le bon outil d'ingestion selon le volume, la latence et le budget
- Identifier les anti-patterns courants en ingestion de donnees
L'ingestion est le maillon le plus fragile d'un pipeline de donnees. Un outil CDC mal configure peut saturer votre base source, et un ELT sans deduplication peut corrompre vos donnees. Choisissez toujours en fonction de vos contraintes de latence reelles, pas de celles que vous imaginez avoir besoin.
ELT vs CDC : deux paradigmes complementaires
ELT (Extract-Load-Transform)
L'ELT extrait les donnees de la source, les charge telles quelles dans le data warehouse, puis applique les transformations dans le warehouse lui-meme. C'est le pattern dominant du Modern Data Stack car il exploite la puissance de calcul du warehouse (Snowflake, BigQuery) pour les transformations.
CDC (Change Data Capture)
Le CDC capture les modifications en temps reel depuis les logs de transaction (WAL PostgreSQL, binlog MySQL). Seules les insertions, mises a jour et suppressions sont propagees, reduisant drastiquement le volume de donnees transferees et la latence de replication.
ELT BATCH (Fivetran / Airbyte / Meltano)
ββββββββββββ Full/Incremental ββββββββββββββββ dbt ββββββββββββ
β Source ββββββββββExtractβββββββΊβ Raw Layer βββTransformββΊβ Marts β
β (API/DB) β (toutes les 5min β (Warehouse) β β(Warehouseβ
ββββββββββββ a 24h) ββββββββββββββββ ββββββββββββ
CDC STREAMING (Debezium / Kafka Connect)
ββββββββββββ WAL/Binlog βββββββββββββ Stream ββββββββββββββββ
β Source ββββRead logsβββββββΊβ Kafka βββProcessingβββΊβ Target β
β (DB) β (temps reel) β (broker) β (Flink/ β (DWH/Lake/ β
ββββββββββββ βββββββββββββ ksqlDB) β service) β
ββββββββββββββββ
Comparaison detaillee des outils
| Outil | Type | Modele | Connecteurs | Latence | Cout |
|---|---|---|---|---|---|
| Fivetran | ELT SaaS | Proprietaire, managed | 400+ | 5 min - 24h | $$$$ (MAR-based) |
| Airbyte | ELT OSS/Cloud | Open-source + Cloud | 350+ | 5 min - 24h | $ (self) / $$ (cloud) |
| Meltano | ELT OSS | Open-source CLI | 600+ (Singer) | Batch schedule | $ (self-hosted) |
| Debezium | CDC OSS | Open-source | 15+ (DB focus) | Secondes | $ (self-hosted) |
| Kafka Connect | Integration | Open-source + managed | 200+ | Secondes | $$ (Confluent) |
Fivetran : le leader ELT managed
Fivetran est la reference du ELT managed. Son approche "zero configuration" permet de connecter une source en quelques clics. Le pricing base sur les Monthly Active Rows (MAR) peut cependant devenir couteux a grande echelle.
Forces de Fivetran
Connecteurs certifies et maintenus par Fivetran, schema drift detection automatique, historisation native avec les tables _fivetran_deleted et _fivetran_synced. Ideal pour les equipes qui ne veulent pas gerer l'infrastructure d'ingestion.
Airbyte : l'alternative open-source
Airbyte offre une experience similaire a Fivetran mais avec un modele open-source. Le deploiement self-hosted permet de maitriser les couts mais requiert une expertise Docker/Kubernetes. Airbyte Cloud offre une alternative managed.
# airbyte-source-postgres.yaml
source:
sourceType: postgres
host: db.production.internal
port: 5432
database: ecommerce
username: ${AIRBYTE_PG_USER}
password: ${AIRBYTE_PG_PASSWORD}
ssl_mode: require
replication_method:
method: CDC # utilise le WAL PostgreSQL
replication_slot: airbyte_slot
publication: airbyte_publication
initial_waiting_seconds: 300
destination:
destinationType: bigquery
project_id: my-data-project
dataset_id: raw_ecommerce
credentials_json: ${GCP_CREDENTIALS}
loading_method:
method: GCS Staging
gcs_bucket_name: airbyte-staging-bucket
Debezium : CDC en temps reel
Debezium est le standard open-source du CDC. Il lit les journaux de transactions (WAL, binlog, redo log) sans impacter les performances de la base source. Deploye comme connecteur Kafka Connect, il publie chaque modification dans un topic Kafka.
{
"name": "ecommerce-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db.production.internal",
"database.port": "5432",
"database.user": "debezium",
"database.password": "${CDC_PASSWORD}",
"database.dbname": "ecommerce",
"database.server.name": "prod-ecommerce",
"schema.include.list": "public",
"table.include.list": "public.orders,public.customers,public.products",
"plugin.name": "pgoutput",
"slot.name": "debezium_slot",
"publication.name": "debezium_publication",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc.$3"
}
}
Meltano : ELT en ligne de commande
Meltano, porte par la communaute Singer, offre une approche CLI-first de l'ELT. Il est particulierement adapte aux equipes DevOps qui veulent gerer leurs pipelines d'ingestion comme du code (Infrastructure as Code).
# Installation et configuration meltano init my-elt-project cd my-elt-project # Ajouter un extracteur (tap) et un loader (target) meltano add extractor tap-postgres meltano add loader target-snowflake # Configurer la source meltano config tap-postgres set host db.production.internal meltano config tap-postgres set database ecommerce meltano config tap-postgres set default_replication_method LOG_BASED # Lancer l'extraction meltano run tap-postgres target-snowflake # Ajouter un schedule orchestable par Airflow meltano schedule add daily-sync --extractor tap-postgres \ --loader target-snowflake --interval "@daily"
Matrice de decision : quel outil pour quel besoin ?
| Besoin | Outil recommande | Justification |
|---|---|---|
| ELT SaaS, zero maintenance | Fivetran | Connecteurs certifies, support premium, zero ops |
| ELT open-source, budget limite | Airbyte (self-hosted) | Gratuit, large catalogue, communaute active |
| ELT as Code, integration CI/CD | Meltano | CLI-first, GitOps natif, ecosysteme Singer |
| CDC temps reel, stack Kafka | Debezium + Kafka Connect | Standard CDC, sub-second latency, scalable |
| Integration multi-source managed | Confluent Cloud + connectors | Kafka managed, connecteurs managed, monitoring integre |
| SaaS + APIs REST | Fivetran ou Airbyte Cloud | Connecteurs SaaS pre-construits (Salesforce, Stripe, etc.) |
Shopify : Airbyte pour democratiser l'acces aux donnees
Shopify a adopte Airbyte en interne pour permettre a chaque equipe produit de configurer ses propres pipelines d'ingestion sans dependre de l'equipe data platform. L'approche self-service a reduit le backlog de demandes d'ingestion de 6 semaines a 2 jours. Les connecteurs custom sont developpes en interne et partages via un registre prive.
CDC sans Schema Registry
Deployer Debezium sans Schema Registry (Avro ou Protobuf) est un anti-pattern critique. Sans gestion des schemas, une modification de colonne en base source (ajout, renommage, changement de type) provoque des erreurs de deserialization en cascade chez tous les consommateurs. Le Schema Registry garantit la compatibilite ascendante et descendante des schemas.
Echec : Fivetran et l'explosion des couts MAR
Une societe de e-commerce a connecte sa base transactionnelle (50M lignes, 200k updates/jour) via Fivetran en mode incremental. Le calcul MAR incluant toutes les lignes modifiees, la facture est passee de 2 000 EUR/mois a 18 000 EUR/mois en 3 mois apres ajout de tables supplementaires. L'equipe a du migrer les tables les plus volumineuses vers Airbyte self-hosted en urgence.
- Lecon : toujours estimer le volume MAR avant de connecter une source a Fivetran
- Utiliser le CDC Fivetran (log-based) plutot que le mode incremental quand possible
Full refresh quotidien sur des tables volumineuses
Faire un full refresh (extraction complete) quotidien sur une table de 100M+ lignes est un anti-pattern courant chez les debutants. Cela surcharge la base source, consomme de la bande passante, et augmente les couts warehouse. Privilegiez toujours l'incremental (basee sur un curseur updated_at) ou le CDC quand la source le supporte.
19. Formats de Fichiers : Parquet, ORC, Avro, JSON, CSV
Objectifs
- Comprendre les differences entre formats colonnaires et formats en lignes
- Maitriser les mecanismes de compression et d'encodage de chaque format
- Analyser les benchmarks de performance pour choisir le format optimal
- Eviter les anti-patterns de choix de format dans un data lake
Le choix du format de fichier impacte directement les performances de requete (x10 a x100), les couts de stockage (x3 a x10), et la maintenabilite a long terme. C'est une decision fondatrice de votre data lake qui affecte toutes les couches en aval. Ne la negligez jamais.
Colonnaire vs Ligne : le principe fondamental
DONNEES ORIGINALES (table orders)
ββββββββββ¬βββββββββββββ¬βββββββββ¬ββββββββ
β id β date β amount β statusβ
ββββββββββΌβββββββββββββΌβββββββββΌββββββββ€
β 1 β 2024-01-15 β 150.00 β paid β
β 2 β 2024-01-16 β 89.50 β paid β
β 3 β 2024-01-16 β 220.00 β pend β
ββββββββββ΄βββββββββββββ΄βββββββββ΄ββββββββ
FORMAT LIGNE (CSV, JSON, Avro) FORMAT COLONNAIRE (Parquet, ORC)
ββββββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββ
β 1, 2024-01-15, 150.00, paid β β id: [1, 2, 3] β
β 2, 2024-01-16, 89.50, paid β β date: [2024-01-15, ...] β
β 3, 2024-01-16, 220.00, pend β β amount: [150.00, 89.50, ...] β
ββββββββββββββββββββββββββββββββββ β status: [paid, paid, pend] β
ββββββββββββββββββββββββββββββββββ
Avantage: ecriture rapide, Avantage: lecture selective,
lecture ligne complete compression superieure,
Cas: OLTP, streaming, logs aggregations rapides
Cas: OLAP, analytics, data lake
Comparaison detaillee des formats
| Critere | Parquet | ORC | Avro | JSON | CSV |
|---|---|---|---|---|---|
| Type | Colonnaire | Colonnaire | Ligne | Ligne | Ligne |
| Schema | Integre (Thrift) | Integre (Protobuf) | Integre (JSON) | Aucun | Aucun |
| Compression | Snappy, Gzip, Zstd, LZ4 | Zlib, Snappy, LZO, Zstd | Snappy, Deflate, Zstd | Gzip externe | Gzip externe |
| Taux compression | Excellent (x5-x10) | Excellent (x5-x10) | Bon (x3-x5) | Faible (x2-x3) | Faible (x2-x3) |
| Predicate pushdown | Oui (row groups) | Oui (stripes + bloom) | Non | Non | Non |
| Column pruning | Oui | Oui | Non | Non | Non |
| Types complexes | Oui (nested, arrays) | Oui (structs, maps) | Oui (unions, arrays) | Oui (natif) | Non |
| Schema evolution | Ajout colonnes | Ajout colonnes | Full evolution | Flexible | Fragile |
| Splittable | Oui | Oui | Oui | Non (sauf JSONL) | Oui |
| Ecosysteme | Spark, Presto, BigQuery, Snowflake | Hive, Spark, Presto | Kafka, Spark, Flink | Universel | Universel |
Apache Parquet en profondeur
Parquet est le format dominant du Modern Data Stack. Sa structure en Row Groups (128 MB par defaut) contenant des Column Chunks permet un acces selectif extremement performant. Chaque column chunk contient des Pages de donnees avec des statistiques (min/max) pour le predicate pushdown.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β FICHIER PARQUET β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β Magic Number: PAR1 β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β ROW GROUP 0 (128 MB par defaut) β β βββββββββββββββββββ¬ββββββββββββββββββ¬βββββββββββββββββ β β β Column Chunk β Column Chunk β Column Chunk β β β β (id) β (date) β (amount) β β β β βββββββββββββββ β βββββββββββββββ β βββββββββββββββ β β β β Page 0 β β β Page 0 β β β Page 0 ββ β β β β min=1,max=50β β β min=Jan β β β min=10.0 ββ β β β β data=[...] β β β data=[...] β β β data=[...] ββ β β β βββββββββββββββ β βββββββββββββββ β βββββββββββββββ β β β βββββββββββββββ β βββββββββββββββ β βββββββββββββββ β β β β Page 1 β β β Page 1 β β β Page 1 ββ β β β β min=51... β β β ... β β β ... ββ β β β βββββββββββββββ β βββββββββββββββ β βββββββββββββββ β β βββββββββββββββββββ΄ββββββββββββββββββ΄βββββββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β ROW GROUP 1 ... β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β FOOTER (schema, row group metadata, statistics) β β Footer Length (4 bytes) β β Magic Number: PAR1 β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Encodages et compression
Parquet et ORC utilisent des encodages specialises par type de donnees qui precedent la compression generale :
| Encodage | Type de donnees | Principe | Gain typique |
|---|---|---|---|
| Dictionary Encoding | Strings cardinalite faible | Remplace les valeurs par des index dans un dictionnaire | x5-x20 |
| Run-Length Encoding (RLE) | Valeurs repetees | Stocke (valeur, nombre de repetitions) | x10-x100 |
| Delta Encoding | Integers tries | Stocke les differences entre valeurs consecutives | x3-x10 |
| Bit Packing | Petits entiers | Utilise le nombre minimal de bits | x2-x4 |
import pyarrow as pa
import pyarrow.parquet as pq
# Schema explicite avec types optimaux
schema = pa.schema([
pa.field("order_id", pa.int64()),
pa.field("customer_id", pa.int32()), # int32 si < 2 milliards
pa.field("order_date", pa.date32()), # date32 au lieu de string
pa.field("amount", pa.decimal128(10, 2)), # decimal pour la precision
pa.field("status", pa.dictionary(pa.int8(), pa.string())), # dict encoding
pa.field("country", pa.dictionary(pa.int8(), pa.string())), # cardinalite faible
])
# Ecriture avec options optimisees
table = pa.table(data, schema=schema)
pq.write_table(
table,
"orders.parquet",
compression="zstd", # meilleur ratio compression/vitesse
compression_level=3, # 1-22, 3 = bon compromis
row_group_size=128 * 1024 * 1024, # 128 MB par row group
use_dictionary=True, # activer le dict encoding
write_statistics=True, # statistiques pour predicate pushdown
)
# Lecture selective : seulement 2 colonnes sur 6
table = pq.read_table(
"orders.parquet",
columns=["order_id", "amount"], # column pruning
filters=[("amount", ">", 100)], # predicate pushdown
)
Benchmarks comparatifs
Voici des benchmarks representatifs sur un dataset de 1 milliard de lignes (TPC-DS scale 100) :
| Metrique | Parquet+Zstd | ORC+Zlib | Avro+Snappy | JSON.gz | CSV.gz |
|---|---|---|---|---|---|
| Taille fichier | 12.3 GB | 11.8 GB | 38.2 GB | 85.4 GB | 72.1 GB |
| Ratio compression | x8.1 | x8.5 | x2.6 | x1.2 | x1.4 |
| Scan 3 colonnes | 2.1 s | 2.4 s | 18.5 s | 45.2 s | 38.7 s |
| Scan complet | 15.3 s | 14.8 s | 12.1 s | 52.3 s | 41.6 s |
| Aggregation SUM | 0.8 s | 0.9 s | 14.2 s | 48.7 s | 39.1 s |
| Ecriture | 45 s | 52 s | 28 s | 35 s | 22 s |
Conclusion des benchmarks
Pour l'analytique (OLAP) : Parquet ou ORC sont 10 a 20 fois plus rapides que CSV/JSON grace au column pruning et au predicate pushdown. Pour le streaming et l'echange de messages : Avro est le choix optimal grace a son schema integre et son support natif dans Kafka.
Stocker un data lake en CSV ou JSON
Stocker les donnees analytiques d'un data lake en CSV ou JSON est l'un des anti-patterns les plus couteux. Une entreprise avec 10 TB de CSV dans S3 paierait ~230 EUR/mois en stockage. Le meme dataset en Parquet Zstd occuperait ~1.5 TB, soit ~35 EUR/mois. Sur les requetes Athena (facturees au volume scanne), l'economie est encore plus spectaculaire : x10 a x20 de reduction.
Twitter/X : Migration de JSON vers Parquet
Twitter stockait ses logs d'activite (impressions, clics, engagements) en JSON dans HDFS, representant plus de 500 TB par jour. La migration vers Parquet a reduit le stockage de 75%, accelere les requetes analytiques d'un facteur 15, et economise des millions de dollars annuels en infrastructure. La cle du succes : une migration progressive table par table avec validation des resultats a chaque etape.
Echec : Mauvais choix de row group size
Une equipe data a configure ses fichiers Parquet avec un row_group_size de 8 MB (au lieu des 128 MB recommandes) pour un dataset de 500 millions de lignes. Resultat : des milliers de row groups par fichier, un footer metadata enorme, et des performances de lecture degradees de 60%. Le predicate pushdown, bien que fonctionnel, etait inefficace car les plages min/max de chaque petit row group se chevauchaient trop.
20. Open Table Formats : Delta Lake, Apache Iceberg, Apache Hudi
Objectifs
- Comprendre pourquoi les Open Table Formats revolutionnent le data lakehouse
- Maitriser les transactions ACID, le time travel et le schema evolution
- Comparer Delta Lake, Apache Iceberg et Apache Hudi sur des criteres objectifs
- Identifier le bon format selon l'ecosysteme et le cas d'usage
Les Open Table Formats transforment un simple stockage objet (S3, GCS, ADLS) en un veritable data lakehouse avec des garanties transactionnelles. C'est l'evolution la plus importante du data engineering depuis l'apparition du cloud data warehouse. Maitrisez ces concepts : ils definissent l'architecture data de la prochaine decennie.
Le probleme que resolvent les Open Table Formats
Un data lake classique (fichiers Parquet dans S3) souffre de limitations majeures :
- Pas de transactions ACID : une ecriture partielle laisse le lake dans un etat inconsistant
- Pas de schema enforcement : n'importe quel fichier peut etre depose sans validation
- Pas de time travel : impossible de consulter l'etat des donnees a un instant T passe
- Updates/deletes impossibles : les fichiers Parquet sont immutables
- Small files problem : des milliers de petits fichiers degradent les performances
DATA LAKE CLASSIQUE DATA LAKEHOUSE (Open Table Format)
ββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββββ
β S3 / GCS / ADLS β β S3 / GCS / ADLS β
β βββββββ βββββββ ββββββ β β ββββββββββββββββββββββββββββββββ
β β.parqβ β.parqβ β.csvβ β β β METADATA LAYER (JSON/Avro) ββ
β βββββββ βββββββ ββββββ β β β - Transaction log ββ
β Pas de transactions β β β - Schema versioning ββ
β Pas de schema β β β - Partition metadata ββ
β Pas d'historique β β β - Statistics (min/max) ββ
β Pas d'updates β β ββββββββββββββββββββββββββββββββ
ββββββββββββββββββββββββββββ β βββββββ βββββββ βββββββ β
β β.parqβ β.parqβ β.parqβ β
β βββββββ βββββββ βββββββ β
β ACID + Time Travel + Updates β
ββββββββββββββββββββββββββββββββββββ
Delta Lake (Databricks)
Delta Lake, cree par Databricks, est le format le plus repandu grace a l'adoption massive de Spark et de la plateforme Databricks. Il utilise un transaction log (dossier _delta_log/) compose de fichiers JSON sequentiels qui enregistrent chaque operation.
# Ecriture ACID dans une table Delta
df.write.format("delta") \
.mode("overwrite") \
.partitionBy("date") \
.save("s3://datalake/orders_delta")
# MERGE (upsert) - INSERT ou UPDATE selon la cle
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "s3://datalake/orders_delta")
delta_table.alias("target").merge(
new_data.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Time Travel : lire la version 5 de la table
df_v5 = spark.read.format("delta") \
.option("versionAsOf", 5) \
.load("s3://datalake/orders_delta")
# Time Travel : lire a un timestamp specifique
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-06-15T10:00:00") \
.load("s3://datalake/orders_delta")
# VACUUM : supprimer les anciens fichiers (> 7 jours)
delta_table.vacuum(retentionHours=168)
Apache Iceberg
Apache Iceberg, initie chez Netflix, se distingue par son architecture de metadata a trois niveaux (catalog, metadata files, manifest files) qui permet une planification de requetes extremement rapide meme sur des tables de milliards de lignes. Il est vendor-neutral et supporte nativement Spark, Trino, Flink, et Dremio.
-- Creer une table Iceberg
CREATE TABLE catalog.db.orders (
order_id BIGINT,
customer_id INT,
order_date DATE,
amount DECIMAL(10,2),
status STRING
) USING iceberg
PARTITIONED BY (days(order_date)) -- partition transform!
TBLPROPERTIES (
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd'
);
-- MERGE INTO (upsert)
MERGE INTO catalog.db.orders AS target
USING new_orders AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Time Travel par snapshot ID
SELECT * FROM catalog.db.orders
VERSION AS OF 123456789;
-- Time Travel par timestamp
SELECT * FROM catalog.db.orders
TIMESTAMP AS OF '2024-06-15 10:00:00';
-- Schema Evolution : ajouter une colonne
ALTER TABLE catalog.db.orders
ADD COLUMNS (shipping_address STRING);
-- Hidden Partitioning : requete sans connaitre le partitionnement
SELECT * FROM catalog.db.orders
WHERE order_date = '2024-06-15';
-- Iceberg applique automatiquement le filtre sur la partition days(order_date)
Apache Hudi
Apache Hudi (Hadoop Upserts Deletes and Incrementals), cree chez Uber, est optimise pour les workloads de CDC et les upserts massifs. Il propose deux types de tables : Copy-on-Write (CoW) pour les lectures rapides et Merge-on-Read (MoR) pour les ecritures rapides.
CoW vs MoR dans Hudi
Copy-on-Write : chaque mise a jour reecrit le fichier Parquet entier. Les lectures sont rapides (fichiers Parquet natifs) mais les ecritures sont couteuses. Ideal pour les tables avec peu de mises a jour.
Merge-on-Read : les mises a jour sont ecrites dans des fichiers log (delta logs). Les lectures doivent merger les fichiers de base avec les delta logs. Ideal pour les tables avec beaucoup de mises a jour (CDC).
Comparaison des trois formats
| Critere | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| Createur | Databricks | Netflix | Uber |
| Transaction ACID | Oui (optimistic concurrency) | Oui (snapshot isolation) | Oui (MVCC) |
| Time Travel | Par version ou timestamp | Par snapshot ou timestamp | Par commit timeline |
| Schema Evolution | Ajout/renommage colonnes | Full evolution (ajout, renommage, reorder, promotion type) | Ajout/suppression colonnes |
| Hidden Partitioning | Non (partition explicite) | Oui (partition transforms) | Non (partition explicite) |
| Upsert Performance | Bon | Bon (v2 row-level deletes) | Excellent (optimise pour CDC) |
| Metadata Layer | JSON transaction log | 3 niveaux (catalog/metadata/manifest) | Timeline + .hoodie metadata |
| Query Planning | Bon | Excellent (manifest files) | Bon |
| Compaction | OPTIMIZE + Z-ORDER | Rewrite + sort | Inline/async compaction |
| Ecosysteme principal | Databricks, Spark | Multi-engine (Spark, Trino, Flink, Dremio, Snowflake) | Spark, Flink, Presto |
| Adoption Cloud | Databricks, Azure, AWS (limitee) | AWS (table format par defaut), Snowflake, GCP | AWS EMR, Glue |
Netflix : Iceberg pour le data lakehouse a l'echelle
Netflix a cree Apache Iceberg pour resoudre les problemes de scalabilite de Hive Metastore sur ses tables de plusieurs petabytes. L'architecture de metadata a trois niveaux permet de planifier des requetes sur des tables de billions de lignes en quelques millisecondes. Netflix utilise Iceberg comme format standard pour toutes ses tables analytiques, couplees a Spark et Trino.
- Plus de 10 000 tables Iceberg en production
- Reduction du temps de planning de requetes de minutes a millisecondes
- Hidden partitioning elimine 90% des erreurs de partitionnement utilisateur
Uber : Hudi pour le CDC a grande echelle
Uber a developpe Apache Hudi pour repliquer en quasi-temps reel ses bases transactionnelles (rides, drivers, payments) vers le data lake. Avec plus de 150 petabytes de donnees geres, Hudi traite des millions d'upserts par seconde. Le mode Merge-on-Read est ideal pour leur pattern CDC ou les mises a jour sont frequentes et les lectures analytiques tolerent un leger delta merge.
Databricks : Delta Lake et l'ecosysteme Unity Catalog
Databricks a construit tout son ecosysteme autour de Delta Lake : Unity Catalog pour la gouvernance, Delta Sharing pour le partage de donnees inter-organisations, et Photon comme moteur d'execution optimise pour Delta. Avec l'adoption de Delta Lake par Azure Synapse et AWS Glue, le format s'est impose comme le plus utilise globalement.
Utiliser un Open Table Format sans strategie de compaction
Sans compaction reguliere, les Open Table Formats accumulent des milliers de petits fichiers (surtout avec les upserts et le CDC). Une table Delta Lake sans OPTIMIZE peut voir ses performances de lecture se degrader de 80% en quelques semaines. Planifiez des jobs de compaction quotidiens ou hebdomadaires et surveillez le nombre de fichiers par partition.
Convergence des formats
L'industrie tend vers une convergence. Apache XTable (anciennement OneTable) permet la traduction entre les trois formats. Databricks a annonce UniForm, qui permet aux tables Delta d'etre lues comme Iceberg ou Hudi. A terme, le choix de format pourrait devenir moins critique, mais comprendre les differences reste essentiel pour l'optimisation.
21. Projet : Pipeline Modern Data Stack de bout en bout
Objectifs
- Concevoir et implementer un pipeline complet : ingestion, transformation, orchestration
- Integrer Airbyte, dbt, Airflow et Snowflake dans une architecture coherente
- Appliquer les bonnes pratiques de monitoring, tests et documentation
- Deployer un pipeline pret pour la production
Ce projet synthetise tout ce que vous avez appris dans la Phase 3. Prenez le temps de comprendre chaque etape. Un bon data architect ne se contente pas de faire fonctionner un pipeline : il le rend observable, testable, et maintenable. Documentez vos choix de conception.
Contexte du projet
Scenario : Plateforme e-commerce DataShop
DataShop est une plateforme e-commerce avec 500 000 clients actifs et 50 000 commandes par jour. L'equipe data doit construire un pipeline analytique complet pour alimenter les dashboards de la direction et les modeles de recommandation.
Sources de donnees :
- Base PostgreSQL transactionnelle : orders, customers, products, order_items
- API Stripe : paiements et remboursements
- Google Analytics 4 : trafic web et conversions
Objectifs analytiques :
- Chiffre d'affaires quotidien par categorie de produit
- Taux de conversion par canal d'acquisition
- Lifetime Value (LTV) client
- Taux de remboursement par vendeur
Architecture du pipeline
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β ORCHESTRATION (Airflow) β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DAG: datashop_daily_pipeline (schedule: 0 6 * * *) β β
β β β β
β β [1. Ingestion]βββΊ[2. dbt run]βββΊ[3. dbt test]βββΊ[4. Alert] β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββββββββ ββββββββββββββββ ββββββββββββββ βββββββββββββ
β INGESTION β β TRANSFORM β β QUALITY β β ALERTING β
β (Airbyte) β β (dbt) β β (dbt test)β β (Slack) β
β β β β β β β β
β PostgreSQLβββΊβ β staging/ β β not_null β β on_failureβ
β Stripe βββΊββββΊβ intermediateββββΊβ unique βββΊβ channel β
β GA4 βββΊβ β marts/ β β accepted β β #data- β
β β β β β relations β β alerts β
βββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββββββββ βββββββββββββ
β β
βΌ βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SNOWFLAKE β
β ββββββββββββ ββββββββββββββββ βββββββββββββββββββββββββ β
β β RAW β β TRANSFORMED β β MARTS β β
β β (landing)βββββΊβ (staging + βββββΊβ (business-ready) β β
β β β β intermediateβ β fct_orders β β
β β raw_pg_* β β stg_* β β fct_payments β β
β β raw_str_*β β int_* β β dim_customers β β
β β raw_ga4_*β β β β dim_products β β
β ββββββββββββ ββββββββββββββββ β mart_revenue_daily β β
β β mart_customer_ltv β β
β βββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lab : Implementation pas a pas
Etape 1 : Configuration Airbyte - Ingestion des sources
Configurez les trois connexions Airbyte pour charger les donnees brutes dans Snowflake :
# Connection PostgreSQL -> Snowflake
source:
name: datashop-postgres
sourceDefinitionId: postgres
connectionConfiguration:
host: db.datashop.internal
port: 5432
database: datashop_prod
username: ${AIRBYTE_PG_USER}
password: ${AIRBYTE_PG_PASSWORD}
ssl_mode: require
replication_method:
method: CDC
replication_slot: airbyte_datashop
publication: airbyte_publication
destination:
name: snowflake-raw
destinationDefinitionId: snowflake
connectionConfiguration:
host: xy12345.eu-west-1.snowflakecomputing.com
role: AIRBYTE_ROLE
warehouse: AIRBYTE_WH
database: DATASHOP
schema: RAW_POSTGRES
sync:
streams:
- name: orders
syncMode: incremental
destinationSyncMode: append_dedup
primaryKey: [["id"]]
cursorField: ["updated_at"]
- name: customers
syncMode: incremental
destinationSyncMode: append_dedup
primaryKey: [["id"]]
cursorField: ["updated_at"]
- name: products
syncMode: incremental
destinationSyncMode: append_dedup
primaryKey: [["id"]]
cursorField: ["updated_at"]
- name: order_items
syncMode: incremental
destinationSyncMode: append_dedup
primaryKey: [["id"]]
cursorField: ["updated_at"]
schedule:
scheduleType: cron
cronExpression: "0 5 * * *" # 5h UTC chaque jour
Etape 2 : Projet dbt - Structure et modeles
Organisez le projet dbt selon la convention staging / intermediate / marts :
dbt_datashop/
βββ dbt_project.yml
βββ models/
β βββ staging/
β β βββ stg_postgres/
β β β βββ _stg_postgres__sources.yml
β β β βββ stg_postgres__orders.sql
β β β βββ stg_postgres__customers.sql
β β β βββ stg_postgres__products.sql
β β β βββ stg_postgres__order_items.sql
β β βββ stg_stripe/
β β βββ _stg_stripe__sources.yml
β β βββ stg_stripe__payments.sql
β βββ intermediate/
β β βββ int_orders_enriched.sql
β β βββ int_customer_orders.sql
β βββ marts/
β βββ finance/
β β βββ fct_orders.sql
β β βββ fct_payments.sql
β β βββ mart_revenue_daily.sql
β βββ marketing/
β βββ dim_customers.sql
β βββ mart_customer_ltv.sql
βββ tests/
β βββ assert_positive_revenue.sql
β βββ assert_orders_have_items.sql
βββ macros/
βββ generate_schema_name.sql
{{ config(materialized='view') }}
with source as (
select * from {{ source('postgres', 'orders') }}
),
renamed as (
select
id as order_id,
customer_id,
status as order_status,
total_amount_cents / 100.0 as order_total,
currency,
created_at as ordered_at,
updated_at,
_airbyte_extracted_at as _loaded_at
from source
where _airbyte_meta ->> 'changes' is null -- exclure les lignes en erreur
)
select * from renamed
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
cluster_by=['ordered_at']
) }}
with orders as (
select * from {{ ref('stg_postgres__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
order_items as (
select * from {{ ref('stg_postgres__order_items') }}
),
payments as (
select * from {{ ref('stg_stripe__payments') }}
),
enriched as (
select
o.order_id,
o.customer_id,
o.order_status,
o.order_total,
o.currency,
o.ordered_at,
count(distinct oi.product_id) as nb_products,
sum(oi.quantity) as total_items,
coalesce(p.total_paid, 0) as total_paid,
coalesce(p.total_refunded, 0) as total_refunded,
case
when p.total_paid >= o.order_total then 'fully_paid'
when p.total_paid > 0 then 'partially_paid'
else 'unpaid'
end as payment_status,
o.updated_at
from orders o
left join order_items oi on o.order_id = oi.order_id
left join (
select
order_id,
sum(case when type = 'charge' then amount else 0 end) as total_paid,
sum(case when type = 'refund' then amount else 0 end) as total_refunded
from payments
group by order_id
) p on o.order_id = p.order_id
group by 1,2,3,4,5,6,o.updated_at,p.total_paid,p.total_refunded
)
select * from enriched
{{ config(materialized='table') }}
with customer_orders as (
select
c.customer_id,
c.customer_name,
c.email,
c.signup_date,
count(distinct o.order_id) as total_orders,
sum(o.order_total) as total_revenue,
sum(o.total_refunded) as total_refunds,
sum(o.order_total) - sum(o.total_refunded) as net_revenue,
min(o.ordered_at) as first_order_date,
max(o.ordered_at) as last_order_date,
datediff('day', min(o.ordered_at), max(o.ordered_at)) as customer_lifespan_days,
avg(o.order_total) as avg_order_value
from {{ ref('dim_customers') }} c
left join {{ ref('fct_orders') }} o on c.customer_id = o.customer_id
where o.order_status != 'cancelled'
group by 1,2,3,4
),
ltv_segments as (
select
*,
case
when net_revenue >= 5000 then 'VIP'
when net_revenue >= 1000 then 'High Value'
when net_revenue >= 200 then 'Medium Value'
else 'Low Value'
end as ltv_segment,
case
when last_order_date >= dateadd('day', -30, current_date()) then 'Active'
when last_order_date >= dateadd('day', -90, current_date()) then 'At Risk'
else 'Churned'
end as activity_status
from customer_orders
)
select * from ltv_segments
Etape 3 : Tests dbt et qualite des donnees
version: 2
models:
- name: fct_orders
description: "Table de faits des commandes enrichies avec paiements"
columns:
- name: order_id
description: "Identifiant unique de la commande"
tests:
- unique
- not_null
- name: order_total
description: "Montant total de la commande en EUR"
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 100000
- name: payment_status
tests:
- accepted_values:
values: ['fully_paid', 'partially_paid', 'unpaid']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
-- Test custom : chaque commande doit avoir au moins un article
-- Si cette requete retourne des lignes, le test echoue
select
o.order_id,
o.ordered_at,
o.total_items
from {{ ref('fct_orders') }} o
where o.total_items = 0
and o.order_status not in ('cancelled', 'draft')
Etape 4 : Orchestration Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda ctx: SlackWebhookOperator(
task_id="slack_alert",
slack_webhook_conn_id="slack_data_alerts",
message=f":red_circle: Pipeline echec: {ctx['task_instance'].task_id}",
).execute(ctx),
}
with DAG(
dag_id="datashop_daily_pipeline",
default_args=default_args,
description="Pipeline quotidien DataShop: ingestion + transformation + tests",
schedule_interval="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["datashop", "production", "daily"],
) as dag:
# Etape 1: Declenchement des syncs Airbyte
sync_postgres = AirbyteTriggerSyncOperator(
task_id="sync_postgres",
airbyte_conn_id="airbyte_default",
connection_id="pg-to-snowflake-connection-id",
asynchronous=True,
)
wait_postgres = AirbyteJobSensor(
task_id="wait_postgres_sync",
airbyte_conn_id="airbyte_default",
airbyte_job_id=sync_postgres.output,
timeout=3600,
poke_interval=60,
)
sync_stripe = AirbyteTriggerSyncOperator(
task_id="sync_stripe",
airbyte_conn_id="airbyte_default",
connection_id="stripe-to-snowflake-connection-id",
asynchronous=True,
)
wait_stripe = AirbyteJobSensor(
task_id="wait_stripe_sync",
airbyte_conn_id="airbyte_default",
airbyte_job_id=sync_stripe.output,
timeout=3600,
poke_interval=60,
)
# Etape 2: dbt run + test via Cosmos
dbt_transform = DbtTaskGroup(
group_id="dbt_transform",
project_config=ProjectConfig("/opt/airflow/dbt/dbt_datashop"),
profile_config=ProfileConfig(
profile_name="datashop",
target_name="prod",
),
default_args={"retries": 1},
)
# Etape 3: Notification de succes
notify_success = SlackWebhookOperator(
task_id="notify_success",
slack_webhook_conn_id="slack_data_alerts",
message=":white_check_mark: Pipeline DataShop quotidien termine avec succes !",
)
# Dependencies
[sync_postgres >> wait_postgres, sync_stripe >> wait_stripe] >> dbt_transform >> notify_success
Etape 5 : Monitoring et observabilite
Un pipeline de production necessite un monitoring proactif. Voici les metriques a surveiller :
| Metrique | Seuil d'alerte | Outil |
|---|---|---|
| Duree du pipeline | > 2x la duree moyenne | Airflow SLA |
| Volume de lignes ingere | Variation > 30% vs J-1 | dbt source freshness |
| Nombre de tests dbt en echec | > 0 en production | dbt test + Slack |
| Credits Snowflake consommes | > budget quotidien | Snowflake Resource Monitor |
| Latence end-to-end | > 2 heures | Airflow + custom sensor |
-- Creer un resource monitor pour le warehouse dbt
CREATE RESOURCE MONITOR datashop_monitor
WITH CREDIT_QUOTA = 50 -- 50 credits par mois
FREQUENCY = MONTHLY
START_TIMESTAMP = IMMEDIATELY
TRIGGERS
ON 75 PERCENT DO NOTIFY -- alerte a 75%
ON 90 PERCENT DO NOTIFY -- alerte a 90%
ON 100 PERCENT DO SUSPEND; -- suspendre a 100%
ALTER WAREHOUSE DBT_WH SET RESOURCE_MONITOR = datashop_monitor;
Checklist de mise en production
Avant de deployer en production, verifiez : (1) Tous les tests dbt passent sur un jeu de donnees complet, (2) Le DAG Airflow a ete execute 3 fois sans erreur en staging, (3) Les alertes Slack sont configurees et testees, (4) Le resource monitor Snowflake est actif, (5) La documentation dbt est generee et accessible, (6) Le plan de rollback est documente.
Echec : Pipeline sans idempotence
Une equipe a deploye un pipeline Airflow + dbt sans s'assurer de l'idempotence. Lors d'un retry automatique apres un timeout reseau, les modeles dbt en mode append ont duplique 2 millions de lignes dans la table de faits. Le dashboard de revenus affichait un CA double pendant 6 heures avant detection. La correction : passer en mode incremental avec unique_key et ajouter un test unique sur la cle primaire.
22. Examen Final - Phase 3 : Modern Data Stack
Objectifs
- Valider l'ensemble des connaissances acquises dans la Phase 3
- Demontrer votre maitrise des Cloud Data Warehouses, dbt, orchestrateurs, ELT/CDC et formats de fichiers
- Obtenir la certification Phase 3 - Modern Data Stack
Cet examen final couvre toute la Phase 3. Vous devez obtenir au moins 9/12 pour valider cette phase. Prenez votre temps, relisez les questions attentivement. Si vous echouez, revisez les lecons correspondantes et repassez l'examen. Bonne chance !
Examen Final Phase 3 (12 questions)
Q1. Quel modele de tarification utilise Snowflake qui le differencie de BigQuery ?
Q2. Quelle strategie de materialisation dbt est la plus adaptee pour un modele staging qui transforme des donnees brutes ?
Q3. Quel est l'avantage principal des Deferrable Operators dans Airflow 2.x ?
Q4. Quelle commande dbt genere la documentation interactive du projet et la sert localement ?
Q5. Quel format de fichier est optimal pour les messages Kafka dans une architecture CDC avec Debezium ?
Q6. Quelle fonctionnalite d'Apache Iceberg elimine la necessite pour les utilisateurs de connaitre le schema de partitionnement ?
Q7. Dans un pipeline dbt, quel test detecterait une violation de cle etrangere entre fct_orders et dim_customers ?
Q8. Quel est l'avantage principal du mode Merge-on-Read (MoR) d'Apache Hudi ?
Q9. Quelle est la bonne pratique pour gerer la taille des fichiers dans un data lake Parquet ?
Q10. Quel outil ELT utilise la facturation Monthly Active Rows (MAR) et peut devenir tres couteux sur des tables volumineuses ?
Q11. Dans une architecture Modern Data Stack, quel est l'ordre correct des couches de transformation dbt ?
Q12. Quelle propriete garantit qu'un pipeline peut etre reexecute plusieurs fois sans effet de bord (duplication, corruption) ?
Resultats et prochaines etapes
Si vous avez obtenu 9/12 ou plus : felicitations, vous maitrisez le Modern Data Stack ! Vous etes pret pour la Phase 4 : Architecture Cloud qui approfondira AWS, GCP et Azure pour le data engineering.
Si vous avez obtenu moins de 9/12 : revisez les lecons du module concerne et repassez l'examen. Concentrez-vous sur les concepts que vous avez manques.