Introduction au Modern Data Stack

45 min Intermediaire

Objectifs

  • Comprendre ce qu'est le Modern Data Stack et pourquoi il a emerge
  • Identifier les composants cles : ingestion, transformation, stockage, orchestration, visualisation
  • Analyser l'evolution du legacy vers le cloud-native
  • Evaluer les avantages et limites de cette approche modulaire

En tant qu'architecte data ayant accompagne plus de 30 migrations vers le cloud, je peux vous affirmer que le Modern Data Stack n'est pas qu'un buzzword. C'est un changement fondamental dans la facon dont nous concevons les pipelines de donnees. La cle, c'est de comprendre que chaque composant est interchangeable : vous n'etes plus enferme dans un ecosysteme monolithique. Mais attention, cette modularite a un prix : la complexite d'integration. Mon conseil : commencez simple, puis iterez.

Qu'est-ce que le Modern Data Stack ?

Le Modern Data Stack (MDS) designe un ensemble d'outils cloud-native, modulaires et souvent SaaS, qui couvrent l'ensemble du cycle de vie des donnees. Contrairement aux architectures traditionnelles basees sur des solutions monolithiques on-premise (Informatica, Teradata, Oracle DWH), le MDS privilegie la specialisation : chaque outil excelle dans une tache precise et communique avec les autres via des API et des formats standardises.

Les 5 piliers du Modern Data Stack

1. Ingestion (Fivetran, Airbyte, Stitch) - Extraction et chargement des donnees brutes depuis les sources.
2. Stockage / Compute (Snowflake, BigQuery, Databricks) - Entrepot cloud elastique separant stockage et calcul.
3. Transformation (dbt, Dataform) - Transformation SQL-first dans l'entrepot (ELT vs ETL).
4. Orchestration (Airflow, Dagster, Prefect) - Planification et monitoring des pipelines.
5. Visualisation (Looker, Tableau, Metabase, Power BI) - Consommation et exploration des donnees.

Architecture Modern Data Stack
  SOURCES                INGESTION          STOCKAGE/COMPUTE        TRANSFORMATION       CONSOMMATION
 +----------+         +------------+       +-----------------+      +-------------+     +-------------+
 | SaaS APIs|-------->|            |       |                 |      |             |     |  Looker     |
 | (Stripe, |         | Fivetran   |------>|   Snowflake     |----->|    dbt      |---->|  Tableau    |
 |  HubSpot)|         | Airbyte    |       |   BigQuery      |      |   Dataform  |     |  Metabase   |
 +----------+         | Stitch     |       |   Databricks    |      |             |     |  Power BI   |
 | Databases|-------->|            |       |                 |      +-------------+     +-------------+
 | (Postgres|         +------------+       +-----------------+            |                    |
 |  MySQL)  |                                     |                      v                    v
 +----------+                               +-----+------+        +-----------+       +-----------+
 | Fichiers |                               | Data Lake  |        | Data      |       | Reverse   |
 | (S3, GCS)|                               | (Parquet,  |        | Catalog   |       | ETL       |
 +----------+                               |  Delta)    |        | (dbt docs)|       | (Census,  |
                                            +------------+        +-----------+       |  Hightouch)|
                                                                                      +-----------+

De l'ETL au ELT : un changement de paradigme

L'approche traditionnelle ETL (Extract, Transform, Load) imposait de transformer les donnees avant de les charger dans l'entrepot, principalement parce que le compute etait couteux et limite. Avec le cloud, le stockage est devenu quasi-gratuit et le compute elastique. Le paradigme ELT (Extract, Load, Transform) s'impose naturellement :

CritereETL TraditionnelELT Moderne
Ou se fait la transformation ?Serveur ETL dedie (Informatica, Talend)Dans l'entrepot cloud (via dbt, SQL)
Donnees brutes conservees ?Souvent non (transformees avant chargement)Oui, couche raw preservee
ScalabiliteLimitee par le serveur ETLElastique (scale-out cloud)
Cout initialEleve (licences, serveurs)Pay-as-you-go
Time-to-valueSemaines a moisHeures a jours
Flexibilite schemaSchema-on-write rigideSchema-on-read flexible

Netflix : Migration vers le Modern Data Stack

Netflix a ete l'un des pionniers de l'adoption du cloud pour ses donnees. En migrant d'Oracle on-premise vers un ecosysteme base sur AWS (S3 pour le stockage, Spark pour le compute, et des outils internes pour la transformation), Netflix a pu :

  • Reduire le temps de traitement de ses pipelines de recommandation de 12h a 45 minutes
  • Passer de 50 TB a plus de 100 PB de donnees gerees sans augmentation proportionnelle des couts
  • Permettre a plus de 500 data engineers et analysts de travailler en parallele sans contention de ressources
  • Deployer des modeles de ML de personnalisation alimentes en quasi temps reel

RetailCorp (anonymise) : Migration precipitee vers le MDS

Une grande chaine de distribution a voulu migrer l'ensemble de son infrastructure data en 6 mois vers un Modern Data Stack complet (Fivetran + Snowflake + dbt + Looker). Le projet a connu un depassement budgetaire de 300% car :

  • Les pipelines legacy avaient des logiques metier complexes non documentees, impossibles a reproduire en SQL pur
  • L'absence de data governance a entraine des doublons et des incoherences dans les dashboards
  • Les equipes n'avaient pas ete formees aux nouveaux outils, generant une resistance au changement massive

Anti-pattern : le "Shiny Object Syndrome"

Adopter chaque nouvel outil du MDS parce qu'il est a la mode, sans evaluer s'il repond a un besoin reel. On se retrouve avec 15 outils SaaS, des couts d'abonnement exponentiels, et une equipe incapable de maitriser l'ensemble.

Solution : Commencez avec un stack minimal (un ingestion tool, un DWH cloud, dbt, un outil de BI). Ajoutez des composants uniquement quand un besoin concret est identifie et que l'equipe a la capacite de l'absorber. Evaluez le TCO sur 3 ans avant chaque ajout.

Scenario : Votre premier audit Modern Data Stack

Vous etes recrute comme Data Architect dans une scale-up de 200 employes. L'equipe data (3 personnes) utilise actuellement : des scripts Python pour l'extraction, un PostgreSQL de 500 Go comme "data warehouse", des notebooks Jupyter pour la transformation, et Google Sheets pour le reporting. Le CEO veut "passer au cloud". Comment structurez-vous votre recommandation ?

Pensez aux criteres suivants : volume de donnees prevu a 2 ans, competences de l'equipe actuelle, budget disponible, cas d'usage prioritaires (reporting operationnel vs analytics avancee vs ML), et contraintes reglementaires (RGPD, residences des donnees).

Conseil pratique : le Medallion Architecture

Quelle que soit la solution choisie, structurez vos donnees en 3 couches :
Bronze (Raw) : donnees brutes telles qu'extraites des sources, immutables.
Silver (Cleaned) : donnees nettoyees, dedupliquees, typees correctement.
Gold (Business) : modeles metier agrages, prets pour la consommation BI et ML.
Cette approche, popularisee par Databricks, est universellement applicable.

Quelle est la difference fondamentale entre ETL et ELT ?
En ETL, la transformation se fait avant le chargement dans l'entrepot (sur un serveur intermediaire). En ELT, les donnees sont d'abord chargees brutes dans l'entrepot cloud, puis transformees sur place en utilisant la puissance de calcul elastique du DWH. L'ELT est devenu dominant grace a la separation stockage/compute du cloud.
Citez les 5 piliers du Modern Data Stack.
1. Ingestion (Fivetran, Airbyte) - 2. Stockage/Compute (Snowflake, BigQuery, Databricks) - 3. Transformation (dbt) - 4. Orchestration (Airflow, Dagster) - 5. Visualisation (Looker, Tableau, Power BI).
Qu'est-ce que le Medallion Architecture ?
C'est une architecture en 3 couches : Bronze (donnees brutes), Silver (donnees nettoyees et normalisees), Gold (modeles metier agrages pour la consommation). Elle permet la tracabilite, la reproductibilite et la qualite des donnees.

Snowflake Deep Dive

45 min Intermediaire

Objectifs

  • Maitriser l'architecture multi-cluster de Snowflake (separation stockage, compute, services)
  • Comprendre et utiliser le Time Travel et le Zero-Copy Cloning
  • Configurer le Data Sharing et les materialized views
  • Optimiser les couts avec les warehouse sizing et les resource monitors

J'ai deploye Snowflake chez plus de 15 clients, et je peux vous dire que la puissance de cet outil est aussi sa faiblesse potentielle. La separation compute/stockage est revolutionnaire, mais si vous ne mettez pas en place des resource monitors des le premier jour, la facture peut exploser en quelques heures. J'ai vu un client depenser 40 000 euros en un week-end parce qu'un warehouse XL etait reste allume. Mon premier reflexe : toujours configurer l'auto-suspend a 5 minutes et mettre des alertes budgetaires.

Architecture Snowflake en profondeur

Snowflake repose sur une architecture unique en trois couches independantes, ce qui le differencie fondamentalement des solutions traditionnelles (Teradata, Oracle) ou meme de certains concurrents cloud.

Architecture 3 couches de Snowflake
 +=========================================================+
 |              COUCHE SERVICES (Cloud Services)            |
 |  +------------------+  +------------+  +--------------+ |
 |  | Query Optimizer  |  | Metadata   |  | Security &   | |
 |  | & Compiler       |  | Management |  | Auth (RBAC)  | |
 |  +------------------+  +------------+  +--------------+ |
 |  +------------------+  +------------+  +--------------+ |
 |  | Transaction Mgmt |  | Result     |  | Infrastructure| |
 |  |                  |  | Cache      |  | Management   | |
 |  +------------------+  +------------+  +--------------+ |
 +=========================================================+
              |                    |                |
 +=========================================================+
 |          COUCHE COMPUTE (Virtual Warehouses)             |
 |  +-------------+  +-------------+  +-------------+     |
 |  | WH_ANALYST  |  | WH_ETL      |  | WH_ML       |     |
 |  | (X-Small)   |  | (Large)     |  | (X-Large)   |     |
 |  | 1 credit/h  |  | 8 credits/h |  | 16 credits/h|     |
 |  +-------------+  +-------------+  +-------------+     |
 |  Chaque warehouse = cluster independant, pas de contention|
 +=========================================================+
              |                    |                |
 +=========================================================+
 |            COUCHE STOCKAGE (Cloud Storage)               |
 |  +---------------------------------------------------+  |
 |  | Micro-partitions compressees (columnar, immutables)|  |
 |  | Stockage : AWS S3 / Azure Blob / GCP GCS          |  |
 |  | Format interne optimise (pas Parquet, pas ORC)     |  |
 |  +---------------------------------------------------+  |
 +=========================================================+

Les Micro-partitions : le secret de Snowflake

Snowflake stocke les donnees dans des micro-partitions de 50 a 500 Mo (compressees). Chaque micro-partition est immutable et contient les metadonnees suivantes : valeurs min/max par colonne, nombre de valeurs distinctes, et null count. Cela permet un pruning extremement efficace : le query optimizer peut eliminer des partitions entieres sans les lire, grace aux metadonnees. Contrairement a un index traditionnel, ce pruning est automatique et ne necessite aucune maintenance.

Time Travel : remonter dans le temps

Le Time Travel de Snowflake permet d'acceder aux donnees telles qu'elles etaient a un moment passe. C'est possible grace a l'immutabilite des micro-partitions : quand une donnee est modifiee, une nouvelle micro-partition est creee, mais l'ancienne est conservee pendant la periode de retention.

SQL
-- Interroger une table telle qu'elle etait il y a 30 minutes
SELECT * FROM orders
AT(OFFSET => -60*30);

-- Interroger a un timestamp precis
SELECT * FROM orders
AT(TIMESTAMP => '2025-12-01 10:00:00'::TIMESTAMP);

-- Restaurer une table supprimee par erreur
UNDROP TABLE orders;

-- Creer une table a partir d'un etat passe
CREATE TABLE orders_backup AS
SELECT * FROM orders
AT(TIMESTAMP => '2025-12-01 08:00:00'::TIMESTAMP);

-- Retention configurable (0 a 90 jours selon l'edition)
ALTER TABLE orders SET DATA_RETENTION_TIME_IN_DAYS = 30;

Zero-Copy Cloning

Le Zero-Copy Cloning permet de creer une copie logique d'une table, d'un schema ou d'une base entiere sans dupliquer physiquement les donnees. Les deux objets partagent les memes micro-partitions sous-jacentes. Ce n'est que lorsqu'une modification est effectuee sur l'un des deux objets qu'une nouvelle micro-partition est creee (copy-on-write).

SQL
-- Cloner une base entiere pour un environnement de dev
CREATE DATABASE dev_analytics CLONE prod_analytics;

-- Cloner un schema specifique
CREATE SCHEMA staging.test_schema CLONE staging.prod_schema;

-- Cloner une table pour des tests
CREATE TABLE orders_test CLONE orders;

-- Verifier l'espace reel consomme (quasi-zero au debut)
SELECT TABLE_NAME, ACTIVE_BYTES, TIME_TRAVEL_BYTES, RETAINED_FOR_CLONE_BYTES
FROM INFORMATION_SCHEMA.TABLE_STORAGE_METRICS
WHERE TABLE_NAME IN ('ORDERS', 'ORDERS_TEST');

Spotify : Data Sharing et multi-cluster

Spotify utilise Snowflake pour partager des donnees d'ecoute agregees avec ses partenaires (labels musicaux, artistes). Grace au Secure Data Sharing, Spotify expose des vues securisees sans copier les donnees :

  • Les labels accedent aux statistiques de streaming via leur propre compte Snowflake, sans transfert de donnees
  • Les couts de compute sont supportes par le consommateur (le label), pas par Spotify
  • Les donnees sont toujours fraiches car il n'y a pas de copie a synchroniser
  • La gouvernance est centralisee : Spotify peut revoquer l'acces instantanement

Data Sharing

SQL
-- Creer un share pour un partenaire
CREATE SHARE partner_analytics;

-- Ajouter des objets au share (vues securisees recommandees)
CREATE SECURE VIEW analytics.public.v_partner_metrics AS
SELECT region, product_category, SUM(revenue) as total_revenue,
       COUNT(DISTINCT customer_id) as unique_customers
FROM analytics.public.sales
GROUP BY region, product_category;

GRANT USAGE ON DATABASE analytics TO SHARE partner_analytics;
GRANT USAGE ON SCHEMA analytics.public TO SHARE partner_analytics;
GRANT SELECT ON VIEW analytics.public.v_partner_metrics TO SHARE partner_analytics;

-- Ajouter le compte consommateur
ALTER SHARE partner_analytics ADD ACCOUNTS = 'PARTNER_ACCT_123';

Anti-pattern : Warehouse surdimensionne permanent

Utiliser un warehouse X-Large (16 credits/h ~ 48$/h) en permanence "au cas ou" alors que 95% des requetes pourraient tourner sur un Small (2 credits/h ~ 6$/h). Certaines equipes configurent un seul gros warehouse partage par tous les utilisateurs, recreant exactement les problemes de contention qu'on voulait eviter.

Solution : Creer des warehouses dedies par usage (WH_ETL pour les pipelines, WH_ANALYST pour les analyses ad-hoc, WH_BI pour les dashboards). Configurer l'auto-suspend a 1-5 minutes, et l'auto-resume. Utiliser les resource monitors pour alerter a 75% et suspendre a 100% du budget mensuel.

SQL
-- Configuration optimale des warehouses
CREATE WAREHOUSE wh_etl WITH
  WAREHOUSE_SIZE = 'LARGE'
  AUTO_SUSPEND = 60          -- 1 minute sans activite
  AUTO_RESUME = TRUE
  MIN_CLUSTER_COUNT = 1
  MAX_CLUSTER_COUNT = 3      -- Multi-cluster auto-scaling
  SCALING_POLICY = 'ECONOMY';

-- Resource monitor pour controler les couts
CREATE RESOURCE MONITOR monthly_budget WITH
  CREDIT_QUOTA = 5000
  FREQUENCY = MONTHLY
  START_TIMESTAMP = IMMEDIATELY
  TRIGGERS
    ON 75 PERCENT DO NOTIFY
    ON 90 PERCENT DO NOTIFY
    ON 100 PERCENT DO SUSPEND;

Piege du clustering key

Ne definissez un clustering key que sur les tables de plus de 1 To avec des patterns de requetes tres previsibles. Le re-clustering automatique consomme des credits en arriere-plan. Pour les tables de moins de 1 To, le pruning naturel des micro-partitions est generalement suffisant. Verifiez d'abord avec SYSTEM$CLUSTERING_INFORMATION('table').

Scenario : Optimisation des couts Snowflake

Vous etes Data Architect chez un e-commerce. La facture Snowflake est passee de 5 000 a 25 000 euros/mois en 3 mois. Le CFO vous demande de reduire de 50% sans impacter les performances. Vous decouvrez : un warehouse XL qui ne s'eteint jamais, 200 tables avec Time Travel a 90 jours, des requetes COPY INTO qui rechargent les memes fichiers, et 50 clones abandonnes de databases de production. Quel plan d'action proposez-vous ?

Quelle est la taille d'une micro-partition Snowflake et pourquoi est-elle importante ?
Une micro-partition fait entre 50 et 500 Mo (compressees). Elle est immutable et contient des metadonnees (min/max, distinct count) qui permettent le partition pruning automatique : le moteur peut eliminer des partitions entieres sans les lire, rendant les requetes rapides sans index.
Quelle est la difference entre Time Travel et Fail-safe dans Snowflake ?
Le Time Travel (0-90 jours, configurable) permet a l'utilisateur de requeter ou restaurer des donnees passees via SQL. Le Fail-safe est une periode supplementaire de 7 jours APRES le Time Travel, accessible uniquement par le support Snowflake en cas de catastrophe. Il ne consomme pas de credits mais coute en stockage.

Google BigQuery

45 min Intermediaire

Objectifs

  • Comprendre l'architecture serverless de BigQuery et le systeme de slots
  • Maitriser le partitionnement, le clustering et les tables materialisees
  • Utiliser BI Engine, les streaming inserts et l'integration ML (BQML)
  • Optimiser les couts avec les modeles de facturation on-demand vs flat-rate

BigQuery est la solution que je recommande quand le client veut zero gestion d'infrastructure. Pas de warehouse a dimensionner, pas de cluster a monitorer. Vous ecrivez du SQL, Google gere le reste. Mais la contrepartie, c'est que vous avez moins de controle sur les performances granulaires. Mon astuce principale : toujours partitionner par date et clusterer par les colonnes les plus filtrees. J'ai vu des factures divisees par 10 simplement en ajoutant un partitionnement sur une table de 5 To.

Architecture Serverless de BigQuery

BigQuery est un data warehouse entierement serverless. Contrairement a Snowflake ou vous gerez des virtual warehouses, BigQuery abstrait completement le compute. Il repose sur trois composants internes de Google :

Architecture interne de BigQuery
  +------------------------------------------------------------------+
  |                     BigQuery API / Console                        |
  +------------------------------------------------------------------+
              |                    |                    |
  +-------------------+  +------------------+  +------------------+
  |    Dremel Engine   |  |   Colossus       |  |   Jupiter Network|
  | (Execution moteur) |  | (Stockage dist.) |  | (Reseau interne) |
  |                    |  |                  |  |                  |
  | - Arbre de servers |  | - Columnar format|  | - 1 Petabit/sec  |
  |   (mixers + leaf)  |  | - Capacitor      |  | - Transfert      |
  | - Execution SQL    |  | - Replique auto  |  |   stockage<>calc |
  | - Slots = unites   |  | - Compression    |  |                  |
  |   de calcul        |  |   automatique    |  |                  |
  +-------------------+  +------------------+  +------------------+
              |                    |                    |
  +------------------------------------------------------------------+
  |                    Borg (Orchestration Google)                     |
  |         Allocation dynamique des ressources compute               |
  +------------------------------------------------------------------+

Les Slots BigQuery

Un slot est une unite de calcul virtuelle dans BigQuery. Chaque requete consomme un certain nombre de slots qui varient dynamiquement. En mode on-demand, vous partagez un pool de 2 000 slots avec d'autres utilisateurs de votre projet. En mode flat-rate (editions), vous reservez un nombre fixe de slots (100 minimum) pour un tarif mensuel previsible. Le choix entre les deux depend du volume de requetes : au-dela de ~30 To/mois scannes, le flat-rate devient generalement plus economique.

Partitionnement et Clustering

SQL
-- Table partitionnee par date + clusterisee
CREATE TABLE `project.dataset.events`
(
  event_id STRING,
  event_timestamp TIMESTAMP,
  user_id STRING,
  event_type STRING,
  country STRING,
  device_type STRING,
  revenue NUMERIC
)
PARTITION BY DATE(event_timestamp)       -- Partitionnement par jour
CLUSTER BY country, event_type, device_type  -- Clustering (max 4 colonnes)
OPTIONS (
  partition_expiration_days = 365,        -- Supprime les partitions > 1 an
  require_partition_filter = TRUE         -- OBLIGE a filtrer par date
);

-- Requete optimisee : scanne seulement la partition du jour + cluster prune
SELECT country, event_type, SUM(revenue) as total_revenue
FROM `project.dataset.events`
WHERE DATE(event_timestamp) = '2025-12-01'  -- Partition pruning
  AND country = 'FR'                         -- Cluster pruning
GROUP BY country, event_type;

-- Verifier les octets scannes (estimation)
-- Utiliser l'option --dry_run dans bq CLI ou l'estimation dans la console

BI Engine : acceleration in-memory

BI Engine est un service d'acceleration in-memory integre a BigQuery. Il maintient un cache intelligent des donnees les plus requetees, reduisant la latence des requetes de secondes a millisecondes. Il est particulierement utile pour les dashboards Looker ou Data Studio.

SQL
-- Creer une reservation BI Engine (via SQL admin)
-- Reserve 10 Go de memoire pour le projet
CREATE BI_ENGINE_RESERVATION
  PROJECT_ID = 'my-analytics-project',
  LOCATION = 'EU',
  SIZE_GB = 10;

-- Les tables materialisees sont automatiquement candidates pour BI Engine
CREATE MATERIALIZED VIEW `project.dataset.mv_daily_sales` AS
SELECT
  DATE(order_timestamp) as order_date,
  product_category,
  region,
  COUNT(*) as order_count,
  SUM(amount) as total_revenue,
  AVG(amount) as avg_order_value
FROM `project.dataset.orders`
GROUP BY 1, 2, 3;

Streaming Inserts et BigQuery ML

SQL
-- BigQuery ML : creer un modele de prediction directement en SQL
CREATE OR REPLACE MODEL `project.dataset.churn_model`
OPTIONS (
  model_type = 'LOGISTIC_REG',
  input_label_cols = ['churned'],
  auto_class_weights = TRUE,
  data_split_method = 'AUTO_SPLIT'
) AS
SELECT
  days_since_last_purchase,
  total_orders,
  avg_order_value,
  support_tickets_count,
  email_open_rate,
  churned
FROM `project.dataset.customer_features`
WHERE signup_date < '2025-01-01';

-- Predire le churn sur les nouveaux clients
SELECT customer_id, predicted_churned, predicted_churned_probs
FROM ML.PREDICT(MODEL `project.dataset.churn_model`,
  (SELECT * FROM `project.dataset.customer_features`
   WHERE signup_date >= '2025-01-01'))
ORDER BY predicted_churned_probs[OFFSET(1)].prob DESC
LIMIT 100;

Uber : BigQuery pour l'analyse temps reel

Uber utilise BigQuery pour analyser les patterns de deplacement et optimiser la tarification dynamique (surge pricing). Grace aux streaming inserts et BI Engine :

  • Plus de 100 milliards d'evenements de trajet analyses par jour
  • Latence d'ingestion en streaming reduite a moins de 5 secondes
  • Les analystes pricing peuvent requeter les donnees en temps quasi reel pour ajuster les modeles
  • BQML permet de prototyper des modeles de demande directement en SQL, accelerant le cycle d'experimentation de semaines a heures

StartupTech (anonymise) : Explosion des couts BigQuery

Une startup SaaS a migre vers BigQuery en mode on-demand sans mettre en place de guardrails. Un data analyst a lance un SELECT * sur une table de 20 To par erreur depuis un notebook, generant une facture de 100$ en une seule requete. Le probleme s'est aggrave quand des requetes automatisees non optimisees ont ete planifiees :

  • Pas de require_partition_filter sur les grandes tables
  • Aucune limite de bytes scannes configuree au niveau du projet
  • Facture mensuelle passee de 500$ a 15 000$ en un mois

Anti-pattern : SELECT * sans partition filter

Lancer des requetes SELECT * sur des tables de plusieurs teraoctets sans filtrer par la colonne de partition. En mode on-demand, BigQuery facture 6.25$/To scanne. Une requete full-scan sur 10 To coute 62.50$ a chaque execution.

Solution : Activer require_partition_filter = TRUE sur toutes les tables partitionnees. Configurer des custom quotas par utilisateur pour limiter les To scannes par jour. Utiliser des vues materialisees pour les requetes recurrentes. Former les equipes a toujours verifier l'estimation de couts avant d'executer.

Optimisation : colonnes imbriquees (STRUCT et ARRAY)

BigQuery excelle avec les types STRUCT et ARRAY, qui evitent les jointures couteuses. Au lieu de normaliser en etoile classique, denormalisez en utilisant des colonnes imbriquees. Exemple : au lieu d'une table orders jointe a order_items, creez une seule table orders avec une colonne items ARRAY<STRUCT<product_id STRING, qty INT64, price NUMERIC>>. Cela reduit les shuffles et accelere les requetes.

Scenario : Migration Redshift vers BigQuery

Vous etes sollicite par un client media qui veut migrer de Redshift vers BigQuery. Leur cluster Redshift (ra3.4xlarge, 6 noeuds) coute 18 000$/mois et les analystes se plaignent de requetes lentes aux heures de pointe. Le dataset fait 50 To, avec 200 utilisateurs concurrents. Comment structurez-vous la migration ? Quel modele de facturation recommandez-vous (on-demand vs editions) ? Comment gerez-vous la compatibilite SQL (Redshift SQL vs BigQuery Standard SQL) ?

Quelle est la difference entre partitionnement et clustering dans BigQuery ?
Le partitionnement divise physiquement la table en segments (par date, entier ou ingestion time), permettant d'eliminer des partitions entieres. Le clustering trie les donnees a l'interieur de chaque partition selon 1 a 4 colonnes, permettant un scan plus cible. Le partitionnement est obligatoire pour les grandes tables, le clustering est complementaire.
A partir de quel volume de scan mensuel le mode flat-rate BigQuery devient-il plus economique que le on-demand ?
En regle generale, au-dela de 30 a 50 To scannes par mois, le flat-rate (editions) devient plus economique. Le calcul exact depend du nombre de slots reserves et du pattern de requetes (continu vs burst). Le mode on-demand facture 6.25$/To scanne, tandis que le flat-rate offre un tarif mensuel fixe pour un nombre garanti de slots.

Databricks & Spark

45 min Intermediaire

Objectifs

  • Comprendre le concept de Lakehouse et son positionnement entre Data Lake et Data Warehouse
  • Maitriser Delta Lake : transactions ACID, time travel, schema evolution
  • Decouvrir Unity Catalog pour la gouvernance unifiee
  • Evaluer les performances du moteur Photon et ses cas d'usage

Databricks est la solution que je recommande quand le client a des besoins a la fois en analytics et en machine learning avance. Le concept de Lakehouse est brillant : vous gardez la flexibilite du data lake (stocker tout type de donnees) avec les garanties du data warehouse (transactions ACID, schema enforcement). Cependant, Databricks demande plus de competences techniques que Snowflake ou BigQuery. Si votre equipe est composee principalement d'analystes SQL, orientez-vous vers les deux autres. Si vous avez des data engineers et des ML engineers, Databricks sera votre terrain de jeu ideal.

Le Paradigme Lakehouse

Le Lakehouse est une architecture qui combine les avantages du Data Lake (stockage pas cher, formats ouverts, support ML) et du Data Warehouse (transactions ACID, performances SQL, gouvernance). Databricks a popularise ce concept avec Delta Lake, une couche transactionnelle open-source au-dessus de Parquet.

Evolution : Data Warehouse -> Data Lake -> Lakehouse
  DATA WAREHOUSE (1990-2010)     DATA LAKE (2010-2020)        LAKEHOUSE (2020+)
  +------------------+           +------------------+         +------------------+
  | Schema rigide    |           | Schema flexible  |         | Schema flexible  |
  | SQL performant   |           | Tout format      |         | + ACID transactions|
  | ACID transactions|           | Pas cher (S3)    |         | SQL performant   |
  | Gouvernance      |           | ML/DS friendly   |         | ML/DS friendly   |
  +------------------+           +------------------+         | Gouvernance      |
  | MAIS:            |           | MAIS:            |         | Formats ouverts  |
  | - Cher           |           | - Pas de ACID    |         +------------------+
  | - Formats proprio|           | - "Data Swamp"   |         | = Delta Lake +   |
  | - Pas ML-friendly|           | - Pas de gouver. |         |   Spark Engine + |
  +------------------+           | - Perf SQL faible|         |   Unity Catalog  |
                                 +------------------+         +------------------+

Delta Lake en profondeur

Delta Lake est le format de table open-source qui rend le Lakehouse possible. Il ajoute une couche de metadonnees transactionnelles (le transaction log ou _delta_log) au-dessus de fichiers Parquet standards.

Les 4 garanties de Delta Lake

Atomicite : chaque operation (INSERT, UPDATE, DELETE, MERGE) est entierement reussie ou entierement annulee.
Coherence : le schema est enforce, les contraintes sont respectees (CHECK, NOT NULL).
Isolation : les ecritures concurrentes sont gerees via un optimistic concurrency control.
Durabilite : les donnees validees sont persistees de maniere fiable sur le stockage cloud.

Python
# Creer une table Delta avec Spark
from delta.tables import DeltaTable

# Ecriture initiale
df_events = spark.read.json("s3://raw-data/events/2025/12/")
df_events.write.format("delta") \
    .partitionBy("event_date") \
    .mode("overwrite") \
    .save("s3://lakehouse/bronze/events")

# MERGE (upsert) - operation impossible avec un Data Lake classique
delta_table = DeltaTable.forPath(spark, "s3://lakehouse/silver/customers")
delta_table.alias("target").merge(
    df_new_customers.alias("source"),
    "target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
    "email": "source.email",
    "last_active": "source.last_active",
    "updated_at": "current_timestamp()"
}).whenNotMatchedInsert(values={
    "customer_id": "source.customer_id",
    "email": "source.email",
    "last_active": "source.last_active",
    "created_at": "current_timestamp()",
    "updated_at": "current_timestamp()"
}).execute()
SQL
-- Time Travel avec Delta Lake
-- Voir l'historique des modifications
DESCRIBE HISTORY lakehouse.silver.customers;

-- Requeter une version precedente
SELECT * FROM lakehouse.silver.customers VERSION AS OF 42;

-- Requeter a un timestamp
SELECT * FROM lakehouse.silver.customers TIMESTAMP AS OF '2025-12-01 10:00:00';

-- Restaurer une version precedente
RESTORE TABLE lakehouse.silver.customers TO VERSION AS OF 42;

-- Schema Evolution : ajouter une colonne automatiquement
ALTER TABLE lakehouse.silver.customers
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

-- OPTIMIZE : compacter les petits fichiers (small file problem)
OPTIMIZE lakehouse.silver.customers
ZORDER BY (country, signup_date);

-- VACUUM : supprimer les fichiers obsoletes (liberer du stockage)
VACUUM lakehouse.silver.customers RETAIN 168 HOURS;

Unity Catalog : gouvernance unifiee

Unity Catalog est la solution de gouvernance de Databricks qui offre un namespace a 3 niveaux : catalog.schema.table. Il centralise les permissions, le lineage, et la decouverte des donnees sur l'ensemble des workspaces.

Hierarchie Unity Catalog
  METASTORE (niveau compte)
  |
  +-- CATALOG: prod_analytics
  |   +-- SCHEMA: bronze
  |   |   +-- TABLE: raw_events
  |   |   +-- TABLE: raw_users
  |   +-- SCHEMA: silver
  |   |   +-- TABLE: clean_events
  |   |   +-- TABLE: dim_users
  |   +-- SCHEMA: gold
  |       +-- TABLE: fact_revenue
  |       +-- TABLE: agg_daily_metrics
  |
  +-- CATALOG: dev_analytics (clone ou separe)
  |
  +-- CATALOG: ml_features
      +-- SCHEMA: features
          +-- TABLE: user_features
          +-- FUNCTION: compute_rfm_score

Moteur Photon

Photon est le moteur d'execution vectorise de Databricks, ecrit en C++ pour maximiser les performances. Il remplace le moteur Spark JVM pour les operations SQL et DataFrame, offrant des accelerations de 2x a 8x sur les requetes analytiques.

Airbnb : Lakehouse pour la plateforme data unifiee

Airbnb a adopte l'architecture Lakehouse avec Databricks pour unifier ses workflows analytics et ML. Avant, les data scientists devaient copier les donnees du warehouse Hive vers des notebooks, creant des incoherences :

  • Reduction de 60% du temps de preparation des donnees pour les equipes ML
  • Un seul point de verite pour les features utilisees en production et en experimentation
  • Lineage complet via Unity Catalog : chaque prediction peut etre tracee jusqu'a la donnee source
  • Le moteur Photon a reduit les couts de compute de 40% sur les pipelines ETL les plus lourds

FinanceGroup (anonymise) : le Data Lake devenu Data Swamp

Un groupe financier avait investi 2 ans dans un Data Lake sur S3 avec Spark (sans Delta Lake). Le resultat : des milliers de fichiers Parquet sans schema coherent, des donnees corrompues apres des ecritures concurrentes, et aucune possibilite d'UPDATE ou DELETE (requis par RGPD).

  • Migration vers Delta Lake en 4 mois, permettant enfin les operations MERGE pour le droit a l'oubli
  • Les transactions ACID ont elimine les corruptions de donnees qui survenaient 2-3 fois par semaine

Anti-pattern : le "Small File Problem"

Avec Spark et Delta Lake, les ecritures streaming ou les pipelines avec beaucoup de petits batchs generent des milliers de petits fichiers (< 1 Mo chacun). Cela degrade enormement les performances de lecture car le moteur doit ouvrir des milliers de connexions fichier au lieu de lire quelques gros fichiers.

Solution : Planifier des OPTIMIZE reguliers qui compactent les petits fichiers en fichiers de ~1 Go. Utiliser ZORDER BY sur les colonnes frequemment filtrees pour co-localiser les donnees. Configurer l'auto-optimize (delta.autoOptimize.optimizeWrite = true) pour les tables critiques.

Scenario : Choix Lakehouse vs Data Warehouse

Vous etes Data Architect dans une entreprise pharma. L'equipe data (15 personnes : 5 data engineers, 5 analystes, 5 data scientists) doit gerer : des donnees cliniques structurees (SQL), des images medicales (non structure), des modeles ML de detection de molecules, et du reporting reglementaire strict. Le budget est de 30 000$/mois. Recommandez-vous Databricks Lakehouse, Snowflake, ou une architecture hybride ? Justifiez avec les cas d'usage specifiques.

Qu'est-ce que Delta Lake et quel probleme resout-il ?
Delta Lake est un format de table open-source qui ajoute des transactions ACID, le schema enforcement, le time travel et les operations MERGE/UPDATE/DELETE au-dessus de fichiers Parquet sur un Data Lake. Il resout le probleme du "Data Swamp" en apportant les garanties d'un Data Warehouse a la flexibilite du Data Lake.
A quoi sert la commande OPTIMIZE ZORDER BY dans Delta Lake ?
OPTIMIZE compacte les petits fichiers en fichiers plus grands (~1 Go) pour ameliorer les performances de lecture. ZORDER BY reorganise physiquement les donnees pour co-localiser les valeurs similaires des colonnes specifiees, permettant un data skipping plus efficace lors des requetes filtrees sur ces colonnes.

Comparaison Cloud DWH

45 min Intermediaire

Objectifs

  • Comparer objectivement Snowflake, BigQuery et Databricks sur des criteres techniques et business
  • Identifier les forces et faiblesses de chaque plateforme selon le cas d'usage
  • Construire une grille de decision pour choisir la bonne solution
  • Comprendre les strategies multi-cloud et de portabilite

La question que j'entends le plus souvent est : "Snowflake, BigQuery ou Databricks ?". Ma reponse est toujours la meme : ca depend. Pas du marketing, pas des benchmarks biaises publies par chaque vendor, mais de VOTRE contexte : votre equipe, vos cas d'usage, votre cloud provider actuel, et votre budget. J'ai accompagne des clients qui etaient parfaitement heureux avec chacune des trois solutions. Le vrai piege, c'est de choisir en se basant sur un seul critere (le cout, ou la performance brute) sans considerer l'ensemble.

Tableau comparatif detaille

CritereSnowflakeBigQueryDatabricks
ArchitectureSeparation stockage/compute avec virtual warehousesServerless, slots partages ou reservesLakehouse, clusters Spark manages
Modele de facturationCredits (compute) + stockage au To/moisPar To scanne (on-demand) ou slots reservesDBU (Databricks Units) + compute cloud
Gestion infraMoyenne (warehouses a dimensionner)Faible (serverless)Elevee (clusters a configurer)
SQL natifExcellent (ANSI SQL complet)Excellent (Standard SQL + extensions)Bon (Spark SQL, en progres avec DBSQL)
Support ML/DSLimite (Snowpark recente)Bon (BQML integre)Excellent (MLflow, notebooks, GPU)
Donnees non structureesLimite (VARIANT semi-structure)Bon (STRUCT, ARRAY, JSON)Excellent (tout format, images, texte)
StreamingSnowpipe (micro-batch)Streaming inserts natifStructured Streaming natif
Data SharingExcellent (natif, cross-cloud)Analytics HubDelta Sharing (open protocol)
GouvernanceBonne (RBAC, masking, tags)Bonne (IAM GCP, column-level)Excellente (Unity Catalog)
Vendor lock-inMoyen (format proprietaire)Eleve (ecosysteme GCP)Faible (formats ouverts Delta/Parquet)
Multi-cloudOui (AWS, Azure, GCP)GCP uniquementOui (AWS, Azure, GCP)

Analyse par cas d'usage

Arbre de decision simplifie

Choisissez Snowflake si : votre equipe est SQL-first, vous avez besoin de data sharing intensif, vous voulez une solution multi-cloud mature, et vos cas d'usage sont principalement analytics/BI.

Choisissez BigQuery si : vous etes deja sur GCP, vous voulez zero gestion d'infrastructure, vous avez des besoins en ML legers (BQML), et votre budget est variable (on-demand).

Choisissez Databricks si : vous avez des besoins forts en ML/AI, vous gerez des donnees non structurees, vous voulez eviter le vendor lock-in (formats ouverts), et votre equipe a des competences Spark/Python.

Comparaison des couts : scenario concret

Cout mensuel estime - 10 To de donnees, 50 utilisateurs
  Scenario : 10 To stockes, 500 requetes/jour, 5h de compute/jour

  SNOWFLAKE (Enterprise, AWS)
  +------------------------------------------+
  | Stockage  : 10 To x 23$/To   =    230$  |
  | Compute   : WH Medium x 5h x 30j        |
  |             = 150h x 4 credits x 3$      |
  |             =                    1 800$  |
  | Cloud Svcs: ~10% compute       =    180$ |
  | TOTAL                         ~  2 210$  |
  +------------------------------------------+

  BIGQUERY (On-demand, US)
  +------------------------------------------+
  | Stockage  : 10 To x 20$/To   =    200$  |
  | Compute   : ~50 To scannes/mois         |
  |             x 6.25$/To        =    312$  |
  | TOTAL                         ~    512$  |
  +------------------------------------------+
  (Attention: depend enormement du volume scanne)

  DATABRICKS (Premium, AWS)
  +------------------------------------------+
  | Stockage S3: 10 To x 23$/To  =    230$  |
  | Compute    : cluster i3.xlarge           |
  |              5h/jour x 30j = 150h        |
  |              x 0.75$/DBU x 4 DBU/h       |
  |              =                    450$   |
  | EC2 infra  : 150h x 0.312$  =      47$  |
  | TOTAL                         ~    727$  |
  +------------------------------------------+
  (Hors SQL Warehouse serverless, couts variables)

Les couts caches a surveiller

Snowflake : les cloud services au-dela de 10% du compute sont factures. Le clustering automatique et la materialisation consomment des credits en arriere-plan.
BigQuery : les streaming inserts coutent 0.05$/Go insere. Le stockage "active" (modifie dans les 90 derniers jours) coute 20$/To vs 10$/To pour le stockage long-term.
Databricks : le cout de l'infrastructure cloud (EC2/VM) s'ajoute aux DBU. Les jobs clusters qui restent allumes coutent meme sans requetes.

Capital One : approche multi-plateforme

Capital One, l'une des plus grandes banques americaines, utilise une approche hybride pour ses differents cas d'usage :

  • Snowflake pour le reporting reglementaire et le partage de donnees avec les regulateurs
  • Databricks pour la detection de fraude en temps reel (ML avance, streaming Spark)
  • Cette approche multi-plateforme permet d'utiliser le meilleur outil pour chaque cas d'usage
  • Le cout supplementaire de gestion de deux plateformes est compense par les gains de performance et d'adequation fonctionnelle

Anti-pattern : choisir uniquement sur le benchmark de performance

Se baser sur des benchmarks TPC-DS pour choisir sa plateforme. Ces benchmarks mesurent des scenarios standardises qui ne representent pas votre charge de travail reelle. De plus, chaque vendor optimise specifiquement pour ces benchmarks.

Solution : Effectuez un Proof of Concept (POC) de 4 a 6 semaines avec vos donnees reelles et vos requetes reelles sur les 2-3 plateformes preselectionnees. Evaluez : performance, facilite d'utilisation pour votre equipe, cout reel, et qualite du support. Impliquez les utilisateurs finaux dans l'evaluation, pas uniquement les ingenieurs.

Strategies de portabilite et multi-cloud

SQL
-- Strategie : utiliser dbt comme couche d'abstraction
-- Le meme modele dbt peut cibler Snowflake, BigQuery ou Databricks
-- en changeant simplement le profil de connexion

-- models/marts/finance/revenue_monthly.sql
-- Ce modele fonctionne sur les 3 plateformes avec dbt
{{ config(
    materialized='table',
    partition_by={'field': 'month_date', 'data_type': 'date'}  -- BigQuery
    -- cluster_by=['region', 'product_line']  -- Snowflake/Databricks
) }}

SELECT
    DATE_TRUNC('month', order_date) AS month_date,  -- ANSI SQL
    region,
    product_line,
    COUNT(DISTINCT customer_id) AS unique_customers,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value
FROM {{ ref('stg_orders') }}
WHERE order_status = 'completed'
GROUP BY 1, 2, 3

Scenario : Recommandation pour un groupe international

Vous etes consultant Data Architect mandate par un groupe industriel present dans 30 pays. Contraintes : equipes data reparties (Paris, New York, Singapour), donnees RGPD en Europe et CCPA aux USA, 500 To de donnees, 200 utilisateurs BI et 30 data scientists. Le groupe utilise AWS pour l'Europe et Azure pour les USA. Budget : 100 000$/mois tout compris. Quelle architecture recommandez-vous et pourquoi ?

Quel est le principal avantage de BigQuery par rapport a Snowflake en termes de gestion ?
BigQuery est entierement serverless : il n'y a aucun warehouse ou cluster a dimensionner, demarrer ou eteindre. Le compute est automatiquement alloue et facture a l'usage (par To scanne). Snowflake requiert de configurer et gerer des virtual warehouses avec des tailles appropriees.
Pourquoi Databricks est-il considere comme ayant le moins de vendor lock-in ?
Databricks utilise des formats ouverts (Delta Lake/Parquet, open-source), Apache Spark (open-source), et le protocole Delta Sharing (open). Vos donnees restent sur votre propre stockage cloud (S3, ADLS, GCS) dans des formats lisibles par d'autres outils. Snowflake utilise un format interne proprietaire et BigQuery est lie a GCP.

Quiz Cloud DWH

30 min Intermediaire

Objectifs

  • Valider vos connaissances sur Snowflake, BigQuery et Databricks
  • Tester votre capacite a choisir la bonne plateforme selon le contexte
  • Verifier votre comprehension des architectures et modeles de couts
  • Identifier vos axes d'amelioration avant de passer a la suite

Ce quiz couvre les concepts essentiels des trois plateformes cloud que nous avons etudiees. Prenez votre temps pour chaque question. Si vous obtenez moins de 6/8, je vous recommande de relire les lecons precedentes avant de continuer. Les questions sont concues pour tester votre comprehension profonde, pas juste la memorisation.

Quiz : Cloud Data Warehouses

Question 1 : Quelle est la caracteristique principale de l'architecture Snowflake qui la differencie des Data Warehouses traditionnels ?

A) L'utilisation du format Parquet pour le stockage
B) La separation complete du stockage et du compute en couches independantes
C) L'utilisation de GPU pour accelerer les requetes
D) Le stockage des donnees exclusivement en memoire RAM
Correct : B) Snowflake se distingue par sa separation en 3 couches independantes (stockage, compute, services cloud). Cela permet de scaler le compute sans impacter le stockage et vice-versa. Les DWH traditionnels (Teradata, Oracle) couplent etroitement stockage et compute sur les memes noeuds.

Question 2 : Dans BigQuery, quelle est la difference fondamentale entre le mode de facturation "on-demand" et le mode "flat-rate" (editions) ?

A) On-demand est gratuit pour les petites requetes, flat-rate est toujours payant
B) On-demand facture par To scanne, flat-rate reserve un nombre fixe de slots pour un tarif mensuel
C) On-demand ne permet pas le streaming, flat-rate le permet
D) On-demand stocke les donnees sur disque, flat-rate utilise la memoire
Correct : B) En mode on-demand, vous payez 6.25$ par To de donnees scannees par vos requetes. En mode flat-rate (editions), vous reservez un nombre fixe de slots (unites de calcul) pour un tarif mensuel previsible. Le flat-rate devient avantageux au-dela de ~30-50 To scannes par mois.

Question 3 : Qu'est-ce que le Zero-Copy Cloning de Snowflake ?

A) Une methode pour supprimer les donnees sans laisser de trace
B) Un mecanisme de replication synchrone entre deux regions
C) La creation d'une copie logique partageant les micro-partitions sans duplication physique
D) Une compression qui reduit la taille des donnees a zero octet temporairement
Correct : C) Le Zero-Copy Cloning cree une copie logique (metadata-only) qui pointe vers les memes micro-partitions que l'original. Aucune donnee n'est physiquement copiee. Ce n'est que lorsqu'une modification est faite sur l'un des objets qu'une nouvelle micro-partition est creee (copy-on-write). C'est ideal pour creer des environnements de dev ou de test instantanement.

Question 4 : Quel probleme principal Delta Lake resout-il par rapport a un Data Lake classique base sur Parquet ?

A) Delta Lake permet de stocker des donnees dans le cloud alors que Parquet ne le peut pas
B) Delta Lake offre des transactions ACID et les operations UPDATE/DELETE/MERGE impossibles avec Parquet seul
C) Delta Lake est 100 fois plus rapide en lecture que Parquet
D) Delta Lake remplace totalement le besoin d'un Data Warehouse
Correct : B) Un Data Lake base sur Parquet ne supporte pas nativement les transactions ACID ni les operations UPDATE/DELETE/MERGE. Delta Lake ajoute un transaction log au-dessus de Parquet pour garantir l'atomicite, la coherence, l'isolation et la durabilite. Cela permet notamment le respect du RGPD (droit a l'oubli) et la gestion fiable des ecritures concurrentes.

Question 5 : Vous devez choisir une plateforme pour une equipe composee a 80% d'analystes SQL et 20% de data engineers, avec des besoins principalement en reporting BI. Quel est le choix le plus pertinent ?

A) Databricks, car il offre le meilleur rapport qualite/prix
B) Snowflake ou BigQuery, car ils sont optimises pour les workflows SQL-first et la BI
C) Apache Hadoop on-premise pour garder le controle total
D) Un Data Lake S3 avec Athena pour minimiser les couts
Correct : B) Pour une equipe SQL-first avec des besoins BI, Snowflake et BigQuery sont les choix les plus adaptes. Ils offrent une experience SQL native, une integration BI facile, et ne necessitent pas de competences Spark/Python. Databricks serait surdimensionne et complexe pour ce profil d'equipe. Hadoop on-premise est obsolete pour ce cas d'usage.

Question 6 : Quelle fonctionnalite de BigQuery permet de creer des modeles de machine learning directement en SQL ?

A) BigQuery Streaming
B) BI Engine
C) BigQuery ML (BQML)
D) Dataflow
Correct : C) BigQuery ML (BQML) permet de creer, entrainer et deployer des modeles de ML directement avec des commandes SQL (CREATE MODEL). Il supporte la regression, la classification, le clustering, les series temporelles, et meme l'import de modeles TensorFlow. BI Engine est un cache in-memory pour la BI, et Dataflow est un service de traitement streaming separe.

Question 7 : Quel est le role d'Unity Catalog dans l'ecosysteme Databricks ?

A) Optimiser les performances des requetes Spark
B) Fournir une gouvernance unifiee avec gestion des permissions, lineage et decouverte des donnees
C) Remplacer Apache Hive Metastore pour le stockage des donnees
D) Gerer le deploiement des clusters Spark automatiquement
Correct : B) Unity Catalog est la solution de gouvernance centralisee de Databricks. Il fournit un namespace a 3 niveaux (catalog.schema.table), la gestion fine des permissions (GRANT/REVOKE sur chaque objet), le lineage automatique des donnees, et la decouverte des assets data. Il remplace effectivement le Hive Metastore mais sa fonction principale est la gouvernance, pas le stockage.

Question 8 : Quelle strategie permet de reduire le vendor lock-in lorsqu'on utilise un Cloud Data Warehouse ?

A) Utiliser exclusivement les fonctions SQL proprietaires de chaque plateforme
B) Stocker toutes les donnees en CSV pour garantir la portabilite
C) Utiliser dbt comme couche d'abstraction SQL et privilegier les formats ouverts (Parquet, Delta)
D) Eviter le cloud et rester on-premise
Correct : C) dbt agit comme une couche d'abstraction : vos modeles SQL sont portables entre Snowflake, BigQuery et Databricks. Combiner dbt avec des formats ouverts (Parquet, Delta Lake) et des protocoles standards (Delta Sharing) minimise la dependance a un vendor specifique. Le CSV est un format pauvre sans typage ni compression, et rester on-premise n'est pas une strategie de portabilite.

Interpretation de vos resultats

8/8 : Excellent ! Vous maitrisez les fondamentaux des Cloud Data Warehouses. Passez directement aux lecons dbt.
6-7/8 : Bon niveau. Relisez les sections correspondant aux questions manquees.
4-5/8 : Des bases a consolider. Reprenez les lecons 1 a 4 en vous concentrant sur les concepts cles.
0-3/8 : Il est recommande de revoir en detail les lecons precedentes avant de continuer.

dbt Fondamentaux

45 min Intermediaire

Objectifs

  • Comprendre le role de dbt dans le Modern Data Stack et le paradigme ELT
  • Maitriser les concepts fondamentaux : models, sources, refs, materializations
  • Structurer un projet dbt avec les bonnes pratiques (staging, intermediate, marts)
  • Utiliser les macros, les packages et les variables pour un code reutilisable

dbt a revolutionne ma facon de travailler en tant que data architect. Avant, les transformations etaient des scripts SQL eparpilles, sans versioning, sans tests, sans documentation. Avec dbt, on applique enfin les pratiques du genie logiciel aux pipelines de donnees : git, CI/CD, tests unitaires, documentation generee automatiquement. Mon conseil numero un : adoptez les conventions de nommage des le premier jour. J'ai vu trop de projets dbt devenir ingerables parce que chacun nommait ses modeles a sa facon.

Qu'est-ce que dbt ?

dbt (data build tool) est un outil de transformation qui permet aux analystes et data engineers d'ecrire des transformations en SQL (ou Python), de les organiser en modeles, de les tester et de les documenter. dbt s'execute dans le data warehouse : il ne fait que du T (Transform) dans le paradigme ELT.

Positionnement de dbt dans le pipeline
  SOURCES          INGESTION          DATA WAREHOUSE              CONSOMMATION
  (APIs, DBs,     (Fivetran,         +---------------------------+  (Looker,
   fichiers)       Airbyte)           |  RAW (sources)            |   Tableau,
      |               |               |    |                      |   Metabase)
      +--------->-----+--------->-----+    v                      |
                                      |  STAGING (stg_*)          |-------->
                                      |    |    [dbt models]      |
                                      |    v                      |
                                      |  INTERMEDIATE (int_*)     |
                                      |    |    [dbt models]      |
                                      |    v                      |
                                      |  MARTS (fct_*, dim_*)     |
                                      |         [dbt models]      |
                                      +---------------------------+
                                              ^
                                              |
                                      dbt run, dbt test, dbt docs

Structure d'un projet dbt

YAML
# dbt_project.yml - Configuration du projet
name: 'analytics'
version: '1.0.0'
config-version: 2

profile: 'analytics'  # Connexion au DWH (Snowflake, BigQuery, etc.)

model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
seed-paths: ["seeds"]

models:
  analytics:
    staging:
      +materialized: view        # Les staging models sont des vues
      +schema: staging
    intermediate:
      +materialized: ephemeral   # Pas materialise, inline dans les requetes
    marts:
      +materialized: table       # Les marts sont des tables physiques
      +schema: analytics
YAML
# models/staging/sources.yml - Declaration des sources
version: 2

sources:
  - name: raw_ecommerce
    database: raw_database
    schema: ecommerce
    description: "Donnees brutes extraites par Fivetran depuis Shopify"
    tables:
      - name: orders
        description: "Table des commandes Shopify"
        loaded_at_field: _fivetran_synced
        freshness:
          warn_after: {count: 12, period: hour}
          error_after: {count: 24, period: hour}
        columns:
          - name: order_id
            description: "Identifiant unique de la commande"
            tests:
              - unique
              - not_null
      - name: customers
        description: "Table des clients Shopify"
      - name: products
        description: "Catalogue produits"

Les modeles dbt : staging, intermediate, marts

SQL
-- models/staging/stg_orders.sql
-- Staging : nettoyage, renommage, typage. 1 modele par source.
WITH source AS (
    SELECT * FROM {{ source('raw_ecommerce', 'orders') }}
),

renamed AS (
    SELECT
        id AS order_id,
        customer_id,
        CAST(created_at AS TIMESTAMP) AS order_timestamp,
        DATE(created_at) AS order_date,
        status AS order_status,
        CAST(total_price AS NUMERIC) AS order_amount,
        currency,
        LOWER(TRIM(shipping_country)) AS shipping_country,
        _fivetran_synced AS loaded_at
    FROM source
    WHERE id IS NOT NULL  -- Exclure les lignes corrompues
)

SELECT * FROM renamed
SQL
-- models/intermediate/int_orders_enriched.sql
-- Intermediate : jointures, logique metier, enrichissement
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
),

products AS (
    SELECT * FROM {{ ref('stg_order_items') }}
)

SELECT
    o.order_id,
    o.order_timestamp,
    o.order_date,
    o.order_status,
    o.order_amount,
    c.customer_id,
    c.customer_segment,
    c.first_order_date,
    c.country AS customer_country,
    -- Calcul metier : est-ce une premiere commande ?
    CASE WHEN o.order_date = c.first_order_date
         THEN TRUE ELSE FALSE END AS is_first_order,
    -- Nombre de jours depuis la derniere commande
    DATEDIFF('day', c.last_order_date, o.order_date) AS days_since_last_order
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
SQL
-- models/marts/finance/fct_revenue.sql
-- Mart : modele final pret pour la BI
{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'
) }}

SELECT
    order_id,
    order_date,
    customer_id,
    customer_segment,
    customer_country,
    order_amount,
    is_first_order,
    -- Metriques derivees
    SUM(order_amount) OVER (
        PARTITION BY customer_id
        ORDER BY order_date
        ROWS UNBOUNDED PRECEDING
    ) AS customer_lifetime_value
FROM {{ ref('int_orders_enriched') }}
WHERE order_status = 'completed'

{% if is_incremental() %}
    AND order_date > (SELECT MAX(order_date) FROM {{ this }})
{% endif %}

Les 4 types de materialisation

view : cree une vue SQL (pas de stockage, recalculee a chaque requete). Ideal pour les modeles staging legers.
table : cree une table physique (recree entierement a chaque dbt run). Ideal pour les marts finaux de petite/moyenne taille.
incremental : insere uniquement les nouvelles lignes a chaque run. Ideal pour les grandes tables factuelles.
ephemeral : pas de materialisation, le SQL est injecte comme CTE dans les modeles dependants. Ideal pour les modeles intermediaires reutilisables.

La fonction ref() et le DAG

La fonction {{ ref('model_name') }} est au coeur de dbt. Elle cree une dependance declarative entre les modeles, permettant a dbt de construire automatiquement le DAG (Directed Acyclic Graph) d'execution. dbt garantit que les modeles sont executes dans le bon ordre.

DAG d'un projet dbt typique
  source('raw','orders')  source('raw','customers')  source('raw','products')
         |                        |                        |
         v                        v                        v
    stg_orders              stg_customers            stg_products
         |                        |                        |
         +----------+-------------+                        |
                    |                                       |
                    v                                       |
          int_orders_enriched <-----------------------------+
                    |
         +----------+----------+
         |                     |
         v                     v
    fct_revenue          dim_customers
         |                     |
         +----------+----------+
                    |
                    v
          rpt_monthly_revenue (exposure -> Looker)

GitLab : dbt a grande echelle

GitLab a construit l'un des plus grands projets dbt open-source, avec plus de 800 modeles et une transparence totale (le repo est public). Leurs pratiques sont une reference :

  • Plus de 800 modeles dbt organises en staging, intermediate et marts
  • Convention de nommage stricte : stg_, int_, fct_, dim_, rpt_
  • CI/CD sur chaque merge request : dbt test + dbt build sur le schema de la branche
  • Documentation generee automatiquement et accessible a toute l'entreprise via dbt docs
  • Temps de build complet reduit de 4h a 45 min grace aux modeles incrementaux

Anti-pattern : le modele monolithique de 500 lignes

Ecrire un seul modele dbt contenant toute la logique metier : jointures, nettoyage, calculs, agregations. Ce modele devient impossible a maintenir, tester et deboguer. Les performances se degradent car le moteur SQL ne peut pas optimiser une requete aussi complexe.

Solution : Decomposez en couches. Staging (1 modele par source, nettoyage uniquement), Intermediate (jointures et logique metier), Marts (agregations finales). Chaque modele doit idealement faire moins de 100 lignes de SQL et avoir une responsabilite unique. Utilisez les CTEs pour structurer le code a l'interieur de chaque modele.

Anti-pattern : hardcoder les noms de tables

Ecrire FROM raw_database.ecommerce.orders directement au lieu d'utiliser {{ source('raw_ecommerce', 'orders') }} ou {{ ref('stg_orders') }}. Cela casse la tracabilite du DAG et rend le projet fragile aux changements d'environnement.

Solution : Toujours utiliser {{ ref() }} pour les modeles dbt et {{ source() }} pour les tables externes. dbt gerera automatiquement les noms de schemas et databases selon l'environnement (dev, staging, prod).

Lab : Creer votre premier projet dbt

Etape 1 : Installation et initialisation

Installez dbt-core avec l'adaptateur de votre choix et initialisez un nouveau projet :

Bash
# Installation avec pip (adaptateur Snowflake)
pip install dbt-snowflake

# Initialisation d'un nouveau projet
dbt init my_analytics_project

# Verification de la connexion
cd my_analytics_project
dbt debug

Etape 2 : Declarer vos sources

Creez le fichier models/staging/sources.yml et declarez au moins une source avec ses tables. Ajoutez des tests de freshness.

Etape 3 : Creer un modele staging

Creez models/staging/stg_orders.sql qui selectionne depuis votre source, renomme les colonnes et caste les types. Utilisez {{ source() }}.

Etape 4 : Creer un modele mart

Creez models/marts/fct_daily_revenue.sql qui agregue les commandes par jour en utilisant {{ ref('stg_orders') }}. Materialisez-le en table.

Etape 5 : Executer et valider

Bash
# Executer tous les modeles
dbt run

# Executer un modele specifique et ses dependances
dbt run --select fct_daily_revenue+

# Generer et ouvrir la documentation
dbt docs generate
dbt docs serve
Quelle est la difference entre {{ ref() }} et {{ source() }} dans dbt ?
{{ source('source_name', 'table_name') }} reference une table externe declaree dans un fichier sources.yml (donnees brutes chargees par un outil d'ingestion). {{ ref('model_name') }} reference un autre modele dbt (transformation). Les deux creent des dependances dans le DAG, mais source() pointe vers des objets que dbt ne gere pas, tandis que ref() pointe vers des objets crees par dbt.
Quand utiliser la materialisation "incremental" plutot que "table" ?
Utilisez "incremental" pour les tables factuelles volumineuses (millions de lignes) ou le recalcul complet serait trop couteux/long. Le modele doit avoir une colonne de date ou d'identifiant permettant d'identifier les "nouvelles" lignes. Utilisez "table" pour les tables de dimensions ou les marts de petite/moyenne taille ou le recalcul complet est acceptable.

dbt Tests & Documentation

45 min Intermediaire

Objectifs

  • Maitriser les tests schema (unique, not_null, accepted_values, relationships) et les tests custom
  • Utiliser dbt-expectations et dbt-utils pour des tests avances
  • Generer et publier une documentation interactive avec dbt docs
  • Mettre en place une CI/CD pour valider automatiquement les changements

La documentation et les tests sont les parents pauvres de la data. J'ai travaille avec des equipes brillantes qui avaient des centaines de modeles dbt sans un seul test. Resultats : des dashboards qui affichaient des chiffres faux pendant des semaines sans que personne ne le detecte. Avec dbt, il n'y a plus d'excuse. Les tests sont declaratifs (quelques lignes de YAML), la documentation est generee automatiquement. Je considere qu'un modele sans tests est un modele en dette technique. Minimum : unique + not_null sur chaque primary key, et accepted_values sur les colonnes de statut.

Les tests dbt : votre filet de securite

dbt propose deux types de tests : les tests schema (declaratifs, en YAML) et les tests data (requetes SQL personnalisees). Les tests schema sont les plus courants et couvrent la majorite des besoins.

Tests schema (YAML)

YAML
# models/marts/schema.yml
version: 2

models:
  - name: fct_revenue
    description: "Table de faits des revenus par commande completee"
    columns:
      - name: order_id
        description: "Identifiant unique de la commande"
        tests:
          - unique
          - not_null

      - name: order_date
        description: "Date de la commande"
        tests:
          - not_null
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: "'2020-01-01'"
              max_value: "CURRENT_DATE()"

      - name: order_status
        description: "Statut de la commande"
        tests:
          - accepted_values:
              values: ['completed', 'refunded', 'partially_refunded']

      - name: customer_id
        description: "Reference vers la dimension client"
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: order_amount
        description: "Montant total de la commande en euros"
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

Tests data custom (SQL)

SQL
-- tests/assert_revenue_not_negative.sql
-- Un test custom passe si la requete retourne 0 ligne
-- Ici on verifie qu'aucun client n'a un revenue cumule negatif

SELECT
    customer_id,
    SUM(order_amount) AS total_revenue
FROM {{ ref('fct_revenue') }}
GROUP BY customer_id
HAVING SUM(order_amount) < 0
SQL
-- tests/assert_daily_revenue_consistency.sql
-- Verifier que le revenue journalier ne s'ecarte pas
-- de plus de 50% de la moyenne mobile sur 7 jours

WITH daily_revenue AS (
    SELECT
        order_date,
        SUM(order_amount) AS daily_total,
        AVG(SUM(order_amount)) OVER (
            ORDER BY order_date
            ROWS BETWEEN 7 PRECEDING AND 1 PRECEDING
        ) AS avg_7d
    FROM {{ ref('fct_revenue') }}
    GROUP BY order_date
)

SELECT
    order_date,
    daily_total,
    avg_7d,
    ABS(daily_total - avg_7d) / NULLIF(avg_7d, 0) AS pct_deviation
FROM daily_revenue
WHERE avg_7d IS NOT NULL
  AND ABS(daily_total - avg_7d) / NULLIF(avg_7d, 0) > 0.5
  AND order_date >= DATEADD('day', -7, CURRENT_DATE())

Packages dbt : dbt-utils et dbt-expectations

YAML
# packages.yml - Dependances du projet
packages:
  - package: dbt-labs/dbt_utils
    version: ">=1.1.0"

  - package: calogica/dbt_expectations
    version: ">=0.10.0"

  - package: dbt-labs/codegen
    version: ">=0.12.0"

# Installer avec : dbt deps

Tests les plus utiles de dbt-expectations

expect_column_values_to_be_between : les valeurs sont dans un intervalle (dates, montants).
expect_column_values_to_match_regex : les valeurs respectent un format (email, UUID, code postal).
expect_table_row_count_to_be_between : le nombre de lignes est dans une fourchette attendue.
expect_column_proportion_of_unique_values_to_be_between : detection de doublons anormaux.
expect_column_values_to_not_be_null : version avancee de not_null avec seuil de tolerance.

Documentation dbt : auto-generee et interactive

dbt genere automatiquement une documentation interactive a partir des descriptions YAML, du DAG, et du schema des tables. C'est un atout majeur pour la gouvernance et l'onboarding des nouveaux membres.

YAML
# models/marts/schema.yml - Documentation enrichie
version: 2

models:
  - name: fct_revenue
    description: |
      Table de faits contenant une ligne par commande completee.
      **Grain** : une ligne par order_id.
      **Source** : donnees Shopify via Fivetran.
      **Rafraichissement** : quotidien a 06h UTC.
      **Owner** : equipe Finance Data (finance-data@company.com).

      ### Logique metier
      - Seules les commandes avec statut 'completed' sont incluses
      - Le montant est en euros, apres conversion si necessaire
      - Le customer_lifetime_value est calcule cumulativement

      ### Exemples de requetes
      ```sql
      -- Revenue mensuel par segment
      SELECT customer_segment, DATE_TRUNC('month', order_date),
             SUM(order_amount) FROM fct_revenue GROUP BY 1, 2
      ```
    columns:
      - name: order_id
        description: "Cle primaire - identifiant unique Shopify de la commande"
      - name: customer_lifetime_value
        description: |
          Somme cumulee de tous les montants de commande du client,
          ordonnee par date de commande. Calcul : window function
          SUM(order_amount) OVER (PARTITION BY customer_id ORDER BY order_date)
YAML
# models/exposures.yml - Documenter les consommateurs
version: 2

exposures:
  - name: monthly_revenue_dashboard
    type: dashboard
    description: "Dashboard Looker utilise par la direction financiere"
    owner:
      name: Marie Dupont
      email: marie.dupont@company.com
    url: https://looker.company.com/dashboards/42
    depends_on:
      - ref('fct_revenue')
      - ref('dim_customers')
      - ref('dim_products')
    maturity: high

  - name: churn_prediction_model
    type: ml
    description: "Modele ML de prediction du churn client"
    owner:
      name: Ahmed Ben Ali
      email: ahmed.benali@company.com
    depends_on:
      - ref('fct_revenue')
      - ref('dim_customers')

CI/CD avec dbt

Pipeline CI/CD dbt typique
  DEVELOPPEUR                    CI (GitHub Actions)                    PRODUCTION
  +-----------+                 +------------------------+             +-----------+
  | 1. Code   |  Pull Request  | 3. dbt build --select  |  Merge PR  | 5. dbt run |
  |    modele |---------------->|    state:modified+     |------------>|    complet |
  | 2. dbt run|                 | 4. dbt test --select   |             | 6. dbt test|
  |    local  |                 |    state:modified+     |             |    complet |
  +-----------+                 +------------------------+             +-----------+
       |                               |                                    |
   dev schema                    CI schema temporaire                  prod schema
  (dev_jean_*)                 (ci_pr_123_*)                         (analytics.*)
                                       |
                                 Si echec -> PR bloquee
                                 Si succes -> PR approuvable
YAML
# .github/workflows/dbt-ci.yml
name: dbt CI
on:
  pull_request:
    branches: [main]
    paths:
      - 'models/**'
      - 'tests/**'
      - 'macros/**'

jobs:
  dbt-test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install dbt-snowflake
      - run: dbt deps
      - run: dbt build --select state:modified+ --defer --state ./prod-manifest
        env:
          DBT_PROFILES_DIR: ./ci_profiles
          SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

Shopify : dbt pour la qualite des donnees a grande echelle

Shopify utilise dbt pour transformer les donnees de plus de 2 millions de marchands. Leur approche des tests est exemplaire :

  • Plus de 3 000 tests dbt executes quotidiennement, couvrant l'integrite referentielle, les volumes attendus et les anomalies statistiques
  • Un systeme d'alertes gradue : les tests critiques (primary keys, not null) bloquent le pipeline, les tests d'anomalie envoient des alertes Slack
  • La documentation dbt est le point d'entree unique pour decouvrir les donnees disponibles
  • Chaque pull request declenche un dbt build sur les modeles modifies, empechant les regressions

DataStartup (anonymise) : les donnees fausses dans les dashboards

Une startup SaaS B2B a decouvert que son dashboard de MRR (Monthly Recurring Revenue) affichait des chiffres errones depuis 3 mois. La cause : un changement dans le format d'un champ de la source (le statut d'abonnement est passe de "active" a "Active") qui a fait disparaitre 30% des abonnements actifs des calculs.

  • Si un test accepted_values avait ete en place, l'anomalie aurait ete detectee en moins de 24h
  • L'impact business : des decisions d'investissement basees sur des chiffres sous-estimes de 30%
  • Apres l'incident, l'equipe a implemente 200+ tests dbt et un monitoring de freshness

Anti-pattern : tests uniquement sur les modeles finaux

Ne tester que les modeles marts (fct_, dim_) et ignorer les modeles staging et intermediate. Si une anomalie entre dans la couche staging sans etre detectee, elle se propage a travers tout le DAG et peut impacter des dizaines de modeles en aval.

Solution : Appliquez la strategie de tests en couches. Sur les staging : unique et not_null sur les primary keys, accepted_values sur les statuts. Sur les intermediate : relationships pour verifier les jointures. Sur les marts : tests metier custom (coherence des totaux, fourchettes attendues). Ajoutez des tests de volume (expect_table_row_count_to_be_between) sur les tables critiques.

Anti-pattern : documentation "une fois et on oublie"

Documenter les modeles une seule fois lors de la creation, puis ne jamais mettre a jour les descriptions quand la logique evolue. Au bout de 6 mois, la documentation ne correspond plus a la realite et personne ne lui fait confiance.

Solution : Integrez la documentation dans le processus de code review. Chaque PR qui modifie un modele doit aussi mettre a jour sa description. Utilisez des hooks pre-commit pour verifier que les modeles et colonnes critiques ont une description non vide. Publiez la doc automatiquement apres chaque deploy en production.

Astuce : le macro generate_schema_name

Par defaut, dbt prefixe les schemas avec le nom de l'utilisateur en dev (ex: dev_jean_staging). Personnalisez le macro generate_schema_name pour controler ou vos modeles sont materialises selon l'environnement. Cela evite les conflits entre developpeurs et assure une separation propre dev/staging/prod.

Lab : Ajouter des tests et de la documentation

Etape 1 : Tests schema de base

Ajoutez un fichier schema.yml pour chaque couche de votre projet. Minimum : unique + not_null sur chaque primary key, accepted_values sur les colonnes de statut.

Etape 2 : Tests custom

Creez un test SQL dans le dossier tests/ qui verifie une regle metier specifique a votre domaine (par exemple : le total des debits egale le total des credits dans une table comptable).

Etape 3 : Installer dbt-expectations

Ajoutez le package dbt-expectations dans packages.yml et executez dbt deps. Ajoutez un test de fourchette de valeurs et un test de volume de table.

Etape 4 : Enrichir la documentation

Ajoutez des descriptions detaillees (grain, source, logique metier, exemples) pour au moins 3 modeles. Declarez une exposure pour un dashboard ou un modele ML.

Etape 5 : Generer et explorer

Bash
# Executer tous les tests
dbt test

# Executer uniquement les tests d'un modele
dbt test --select fct_revenue

# Generer la documentation
dbt docs generate

# Ouvrir dans le navigateur (inclut le DAG interactif)
dbt docs serve --port 8080

Scenario : Mise en place de la qualite des donnees

Vous rejoignez une equipe qui a 150 modeles dbt en production mais zero test. Le CTO vous donne 2 semaines pour mettre en place une strategie de qualite des donnees. Comment priorisez-vous ? Par quels modeles commencez-vous ? Quel niveau de couverture visez-vous a court terme vs long terme ? Comment embarquez-vous l'equipe dans cette demarche sans ralentir la velocite de developpement ?

Quels sont les 4 tests schema natifs de dbt ?
1. unique : verifie qu'il n'y a pas de doublons dans une colonne. 2. not_null : verifie qu'aucune valeur n'est NULL. 3. accepted_values : verifie que les valeurs sont dans une liste definie. 4. relationships : verifie l'integrite referentielle (chaque valeur existe dans une autre table).
Qu'est-ce qu'une exposure dans dbt et a quoi sert-elle ?
Une exposure documente un consommateur externe des modeles dbt (dashboard, modele ML, application). Elle declare quels modeles sont utilises par ce consommateur, qui en est responsable, et son URL. Cela permet de visualiser dans le DAG quels modeles alimentent quels produits finaux, et d'evaluer l'impact d'un changement sur les consommateurs en aval.
Pourquoi est-il important de tester les modeles staging et pas uniquement les marts ?
Les modeles staging sont le point d'entree des donnees dans le DAG dbt. Si une anomalie (doublon, valeur inattendue, NULL) passe au niveau staging sans etre detectee, elle se propage a tous les modeles intermediaires et marts en aval, potentiellement impactant des dizaines de tables et dashboards. Tester tot dans le pipeline permet de detecter les problemes a la source.

dbt Patterns Avances

45 min Avance

Objectifs

  • Maitriser les modeles incrementaux et leurs strategies
  • Comprendre et implementer les snapshots pour le suivi historique
  • Creer des macros reutilisables et des packages personnalises
  • Utiliser les hooks et les materialisations custom
  • Appliquer les bonnes pratiques de performance dbt

Les patterns avances de dbt sont ce qui differencie un analyste qui utilise dbt d'un Data Engineer qui maitrise dbt. Les modeles incrementaux peuvent diviser votre temps de build par 10, mais mal configures, ils peuvent introduire des bugs silencieux dans vos donnees. Prenez le temps de comprendre chaque strategie.

Modeles Incrementaux

Les modeles incrementaux permettent de ne traiter que les nouvelles donnees au lieu de reconstruire l'integralite de la table a chaque execution. C'est le pattern le plus important pour les pipelines a grande echelle.

Principe Fondamental

Un modele incremental verifie si la table cible existe deja. Si oui, il ne traite que les lignes nouvelles ou modifiees depuis la derniere execution. Sinon, il effectue un full refresh comme une table classique.

SQL - Modele Incremental
-- models/staging/stg_events.sql
{{
  config(
    materialized='incremental',
    unique_key='event_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
  )
}}

SELECT
    event_id,
    user_id,
    event_type,
    event_properties,
    created_at,
    _loaded_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
  WHERE _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }})
{% endif %}

Strategies Incrementales

StrategieMecanismeCas d'usagePerformance
appendINSERT INTO simpleLogs, evenements immutablesTres rapide
mergeMERGE (upsert)Donnees avec mises a jourModeree
delete+insertDELETE puis INSERT par partitionGrandes partitions Redshift/PostgresRapide sur partitions
insert_overwriteEcrase les partitions concerneesBigQuery, SparkTres rapide sur partitions
SQL - Insert Overwrite (BigQuery)
-- models/intermediate/int_daily_revenue.sql
{{
  config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={
      "field": "order_date",
      "data_type": "date",
      "granularity": "day"
    },
    cluster_by=["region", "product_category"]
  )
}}

SELECT
    DATE(order_timestamp) AS order_date,
    region,
    product_category,
    COUNT(*) AS order_count,
    SUM(amount) AS total_revenue
FROM {{ ref('stg_orders') }}

{% if is_incremental() %}
  WHERE DATE(order_timestamp) >= DATE_SUB(
    (SELECT MAX(order_date) FROM {{ this }}),
    INTERVAL 3 DAY  -- Lookback pour rattraper les retards
  )
{% endif %}

GROUP BY 1, 2, 3

Incremental sans Lookback

Ne jamais utiliser un WHERE strict comme _loaded_at > (SELECT MAX(_loaded_at) FROM {{ this }}) sans marge de securite. Les donnees en retard (late-arriving data) seront perdues definitivement.

Solution : Ajoutez toujours un lookback de 1 a 3 jours et utilisez un unique_key pour eviter les doublons lors du retraitement.

Snapshots : Historisation des Donnees

Les snapshots capturent les changements dans les tables source au fil du temps en implementant une Slowly Changing Dimension Type 2 (SCD2).

SQL - Snapshot SCD2
-- snapshots/snap_customers.sql
{% snapshot snap_customers %}
{{
  config(
    target_schema='snapshots',
    unique_key='customer_id',
    strategy='timestamp',
    updated_at='updated_at',
    invalidate_hard_deletes=True
  )
}}

SELECT
    customer_id,
    company_name,
    subscription_plan,
    mrr,
    status,
    updated_at
FROM {{ source('crm', 'customers') }}

{% endsnapshot %}
Fonctionnement du Snapshot SCD2
  Source: customers (t0)          Source: customers (t1)
  +---------+-------+-----+      +---------+-------+-----+
  | cust_id | plan  | mrr |      | cust_id | plan  | mrr |
  +---------+-------+-----+      +---------+-------+-----+
  | 101     | Basic | 29  |      | 101     | Pro   | 99  |  <-- changement
  | 102     | Pro   | 99  |      | 102     | Pro   | 99  |
  +---------+-------+-----+      +---------+-------+-----+

  Snapshot resultant :
  +---------+-------+-----+---------------------+---------------------+
  | cust_id | plan  | mrr | dbt_valid_from      | dbt_valid_to        |
  +---------+-------+-----+---------------------+---------------------+
  | 101     | Basic | 29  | 2024-01-01 00:00:00 | 2024-02-01 00:00:00 |
  | 101     | Pro   | 99  | 2024-02-01 00:00:00 | NULL                |
  | 102     | Pro   | 99  | 2024-01-01 00:00:00 | NULL                |
  +---------+-------+-----+---------------------+---------------------+

Macros et Packages

Les macros permettent de creer du SQL reutilisable. Elles sont essentielles pour maintenir un projet dbt DRY (Don't Repeat Yourself).

SQL - Macro Personnalisee
-- macros/generate_surrogate_key.sql
{% macro generate_surrogate_key(field_list) %}
    {% set fields = [] %}
    {% for field in field_list %}
        {% do fields.append(
            "COALESCE(CAST(" ~ field ~ " AS VARCHAR), '_null_')"
        ) %}
    {% endfor %}
    {{ dbt_utils.hash(fields | join(" || '|' || ")) }}
{% endmacro %}

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    ROUND({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- Utilisation dans un modele :
-- SELECT {{ generate_surrogate_key(['order_id', 'line_item_id']) }} AS sk,
--        {{ cents_to_dollars('amount_cents') }} AS amount_dollars
YAML - packages.yml
packages:
  - package: dbt-labs/dbt_utils
    version: [">=1.0.0", "<2.0.0"]

  - package: dbt-labs/dbt_expectations
    version: [">=0.8.0", "<1.0.0"]

  - package: calogica/dbt_date
    version: [">=0.7.0", "<1.0.0"]

  - git: "https://github.com/mon-org/dbt-internal-macros.git"
    revision: v1.2.0

Hooks et Materialisations Custom

SQL - Hooks (pre et post)
-- dbt_project.yml
models:
  my_project:
    marts:
      +pre-hook:
        - "SET query_tag = 'dbt_marts_{{ this.name }}'"
      +post-hook:
        - "GRANT SELECT ON {{ this }} TO ROLE analyst_role"
        - "ALTER TABLE {{ this }} SET DATA_RETENTION_TIME_IN_DAYS = 7"

-- Ou directement dans un modele :
{{
  config(
    materialized='table',
    post_hook=[
      "CREATE INDEX IF NOT EXISTS idx_{{ this.name }}_user
       ON {{ this }} (user_id)",
      "ANALYZE {{ this }}"
    ]
  )
}}

Shopify : dbt a Grande Echelle

Shopify gere plus de 5 000 modeles dbt dans un mono-repo. Leur approche des patterns avances :

  • Modeles incrementaux partitionnes pour traiter +100 milliards de lignes d'evenements sans full refresh
  • Macros partagees dans un package interne utilise par 30+ equipes data
  • Snapshots automatises pour tracker les changements de prix et d'inventaire sur des millions de produits
  • Materialisation custom "streaming_table" pour les cas ou le batch classique n'etait pas assez rapide

Resultat : temps de build passe de 8h a 45min grace aux incrementaux bien configures.

Startup Fintech : Snapshots Non Testes

Une startup fintech a deploye des snapshots sur les tables de transactions sans tester le comportement lors des changements de schema source.

  • Un ajout de colonne dans la source a casse tous les snapshots existants
  • 3 mois d'historique perdus car les snapshots echouaient silencieusement
  • Le invalidate_hard_deletes n'etait pas active, donc les clients supprimes apparaissaient toujours comme actifs dans les rapports

Lecon : Toujours ajouter des tests dbt sur les snapshots (unicite, non-null sur dbt_valid_from) et monitorer les echecs de snapshot.

Quelle est la difference entre les strategies de snapshot "timestamp" et "check" ?
timestamp compare une colonne updated_at pour detecter les changements : rapide mais necessite une colonne fiable. check compare les valeurs de colonnes specifiees : plus robuste mais plus lent car il doit scanner et comparer chaque ligne.
Quand faut-il faire un --full-refresh sur un modele incremental ?
Lors d'un changement de schema (ajout/suppression de colonne), apres une correction de logique metier, en cas de donnees corrompues, ou periodiquement (hebdomadaire/mensuel) pour garantir la coherence. Utilisez on_schema_change='sync_all_columns' pour les changements mineurs.

dbt CI/CD

40 min Avance

Objectifs

  • Configurer un pipeline CI/CD complet pour dbt avec GitHub Actions
  • Implementer le Slim CI avec comparaison de manifests
  • Maitriser les strategies de deploiement (blue-green, canary)
  • Automatiser les tests, le linting et la documentation
  • Gerer les environnements (dev, staging, prod) avec dbt

Le CI/CD pour dbt est souvent neglige, pourtant c'est lui qui vous protege contre les regressions en production. Un pipeline bien configure vous permet de fusionner vos PR en toute confiance. Investissez du temps dans le Slim CI : il reduit vos couts cloud de 80% sur les builds CI.

Architecture CI/CD pour dbt

Pipeline CI/CD dbt Complet
  PR Ouverte          Merge dans main        Deploiement
  =========          ===============        ============

  [Lint SQL]          [dbt build           [dbt run
   sqlfluff           --target staging]      --target prod]
      |                     |                     |
  [dbt compile]       [dbt test             [dbt test
   --target ci]        --target staging]     --target prod]
      |                     |                     |
  [Slim CI:            [Generate docs]       [Notify Slack]
   dbt build                |                     |
   --select             [Deploy docs          [Update
   state:modified+]      to S3/GCS]           monitoring]
      |
  [dbt test
   --select
   state:modified+]
      |
  [Check results
   & PR comment]

GitHub Actions pour dbt

YAML - .github/workflows/dbt-ci.yml
name: dbt CI

on:
  pull_request:
    branches: [main]
    paths:
      - 'models/**'
      - 'macros/**'
      - 'tests/**'
      - 'dbt_project.yml'

env:
  DBT_PROFILES_DIR: ./ci_profiles
  SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
  SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
  SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

jobs:
  lint:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install sqlfluff dbt-sqlfluff
      - run: sqlfluff lint models/ --dialect snowflake

  slim-ci:
    needs: lint
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - run: pip install dbt-snowflake~=1.7.0

      - name: Telecharger manifest de production
        run: |
          aws s3 cp s3://dbt-artifacts/prod/manifest.json \
            ./target-defer/manifest.json

      - name: dbt deps
        run: dbt deps

      - name: Slim CI - Build modeles modifies
        run: |
          dbt build \
            --select state:modified+ \
            --defer \
            --state ./target-defer \
            --target ci \
            --fail-fast

      - name: Commenter la PR
        if: always()
        uses: actions/github-script@v7
        with:
          script: |
            const fs = require('fs');
            const results = JSON.parse(
              fs.readFileSync('target/run_results.json')
            );
            const summary = results.results.map(r =>
              `| ${r.unique_id} | ${r.status} | ${r.execution_time}s |`
            ).join('\n');
            github.rest.issues.createComment({
              issue_number: context.issue.number,
              owner: context.repo.owner,
              repo: context.repo.repo,
              body: `## Resultats dbt CI\n| Modele | Statut | Temps |\n|---|---|---|\n${summary}`
            });

Slim CI : Comparaison de Manifests

Qu'est-ce que le Slim CI ?

Le Slim CI compare le manifest de production (etat actuel) avec le manifest de la branche PR (etat futur). Seuls les modeles modifies et leurs descendants sont executes, reduisant drastiquement le temps et le cout des builds CI.

Bash - Commandes Slim CI
# Selectionner uniquement les modeles modifies
dbt run --select state:modified --state ./prod-manifest/

# Modeles modifies + leurs enfants (downstream)
dbt run --select state:modified+ --state ./prod-manifest/

# Tester les modeles modifies et 1 niveau d'enfants
dbt test --select state:modified+1 --state ./prod-manifest/

# Combiner avec defer pour utiliser les tables prod non-modifiees
dbt build --select state:modified+ \
          --defer \
          --state ./prod-manifest/ \
          --target ci

# Verifier les changements de schema uniquement
dbt run --select state:modified --state ./prod-manifest/ \
        --vars '{"schema_check_only": true}'

Strategies de Deploiement

Blue-Green Deployment

Deux schemas identiques (blue/green). On build dans le schema inactif, puis on permute.

  • Zero downtime
  • Rollback instantane
  • Double cout de stockage
SQL
-- Permutation atomique
ALTER SCHEMA prod_blue
  RENAME TO prod_old;
ALTER SCHEMA prod_green
  RENAME TO prod_blue;

Rolling Deployment

Les modeles sont mis a jour incrementalement dans le schema de production directement.

  • Simple a implementer
  • Pas de surcharge stockage
  • Risque d'etat inconsistant
Bash
# Build couche par couche
dbt run --select tag:staging
dbt run --select tag:intermediate
dbt run --select tag:marts

GitLab : CI/CD dbt avec Merge Request Pipelines

GitLab utilise dbt en interne pour leur data warehouse analytique et a partage leur approche CI/CD :

  • Slim CI sur chaque MR : seuls les modeles impactes sont testes dans un schema ephemere dedie a la MR
  • Schema CI unique par MR : ci_mr_{{ merge_request_id }} evite les collisions entre branches
  • Cleanup automatique : un job CRON supprime les schemas CI de plus de 48h
  • Comparaison des row counts : un script compare le nombre de lignes entre CI et prod pour detecter les anomalies

Impact : le temps moyen de revue des MR data est passe de 3 jours a 4 heures grace aux commentaires automatiques de resultats CI.

Full Build en CI a chaque PR

Executer dbt build sans selecteurs sur chaque pull request est un gaspillage massif de ressources. Pour un projet de 500 modeles, cela peut couter des centaines de dollars par jour en credits cloud.

Couts observes : Une equipe a depense 15 000 $/mois en credits Snowflake uniquement pour le CI, avant de passer au Slim CI (reduit a 1 200 $/mois).

Solution : Toujours utiliser state:modified+ avec --defer en CI. Planifier un full build uniquement en staging hebdomadaire.

Mise en place d'un CI/CD dbt pour une equipe de 10 analystes

Votre equipe utilise dbt avec Snowflake. Actuellement, chacun execute dbt run manuellement et deploie en prod via dbt run --target prod depuis son poste.

Objectif : Implementer un pipeline CI/CD avec les contraintes suivantes :

  • Chaque PR doit etre validee automatiquement avant merge
  • Le deploiement en prod ne doit se faire que depuis la branche main
  • Le cout CI ne doit pas depasser 500 $/mois
  • Les resultats CI doivent etre visibles directement dans la PR

Approche recommandee : Slim CI avec schemas ephemeres, deploiement Blue-Green automatise via GitHub Actions, et nettoyage CRON des schemas obsoletes.

Securite des Secrets

Ne jamais stocker les identifiants de base de donnees dans le code. Utilisez les secrets GitHub Actions (${{ secrets.* }}), les variables d'environnement, ou un gestionnaire de secrets comme HashiCorp Vault. Verifiez que profiles.yml est bien dans .gitignore.

Que fait l'option --defer dans dbt ?
L'option --defer permet a dbt de referencer les tables de production pour les modeles qui ne sont pas inclus dans la selection courante. Ainsi, un modele modifie peut s'appuyer sur les tables prod existantes sans avoir a les reconstruire. C'est essentiel pour le Slim CI.

Lab : Projet dbt Complet

90 min Avance

Objectifs

  • Construire un projet dbt complet de bout en bout
  • Implementer les couches staging, intermediate et mart
  • Appliquer les conventions de nommage et les tests
  • Configurer la documentation et les exposures
  • Creer un DAG coherent et performant

Ce lab est votre meilleur entrainement avant un projet reel. Prenez le temps de bien structurer vos couches. Un bon projet dbt se reconnait a la clarte de son DAG : staging en entree, marts en sortie, et des couches intermediaires bien nommees entre les deux. Ne sautez pas l'etape des tests et de la documentation !

Contexte du Projet

Vous etes Data Engineer chez un e-commerce. Vous devez construire un data warehouse analytique a partir de trois sources : le systeme de commandes (PostgreSQL), le CRM (Salesforce), et les evenements web (Segment). L'objectif final est de fournir des marts pour le marketing, la finance et le produit.

Architecture du Projet dbt
  Sources                  Staging              Intermediate              Marts
  ========                =========            ==============           ========

  [PostgreSQL]  ------>  stg_orders            int_orders_enriched      fct_orders
  [orders,               stg_order_items       int_customer_orders      fct_daily_revenue
   products,             stg_products                                   dim_products
   customers]            stg_customers

  [Salesforce]  ------>  stg_sf_accounts       int_customer_360         dim_customers
  [accounts,             stg_sf_contacts                                fct_customer_ltv
   opportunities]        stg_sf_opportunities

  [Segment]     ------>  stg_web_events        int_session_events       fct_sessions
  [pages,                stg_web_pages         int_user_journey         fct_funnel_analysis
   identifies,
   tracks]

Etape 1 : Initialisation du Projet

1.1 Structure des Dossiers

Bash
dbt init ecommerce_analytics
cd ecommerce_analytics

# Structure recommandee
mkdir -p models/staging/postgres
mkdir -p models/staging/salesforce
mkdir -p models/staging/segment
mkdir -p models/intermediate
mkdir -p models/marts/marketing
mkdir -p models/marts/finance
mkdir -p models/marts/product
mkdir -p tests/generic
mkdir -p macros
mkdir -p snapshots
mkdir -p seeds

1.2 Configuration dbt_project.yml

YAML - dbt_project.yml
name: 'ecommerce_analytics'
version: '1.0.0'
config-version: 2
profile: 'ecommerce'

model-paths: ["models"]
test-paths: ["tests"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
seed-paths: ["seeds"]

models:
  ecommerce_analytics:
    staging:
      +materialized: view
      +schema: staging
      postgres:
        +tags: ['postgres', 'staging']
      salesforce:
        +tags: ['salesforce', 'staging']
      segment:
        +tags: ['segment', 'staging']
    intermediate:
      +materialized: ephemeral
      +schema: intermediate
    marts:
      +materialized: table
      marketing:
        +schema: marketing
        +tags: ['marketing']
      finance:
        +schema: finance
        +tags: ['finance']
      product:
        +schema: product
        +tags: ['product']

1.3 Definir les Sources

YAML - models/staging/postgres/_sources.yml
version: 2
sources:
  - name: postgres
    database: raw_db
    schema: ecommerce
    description: "Systeme transactionnel e-commerce"
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    loaded_at_field: _loaded_at
    tables:
      - name: orders
        description: "Commandes clients"
        columns:
          - name: order_id
            description: "Identifiant unique de commande"
            tests:
              - unique
              - not_null
      - name: order_items
        description: "Lignes de commande"
      - name: products
        description: "Catalogue produits"
      - name: customers
        description: "Comptes clients"

Etape 2 : Couche Staging

2.1 Modele stg_orders.sql

SQL - models/staging/postgres/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
),

renamed AS (
    SELECT
        -- Cles
        order_id,
        customer_id,

        -- Dimensions
        LOWER(status) AS order_status,
        LOWER(payment_method) AS payment_method,
        shipping_country,

        -- Metriques
        {{ cents_to_dollars('total_amount_cents') }} AS order_total,
        {{ cents_to_dollars('shipping_cost_cents') }} AS shipping_cost,
        {{ cents_to_dollars('discount_cents') }} AS discount_amount,

        -- Timestamps
        ordered_at,
        shipped_at,
        delivered_at,
        cancelled_at,

        -- Meta
        _loaded_at

    FROM source
    WHERE order_id IS NOT NULL
)

SELECT * FROM renamed

2.2 Tests et Documentation Staging

YAML - models/staging/postgres/_stg_postgres.yml
version: 2
models:
  - name: stg_orders
    description: "Commandes nettoyees et renommees"
    columns:
      - name: order_id
        description: "PK - Identifiant unique"
        tests:
          - unique
          - not_null
      - name: order_status
        tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped',
                       'delivered', 'cancelled', 'refunded']
      - name: order_total
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 100000

Etape 3 : Couche Intermediate

3.1 Enrichissement des Commandes

SQL - models/intermediate/int_orders_enriched.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
),

order_items AS (
    SELECT * FROM {{ ref('stg_order_items') }}
),

item_summary AS (
    SELECT
        order_id,
        COUNT(*) AS item_count,
        COUNT(DISTINCT product_id) AS unique_products,
        SUM(quantity) AS total_units
    FROM order_items
    GROUP BY order_id
),

enriched AS (
    SELECT
        o.*,
        s.item_count,
        s.unique_products,
        s.total_units,
        DATEDIFF('day', o.ordered_at, o.shipped_at) AS days_to_ship,
        DATEDIFF('day', o.shipped_at, o.delivered_at) AS days_to_deliver,
        CASE
            WHEN o.order_total >= 500 THEN 'high_value'
            WHEN o.order_total >= 100 THEN 'medium_value'
            ELSE 'low_value'
        END AS order_tier
    FROM orders o
    LEFT JOIN item_summary s ON o.order_id = s.order_id
)

SELECT * FROM enriched

Etape 4 : Couche Marts

4.1 Fact Table : fct_daily_revenue

SQL - models/marts/finance/fct_daily_revenue.sql
{{
  config(
    materialized='incremental',
    unique_key='revenue_date',
    incremental_strategy='merge'
  )
}}

WITH orders AS (
    SELECT * FROM {{ ref('int_orders_enriched') }}
    WHERE order_status NOT IN ('cancelled', 'refunded')
    {% if is_incremental() %}
      AND ordered_at >= (
        SELECT DATEADD('day', -3, MAX(revenue_date))
        FROM {{ this }}
      )
    {% endif %}
),

daily_agg AS (
    SELECT
        DATE(ordered_at) AS revenue_date,
        COUNT(DISTINCT order_id) AS order_count,
        COUNT(DISTINCT customer_id) AS unique_customers,
        SUM(order_total) AS gross_revenue,
        SUM(discount_amount) AS total_discounts,
        SUM(order_total - discount_amount) AS net_revenue,
        SUM(shipping_cost) AS total_shipping,
        AVG(order_total) AS avg_order_value,
        SUM(total_units) AS total_units_sold
    FROM orders
    GROUP BY DATE(ordered_at)
)

SELECT * FROM daily_agg

Convention de Nommage dbt Labs

Les conventions officielles recommandees par dbt Labs, utilisees par la majorite des projets matures :

  • stg_[source]_[entite] : Staging, nettoyage 1:1 avec la source
  • int_[entite]_[verbe] : Intermediate, enrichissement et jointures
  • fct_[entite] : Fact table dans les marts (evenements, transactions)
  • dim_[entite] : Dimension table dans les marts (entites de reference)
  • Pas de logique metier dans staging : uniquement renommage, typage, filtrage null

Marts qui referent directement les sources

Ne jamais faire {{ source('raw', 'orders') }} dans un mart. Les marts doivent toujours passer par staging (et idealement intermediate). Sinon :

  • Toute modification de schema source casse N modeles au lieu d'un seul
  • La logique de nettoyage est dupliquee dans chaque mart
  • Le DAG devient un plat de spaghetti impossible a maintenir

Conseil de Performance

Materialisez vos modeles intermediate en ephemeral par defaut. Ils seront inlines comme CTE dans les modeles downstream. Si un intermediate est reference par 3+ modeles, passez-le en view pour eviter la duplication de calcul.

Quiz : dbt Maitrise

20 min Avance

Objectifs

  • Valider la comprehension des concepts dbt fondamentaux et avances
  • Tester les connaissances sur les patterns, le CI/CD et les bonnes pratiques
  • Identifier les lacunes a approfondir avant le module suivant

Ce quiz couvre l'ensemble du module dbt. Prenez votre temps et reflechissez bien a chaque question. Si vous obtenez moins de 6/8, relisez les lecons correspondantes avant de passer au module Orchestration.

Quiz dbt - 8 Questions

1. Quelle materialisation dbt est la plus adaptee pour une table de faits de 500 millions de lignes alimentee quotidiennement ?

A. table
B. view
C. incremental
D. ephemeral
Correct : C. incremental. Pour les grandes tables alimentees regulierement, la materialisation incremental permet de ne traiter que les nouvelles donnees, evitant un full rebuild couteux.

2. Que fait la commande dbt build --select state:modified+ --defer --state ./prod-manifest/ ?

A. Reconstruit tous les modeles du projet
B. Execute uniquement les modeles modifies et leurs descendants, en utilisant les tables prod pour les dependances non-modifiees
C. Compare les schemas sans executer de requetes
D. Supprime les modeles obsoletes et reconstruit les modifies
Correct : B. Le selecteur state:modified+ cible les modeles modifies et leurs descendants (le + signifie downstream). L'option --defer permet de referencer les tables de production pour les dependances non incluses dans la selection.

3. Dans un snapshot dbt avec la strategie "timestamp", que se passe-t-il si la colonne updated_at n'est pas fiable ?

A. Le snapshot echoue avec une erreur
B. Des changements peuvent etre manques silencieusement
C. dbt bascule automatiquement sur la strategie "check"
D. Les doublons sont automatiquement elimines
Correct : B. Si updated_at n'est pas mis a jour lors de certaines modifications, le snapshot ne detectera pas ces changements. Les lignes modifiees ne seront pas historisees, entrainant une perte de donnees silencieuse.

4. Quelle couche du projet dbt doit contenir la logique metier complexe (jointures, calculs derives) ?

A. staging
B. intermediate
C. marts
D. sources
Correct : B. intermediate. La couche staging fait uniquement du renommage et nettoyage (1:1 avec la source). La couche intermediate est le lieu ideal pour les jointures, enrichissements et transformations metier. Les marts consomment ces modeles pour exposer les tables finales.

5. Quel est le role principal d'une macro dbt ?

A. Planifier l'execution des modeles a intervalles reguliers
B. Generer du SQL reutilisable et parametrable via Jinja
C. Gerer les connexions aux bases de donnees
D. Creer des visualisations de donnees
Correct : B. Les macros dbt sont des blocs de code Jinja reutilisables qui generent du SQL. Elles permettent d'eviter la duplication de code et d'appliquer le principe DRY (Don't Repeat Yourself) a l'echelle du projet.

6. Quel est l'avantage principal du deploiement Blue-Green pour dbt ?

A. Il reduit le cout de stockage de moitie
B. Il permet un rollback instantane en cas de probleme
C. Il accelere le temps de build de 50%
D. Il elimine le besoin de tests
Correct : B. Le deploiement Blue-Green maintient deux environnements identiques. Si le nouveau deploiement (green) echoue, on peut revenir instantanement a l'ancien (blue) par un simple swap de schema. Le cout est un doublement du stockage.

7. Quelle incremental_strategy est la plus adaptee pour BigQuery avec des donnees partitionnees par date ?

A. append
B. merge
C. insert_overwrite
D. delete+insert
Correct : C. insert_overwrite. Sur BigQuery, insert_overwrite est la strategie recommandee pour les tables partitionnees. Elle ecrase uniquement les partitions concernees, ce qui est plus performant et moins couteux qu'un merge sur l'ensemble de la table.

8. Que signifie le selecteur state:modified+1 dans une commande dbt ?

A. Execute les modeles modifies et tous leurs descendants
B. Execute les modeles modifies et 1 niveau de descendants directs
C. Execute 1 seul modele modifie
D. Execute les modeles modifies et 1 niveau d'ancetres
Correct : B. Le +1 apres le selecteur indique de prendre les modeles correspondants (ici state:modified) plus 1 niveau de descendants directs (enfants). Le + sans chiffre prend tous les descendants.

Evaluation

8/8 : Excellent ! Vous etes pret pour le module Orchestration.
6-7/8 : Bon niveau. Relisez les points manques avant de continuer.
Moins de 6 : Revoyez les lecons 7 a 10 avant de poursuivre.

Apache Airflow

50 min Avance

Objectifs

  • Comprendre l'architecture d'Airflow et ses composants cles
  • Creer et configurer des DAGs avec operators, sensors et XCom
  • Maitriser les connections, pools et variables
  • Comparer les executors : Local, Celery et Kubernetes
  • Appliquer les bonnes pratiques de production

Airflow est le standard de facto pour l'orchestration de pipelines data. Meme si des alternatives plus modernes existent, 80% des equipes data utilisent Airflow en production. Comprendre ses forces ET ses limites est essentiel pour un Data Architect. La cle : un DAG Airflow orchestre, il ne transforme pas. Gardez la logique metier dans dbt ou Spark, pas dans vos DAGs.

Architecture d'Airflow

Composants Airflow
                         +------------------+
                         |   Webserver      |
                         |   (Flask UI)     |
                         +--------+---------+
                                  |
                                  v
  +------------------+   +------------------+   +------------------+
  |   Scheduler      |-->|   Metadata DB    |<--|   Triggerer      |
  |   (Planification)|   |   (PostgreSQL)   |   |   (Async Sensors)|
  +--------+---------+   +------------------+   +------------------+
           |
           v
  +------------------+   +------------------+   +------------------+
  |   Executor       |-->|   Worker 1       |   |   Worker N       |
  |   (Celery/K8s)   |   |   (Task exec)   |   |   (Task exec)   |
  +------------------+   +------------------+   +------------------+
           |
           v
  +------------------+
  |   Message Broker |
  |   (Redis/RabbitMQ)|
  +------------------+

Anatomie d'un DAG

Python - DAG Airflow Complet
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.utils.task_group import TaskGroup

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['data-alerts@company.com'],
    'email_on_failure': True,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
    'sla': timedelta(hours=4),
}

with DAG(
    dag_id='ecommerce_daily_pipeline',
    default_args=default_args,
    description='Pipeline quotidien e-commerce ELT',
    schedule_interval='0 6 * * *',  # Tous les jours a 6h
    start_date=datetime(2024, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=['ecommerce', 'daily', 'production'],
) as dag:

    # Sensor : attendre que l'ingestion soit terminee
    wait_for_ingestion = ExternalTaskSensor(
        task_id='wait_for_ingestion',
        external_dag_id='raw_data_ingestion',
        external_task_id='load_complete',
        timeout=3600,
        poke_interval=60,
        mode='reschedule',  # Libere le worker entre les checks
    )

    # Verifier la fraicheur des donnees
    check_freshness = BashOperator(
        task_id='check_source_freshness',
        bash_command='cd /opt/dbt && dbt source freshness --select source:postgres',
        pool='dbt_pool',
    )

    # Groupe de taches : dbt build
    with TaskGroup('dbt_build', tooltip='Build dbt par couche') as dbt_group:
        staging = BashOperator(
            task_id='dbt_staging',
            bash_command='cd /opt/dbt && dbt run --select tag:staging',
            pool='dbt_pool',
        )
        intermediate = BashOperator(
            task_id='dbt_intermediate',
            bash_command='cd /opt/dbt && dbt run --select tag:intermediate',
            pool='dbt_pool',
        )
        marts = BashOperator(
            task_id='dbt_marts',
            bash_command='cd /opt/dbt && dbt run --select tag:marts',
            pool='dbt_pool',
        )
        staging >> intermediate >> marts

    # Tests dbt
    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='cd /opt/dbt && dbt test --select tag:marts',
        pool='dbt_pool',
    )

    # Notification
    def notify_success(**context):
        execution_date = context['execution_date']
        print(f"Pipeline complete pour {execution_date}")

    notify = PythonOperator(
        task_id='notify_success',
        python_callable=notify_success,
    )

    wait_for_ingestion >> check_freshness >> dbt_group >> dbt_test >> notify

XCom : Communication entre Taches

Python - XCom
# Pousser des donnees vers XCom
def extract_stats(**context):
    stats = {'total_rows': 150000, 'new_rows': 1200, 'errors': 3}
    context['ti'].xcom_push(key='pipeline_stats', value=stats)
    return stats

# Tirer des donnees depuis XCom
def check_quality(**context):
    stats = context['ti'].xcom_pull(
        task_ids='extract_stats',
        key='pipeline_stats'
    )
    if stats['errors'] > 100:
        raise ValueError(f"Trop d'erreurs : {stats['errors']}")
    print(f"Qualite OK : {stats['new_rows']} nouvelles lignes")

Limites de XCom

XCom stocke les donnees dans la metadata DB (PostgreSQL). Ne transmettez jamais de gros volumes via XCom (max quelques Ko). Pour les DataFrames, utilisez un stockage intermediaire (S3, GCS) et passez uniquement le chemin via XCom.

Connections, Pools et Variables

ConceptRoleExemple
ConnectionIdentifiants de connexion aux systemes externesSnowflake, S3, Slack, SMTP
PoolLimite le nombre de taches concurrentes pour une ressourcedbt_pool: max 4 slots
VariableConfiguration dynamique accessible dans les DAGsenv=production, lookback_days=3

Comparaison des Executors

ExecutorScalabiliteIsolationComplexiteCas d'usage
LocalExecutorFaible (1 machine)AucuneSimpleDev, petites equipes
CeleryExecutorHaute (N workers)ProcessusMoyenneProduction standard
KubernetesExecutorTres hauteContainerEleveeMulti-tenant, cloud-native

Spotify : Airflow a l'Echelle Massive

Spotify est l'un des plus grands utilisateurs d'Airflow au monde avec un deploiement impressionnant :

  • +30 000 DAGs geres par plus de 600 ingenieurs data a travers l'entreprise
  • KubernetesExecutor pour l'isolation complete entre les equipes et la scalabilite dynamique
  • Systeme de "Golden Paths" : templates de DAGs standardises que les equipes peuvent reutiliser
  • Monitoring custom avec Datadog : alertes sur les SLA, les retries excessifs, et les DAGs bloques
  • Migration progressive vers une architecture event-driven pour les cas temps reel

Lecon cle : A grande echelle, la standardisation et le monitoring sont aussi importants que l'outil lui-meme.

E-commerce : DAGs Monolithiques

Un e-commerce francais a deploye Airflow avec des DAGs contenant plus de 200 taches chacun, melangeant extraction, transformation et chargement dans un seul pipeline.

  • Le scheduler mettait 15 minutes a parser un seul DAG au lieu de quelques secondes
  • Un echec sur la tache 180 necessitait de relancer tout le DAG (pas de clear granulaire pratique)
  • Les dependances entre taches formaient un graphe tellement complexe que personne ne comprenait le flux
  • La metadata DB a atteint 200 Go, ralentissant l'ensemble du cluster

Solution appliquee : Decomposition en 40 DAGs modulaires avec ExternalTaskSensor, nettoyage regulier de la metadata DB, et delegation de la transformation a dbt.

Logique Metier dans les DAGs

Placer des transformations SQL ou des calculs complexes directement dans les PythonOperator est un anti-pattern majeur. Le DAG Airflow doit uniquement orchestrer (declencher, ordonnancer, monitorer), pas transformer.

Problemes : Code non testable, pas de versioning des transformations, impossible de re-executer une transformation sans Airflow.

Solution : Utilisez dbt pour les transformations SQL, Spark pour le processing, et Airflow uniquement pour les declencher au bon moment.

Quelle est la difference entre mode='poke' et mode='reschedule' pour un Sensor Airflow ?
poke : le worker reste occupe et verifie periodiquement la condition (bloque un slot). reschedule : le worker est libere entre les verifications, la tache est replanifiee a chaque intervalle. Utilisez reschedule pour les attentes longues (>5 min) afin de ne pas gaspiller des slots worker.
Pourquoi ne faut-il jamais utiliser catchup=True en production sans precaution ?
Avec catchup=True, Airflow execute retroactivement toutes les executions manquees entre start_date et maintenant. Si vous avez un DAG quotidien avec un start_date d'il y a un an, cela declenchera 365 executions simultanees, surchargeant votre cluster et potentiellement votre base de donnees cible.

Dagster

45 min Avance

Objectifs

  • Comprendre la philosophie "Software-Defined Assets" de Dagster
  • Maitriser les concepts cles : assets, ops, graphs, jobs et IO managers
  • Utiliser Dagit (l'interface web) pour le monitoring et le debug
  • Configurer les resources et les schedules
  • Comparer l'approche Dagster vs Airflow

Dagster represente un changement de paradigme par rapport a Airflow. Au lieu de definir "comment orchestrer des taches", vous definissez "quels assets de donnees existent et comment ils sont produits". C'est une approche plus declarative qui correspond mieux a la realite du travail data. Si vous demarrez un nouveau projet en 2024+, Dagster merite serieusement d'etre considere.

Philosophie : Software-Defined Assets

Changement de Paradigme

Airflow : "Je definis des taches et leur ordre d'execution." (Task-centric)
Dagster : "Je definis des assets de donnees et comment ils dependent les uns des autres." (Asset-centric)
Dagster deduit automatiquement le graphe d'execution a partir des dependances entre assets.

Comparaison des Approches
  Airflow (Task-centric)              Dagster (Asset-centric)
  ======================              ======================

  DAG:                                Assets:
    task_extract  ----+               raw_orders (source)
    task_transform ---+---> task_load     |
    task_notify ------+               clean_orders (depend de raw_orders)
                                          |
  "Execute A puis B puis C"           daily_revenue (depend de clean_orders)

                                      "Ces assets existent, voici
                                       comment les produire"

Software-Defined Assets

Python - Assets Dagster
import dagster as dg
import pandas as pd

# Asset source
@dg.asset(
    description="Commandes brutes depuis PostgreSQL",
    group_name="staging",
    compute_kind="sql",
    metadata={"owner": "data-team", "freshness_policy": "daily"},
)
def raw_orders(context: dg.AssetExecutionContext) -> pd.DataFrame:
    """Extraction des commandes depuis la base transactionnelle."""
    query = "SELECT * FROM orders WHERE date >= CURRENT_DATE - INTERVAL '1 day'"
    df = pd.read_sql(query, context.resources.postgres_conn)
    context.log.info(f"Extrait {len(df)} commandes")
    context.add_output_metadata({
        "row_count": len(df),
        "date_range": f"{df['ordered_at'].min()} - {df['ordered_at'].max()}"
    })
    return df


# Asset dependant
@dg.asset(
    description="Commandes nettoyees et enrichies",
    group_name="intermediate",
    compute_kind="pandas",
    deps=[raw_orders],  # Dependance explicite
)
def clean_orders(context: dg.AssetExecutionContext, raw_orders: pd.DataFrame) -> pd.DataFrame:
    """Nettoyage : suppression des doublons, typage, filtrage."""
    df = raw_orders.copy()
    df = df.drop_duplicates(subset=['order_id'])
    df['order_total'] = df['total_amount_cents'] / 100.0
    df['ordered_at'] = pd.to_datetime(df['ordered_at'])
    df = df[df['order_total'] > 0]

    context.log.info(f"Apres nettoyage : {len(df)} commandes valides")
    return df


# Asset mart
@dg.asset(
    description="Revenu quotidien agrege",
    group_name="marts",
    compute_kind="pandas",
    freshness_policy=dg.FreshnessPolicy(maximum_lag_minutes=60 * 8),
)
def daily_revenue(context: dg.AssetExecutionContext, clean_orders: pd.DataFrame) -> pd.DataFrame:
    """Agregation du revenu par jour pour le dashboard finance."""
    df = clean_orders.groupby(clean_orders['ordered_at'].dt.date).agg(
        order_count=('order_id', 'nunique'),
        gross_revenue=('order_total', 'sum'),
        avg_order_value=('order_total', 'mean'),
    ).reset_index()

    context.add_output_metadata({
        "row_count": len(df),
        "total_revenue": float(df['gross_revenue'].sum()),
    })
    return df

Ops, Graphs et Jobs

Pour les cas ou les assets ne suffisent pas (operations sans sortie durable, side-effects), Dagster propose les ops et graphs :

Python - Ops et Graphs
from dagster import op, graph, job, In, Out, String

@op(
    description="Valide la qualite des donnees",
    ins={"table_name": In(String)},
    out={"validation_result": Out(bool)},
)
def validate_data_quality(context, table_name: str) -> bool:
    """Execute des checks de qualite sur une table."""
    checks = {
        'null_check': f"SELECT COUNT(*) FROM {table_name} WHERE id IS NULL",
        'dup_check': f"SELECT COUNT(*) - COUNT(DISTINCT id) FROM {table_name}",
    }
    all_passed = True
    for name, query in checks.items():
        result = context.resources.db.execute(query)
        if result > 0:
            context.log.warning(f"Check {name} echoue : {result} problemes")
            all_passed = False
    return all_passed


@op
def send_alert(context, validation_passed: bool):
    if not validation_passed:
        context.resources.slack.send_message(
            channel="#data-alerts",
            text="Probleme de qualite detecte !"
        )


@graph
def quality_check_pipeline():
    result = validate_data_quality()
    send_alert(result)


# Transformer le graph en job executable
quality_job = quality_check_pipeline.to_job(
    resource_defs={
        "db": snowflake_resource,
        "slack": slack_resource,
    }
)

IO Managers

Python - IO Manager Custom
from dagster import IOManager, io_manager, InputContext, OutputContext
import pandas as pd

class SnowflakeIOManager(IOManager):
    """Stocke et charge les assets depuis Snowflake."""

    def __init__(self, connection_string: str, schema: str):
        self.connection_string = connection_string
        self.schema = schema

    def handle_output(self, context: OutputContext, obj: pd.DataFrame):
        table_name = context.asset_key.to_user_string().replace("/", "_")
        full_table = f"{self.schema}.{table_name}"
        obj.to_sql(full_table, self.connection_string, if_exists='replace')
        context.log.info(f"Ecrit {len(obj)} lignes dans {full_table}")

    def load_input(self, context: InputContext) -> pd.DataFrame:
        table_name = context.asset_key.to_user_string().replace("/", "_")
        full_table = f"{self.schema}.{table_name}"
        return pd.read_sql(f"SELECT * FROM {full_table}", self.connection_string)

@io_manager(config_schema={"connection_string": str, "schema": str})
def snowflake_io_manager(context):
    return SnowflakeIOManager(
        connection_string=context.resource_config["connection_string"],
        schema=context.resource_config["schema"],
    )

Dagit : Interface Web

Fonctionnalites de Dagit

Dagit offre une experience developpeur superieure a l'UI Airflow :

  • Global Asset Lineage : graphe complet de tous les assets et leurs dependances
  • Asset Catalog : documentation automatique de chaque asset avec metadata
  • Freshness monitoring : alerte si un asset n'est pas rafraichi a temps
  • Backfill partitionne : reprocesser des partitions specifiques en quelques clics
  • Launchpad : configuration et lancement interactif des jobs

Elementl (Createurs de Dagster) : Migration chez de Grands Clients

Plusieurs entreprises majeures ont migre d'Airflow vers Dagster avec des resultats mesurables :

  • Drizly (e-commerce alcool, acquis par Uber) : migration de 200+ DAGs Airflow vers Dagster en 3 mois. Reduction de 60% du temps de debug grace au lineage global des assets
  • Materialize : utilise Dagster pour orchestrer ses pipelines internes. Les Software-Defined Assets ont reduit le nombre de lignes de code de pipeline de 40%
  • Le pattern "asset-centric" a naturellement encourage les equipes a mieux documenter leurs donnees, car la documentation fait partie de la definition de l'asset

Tout Mettre dans des Ops au lieu d'Assets

L'erreur classique des equipes migrant depuis Airflow : reproduire le pattern task-centric avec des ops au lieu d'adopter les Software-Defined Assets.

  • Perte du lineage automatique des donnees
  • Pas de freshness monitoring
  • Pas de backfill intelligent par partition
  • Code plus complexe sans benefice

Regle : Si votre operation produit un dataset persistant (table, fichier), c'est un asset. Reservez les ops pour les side-effects purs (notifications, appels API sans stockage).

Airflow - Forces

  • Ecosysteme mature (800+ providers)
  • Communaute massive
  • Services manages (MWAA, Cloud Composer)
  • Documentation abondante
  • Ideal pour orchestration pure

Dagster - Forces

  • Asset-centric (paradigme moderne)
  • Testabilite native
  • UI/UX superieure (Dagit)
  • Freshness policies
  • Ideal pour equipes data modernes
Qu'est-ce qu'un IO Manager dans Dagster et pourquoi est-ce important ?
Un IO Manager gere la serialisation/deserialisation des donnees entre les assets. Il abstrait le stockage (Snowflake, S3, local) du code metier. Cela permet de changer de stockage sans modifier les assets, et de tester localement avec un IO Manager fichier tout en utilisant Snowflake en production.

Prefect

45 min Avance

Objectifs

  • Comprendre la philosophie de Prefect et son approche "Pythonic"
  • Maitriser les flows, tasks, deployments et work pools
  • Utiliser les blocks pour la configuration et les secrets
  • Configurer Prefect Cloud et le monitoring
  • Comparer Prefect avec Airflow et Dagster

Prefect se positionne comme l'orchestrateur le plus "developer-friendly". Son approche est radicalement differente d'Airflow : pas de DAG a declarer, votre code Python devient automatiquement un workflow en ajoutant des decorateurs. C'est l'outil ideal si vous voulez la puissance d'un orchestrateur sans la complexite operationnelle d'Airflow.

Philosophie de Prefect

Les 3 Principes de Prefect

1. Code natif Python : Pas de DSL, pas de fichier de config YAML complexe. Votre code Python + decorateurs = workflow.
2. Negative engineering : Prefect gere tout ce qui peut mal tourner (retries, timeouts, logging, notifications) sans que vous ayez a le coder.
3. Hybrid execution : L'orchestration dans le cloud, l'execution dans votre infrastructure. Vos donnees ne transitent jamais par Prefect Cloud.

Architecture Hybride Prefect
  +-------------------------------------------+
  |            Prefect Cloud                   |
  |  +----------+  +-----------+  +---------+ |
  |  | API      |  | Dashboard |  | Automations| |
  |  | Server   |  | (UI)      |  | & Alerts  | |
  |  +----+-----+  +-----------+  +---------+ |
  +-------|-----------------------------------+
          | API calls (metadata seulement)
          v
  +-------------------------------------------+
  |        Votre Infrastructure                |
  |  +-------------+    +------------------+   |
  |  | Work Pool   |--->| Worker           |   |
  |  | (Process/   |    | (execute flows)  |   |
  |  |  Docker/K8s)|    +------------------+   |
  |  +-------------+                           |
  |       |                                    |
  |  +----v------+    +------------------+     |
  |  | Flow Run  |--->| Vos Donnees      |     |
  |  | (local)   |    | (DB, S3, DWH)    |     |
  |  +-----------+    +------------------+     |
  +-------------------------------------------+

Flows et Tasks

Python - Flow Prefect Complet
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import pandas as pd
import httpx

@task(
    name="extract-api-data",
    description="Extrait les donnees depuis l'API externe",
    retries=3,
    retry_delay_seconds=30,
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=1),
    tags=["extract", "api"],
)
def extract_from_api(endpoint: str, date: str) -> dict:
    """Appel API avec retry automatique et cache."""
    logger = get_run_logger()
    response = httpx.get(
        f"https://api.ecommerce.com/v2/{endpoint}",
        params={"date": date},
        timeout=30,
    )
    response.raise_for_status()
    data = response.json()
    logger.info(f"Extrait {len(data['results'])} enregistrements depuis {endpoint}")
    return data


@task(
    name="transform-orders",
    tags=["transform"],
)
def transform_orders(raw_data: dict) -> pd.DataFrame:
    """Transforme les donnees brutes en DataFrame structure."""
    logger = get_run_logger()
    df = pd.DataFrame(raw_data['results'])
    df['order_total'] = df['amount_cents'] / 100.0
    df['ordered_at'] = pd.to_datetime(df['ordered_at'])
    df = df[df['order_total'] > 0]

    logger.info(f"Transforme {len(df)} commandes valides")
    return df


@task(
    name="load-to-warehouse",
    retries=2,
    retry_delay_seconds=60,
    tags=["load", "snowflake"],
)
def load_to_snowflake(df: pd.DataFrame, table_name: str):
    """Charge les donnees dans Snowflake."""
    logger = get_run_logger()
    from prefect_snowflake import SnowflakeConnector
    connector = SnowflakeConnector.load("prod-snowflake")
    with connector.get_connection() as conn:
        df.to_sql(table_name, conn, if_exists='append', index=False)
    logger.info(f"Charge {len(df)} lignes dans {table_name}")


@flow(
    name="ecommerce-daily-pipeline",
    description="Pipeline ELT quotidien e-commerce",
    version="2.1.0",
    retries=1,
    retry_delay_seconds=300,
    log_prints=True,
)
def daily_pipeline(target_date: str = None):
    """Pipeline principal : extract, transform, load."""
    from datetime import date
    target = target_date or str(date.today())

    # Extract (parallel par defaut si independant)
    orders_raw = extract_from_api("orders", target)
    products_raw = extract_from_api("products", target)

    # Transform
    orders_clean = transform_orders(orders_raw)

    # Load
    load_to_snowflake(orders_clean, "staging.orders")

    print(f"Pipeline termine pour {target}")


# Execution locale pour test
if __name__ == "__main__":
    daily_pipeline(target_date="2024-03-15")

Deployments et Work Pools

Python - Deployment avec prefect.yaml
# prefect.yaml
deployments:
  - name: ecommerce-daily
    entrypoint: pipelines/daily.py:daily_pipeline
    work_pool:
      name: production-k8s
      job_variables:
        image: registry.company.com/data-pipelines:latest
        cpu: "2000m"
        memory: "4Gi"
    schedule:
      cron: "0 6 * * *"
      timezone: "Europe/Paris"
    parameters:
      target_date: null  # Sera calcule automatiquement
    tags: ["production", "daily", "ecommerce"]

  - name: ecommerce-backfill
    entrypoint: pipelines/daily.py:daily_pipeline
    work_pool:
      name: production-k8s
      job_variables:
        image: registry.company.com/data-pipelines:latest
        cpu: "4000m"
        memory: "8Gi"
    tags: ["backfill", "ecommerce"]
Bash - Commandes Prefect CLI
# Creer un work pool Kubernetes
prefect work-pool create production-k8s --type kubernetes

# Demarrer un worker
prefect worker start --pool production-k8s

# Deployer les flows
prefect deploy --all

# Lancer manuellement un flow
prefect deployment run 'ecommerce-daily-pipeline/ecommerce-daily' \
  --param target_date=2024-03-01

# Backfill sur une plage de dates
for d in $(seq -w 1 31); do
  prefect deployment run 'ecommerce-daily-pipeline/ecommerce-backfill' \
    --param target_date=2024-03-$d
done

Blocks : Configuration et Secrets

Python - Blocks Prefect
from prefect_snowflake import SnowflakeConnector
from prefect.blocks.notifications import SlackWebhook
from prefect.blocks.system import Secret

# Configurer un block Snowflake (via UI ou code)
snowflake_block = SnowflakeConnector(
    account="myorg-account",
    user="dbt_user",
    password="...",  # Ou reference a un Secret block
    database="analytics",
    warehouse="transform_wh",
    schema="public",
    role="transformer",
)
snowflake_block.save("prod-snowflake", overwrite=True)

# Block Slack pour les notifications
slack = SlackWebhook(url="https://hooks.slack.com/services/...")
slack.save("data-alerts", overwrite=True)

# Utilisation dans un flow
@flow
def monitored_pipeline():
    connector = SnowflakeConnector.load("prod-snowflake")
    slack = SlackWebhook.load("data-alerts")

    try:
        # ... logique pipeline
        pass
    except Exception as e:
        slack.notify(f"Pipeline echoue : {str(e)}")
        raise

Automations et Monitoring

Prefect Cloud Automations

Prefect Cloud offre un systeme d'automations declenchees par des evenements :

  • Flow run failed : Envoyer une alerte Slack et creer un ticket Jira
  • Flow run late : Notifier l'equipe si un flow planifie n'a pas demarre
  • Work pool unhealthy : Alerter l'ops si aucun worker n'est disponible
  • Custom events : Declencher un flow B a la completion du flow A

Shopify avec Prefect : Orchestration Cloud-Native

Shopify a adopte Prefect pour certains de ses pipelines data en complement d'Airflow, notamment pour les cas d'usage necessitant une grande flexibilite :

  • Pipelines ML : Les flows Prefect orchestrent l'entrainement et le deploiement des modeles de recommandation, avec des retries intelligents sur les echecs GPU
  • Work pools differencies : Un pool CPU pour l'ETL standard, un pool GPU pour le ML, un pool spot pour les backfills (cout reduit de 70%)
  • Execution hybride : L'orchestration dans Prefect Cloud, l'execution dans leurs clusters Kubernetes prives -- zero donnees dans le cloud Prefect
  • Integration dbt : Les flows Prefect declenchent dbt Cloud via l'API, avec monitoring des resultats et alertes automatiques

Ignorer le Cache des Tasks

Ne pas utiliser le cache de Prefect sur les tasks d'extraction est un gaspillage majeur de ressources, surtout lors du developpement et des re-runs.

  • Chaque re-run rappelle les APIs externes inutilement
  • Les quotas API sont consommes pour rien
  • Le temps de test est multiplie par le nombre de retries

Solution : Utilisez cache_key_fn=task_input_hash avec un cache_expiration adapte. En dev, un cache d'1 heure evite des centaines d'appels API inutiles.

Choix d'Orchestrateur pour un Nouveau Projet

Votre entreprise (50 personnes, equipe data de 5) lance un nouveau data warehouse. Vous devez choisir un orchestrateur. Voici les criteres :

CritereAirflowDagsterPrefect
Courbe d'apprentissageMoyenneMoyenne-hauteFaible
Ops overheadEleveMoyenFaible (Cloud)
EcosystemeTres richeEn croissanceRiche
TestabiliteFaibleExcellenteBonne
Service manageMWAA, Composer, AstronomerDagster CloudPrefect Cloud
Ideal pourGrandes equipes, legacyEquipes data modernesPetites equipes, rapidite

Recommandation : Pour une petite equipe, Prefect Cloud offre le meilleur ratio valeur/effort. Pour un projet data-centric avec lineage important, Dagster. Pour un environnement existant avec beaucoup d'integrations, Airflow.

Startup SaaS : Migration Prematuree vers Prefect 2

Une startup SaaS a migre de Prefect 1 (Orion) vers Prefect 2 des sa sortie beta, sans planification adequte :

  • L'API Prefect 2 etait incompatible avec Prefect 1 -- 100% des flows a reecrire
  • Les blocks remplacaient les anciens secrets et connections -- migration manuelle de 50+ configurations
  • Le nouveau work pool system necessitait une reconfiguration complete de l'infrastructure
  • 3 semaines de downtime partiel des pipelines pendant la migration

Lecon : Ne jamais migrer un outil d'orchestration en production sur une version beta. Planifier la migration sur 2-3 sprints avec une periode de cohabitation ou les deux versions tournent en parallele.

Quelle est la principale difference architecturale entre Prefect Cloud et un Airflow auto-heberge ?
Prefect Cloud suit un modele hybride : le control plane (API, UI, scheduling, automations) est dans le cloud, mais l'execution (workers, flows, donnees) reste dans votre infrastructure. Airflow auto-heberge necessite de gerer l'integralite de la stack (webserver, scheduler, metadata DB, workers) soi-meme, ce qui represente un effort operationnel significatif.
Comment Prefect gere-t-il la concurrence et le parallelisme des tasks ?
Par defaut, les tasks independantes s'executent sequentiellement. Pour le parallelisme, utilisez .submit() au lieu de l'appel direct, ce qui soumet la task a un task runner (ConcurrentTaskRunner pour l'async, DaskTaskRunner pour le parallelisme distribue, RayTaskRunner pour le calcul GPU). Exemple : result = my_task.submit(arg).

Resume du Module Orchestration

Les trois orchestrateurs (Airflow, Dagster, Prefect) resolvent le meme probleme fondamental -- coordonner l'execution de pipelines data -- mais avec des philosophies tres differentes. Airflow est le standard etabli, Dagster apporte l'approche asset-centric, et Prefect mise sur la simplicite Pythonic. Le choix depend de la taille de l'equipe, de la complexite des pipelines, et de l'appetence pour le DevOps.

16. Comparaison Orchestrateurs : Airflow vs Dagster vs Prefect

18 min Avance

Objectifs

  • Comparer en profondeur Apache Airflow, Dagster et Prefect sur des criteres objectifs
  • Construire une matrice de decision adaptee a votre contexte
  • Evaluer le TCO (Total Cost of Ownership) de chaque solution
  • Identifier le meilleur orchestrateur selon le profil d'equipe et le cas d'usage

Le choix d'un orchestrateur est une decision structurante qui engage votre equipe pour 3 a 5 ans. Ne choisissez jamais sur la base d'un tutoriel ou d'un benchmark isole. Evaluez la maturite de l'ecosysteme, la taille de la communaute, et surtout la capacite de votre equipe a operer l'outil au quotidien.

Vue d'ensemble des trois orchestrateurs

Apache Airflow, Dagster et Prefect representent trois generations d'orchestrateurs de donnees. Airflow, cree par Airbnb en 2014, est le standard de facto avec la plus grande communaute. Dagster, lance en 2019 par Elementl, introduit le concept de Software-Defined Assets. Prefect, fonde en 2018, mise sur la simplicite et l'experience developpeur.

CritereApache AirflowDagsterPrefect
Annee de creation2014 (Airbnb)2019 (Elementl)2018 (Prefect Technologies)
ParadigmeDAG de taches (task-centric)Assets materialisables (asset-centric)Flows et taches (task-centric moderne)
LangagePythonPythonPython
LicenceApache 2.0Apache 2.0Apache 2.0 (OSS) / Proprietary (Cloud)
Offre CloudAstronomer, MWAA, Cloud ComposerDagster CloudPrefect Cloud
Communaute GitHub Stars~37 000~12 000~16 000
SchedulingCron natif, TimetablesCron, Sensors, Freshness policiesCron, RRule, event-driven
UI / InterfaceWeb UI complete, Graph viewDagit/Dagster UI, Asset lineagePrefect UI, dashboard cloud
TestingDifficile (mocking complexe)Natif (execution in-process)Natif (execution locale simple)
ScalabiliteCeleryExecutor, KubernetesExecutorMulti-process, K8sWorkers distribues, K8s
Courbe d'apprentissageMoyenne a eleveeElevee (concepts uniques)Faible a moyenne
Plugins / Connecteurs2000+ providersIntegrations croissantesCollections/blocks modulaires

Comparaison architecturale detaillee

Airflow : Architecture classique

Airflow repose sur un Scheduler central qui parse les fichiers DAG, un Webserver Flask pour l'UI, et un Executor qui delegue les taches aux workers. La metabase PostgreSQL stocke l'etat de toutes les executions.

  • Architecture eprouvee, bien documentee
  • Separation claire scheduler/worker
  • Nombreux executors (Local, Celery, Kubernetes)
  • XCom pour passage de donnees inter-taches

Dagster : Architecture asset-centric

Dagster introduit le Dagster Daemon pour le scheduling, Dagit pour la visualisation, et un systeme d'IO Managers pour gerer la persistance des assets. Chaque asset declare ses dependances et son type de donnees.

  • Software-Defined Assets (SDA)
  • Type system integre avec validation
  • IO Managers abstraient le stockage
  • Partitioning natif et puissant
Architecture comparee : Scheduler et Execution
AIRFLOW                        DAGSTER                       PREFECT
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Scheduler   β”‚              β”‚  Daemon      β”‚              β”‚  Orion Serverβ”‚
β”‚  (parse DAGs)β”‚              β”‚  (sensors,   β”‚              β”‚  (API + UI)  β”‚
β”‚              β”‚              β”‚   schedules) β”‚              β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚                             β”‚                             β”‚
       β–Ό                             β–Ό                             β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Executor    β”‚              β”‚  Run Launcherβ”‚              β”‚  Work Pool   β”‚
β”‚  (Celery/K8s)β”‚              β”‚  (process/K8sβ”‚              β”‚  (agents/    β”‚
β”‚              β”‚              β”‚   /Docker)   β”‚              β”‚   workers)   β”‚
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚                             β”‚                             β”‚
       β–Ό                             β–Ό                             β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Workers     β”‚              β”‚  User Code   β”‚              β”‚  Flow Runs   β”‚
β”‚  (tasks)     β”‚              β”‚  (assets +   β”‚              β”‚  (tasks)     β”‚
β”‚              β”‚              β”‚   resources) β”‚              β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚                             β”‚                             β”‚
       β–Ό                             β–Ό                             β–Ό
   PostgreSQL                   PostgreSQL                  PostgreSQL/
   (metastore)                 (event log)                  SQLite/Cloud

Matrice de decision

Utilisez cette matrice pour evaluer chaque orchestrateur selon vos criteres prioritaires. Notez de 1 (faible) a 5 (excellent) :

Critere de decisionPoidsAirflowDagsterPrefect
Maturite et stabiliteEleve5/53/53/5
Ecosysteme de pluginsEleve5/53/53/5
Facilite de deploiementMoyen2/53/55/5
Experience developpeurMoyen3/54/55/5
Testabilite du codeMoyen2/55/54/5
Data Lineage natifEleve2/55/53/5
Gestion des erreursMoyen3/54/55/5
Offre managed cloudMoyen4/5 (multi)4/55/5
Recrutement talentEleve5/52/53/5
Support grandes equipesEleve5/54/53/5

Analyse TCO (Total Cost of Ownership)

Le cout total ne se limite pas aux frais de licence. Voici une estimation sur 12 mois pour une equipe de 5 data engineers avec 200 pipelines :

Poste de coutAirflow (self-hosted)Airflow (Astronomer)Dagster CloudPrefect Cloud
Infrastructure1 200 EUR/moisInclusInclusInclus
Licence / SaaS0 EUR~3 000 EUR/mois~2 500 EUR/mois~1 500 EUR/mois
Ops / Maintenance0.5 ETP (~3 500 EUR)~0.1 ETP (~700 EUR)~0.1 ETP (~700 EUR)~0.05 ETP (~350 EUR)
Formation equipe2 semaines1.5 semaines3 semaines1 semaine
Total mensuel estime~4 700 EUR~3 700 EUR~3 200 EUR~1 850 EUR

Attention au cout cache du self-hosting

Le self-hosting d'Airflow parait gratuit mais consomme enormement de temps d'ingenierie : mises a jour, gestion des dependances, tuning du scheduler, monitoring. De nombreuses equipes sous-estiment ce cout de 40 a 60 % la premiere annee.

Quand choisir quel orchestrateur ?

Scenario A : Grande entreprise, 50+ pipelines, equipe de 10+

Recommandation : Apache Airflow (managed via Astronomer ou MWAA). La maturite, l'ecosysteme de providers, et la facilite de recrutement en font le choix le plus sur pour les grandes organisations. Le pattern DAG est bien compris par la majorite des data engineers.

Scenario B : Equipe data-centric, focus qualite et lineage

Recommandation : Dagster. Si la qualite des donnees et la tracabilite des assets sont vos priorites absolues, Dagster apporte un cadre rigoureux avec les Software-Defined Assets, le type system, et le lineage integre. Ideal pour les equipes analytiques avancees.

Scenario C : Startup/PME, rapidite de mise en oeuvre

Recommandation : Prefect. La courbe d'apprentissage la plus douce, le deploiement le plus rapide, et le cout le plus bas. Parfait pour les equipes restreintes qui veulent etre operationnelles en quelques jours sans investissement infrastructure lourd.

Spotify : Migration d'Airflow vers une architecture hybride

Spotify exploite plus de 30 000 DAGs Airflow pour orchestrer ses pipelines de recommandation musicale. L'equipe a developpe des extensions internes (Backstage integration, custom operators) mais a rencontre des limites de scalabilite du scheduler. Leur approche : garder Airflow comme orchestrateur principal tout en evaluant Dagster pour les pipelines ML ou le lineage asset-centric est critique.

  • Lecon : un orchestrateur unique ne suffit pas toujours a grande echelle
  • La coexistence de plusieurs outils est viable si les frontieres sont claires

Choisir un orchestrateur uniquement sur la hype

Un anti-pattern frequent consiste a migrer vers Dagster ou Prefect simplement parce qu'Airflow semble "vieux". Si vos DAGs Airflow fonctionnent et que votre equipe les maitrise, une migration represente un cout enorme (reecriture, formation, risques de regression). Migrez uniquement si vous avez identifie des limitations concretes et quantifiees.

Echec : Migration precipitee vers Prefect 2.0

Une fintech europeenne a decide de migrer ses 120 DAGs Airflow vers Prefect 2.0 en 3 mois. L'equipe a sous-estime les differences fondamentales : absence de sensors equivalents, gestion differente des retries, et pattern d'execution hybride (cloud + agent). Resultat : 6 mois de retard, pipelines instables pendant 2 mois en production, et retour partiel a Airflow pour les pipelines critiques.

  • Lecon : toujours faire un POC sur 5-10 pipelines representatifs avant de migrer
  • Prevoir un plan de rollback et une phase de coexistence

Code compare : meme pipeline, trois syntaxes

Python - Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(): ...
def transform(): ...
def load(): ...

with DAG("etl_pipeline", start_date=datetime(2024,1,1),
         schedule_interval="@daily", catchup=False) as dag:
    t1 = PythonOperator(task_id="extract", python_callable=extract)
    t2 = PythonOperator(task_id="transform", python_callable=transform)
    t3 = PythonOperator(task_id="load", python_callable=load)
    t1 >> t2 >> t3
Python - Dagster
from dagster import asset, define_asset_job, ScheduleDefinition

@asset
def raw_data():
    """Extract raw data from source."""
    return extract()

@asset(deps=[raw_data])
def transformed_data(raw_data):
    """Transform raw data."""
    return transform(raw_data)

@asset(deps=[transformed_data])
def loaded_data(transformed_data):
    """Load transformed data to warehouse."""
    return load(transformed_data)

etl_job = define_asset_job("etl_pipeline", selection="*")
etl_schedule = ScheduleDefinition(job=etl_job, cron_schedule="@daily")
Python - Prefect
from prefect import flow, task
from prefect.deployments import Deployment

@task(retries=3, retry_delay_seconds=60)
def extract(): ...

@task
def transform(data): ...

@task
def load(data): ...

@flow(name="etl_pipeline", log_prints=True)
def etl_pipeline():
    raw = extract()
    transformed = transform(raw)
    load(transformed)

# Deploiement avec schedule
if __name__ == "__main__":
    etl_pipeline.serve(name="daily-etl", cron="0 0 * * *")
Quelle est la difference fondamentale entre l'approche task-centric et asset-centric ?
L'approche task-centric (Airflow, Prefect) definit le pipeline comme une sequence d'actions a executer. L'approche asset-centric (Dagster) definit le pipeline comme un ensemble de donnees a materialiser. Avec les assets, le focus est sur le "quoi" (les donnees produites) plutot que le "comment" (les taches executees), ce qui facilite le lineage et la reexecution selective.
Pourquoi Airflow reste-t-il dominant malgre des alternatives plus modernes ?
Airflow beneficie de l'effet de reseau : plus de 2000 providers, une communaute massive, une abondance de documentation et de talent disponible sur le marche. Les entreprises privilegient la stabilite et la previsibilite. De plus, Airflow 2.x a corrige beaucoup de faiblesses historiques (TaskFlow API, dynamic task mapping, deferrable operators).

17. Quiz Orchestration

12 min Intermediaire

Objectifs

  • Valider vos connaissances sur Airflow, Dagster et Prefect
  • Tester votre comprehension des patterns d'orchestration
  • Verifier votre capacite a choisir le bon outil selon le contexte

Ce quiz couvre l'ensemble du module orchestration. Prenez le temps de bien lire chaque question. Si vous obtenez moins de 6/8, revisez les lecons 13 a 16 avant de continuer.

Quiz Module 3.3 - Orchestration

Q1. Quel composant Airflow est responsable de parser les fichiers DAG et de planifier les executions ?

A. Le Webserver
B. Le Scheduler
C. L'Executor
D. Le Triggerer
Le Scheduler est le composant central d'Airflow qui parse continuellement les fichiers DAG du dossier dags/, determine les taches eligibles a l'execution, et les transmet a l'Executor.

Q2. Quel concept unique differencie fondamentalement Dagster d'Airflow ?

A. Le support de Python
B. Les Software-Defined Assets
C. Le scheduling par cron
D. L'interface web graphique
Les Software-Defined Assets sont le concept fondateur de Dagster. Contrairement aux DAGs task-centric d'Airflow, Dagster definit les pipelines autour des donnees produites (assets), avec lineage, types, et materialisation declarative.

Q3. Dans Prefect, quel decorateur est utilise pour definir un point d'entree de pipeline ?

A. @task
B. @dag
C. @flow
D. @pipeline
Le decorateur @flow est le point d'entree dans Prefect. Il encapsule la logique du pipeline et peut contenir des appels a des fonctions decorees avec @task. Un flow peut aussi appeler d'autres flows (subflows).

Q4. Quel Executor Airflow est recommande pour une scalabilite maximale en production sur Kubernetes ?

A. LocalExecutor
B. SequentialExecutor
C. CeleryExecutor
D. KubernetesExecutor
Le KubernetesExecutor lance chaque tache dans un Pod Kubernetes isole, offrant une scalabilite maximale et une isolation des ressources. Chaque tache dispose de son propre environnement, evitant les conflits de dependances.

Q5. Quelle est la methode recommandee pour passer des donnees entre taches dans Airflow 2.x ?

A. Variables globales Python
B. Fichiers partages sur le filesystem
C. XCom avec la TaskFlow API
D. Base de donnees externe
La TaskFlow API d'Airflow 2.x simplifie l'utilisation des XComs. Avec le decorateur @task, les valeurs retournees sont automatiquement stockees en XCom et passees comme arguments aux taches dependantes, sans manipulation manuelle.

Q6. Quelle approche est un anti-pattern lors du choix d'un orchestrateur ?

A. Faire un POC sur 5-10 pipelines representatifs
B. Evaluer le TCO sur 12 mois
C. Migrer 100% des pipelines d'un coup vers le nouvel outil
D. Tester la capacite de recrutement sur le marche
Migrer 100% des pipelines d'un coup est un anti-pattern majeur (approche "big bang"). Il faut privilegier une migration incrementale avec une phase de coexistence, en commencant par les pipelines les moins critiques.

Q7. Quel mecanisme Dagster utilise-t-il pour abstraire la couche de persistance des assets ?

A. Operators
B. IO Managers
C. Connections
D. Hooks
Les IO Managers de Dagster gerent la serialisation et la persistance des assets. Ils permettent de changer la destination de stockage (fichier local, S3, BigQuery) sans modifier le code de l'asset, favorisant la separation des preoccupations.

Q8. Quel orchestrateur offre le TCO le plus bas pour une petite equipe (3-5 personnes) avec 50 pipelines ?

A. Airflow self-hosted
B. Airflow sur Astronomer
C. Dagster Cloud
D. Prefect Cloud
Prefect Cloud offre generalement le TCO le plus bas pour les petites equipes grace a un tier gratuit genereux, une courbe d'apprentissage faible (moins de temps de formation), et une maintenance quasi nulle cote infrastructure.

18. Outils ELT/CDC : Fivetran, Airbyte, Debezium, Kafka Connect, Meltano

20 min Avance

Objectifs

  • Comprendre la difference entre ELT batch et CDC (Change Data Capture) en temps reel
  • Maitriser les architectures et cas d'usage de Fivetran, Airbyte, Debezium, Kafka Connect et Meltano
  • Savoir choisir le bon outil d'ingestion selon le volume, la latence et le budget
  • Identifier les anti-patterns courants en ingestion de donnees

L'ingestion est le maillon le plus fragile d'un pipeline de donnees. Un outil CDC mal configure peut saturer votre base source, et un ELT sans deduplication peut corrompre vos donnees. Choisissez toujours en fonction de vos contraintes de latence reelles, pas de celles que vous imaginez avoir besoin.

ELT vs CDC : deux paradigmes complementaires

ELT (Extract-Load-Transform)

L'ELT extrait les donnees de la source, les charge telles quelles dans le data warehouse, puis applique les transformations dans le warehouse lui-meme. C'est le pattern dominant du Modern Data Stack car il exploite la puissance de calcul du warehouse (Snowflake, BigQuery) pour les transformations.

CDC (Change Data Capture)

Le CDC capture les modifications en temps reel depuis les logs de transaction (WAL PostgreSQL, binlog MySQL). Seules les insertions, mises a jour et suppressions sont propagees, reduisant drastiquement le volume de donnees transferees et la latence de replication.

ELT Batch vs CDC Streaming
ELT BATCH (Fivetran / Airbyte / Meltano)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    Full/Incremental    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    dbt     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Source   │─────────Extract──────►│  Raw Layer    │──Transform─►│  Marts   β”‚
β”‚  (API/DB) β”‚    (toutes les 5min   β”‚  (Warehouse)  β”‚            β”‚(Warehouseβ”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     a 24h)             β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CDC STREAMING (Debezium / Kafka Connect)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    WAL/Binlog     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    Stream    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Source   │───Read logs──────►│  Kafka    │──Processing──►│  Target      β”‚
β”‚  (DB)     β”‚   (temps reel)   β”‚  (broker) β”‚   (Flink/    β”‚  (DWH/Lake/  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    ksqlDB)    β”‚   service)   β”‚
                                                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Comparaison detaillee des outils

OutilTypeModeleConnecteursLatenceCout
FivetranELT SaaSProprietaire, managed400+5 min - 24h$$$$ (MAR-based)
AirbyteELT OSS/CloudOpen-source + Cloud350+5 min - 24h$ (self) / $$ (cloud)
MeltanoELT OSSOpen-source CLI600+ (Singer)Batch schedule$ (self-hosted)
DebeziumCDC OSSOpen-source15+ (DB focus)Secondes$ (self-hosted)
Kafka ConnectIntegrationOpen-source + managed200+Secondes$$ (Confluent)

Fivetran : le leader ELT managed

Fivetran est la reference du ELT managed. Son approche "zero configuration" permet de connecter une source en quelques clics. Le pricing base sur les Monthly Active Rows (MAR) peut cependant devenir couteux a grande echelle.

Forces de Fivetran

Connecteurs certifies et maintenus par Fivetran, schema drift detection automatique, historisation native avec les tables _fivetran_deleted et _fivetran_synced. Ideal pour les equipes qui ne veulent pas gerer l'infrastructure d'ingestion.

Airbyte : l'alternative open-source

Airbyte offre une experience similaire a Fivetran mais avec un modele open-source. Le deploiement self-hosted permet de maitriser les couts mais requiert une expertise Docker/Kubernetes. Airbyte Cloud offre une alternative managed.

YAML - Configuration Airbyte (source PostgreSQL)
# airbyte-source-postgres.yaml
source:
  sourceType: postgres
  host: db.production.internal
  port: 5432
  database: ecommerce
  username: ${AIRBYTE_PG_USER}
  password: ${AIRBYTE_PG_PASSWORD}
  ssl_mode: require
  replication_method:
    method: CDC    # utilise le WAL PostgreSQL
    replication_slot: airbyte_slot
    publication: airbyte_publication
    initial_waiting_seconds: 300

destination:
  destinationType: bigquery
  project_id: my-data-project
  dataset_id: raw_ecommerce
  credentials_json: ${GCP_CREDENTIALS}
  loading_method:
    method: GCS Staging
    gcs_bucket_name: airbyte-staging-bucket

Debezium : CDC en temps reel

Debezium est le standard open-source du CDC. Il lit les journaux de transactions (WAL, binlog, redo log) sans impacter les performances de la base source. Deploye comme connecteur Kafka Connect, il publie chaque modification dans un topic Kafka.

JSON - Configuration Debezium PostgreSQL Connector
{
  "name": "ecommerce-cdc-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db.production.internal",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "${CDC_PASSWORD}",
    "database.dbname": "ecommerce",
    "database.server.name": "prod-ecommerce",
    "schema.include.list": "public",
    "table.include.list": "public.orders,public.customers,public.products",
    "plugin.name": "pgoutput",
    "slot.name": "debezium_slot",
    "publication.name": "debezium_publication",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "cdc.$3"
  }
}

Meltano : ELT en ligne de commande

Meltano, porte par la communaute Singer, offre une approche CLI-first de l'ELT. Il est particulierement adapte aux equipes DevOps qui veulent gerer leurs pipelines d'ingestion comme du code (Infrastructure as Code).

Bash - Pipeline Meltano
# Installation et configuration
meltano init my-elt-project
cd my-elt-project

# Ajouter un extracteur (tap) et un loader (target)
meltano add extractor tap-postgres
meltano add loader target-snowflake

# Configurer la source
meltano config tap-postgres set host db.production.internal
meltano config tap-postgres set database ecommerce
meltano config tap-postgres set default_replication_method LOG_BASED

# Lancer l'extraction
meltano run tap-postgres target-snowflake

# Ajouter un schedule orchestable par Airflow
meltano schedule add daily-sync --extractor tap-postgres \
  --loader target-snowflake --interval "@daily"

Matrice de decision : quel outil pour quel besoin ?

BesoinOutil recommandeJustification
ELT SaaS, zero maintenanceFivetranConnecteurs certifies, support premium, zero ops
ELT open-source, budget limiteAirbyte (self-hosted)Gratuit, large catalogue, communaute active
ELT as Code, integration CI/CDMeltanoCLI-first, GitOps natif, ecosysteme Singer
CDC temps reel, stack KafkaDebezium + Kafka ConnectStandard CDC, sub-second latency, scalable
Integration multi-source managedConfluent Cloud + connectorsKafka managed, connecteurs managed, monitoring integre
SaaS + APIs RESTFivetran ou Airbyte CloudConnecteurs SaaS pre-construits (Salesforce, Stripe, etc.)

Shopify : Airbyte pour democratiser l'acces aux donnees

Shopify a adopte Airbyte en interne pour permettre a chaque equipe produit de configurer ses propres pipelines d'ingestion sans dependre de l'equipe data platform. L'approche self-service a reduit le backlog de demandes d'ingestion de 6 semaines a 2 jours. Les connecteurs custom sont developpes en interne et partages via un registre prive.

CDC sans Schema Registry

Deployer Debezium sans Schema Registry (Avro ou Protobuf) est un anti-pattern critique. Sans gestion des schemas, une modification de colonne en base source (ajout, renommage, changement de type) provoque des erreurs de deserialization en cascade chez tous les consommateurs. Le Schema Registry garantit la compatibilite ascendante et descendante des schemas.

Echec : Fivetran et l'explosion des couts MAR

Une societe de e-commerce a connecte sa base transactionnelle (50M lignes, 200k updates/jour) via Fivetran en mode incremental. Le calcul MAR incluant toutes les lignes modifiees, la facture est passee de 2 000 EUR/mois a 18 000 EUR/mois en 3 mois apres ajout de tables supplementaires. L'equipe a du migrer les tables les plus volumineuses vers Airbyte self-hosted en urgence.

  • Lecon : toujours estimer le volume MAR avant de connecter une source a Fivetran
  • Utiliser le CDC Fivetran (log-based) plutot que le mode incremental quand possible

Full refresh quotidien sur des tables volumineuses

Faire un full refresh (extraction complete) quotidien sur une table de 100M+ lignes est un anti-pattern courant chez les debutants. Cela surcharge la base source, consomme de la bande passante, et augmente les couts warehouse. Privilegiez toujours l'incremental (basee sur un curseur updated_at) ou le CDC quand la source le supporte.

Quelle est la difference entre un replication slot PostgreSQL et un binlog MySQL pour le CDC ?
PostgreSQL utilise le WAL (Write-Ahead Log) via des replication slots qui conservent les modifications tant que le consommateur ne les a pas lues. MySQL utilise le binlog (binary log) qui enregistre toutes les modifications en mode ROW, STATEMENT ou MIXED. Les deux permettent le CDC, mais le WAL PostgreSQL offre une meilleure granularite (pgoutput) tandis que le binlog MySQL necessite le mode ROW pour un CDC fiable.
Pourquoi Meltano est-il qualifie d'ELT "as Code" ?
Meltano stocke toute sa configuration dans des fichiers YAML versionnables dans Git. Les pipelines sont definis comme du code, les environnements (dev, staging, prod) sont geres via des fichiers de configuration, et les deploiements peuvent etre automatises via CI/CD. C'est l'approche GitOps appliquee a l'ingestion de donnees.

19. Formats de Fichiers : Parquet, ORC, Avro, JSON, CSV

18 min Avance

Objectifs

  • Comprendre les differences entre formats colonnaires et formats en lignes
  • Maitriser les mecanismes de compression et d'encodage de chaque format
  • Analyser les benchmarks de performance pour choisir le format optimal
  • Eviter les anti-patterns de choix de format dans un data lake

Le choix du format de fichier impacte directement les performances de requete (x10 a x100), les couts de stockage (x3 a x10), et la maintenabilite a long terme. C'est une decision fondatrice de votre data lake qui affecte toutes les couches en aval. Ne la negligez jamais.

Colonnaire vs Ligne : le principe fondamental

Stockage en lignes vs Stockage en colonnes
DONNEES ORIGINALES (table orders)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
β”‚ id     β”‚ date       β”‚ amount β”‚ statusβ”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 1      β”‚ 2024-01-15 β”‚ 150.00 β”‚ paid  β”‚
β”‚ 2      β”‚ 2024-01-16 β”‚ 89.50  β”‚ paid  β”‚
β”‚ 3      β”‚ 2024-01-16 β”‚ 220.00 β”‚ pend  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜

FORMAT LIGNE (CSV, JSON, Avro)           FORMAT COLONNAIRE (Parquet, ORC)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 1, 2024-01-15, 150.00, paid   β”‚       β”‚ id:     [1, 2, 3]             β”‚
β”‚ 2, 2024-01-16, 89.50,  paid   β”‚       β”‚ date:   [2024-01-15, ...]     β”‚
β”‚ 3, 2024-01-16, 220.00, pend   β”‚       β”‚ amount: [150.00, 89.50, ...]  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚ status: [paid, paid, pend]    β”‚
                                         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Avantage: ecriture rapide,               Avantage: lecture selective,
  lecture ligne complete                    compression superieure,
Cas: OLTP, streaming, logs                  aggregations rapides
                                          Cas: OLAP, analytics, data lake

Comparaison detaillee des formats

CritereParquetORCAvroJSONCSV
TypeColonnaireColonnaireLigneLigneLigne
SchemaIntegre (Thrift)Integre (Protobuf)Integre (JSON)AucunAucun
CompressionSnappy, Gzip, Zstd, LZ4Zlib, Snappy, LZO, ZstdSnappy, Deflate, ZstdGzip externeGzip externe
Taux compressionExcellent (x5-x10)Excellent (x5-x10)Bon (x3-x5)Faible (x2-x3)Faible (x2-x3)
Predicate pushdownOui (row groups)Oui (stripes + bloom)NonNonNon
Column pruningOuiOuiNonNonNon
Types complexesOui (nested, arrays)Oui (structs, maps)Oui (unions, arrays)Oui (natif)Non
Schema evolutionAjout colonnesAjout colonnesFull evolutionFlexibleFragile
SplittableOuiOuiOuiNon (sauf JSONL)Oui
EcosystemeSpark, Presto, BigQuery, SnowflakeHive, Spark, PrestoKafka, Spark, FlinkUniverselUniversel

Apache Parquet en profondeur

Parquet est le format dominant du Modern Data Stack. Sa structure en Row Groups (128 MB par defaut) contenant des Column Chunks permet un acces selectif extremement performant. Chaque column chunk contient des Pages de donnees avec des statistiques (min/max) pour le predicate pushdown.

Structure interne d'un fichier Parquet
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    FICHIER PARQUET                        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  Magic Number: PAR1                                      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  ROW GROUP 0 (128 MB par defaut)                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Column Chunk    β”‚ Column Chunk    β”‚ Column Chunk   β”‚ β”‚
β”‚  β”‚ (id)            β”‚ (date)          β”‚ (amount)       β”‚ β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚ β”‚
β”‚  β”‚ β”‚ Page 0      β”‚ β”‚ β”‚ Page 0      β”‚ β”‚ β”‚ Page 0     β”‚β”‚ β”‚
β”‚  β”‚ β”‚ min=1,max=50β”‚ β”‚ β”‚ min=Jan     β”‚ β”‚ β”‚ min=10.0   β”‚β”‚ β”‚
β”‚  β”‚ β”‚ data=[...]  β”‚ β”‚ β”‚ data=[...]  β”‚ β”‚ β”‚ data=[...] β”‚β”‚ β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚ β”‚
β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚ β”‚
β”‚  β”‚ β”‚ Page 1      β”‚ β”‚ β”‚ Page 1      β”‚ β”‚ β”‚ Page 1     β”‚β”‚ β”‚
β”‚  β”‚ β”‚ min=51...   β”‚ β”‚ β”‚ ...         β”‚ β”‚ β”‚ ...        β”‚β”‚ β”‚
β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  ROW GROUP 1 ...                                         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  FOOTER (schema, row group metadata, statistics)         β”‚
β”‚  Footer Length (4 bytes)                                  β”‚
β”‚  Magic Number: PAR1                                      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Encodages et compression

Parquet et ORC utilisent des encodages specialises par type de donnees qui precedent la compression generale :

EncodageType de donneesPrincipeGain typique
Dictionary EncodingStrings cardinalite faibleRemplace les valeurs par des index dans un dictionnairex5-x20
Run-Length Encoding (RLE)Valeurs repeteesStocke (valeur, nombre de repetitions)x10-x100
Delta EncodingIntegers triesStocke les differences entre valeurs consecutivesx3-x10
Bit PackingPetits entiersUtilise le nombre minimal de bitsx2-x4
Python - Ecriture et lecture Parquet optimisee
import pyarrow as pa
import pyarrow.parquet as pq

# Schema explicite avec types optimaux
schema = pa.schema([
    pa.field("order_id", pa.int64()),
    pa.field("customer_id", pa.int32()),       # int32 si < 2 milliards
    pa.field("order_date", pa.date32()),        # date32 au lieu de string
    pa.field("amount", pa.decimal128(10, 2)),   # decimal pour la precision
    pa.field("status", pa.dictionary(pa.int8(), pa.string())),  # dict encoding
    pa.field("country", pa.dictionary(pa.int8(), pa.string())), # cardinalite faible
])

# Ecriture avec options optimisees
table = pa.table(data, schema=schema)
pq.write_table(
    table,
    "orders.parquet",
    compression="zstd",              # meilleur ratio compression/vitesse
    compression_level=3,             # 1-22, 3 = bon compromis
    row_group_size=128 * 1024 * 1024,  # 128 MB par row group
    use_dictionary=True,             # activer le dict encoding
    write_statistics=True,           # statistiques pour predicate pushdown
)

# Lecture selective : seulement 2 colonnes sur 6
table = pq.read_table(
    "orders.parquet",
    columns=["order_id", "amount"],  # column pruning
    filters=[("amount", ">", 100)], # predicate pushdown
)

Benchmarks comparatifs

Voici des benchmarks representatifs sur un dataset de 1 milliard de lignes (TPC-DS scale 100) :

MetriqueParquet+ZstdORC+ZlibAvro+SnappyJSON.gzCSV.gz
Taille fichier12.3 GB11.8 GB38.2 GB85.4 GB72.1 GB
Ratio compressionx8.1x8.5x2.6x1.2x1.4
Scan 3 colonnes2.1 s2.4 s18.5 s45.2 s38.7 s
Scan complet15.3 s14.8 s12.1 s52.3 s41.6 s
Aggregation SUM0.8 s0.9 s14.2 s48.7 s39.1 s
Ecriture45 s52 s28 s35 s22 s

Conclusion des benchmarks

Pour l'analytique (OLAP) : Parquet ou ORC sont 10 a 20 fois plus rapides que CSV/JSON grace au column pruning et au predicate pushdown. Pour le streaming et l'echange de messages : Avro est le choix optimal grace a son schema integre et son support natif dans Kafka.

Stocker un data lake en CSV ou JSON

Stocker les donnees analytiques d'un data lake en CSV ou JSON est l'un des anti-patterns les plus couteux. Une entreprise avec 10 TB de CSV dans S3 paierait ~230 EUR/mois en stockage. Le meme dataset en Parquet Zstd occuperait ~1.5 TB, soit ~35 EUR/mois. Sur les requetes Athena (facturees au volume scanne), l'economie est encore plus spectaculaire : x10 a x20 de reduction.

Twitter/X : Migration de JSON vers Parquet

Twitter stockait ses logs d'activite (impressions, clics, engagements) en JSON dans HDFS, representant plus de 500 TB par jour. La migration vers Parquet a reduit le stockage de 75%, accelere les requetes analytiques d'un facteur 15, et economise des millions de dollars annuels en infrastructure. La cle du succes : une migration progressive table par table avec validation des resultats a chaque etape.

Echec : Mauvais choix de row group size

Une equipe data a configure ses fichiers Parquet avec un row_group_size de 8 MB (au lieu des 128 MB recommandes) pour un dataset de 500 millions de lignes. Resultat : des milliers de row groups par fichier, un footer metadata enorme, et des performances de lecture degradees de 60%. Le predicate pushdown, bien que fonctionnel, etait inefficace car les plages min/max de chaque petit row group se chevauchaient trop.

Pourquoi Avro est-il prefere a Parquet pour Kafka et le streaming ?
Avro est un format en lignes, optimise pour l'ecriture sequentielle rapide. Dans un contexte streaming, chaque message est ecrit individuellement et doit etre serialise/deserialise rapidement. Avro offre : schema integre dans chaque message (ou reference au Schema Registry), compatibilite ascendante/descendante native, et une faible empreinte memoire. Parquet, etant colonnaire, necessite d'accumuler un batch de lignes avant l'ecriture, ce qui est incompatible avec le temps reel.
Qu'est-ce que le predicate pushdown et pourquoi est-il crucial ?
Le predicate pushdown permet au moteur de requete de filtrer les donnees au niveau du stockage, avant de les charger en memoire. Dans Parquet, chaque row group et chaque page contiennent des statistiques (min, max, null count). Si un filtre WHERE amount > 1000 est applique et qu'un row group a max(amount) = 500, ce row group entier est ignore. Cela peut reduire le volume de donnees lues de 90% ou plus, accelerant les requetes proportionnellement.

20. Open Table Formats : Delta Lake, Apache Iceberg, Apache Hudi

20 min Avance

Objectifs

  • Comprendre pourquoi les Open Table Formats revolutionnent le data lakehouse
  • Maitriser les transactions ACID, le time travel et le schema evolution
  • Comparer Delta Lake, Apache Iceberg et Apache Hudi sur des criteres objectifs
  • Identifier le bon format selon l'ecosysteme et le cas d'usage

Les Open Table Formats transforment un simple stockage objet (S3, GCS, ADLS) en un veritable data lakehouse avec des garanties transactionnelles. C'est l'evolution la plus importante du data engineering depuis l'apparition du cloud data warehouse. Maitrisez ces concepts : ils definissent l'architecture data de la prochaine decennie.

Le probleme que resolvent les Open Table Formats

Un data lake classique (fichiers Parquet dans S3) souffre de limitations majeures :

  • Pas de transactions ACID : une ecriture partielle laisse le lake dans un etat inconsistant
  • Pas de schema enforcement : n'importe quel fichier peut etre depose sans validation
  • Pas de time travel : impossible de consulter l'etat des donnees a un instant T passe
  • Updates/deletes impossibles : les fichiers Parquet sont immutables
  • Small files problem : des milliers de petits fichiers degradent les performances
Data Lake classique vs Data Lakehouse (Open Table Format)
DATA LAKE CLASSIQUE                   DATA LAKEHOUSE (Open Table Format)
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  S3 / GCS / ADLS         β”‚         β”‚  S3 / GCS / ADLS                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β” β”‚         β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
β”‚  β”‚.parqβ”‚ β”‚.parqβ”‚ β”‚.csvβ”‚ β”‚         β”‚  β”‚  METADATA LAYER (JSON/Avro) β”‚β”‚
β”‚  β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”˜ β”‚         β”‚  β”‚  - Transaction log          β”‚β”‚
β”‚  Pas de transactions     β”‚         β”‚  β”‚  - Schema versioning        β”‚β”‚
β”‚  Pas de schema           β”‚         β”‚  β”‚  - Partition metadata       β”‚β”‚
β”‚  Pas d'historique        β”‚         β”‚  β”‚  - Statistics (min/max)     β”‚β”‚
β”‚  Pas d'updates           β”‚         β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β”‚  β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”      β”‚
                                     β”‚  β”‚.parqβ”‚ β”‚.parqβ”‚ β”‚.parqβ”‚      β”‚
                                     β”‚  β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”˜      β”‚
                                     β”‚  ACID + Time Travel + Updates  β”‚
                                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Delta Lake (Databricks)

Delta Lake, cree par Databricks, est le format le plus repandu grace a l'adoption massive de Spark et de la plateforme Databricks. Il utilise un transaction log (dossier _delta_log/) compose de fichiers JSON sequentiels qui enregistrent chaque operation.

Python - Delta Lake avec PySpark
# Ecriture ACID dans une table Delta
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("date") \
    .save("s3://datalake/orders_delta")

# MERGE (upsert) - INSERT ou UPDATE selon la cle
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "s3://datalake/orders_delta")

delta_table.alias("target").merge(
    new_data.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Time Travel : lire la version 5 de la table
df_v5 = spark.read.format("delta") \
    .option("versionAsOf", 5) \
    .load("s3://datalake/orders_delta")

# Time Travel : lire a un timestamp specifique
df_yesterday = spark.read.format("delta") \
    .option("timestampAsOf", "2024-06-15T10:00:00") \
    .load("s3://datalake/orders_delta")

# VACUUM : supprimer les anciens fichiers (> 7 jours)
delta_table.vacuum(retentionHours=168)

Apache Iceberg

Apache Iceberg, initie chez Netflix, se distingue par son architecture de metadata a trois niveaux (catalog, metadata files, manifest files) qui permet une planification de requetes extremement rapide meme sur des tables de milliards de lignes. Il est vendor-neutral et supporte nativement Spark, Trino, Flink, et Dremio.

SQL - Apache Iceberg avec Spark SQL
-- Creer une table Iceberg
CREATE TABLE catalog.db.orders (
    order_id BIGINT,
    customer_id INT,
    order_date DATE,
    amount DECIMAL(10,2),
    status STRING
) USING iceberg
PARTITIONED BY (days(order_date))  -- partition transform!
TBLPROPERTIES (
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'zstd'
);

-- MERGE INTO (upsert)
MERGE INTO catalog.db.orders AS target
USING new_orders AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- Time Travel par snapshot ID
SELECT * FROM catalog.db.orders
  VERSION AS OF 123456789;

-- Time Travel par timestamp
SELECT * FROM catalog.db.orders
  TIMESTAMP AS OF '2024-06-15 10:00:00';

-- Schema Evolution : ajouter une colonne
ALTER TABLE catalog.db.orders
  ADD COLUMNS (shipping_address STRING);

-- Hidden Partitioning : requete sans connaitre le partitionnement
SELECT * FROM catalog.db.orders
  WHERE order_date = '2024-06-15';
-- Iceberg applique automatiquement le filtre sur la partition days(order_date)

Apache Hudi

Apache Hudi (Hadoop Upserts Deletes and Incrementals), cree chez Uber, est optimise pour les workloads de CDC et les upserts massifs. Il propose deux types de tables : Copy-on-Write (CoW) pour les lectures rapides et Merge-on-Read (MoR) pour les ecritures rapides.

CoW vs MoR dans Hudi

Copy-on-Write : chaque mise a jour reecrit le fichier Parquet entier. Les lectures sont rapides (fichiers Parquet natifs) mais les ecritures sont couteuses. Ideal pour les tables avec peu de mises a jour.
Merge-on-Read : les mises a jour sont ecrites dans des fichiers log (delta logs). Les lectures doivent merger les fichiers de base avec les delta logs. Ideal pour les tables avec beaucoup de mises a jour (CDC).

Comparaison des trois formats

CritereDelta LakeApache IcebergApache Hudi
CreateurDatabricksNetflixUber
Transaction ACIDOui (optimistic concurrency)Oui (snapshot isolation)Oui (MVCC)
Time TravelPar version ou timestampPar snapshot ou timestampPar commit timeline
Schema EvolutionAjout/renommage colonnesFull evolution (ajout, renommage, reorder, promotion type)Ajout/suppression colonnes
Hidden PartitioningNon (partition explicite)Oui (partition transforms)Non (partition explicite)
Upsert PerformanceBonBon (v2 row-level deletes)Excellent (optimise pour CDC)
Metadata LayerJSON transaction log3 niveaux (catalog/metadata/manifest)Timeline + .hoodie metadata
Query PlanningBonExcellent (manifest files)Bon
CompactionOPTIMIZE + Z-ORDERRewrite + sortInline/async compaction
Ecosysteme principalDatabricks, SparkMulti-engine (Spark, Trino, Flink, Dremio, Snowflake)Spark, Flink, Presto
Adoption CloudDatabricks, Azure, AWS (limitee)AWS (table format par defaut), Snowflake, GCPAWS EMR, Glue

Netflix : Iceberg pour le data lakehouse a l'echelle

Netflix a cree Apache Iceberg pour resoudre les problemes de scalabilite de Hive Metastore sur ses tables de plusieurs petabytes. L'architecture de metadata a trois niveaux permet de planifier des requetes sur des tables de billions de lignes en quelques millisecondes. Netflix utilise Iceberg comme format standard pour toutes ses tables analytiques, couplees a Spark et Trino.

  • Plus de 10 000 tables Iceberg en production
  • Reduction du temps de planning de requetes de minutes a millisecondes
  • Hidden partitioning elimine 90% des erreurs de partitionnement utilisateur

Uber : Hudi pour le CDC a grande echelle

Uber a developpe Apache Hudi pour repliquer en quasi-temps reel ses bases transactionnelles (rides, drivers, payments) vers le data lake. Avec plus de 150 petabytes de donnees geres, Hudi traite des millions d'upserts par seconde. Le mode Merge-on-Read est ideal pour leur pattern CDC ou les mises a jour sont frequentes et les lectures analytiques tolerent un leger delta merge.

Databricks : Delta Lake et l'ecosysteme Unity Catalog

Databricks a construit tout son ecosysteme autour de Delta Lake : Unity Catalog pour la gouvernance, Delta Sharing pour le partage de donnees inter-organisations, et Photon comme moteur d'execution optimise pour Delta. Avec l'adoption de Delta Lake par Azure Synapse et AWS Glue, le format s'est impose comme le plus utilise globalement.

Utiliser un Open Table Format sans strategie de compaction

Sans compaction reguliere, les Open Table Formats accumulent des milliers de petits fichiers (surtout avec les upserts et le CDC). Une table Delta Lake sans OPTIMIZE peut voir ses performances de lecture se degrader de 80% en quelques semaines. Planifiez des jobs de compaction quotidiens ou hebdomadaires et surveillez le nombre de fichiers par partition.

Convergence des formats

L'industrie tend vers une convergence. Apache XTable (anciennement OneTable) permet la traduction entre les trois formats. Databricks a annonce UniForm, qui permet aux tables Delta d'etre lues comme Iceberg ou Hudi. A terme, le choix de format pourrait devenir moins critique, mais comprendre les differences reste essentiel pour l'optimisation.

Qu'est-ce que le Hidden Partitioning d'Iceberg et pourquoi est-ce important ?
Le Hidden Partitioning d'Iceberg separe la logique de partitionnement de la syntaxe des requetes. Dans Hive/Delta, les utilisateurs doivent connaitre le schema de partitionnement et filtrer explicitement (WHERE year=2024 AND month=06). Avec Iceberg, la partition est definie via des transforms (days, months, hours, bucket, truncate) et le moteur applique automatiquement le filtre. Cela elimine les erreurs humaines (full table scan accidentel) et permet de modifier le schema de partitionnement sans reecrire les donnees (partition evolution).
Quand choisir Hudi plutot qu'Iceberg ou Delta ?
Choisissez Hudi quand votre cas d'usage principal est le CDC a haute frequence avec des millions d'upserts par heure. Le mode Merge-on-Read de Hudi est specifiquement optimise pour ce pattern : les ecritures sont rapides (append dans des delta logs) et la compaction peut etre faite de maniere asynchrone. Si votre workload est principalement analytique (reads >> writes), Iceberg ou Delta sont generalement preferes.

21. Projet : Pipeline Modern Data Stack de bout en bout

30 min Avance

Objectifs

  • Concevoir et implementer un pipeline complet : ingestion, transformation, orchestration
  • Integrer Airbyte, dbt, Airflow et Snowflake dans une architecture coherente
  • Appliquer les bonnes pratiques de monitoring, tests et documentation
  • Deployer un pipeline pret pour la production

Ce projet synthetise tout ce que vous avez appris dans la Phase 3. Prenez le temps de comprendre chaque etape. Un bon data architect ne se contente pas de faire fonctionner un pipeline : il le rend observable, testable, et maintenable. Documentez vos choix de conception.

Contexte du projet

Scenario : Plateforme e-commerce DataShop

DataShop est une plateforme e-commerce avec 500 000 clients actifs et 50 000 commandes par jour. L'equipe data doit construire un pipeline analytique complet pour alimenter les dashboards de la direction et les modeles de recommandation.

Sources de donnees :

  • Base PostgreSQL transactionnelle : orders, customers, products, order_items
  • API Stripe : paiements et remboursements
  • Google Analytics 4 : trafic web et conversions

Objectifs analytiques :

  • Chiffre d'affaires quotidien par categorie de produit
  • Taux de conversion par canal d'acquisition
  • Lifetime Value (LTV) client
  • Taux de remboursement par vendeur

Architecture du pipeline

Architecture complete du pipeline DataShop
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        ORCHESTRATION (Airflow)                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
β”‚  β”‚  DAG: datashop_daily_pipeline (schedule: 0 6 * * *)          β”‚    β”‚
β”‚  β”‚                                                               β”‚    β”‚
β”‚  β”‚  [1. Ingestion]──►[2. dbt run]──►[3. dbt test]──►[4. Alert] β”‚    β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                    β”‚                β”‚              β”‚
        β–Ό                    β–Ό                β–Ό              β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  INGESTION    β”‚   β”‚  TRANSFORM   β”‚  β”‚  QUALITY   β”‚ β”‚ ALERTING  β”‚
β”‚  (Airbyte)    β”‚   β”‚  (dbt)       β”‚  β”‚  (dbt test)β”‚ β”‚ (Slack)   β”‚
β”‚               β”‚   β”‚              β”‚  β”‚            β”‚ β”‚           β”‚
β”‚  PostgreSQL──►│   β”‚  staging/    β”‚  β”‚  not_null  β”‚ β”‚ on_failureβ”‚
β”‚  Stripe    ──►│──►│  intermediate│──►│  unique    │─►│  channel β”‚
β”‚  GA4       ──►│   β”‚  marts/      β”‚  β”‚  accepted  β”‚ β”‚  #data-  β”‚
β”‚               β”‚   β”‚              β”‚  β”‚  relations β”‚ β”‚  alerts   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
        β”‚                  β”‚
        β–Ό                  β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                    SNOWFLAKE                                      β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ RAW      β”‚    β”‚ TRANSFORMED  β”‚    β”‚ MARTS                 β”‚  β”‚
β”‚  β”‚ (landing)│───►│ (staging +   │───►│ (business-ready)      β”‚  β”‚
β”‚  β”‚          β”‚    β”‚  intermediateβ”‚    β”‚ fct_orders             β”‚  β”‚
β”‚  β”‚ raw_pg_* β”‚    β”‚  stg_*       β”‚    β”‚ fct_payments           β”‚  β”‚
β”‚  β”‚ raw_str_*β”‚    β”‚  int_*       β”‚    β”‚ dim_customers          β”‚  β”‚
β”‚  β”‚ raw_ga4_*β”‚    β”‚              β”‚    β”‚ dim_products            β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚ mart_revenue_daily     β”‚  β”‚
β”‚                                      β”‚ mart_customer_ltv      β”‚  β”‚
β”‚                                      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Lab : Implementation pas a pas

Etape 1 : Configuration Airbyte - Ingestion des sources

Configurez les trois connexions Airbyte pour charger les donnees brutes dans Snowflake :

YAML - airbyte/connections/postgres_to_snowflake.yaml
# Connection PostgreSQL -> Snowflake
source:
  name: datashop-postgres
  sourceDefinitionId: postgres
  connectionConfiguration:
    host: db.datashop.internal
    port: 5432
    database: datashop_prod
    username: ${AIRBYTE_PG_USER}
    password: ${AIRBYTE_PG_PASSWORD}
    ssl_mode: require
    replication_method:
      method: CDC
      replication_slot: airbyte_datashop
      publication: airbyte_publication

destination:
  name: snowflake-raw
  destinationDefinitionId: snowflake
  connectionConfiguration:
    host: xy12345.eu-west-1.snowflakecomputing.com
    role: AIRBYTE_ROLE
    warehouse: AIRBYTE_WH
    database: DATASHOP
    schema: RAW_POSTGRES

sync:
  streams:
    - name: orders
      syncMode: incremental
      destinationSyncMode: append_dedup
      primaryKey: [["id"]]
      cursorField: ["updated_at"]
    - name: customers
      syncMode: incremental
      destinationSyncMode: append_dedup
      primaryKey: [["id"]]
      cursorField: ["updated_at"]
    - name: products
      syncMode: incremental
      destinationSyncMode: append_dedup
      primaryKey: [["id"]]
      cursorField: ["updated_at"]
    - name: order_items
      syncMode: incremental
      destinationSyncMode: append_dedup
      primaryKey: [["id"]]
      cursorField: ["updated_at"]
  schedule:
    scheduleType: cron
    cronExpression: "0 5 * * *"  # 5h UTC chaque jour

Etape 2 : Projet dbt - Structure et modeles

Organisez le projet dbt selon la convention staging / intermediate / marts :

Text - Structure du projet dbt
dbt_datashop/
β”œβ”€β”€ dbt_project.yml
β”œβ”€β”€ models/
β”‚   β”œβ”€β”€ staging/
β”‚   β”‚   β”œβ”€β”€ stg_postgres/
β”‚   β”‚   β”‚   β”œβ”€β”€ _stg_postgres__sources.yml
β”‚   β”‚   β”‚   β”œβ”€β”€ stg_postgres__orders.sql
β”‚   β”‚   β”‚   β”œβ”€β”€ stg_postgres__customers.sql
β”‚   β”‚   β”‚   β”œβ”€β”€ stg_postgres__products.sql
β”‚   β”‚   β”‚   └── stg_postgres__order_items.sql
β”‚   β”‚   └── stg_stripe/
β”‚   β”‚       β”œβ”€β”€ _stg_stripe__sources.yml
β”‚   β”‚       └── stg_stripe__payments.sql
β”‚   β”œβ”€β”€ intermediate/
β”‚   β”‚   β”œβ”€β”€ int_orders_enriched.sql
β”‚   β”‚   └── int_customer_orders.sql
β”‚   └── marts/
β”‚       β”œβ”€β”€ finance/
β”‚       β”‚   β”œβ”€β”€ fct_orders.sql
β”‚       β”‚   β”œβ”€β”€ fct_payments.sql
β”‚       β”‚   └── mart_revenue_daily.sql
β”‚       └── marketing/
β”‚           β”œβ”€β”€ dim_customers.sql
β”‚           └── mart_customer_ltv.sql
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ assert_positive_revenue.sql
β”‚   └── assert_orders_have_items.sql
└── macros/
    └── generate_schema_name.sql
SQL - models/staging/stg_postgres/stg_postgres__orders.sql
{{ config(materialized='view') }}

with source as (
    select * from {{ source('postgres', 'orders') }}
),

renamed as (
    select
        id as order_id,
        customer_id,
        status as order_status,
        total_amount_cents / 100.0 as order_total,
        currency,
        created_at as ordered_at,
        updated_at,
        _airbyte_extracted_at as _loaded_at
    from source
    where _airbyte_meta ->> 'changes' is null  -- exclure les lignes en erreur
)

select * from renamed
SQL - models/marts/finance/fct_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    cluster_by=['ordered_at']
) }}

with orders as (
    select * from {{ ref('stg_postgres__orders') }}
    {% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }})
    {% endif %}
),

order_items as (
    select * from {{ ref('stg_postgres__order_items') }}
),

payments as (
    select * from {{ ref('stg_stripe__payments') }}
),

enriched as (
    select
        o.order_id,
        o.customer_id,
        o.order_status,
        o.order_total,
        o.currency,
        o.ordered_at,
        count(distinct oi.product_id) as nb_products,
        sum(oi.quantity) as total_items,
        coalesce(p.total_paid, 0) as total_paid,
        coalesce(p.total_refunded, 0) as total_refunded,
        case
            when p.total_paid >= o.order_total then 'fully_paid'
            when p.total_paid > 0 then 'partially_paid'
            else 'unpaid'
        end as payment_status,
        o.updated_at
    from orders o
    left join order_items oi on o.order_id = oi.order_id
    left join (
        select
            order_id,
            sum(case when type = 'charge' then amount else 0 end) as total_paid,
            sum(case when type = 'refund' then amount else 0 end) as total_refunded
        from payments
        group by order_id
    ) p on o.order_id = p.order_id
    group by 1,2,3,4,5,6,o.updated_at,p.total_paid,p.total_refunded
)

select * from enriched
SQL - models/marts/marketing/mart_customer_ltv.sql
{{ config(materialized='table') }}

with customer_orders as (
    select
        c.customer_id,
        c.customer_name,
        c.email,
        c.signup_date,
        count(distinct o.order_id) as total_orders,
        sum(o.order_total) as total_revenue,
        sum(o.total_refunded) as total_refunds,
        sum(o.order_total) - sum(o.total_refunded) as net_revenue,
        min(o.ordered_at) as first_order_date,
        max(o.ordered_at) as last_order_date,
        datediff('day', min(o.ordered_at), max(o.ordered_at)) as customer_lifespan_days,
        avg(o.order_total) as avg_order_value
    from {{ ref('dim_customers') }} c
    left join {{ ref('fct_orders') }} o on c.customer_id = o.customer_id
    where o.order_status != 'cancelled'
    group by 1,2,3,4
),

ltv_segments as (
    select
        *,
        case
            when net_revenue >= 5000 then 'VIP'
            when net_revenue >= 1000 then 'High Value'
            when net_revenue >= 200 then 'Medium Value'
            else 'Low Value'
        end as ltv_segment,
        case
            when last_order_date >= dateadd('day', -30, current_date()) then 'Active'
            when last_order_date >= dateadd('day', -90, current_date()) then 'At Risk'
            else 'Churned'
        end as activity_status
    from customer_orders
)

select * from ltv_segments

Etape 3 : Tests dbt et qualite des donnees

YAML - models/marts/finance/_finance__models.yml
version: 2

models:
  - name: fct_orders
    description: "Table de faits des commandes enrichies avec paiements"
    columns:
      - name: order_id
        description: "Identifiant unique de la commande"
        tests:
          - unique
          - not_null
      - name: order_total
        description: "Montant total de la commande en EUR"
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: 0
              max_value: 100000
      - name: payment_status
        tests:
          - accepted_values:
              values: ['fully_paid', 'partially_paid', 'unpaid']
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
SQL - tests/assert_orders_have_items.sql
-- Test custom : chaque commande doit avoir au moins un article
-- Si cette requete retourne des lignes, le test echoue
select
    o.order_id,
    o.ordered_at,
    o.total_items
from {{ ref('fct_orders') }} o
where o.total_items = 0
  and o.order_status not in ('cancelled', 'draft')

Etape 4 : Orchestration Airflow

Python - dags/datashop_daily_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
    "on_failure_callback": lambda ctx: SlackWebhookOperator(
        task_id="slack_alert",
        slack_webhook_conn_id="slack_data_alerts",
        message=f":red_circle: Pipeline echec: {ctx['task_instance'].task_id}",
    ).execute(ctx),
}

with DAG(
    dag_id="datashop_daily_pipeline",
    default_args=default_args,
    description="Pipeline quotidien DataShop: ingestion + transformation + tests",
    schedule_interval="0 6 * * *",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["datashop", "production", "daily"],
) as dag:

    # Etape 1: Declenchement des syncs Airbyte
    sync_postgres = AirbyteTriggerSyncOperator(
        task_id="sync_postgres",
        airbyte_conn_id="airbyte_default",
        connection_id="pg-to-snowflake-connection-id",
        asynchronous=True,
    )

    wait_postgres = AirbyteJobSensor(
        task_id="wait_postgres_sync",
        airbyte_conn_id="airbyte_default",
        airbyte_job_id=sync_postgres.output,
        timeout=3600,
        poke_interval=60,
    )

    sync_stripe = AirbyteTriggerSyncOperator(
        task_id="sync_stripe",
        airbyte_conn_id="airbyte_default",
        connection_id="stripe-to-snowflake-connection-id",
        asynchronous=True,
    )

    wait_stripe = AirbyteJobSensor(
        task_id="wait_stripe_sync",
        airbyte_conn_id="airbyte_default",
        airbyte_job_id=sync_stripe.output,
        timeout=3600,
        poke_interval=60,
    )

    # Etape 2: dbt run + test via Cosmos
    dbt_transform = DbtTaskGroup(
        group_id="dbt_transform",
        project_config=ProjectConfig("/opt/airflow/dbt/dbt_datashop"),
        profile_config=ProfileConfig(
            profile_name="datashop",
            target_name="prod",
        ),
        default_args={"retries": 1},
    )

    # Etape 3: Notification de succes
    notify_success = SlackWebhookOperator(
        task_id="notify_success",
        slack_webhook_conn_id="slack_data_alerts",
        message=":white_check_mark: Pipeline DataShop quotidien termine avec succes !",
    )

    # Dependencies
    [sync_postgres >> wait_postgres, sync_stripe >> wait_stripe] >> dbt_transform >> notify_success

Etape 5 : Monitoring et observabilite

Un pipeline de production necessite un monitoring proactif. Voici les metriques a surveiller :

MetriqueSeuil d'alerteOutil
Duree du pipeline> 2x la duree moyenneAirflow SLA
Volume de lignes ingereVariation > 30% vs J-1dbt source freshness
Nombre de tests dbt en echec> 0 en productiondbt test + Slack
Credits Snowflake consommes> budget quotidienSnowflake Resource Monitor
Latence end-to-end> 2 heuresAirflow + custom sensor
SQL - Moniteur Snowflake de credits
-- Creer un resource monitor pour le warehouse dbt
CREATE RESOURCE MONITOR datashop_monitor
  WITH CREDIT_QUOTA = 50           -- 50 credits par mois
  FREQUENCY = MONTHLY
  START_TIMESTAMP = IMMEDIATELY
  TRIGGERS
    ON 75 PERCENT DO NOTIFY        -- alerte a 75%
    ON 90 PERCENT DO NOTIFY        -- alerte a 90%
    ON 100 PERCENT DO SUSPEND;     -- suspendre a 100%

ALTER WAREHOUSE DBT_WH SET RESOURCE_MONITOR = datashop_monitor;

Checklist de mise en production

Avant de deployer en production, verifiez : (1) Tous les tests dbt passent sur un jeu de donnees complet, (2) Le DAG Airflow a ete execute 3 fois sans erreur en staging, (3) Les alertes Slack sont configurees et testees, (4) Le resource monitor Snowflake est actif, (5) La documentation dbt est generee et accessible, (6) Le plan de rollback est documente.

Echec : Pipeline sans idempotence

Une equipe a deploye un pipeline Airflow + dbt sans s'assurer de l'idempotence. Lors d'un retry automatique apres un timeout reseau, les modeles dbt en mode append ont duplique 2 millions de lignes dans la table de faits. Le dashboard de revenus affichait un CA double pendant 6 heures avant detection. La correction : passer en mode incremental avec unique_key et ajouter un test unique sur la cle primaire.

22. Examen Final - Phase 3 : Modern Data Stack

25 min Avance

Objectifs

  • Valider l'ensemble des connaissances acquises dans la Phase 3
  • Demontrer votre maitrise des Cloud Data Warehouses, dbt, orchestrateurs, ELT/CDC et formats de fichiers
  • Obtenir la certification Phase 3 - Modern Data Stack

Cet examen final couvre toute la Phase 3. Vous devez obtenir au moins 9/12 pour valider cette phase. Prenez votre temps, relisez les questions attentivement. Si vous echouez, revisez les lecons correspondantes et repassez l'examen. Bonne chance !

Examen Final Phase 3 (12 questions)

Q1. Quel modele de tarification utilise Snowflake qui le differencie de BigQuery ?

A. Tarification au volume de donnees stockees uniquement
B. Separation compute/storage avec des credits par warehouse
C. Tarification au nombre d'utilisateurs
D. Tarification a la requete (par octets scannes)
Snowflake utilise un modele de credits par virtual warehouse (compute) separe du stockage. BigQuery utilise la tarification a la requete (octets scannes) ou les slots reserves. Cette separation compute/storage est la caracteristique fondamentale de Snowflake.

Q2. Quelle strategie de materialisation dbt est la plus adaptee pour un modele staging qui transforme des donnees brutes ?

A. table
B. view
C. incremental
D. ephemeral
Les modeles staging sont conventionnellement materialises en view. Ils effectuent des transformations legeres (renommage, cast, filtrage) sur les sources brutes. En view, ils n'occupent pas de stockage supplementaire et refletent toujours les donnees les plus recentes de la source.

Q3. Quel est l'avantage principal des Deferrable Operators dans Airflow 2.x ?

A. Ils executent les taches plus rapidement
B. Ils liberent le worker slot pendant l'attente, reduisant la consommation de ressources
C. Ils permettent d'ecrire des DAGs en YAML au lieu de Python
D. Ils remplacent le scheduler Airflow
Les Deferrable Operators utilisent le Triggerer pour liberer le worker slot quand une tache attend un evenement externe (requete API, job Spark). Cela permet de gerer des centaines de taches en attente avec une fraction des ressources traditionnellement requises.

Q4. Quelle commande dbt genere la documentation interactive du projet et la sert localement ?

A. dbt test --generate-docs
B. dbt docs generate && dbt docs serve
C. dbt run --docs
D. dbt compile --with-docs
La commande dbt docs generate cree les fichiers de documentation (catalog.json, manifest.json), puis dbt docs serve lance un serveur web local pour naviguer dans le lineage graph et la documentation de chaque modele.

Q5. Quel format de fichier est optimal pour les messages Kafka dans une architecture CDC avec Debezium ?

A. Parquet
B. CSV
C. Avro avec Schema Registry
D. JSON sans schema
Avro avec Schema Registry est le format recommande pour Kafka + Debezium. Avro est compact, rapide en serialisation, et le Schema Registry assure la compatibilite des schemas lors des evolutions de la base source (ajout/suppression de colonnes).

Q6. Quelle fonctionnalite d'Apache Iceberg elimine la necessite pour les utilisateurs de connaitre le schema de partitionnement ?

A. Time Travel
B. Schema Evolution
C. Hidden Partitioning
D. ACID Transactions
Le Hidden Partitioning d'Iceberg permet de definir des partition transforms (days, months, bucket) au niveau de la table. Les utilisateurs ecrivent des requetes normales (WHERE date = '2024-01-15') et Iceberg applique automatiquement le filtre au niveau des partitions, eliminant les full table scans accidentels.

Q7. Dans un pipeline dbt, quel test detecterait une violation de cle etrangere entre fct_orders et dim_customers ?

A. unique
B. not_null
C. accepted_values
D. relationships
Le test relationships de dbt verifie que chaque valeur d'une colonne existe dans une colonne d'un autre modele, equivalent a une verification de cle etrangere. Par exemple, il verifie que chaque customer_id dans fct_orders existe dans dim_customers.

Q8. Quel est l'avantage principal du mode Merge-on-Read (MoR) d'Apache Hudi ?

A. Les lectures sont plus rapides que Copy-on-Write
B. Les ecritures sont plus rapides car les updates sont stockees dans des delta logs
C. Il ne necessite pas de compaction
D. Il est compatible uniquement avec Spark
Le mode Merge-on-Read optimise la vitesse d'ecriture en appendant les modifications dans des delta logs au lieu de reecrire les fichiers Parquet. Les lectures doivent ensuite merger les fichiers de base avec les delta logs, ce qui est un compromis accepte pour les workloads CDC a haute frequence.

Q9. Quelle est la bonne pratique pour gerer la taille des fichiers dans un data lake Parquet ?

A. Maximiser le nombre de petits fichiers pour la parallelisation
B. Viser des fichiers de 128 MB a 1 GB avec des row groups de 128 MB
C. Un seul fichier par table pour minimiser les metadata
D. Des fichiers de 1 MB pour un predicate pushdown optimal
La regle d'or est de viser des fichiers entre 128 MB et 1 GB avec des row groups de 128 MB. Les fichiers trop petits (small files problem) degradent les performances a cause du overhead de metadata et des appels API au stockage objet. Les fichiers trop gros limitent la parallelisation.

Q10. Quel outil ELT utilise la facturation Monthly Active Rows (MAR) et peut devenir tres couteux sur des tables volumineuses ?

A. Airbyte
B. Meltano
C. Fivetran
D. Debezium
Fivetran utilise le modele MAR (Monthly Active Rows) qui facture chaque ligne modifiee ou ajoutee dans le mois. Sur des tables transactionnelles avec des millions de mises a jour quotidiennes, le cout peut exploser rapidement. Il est essentiel d'estimer le volume MAR avant de connecter une source.

Q11. Dans une architecture Modern Data Stack, quel est l'ordre correct des couches de transformation dbt ?

A. marts -> intermediate -> staging
B. staging -> marts -> intermediate
C. staging -> intermediate -> marts
D. raw -> marts -> staging
L'ordre correct est staging -> intermediate -> marts. Les modeles staging nettoient et renomment les donnees brutes (source). Les modeles intermediate effectuent les jointures et enrichissements complexes. Les marts produisent les tables business-ready consommees par les outils BI et les data analysts.

Q12. Quelle propriete garantit qu'un pipeline peut etre reexecute plusieurs fois sans effet de bord (duplication, corruption) ?

A. La scalabilite
B. L'observabilite
C. L'idempotence
D. La latence
L'idempotence garantit que l'execution repetee d'un pipeline produit toujours le meme resultat. C'est une propriete critique en production : un retry automatique apres un timeout ne doit pas dupliquer les donnees. On l'obtient via des strategies merge/upsert avec unique_key dans dbt, ou des operations MERGE/DELETE+INSERT dans les pipelines.

Resultats et prochaines etapes

Si vous avez obtenu 9/12 ou plus : felicitations, vous maitrisez le Modern Data Stack ! Vous etes pret pour la Phase 4 : Architecture Cloud qui approfondira AWS, GCP et Azure pour le data engineering.

Si vous avez obtenu moins de 9/12 : revisez les lecons du module concerne et repassez l'examen. Concentrez-vous sur les concepts que vous avez manques.