Introduction au Streaming de Donnees

45 min Intermediaire

Objectifs de la lecon

  • Comprendre le spectre batch-to-streaming et les differents paradigmes de traitement
  • Distinguer les types de latence : event-time, processing-time, ingestion-time
  • Comparer les architectures Lambda et Kappa avec leurs compromis
  • Identifier les cas d'usage reels necessitant du streaming

Le streaming n'est pas simplement "du batch plus rapide". C'est un changement fondamental de paradigme : au lieu de traiter des donnees au repos, vous traitez des donnees en mouvement. Cette distinction change tout, de la semantique de traitement a la gestion des erreurs.

Le Spectre Batch-to-Streaming

Le traitement de donnees n'est pas binaire entre batch et streaming. Il existe un spectre complet de modes de traitement, chacun avec ses caracteristiques propres :

ModeLatenceComplexiteCas d'usage
Batch classiqueHeures / JoursFaibleRapports mensuels, ETL nocturne
Micro-batchSecondes / MinutesMoyenneDashboards quasi temps reel
Near Real-TimeMillisecondes / SecondesEleveeAlertes, monitoring
Real-TimeMillisecondesTres eleveeFraude, trading, IoT

Les Trois Temps du Streaming

Event-time : le moment ou l'evenement s'est reellement produit. C'est le temps metier.
Ingestion-time : le moment ou l'evenement arrive dans le systeme de streaming.
Processing-time : le moment ou l'evenement est traite par votre application.
La difference entre ces temps s'appelle le skew et constitue un defi majeur du streaming.

Architecture Lambda

L'architecture Lambda, proposee par Nathan Marz, combine une couche batch et une couche temps reel pour offrir des vues completes et a faible latence :

Architecture Lambda
                    +------------------+
                    |  Sources Donnees |
                    +--------+---------+
                             |
              +--------------+--------------+
              |                             |
     +--------v--------+          +--------v--------+
     |   Batch Layer   |          |  Speed Layer    |
     |  (MapReduce/    |          |  (Storm/Flink)  |
     |   Spark Batch)  |          |                 |
     +--------+--------+          +--------+--------+
              |                             |
     +--------v--------+          +--------v--------+
     |  Batch Views    |          | Real-time Views |
     +--------+--------+          +--------+--------+
              |                             |
              +--------------+--------------+
                             |
                    +--------v--------+
                    |  Serving Layer  |
                    |  (Merge Views)  |
                    +-----------------+

Avantages Lambda

  • Tolerante aux pannes : le batch corrige les erreurs du speed layer
  • Resultats exacts via la couche batch
  • Latence faible via la couche speed

Inconvenients Lambda

  • Double logique metier a maintenir (batch + stream)
  • Complexite operationnelle elevee
  • Synchronisation difficile entre les couches

Architecture Kappa

L'architecture Kappa, proposee par Jay Kreps (co-createur de Kafka chez LinkedIn), simplifie en eliminant la couche batch :

Architecture Kappa
     +------------------+
     |  Sources Donnees |
     +--------+---------+
              |
     +--------v---------+
     |    Stream Layer   |
     |  (Kafka + Flink)  |
     |                   |
     |  Reprocessing =   |
     |  Replay du log    |
     +--------+----------+
              |
     +--------v---------+
     |   Serving Layer   |
     |  (Vues unifiees)  |
     +-------------------+

Quand choisir Kappa ?

Privilegiez Kappa quand votre log immutable (Kafka) peut servir de source de verite et que vous pouvez retraiter l'historique en rejouant le stream. C'est l'approche dominante dans l'industrie moderne.

Cas d'Usage du Streaming

Scenarios Concrets

  • Detection de fraude bancaire : chaque transaction doit etre evaluee en <100ms avant autorisation
  • IoT industriel : des milliers de capteurs envoient des metriques en continu, les anomalies doivent declencher des alertes instantanees
  • Personnalisation e-commerce : recommandations basees sur le parcours en temps reel de l'utilisateur
  • Tarification dynamique : ajustement des prix en fonction de l'offre et la demande en temps reel (Uber Surge Pricing)

LinkedIn - La Genese de Kafka

En 2010, LinkedIn faisait face a un probleme majeur : des centaines de systemes generaient des donnees qui devaient etre consommees par des dizaines d'autres systemes. Le modele point-a-point creait un graphe de N x M connexions ingerables.

Jay Kreps, Neha Narkhede et Jun Rao ont concu Apache Kafka comme un log distribue unifie : chaque producteur ecrit dans un topic, chaque consommateur lit independamment. Le nombre de connexions passe de N x M a N + M.

Resultats : Kafka traite aujourd'hui plus de 7 trillions de messages par jour chez LinkedIn, avec une latence mediane inferieure a 5ms.

Tout en Streaming

L'erreur classique est de vouloir tout traiter en streaming. Si votre rapport mensuel peut attendre 24h, un batch simple sera plus fiable, moins couteux et plus facile a deboguer. Le streaming ajoute de la complexite : ne l'utilisez que quand la latence est un besoin metier reel.

Quelle est la difference fondamentale entre Lambda et Kappa ?
Lambda maintient deux chemins de traitement paralleles (batch + speed) fusionnes au serving layer. Kappa n'utilise qu'un seul chemin streaming et retraite l'historique en rejouant le log. Kappa est plus simple a maintenir mais necessite un log immutable performant comme Kafka.
Pourquoi l'event-time est-il si important en streaming ?
L'event-time represente le moment reel ou un evenement s'est produit. Les evenements peuvent arriver en desordre ou en retard (donnees mobiles, reseaux instables). Si vous utilisez uniquement le processing-time, vos fenetres temporelles et aggregations seront incorrectes. Les watermarks permettent de gerer ce desordre.

Kafka Architecture

60 min Avance

Objectifs de la lecon

  • Comprendre l'architecture interne de Kafka : brokers, topics, partitions
  • Maitriser les mecanismes de replication et le concept d'ISR
  • Comprendre le fonctionnement des consumer groups
  • Comparer ZooKeeper et KRaft pour la gestion du cluster

Kafka n'est pas qu'un simple message broker. C'est un log distribue, append-only, concu pour la durabilite et le debit. Comprendre cette nature fondamentale est la cle pour bien l'utiliser et eviter les pieges classiques de configuration.

Concepts Fondamentaux

Kafka = Log Distribue Immutable

Un topic Kafka est un log append-only partitionne. Chaque message recoit un offset sequentiel unique au sein de sa partition. Les messages ne sont jamais modifies ni supprimes (avant expiration du retention). Cette immutabilite est la base de toute l'architecture.

Architecture d'un Cluster Kafka
  Producers                    Kafka Cluster                    Consumers
  =========          ================================          =========

  +-------+         Broker 1          Broker 2                 +-------+
  | App A +-------> +-----------+    +-----------+  +--------> | App X |
  +-------+         | Topic T   |    | Topic T   |  |          +-------+
                    | Part 0 (L)|    | Part 0 (F)|  |
  +-------+         | Part 2 (F)|    | Part 1 (L)|  |          +-------+
  | App B +-------> +-----------+    +-----------+  +--------> | App Y |
  +-------+                                         |          +-------+
                    Broker 3                         |
                    +-----------+                    |          +-------+
                    | Topic T   |                    +--------> | App Z |
                    | Part 1 (F)|                               +-------+
                    | Part 2 (L)|
                    +-----------+
                    (L)=Leader (F)=Follower

Brokers et Partitions

Un cluster Kafka est compose de brokers (serveurs). Chaque topic est divise en partitions distribuees sur les brokers. Les partitions sont l'unite de parallelisme : plus de partitions = plus de throughput potentiel.

Bash
# Creer un topic avec 6 partitions et facteur de replication 3
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic commandes \
  --partitions 6 \
  --replication-factor 3

# Decrire un topic existant
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic commandes

# Resultat typique :
# Topic: commandes  Partitions: 6  ReplicationFactor: 3
# Partition: 0  Leader: 1  Replicas: 1,2,3  Isr: 1,2,3
# Partition: 1  Leader: 2  Replicas: 2,3,1  Isr: 2,3,1
# ...

Replication et ISR

Chaque partition a un leader et des followers. Toutes les lectures et ecritures passent par le leader. Les followers repliquent le log de maniere asynchrone.

In-Sync Replicas (ISR)

L'ISR est l'ensemble des replicas qui sont "a jour" avec le leader (dans la limite de replica.lag.time.max.ms, par defaut 30s). Un message n'est considere comme committed que lorsqu'il a ete replique sur tous les ISR. Si un follower prend du retard, il sort de l'ISR. Le parametre min.insync.replicas definit le nombre minimum d'ISR requis pour accepter une ecriture.

Mecanisme de Replication
  Producer --acks=all---> Partition 0 Leader (Broker 1)
                               |
                    +----------+----------+
                    |                     |
              Follower (Broker 2)   Follower (Broker 3)
              [ISR: oui]            [ISR: oui]

  min.insync.replicas=2 : il faut au moins 2 ISR
  pour que l'ecriture soit acceptee avec acks=all

  Si Broker 3 tombe :
  - ISR = {Broker 1, Broker 2} -> ecriture OK (2 >= 2)
  Si Broker 2 tombe aussi :
  - ISR = {Broker 1} -> ecriture REFUSEE (1 < 2)
  - NotEnoughReplicasException

Consumer Groups

Les consumer groups permettent le traitement parallele. Chaque partition est assignee a exactement un consumer du groupe. Si un consumer tombe, ses partitions sont redistribuees aux survivants (rebalancing).

Consumer Groups et Partitions
  Topic "commandes" (6 partitions)

  Consumer Group "payment-service" :
  +------------+  +------------+  +------------+
  | Consumer 1 |  | Consumer 2 |  | Consumer 3 |
  | P0, P1     |  | P2, P3     |  | P4, P5     |
  +------------+  +------------+  +------------+

  Consumer Group "analytics-service" :
  +------------+  +------------+
  | Consumer A |  | Consumer B |
  | P0, P1, P2 |  | P3, P4, P5 |
  +------------+  +------------+

  Regle : #consumers <= #partitions dans un groupe
  Un consumer en trop sera idle (inactif)

ZooKeeper vs KRaft

ZooKeeper (legacy)

  • Service externe pour les metadonnees du cluster
  • Election du controller via ZooKeeper
  • Limite a ~200K partitions par cluster
  • Composant supplementaire a operer et monitorer
  • Deprecie depuis Kafka 3.3

KRaft (moderne)

  • Metadonnees gerees nativement par Kafka via Raft
  • Quorum controller integre
  • Supporte des millions de partitions
  • Demarrage plus rapide, architecture simplifiee
  • Mode par defaut depuis Kafka 3.6+
Properties
# Configuration KRaft (server.properties)
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
log.dirs=/var/kafka-logs
metadata.log.dir=/var/kafka-metadata

LinkedIn - Scaling Kafka a l'Extreme

LinkedIn opere plus de 100 clusters Kafka avec des milliers de brokers. Leur plus grand cluster gere plus de 100 000 partitions. La migration vers KRaft a ete motivee par les limitations de ZooKeeper : au-dela de 200K partitions, le temps de recovery apres panne d'un controller depassait plusieurs minutes. Avec KRaft, ce temps est reduit a quelques secondes.

Lecon cle : Le nombre de partitions est le levier principal de scalabilite, mais chaque partition a un cout en memoire et en temps de recovery. Dimensionnez selon votre throughput cible, pas "au cas ou".

Trop de Partitions

Creer des centaines de partitions "par precaution" est un anti-pattern courant. Chaque partition consomme de la memoire (file descriptors, index segments), augmente le temps de rebalancing des consumers, et complexifie le monitoring. Commencez avec nombre de partitions = 2x le nombre de consumers et augmentez si necessaire. On ne peut pas reduire le nombre de partitions d'un topic existant.

Ordonnancement

L'ordre des messages n'est garanti que au sein d'une meme partition. Si l'ordre global est critique, utilisez une seule partition (au detriment du parallelisme) ou une cle de partitionnement pertinente pour regrouper les messages lies dans la meme partition.

Que se passe-t-il si le nombre de consumers dans un groupe depasse le nombre de partitions ?
Les consumers en surplus restent inactifs (idle). Chaque partition ne peut etre assignee qu'a un seul consumer dans un meme groupe. Par exemple, avec 4 partitions et 6 consumers, 2 consumers ne recevront aucun message. C'est pour cela que le nombre de partitions definit le parallelisme maximal.

Quiz rapide

Quel parametre garantit qu'un message est ecrit sur tous les ISR avant d'etre confirme au producer ?

min.insync.replicas=all
acks=all
replication.factor=all
Correct ! acks=all (ou acks=-1) attend la confirmation de tous les ISR. min.insync.replicas definit combien d'ISR sont necessaires.

Kafka Producers & Consumers

55 min Avance

Objectifs de la lecon

  • Configurer un producer Kafka avec les bons niveaux de garantie (acks, idempotence)
  • Comprendre la semantique exactly-once de bout en bout
  • Maitriser la gestion des offsets cote consumer
  • Gerer le rebalancing et ses impacts sur les applications

La configuration du producer et du consumer est souvent negligee par les developpeurs. Pourtant, c'est la que se joue la difference entre un systeme qui perd des messages, qui en duplique, ou qui offre des garanties exactly-once. Chaque parametre a un impact direct sur la fiabilite et les performances.

Producer : Niveaux de Garantie

Parametre acksComportementDurabiliteLatence
acks=0Fire and forget, pas d'attente de confirmationPerte possibleTres faible
acks=1Confirmation du leader uniquementPerte si le leader tombe avant replicationFaible
acks=allConfirmation de tous les ISRMaximale (avec min.insync.replicas >= 2)Plus elevee

Idempotence du Producer

Sans idempotence, un retry apres timeout reseau peut causer des doublons. L'idempotence garantit que chaque message est ecrit exactement une fois dans sa partition, meme en cas de retry.

Java
// Configuration d'un producer idempotent
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

// Idempotence : elimine les doublons en cas de retry
props.put("enable.idempotence", true);  // implicitement acks=all
props.put("max.in.flight.requests.per.connection", 5); // max 5 avec idempotence
props.put("retries", Integer.MAX_VALUE);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Envoi avec callback pour gestion d'erreur
producer.send(new ProducerRecord<>("commandes", orderId, orderJson),
    (metadata, exception) -> {
        if (exception != null) {
            log.error("Echec envoi commande {}: {}", orderId, exception.getMessage());
            // Logique de compensation
        } else {
            log.info("Commande {} envoyee - partition:{} offset:{}",
                orderId, metadata.partition(), metadata.offset());
        }
    });

Comment fonctionne l'idempotence ?

Kafka attribue un Producer ID (PID) et un sequence number a chaque message. Le broker rejette tout message dont le sequence number a deja ete vu pour ce PID+partition. Ainsi, meme si le producer renvoie un message apres un timeout, le broker le detecte comme doublon et repond OK sans l'ecrire une seconde fois.

Exactly-Once Semantics (EOS)

L'idempotence protege contre les doublons au sein d'une partition. Pour une garantie exactly-once de bout en bout (lire depuis un topic, traiter, ecrire dans un autre topic), il faut les transactions Kafka :

Java
// Producer transactionnel pour exactly-once
props.put("transactional.id", "payment-processor-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
    producer.beginTransaction();

    // Lire les messages d'entree
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        // Traitement metier
        String result = processPayment(record.value());

        // Ecriture du resultat dans le topic de sortie
        producer.send(new ProducerRecord<>("paiements-valides", record.key(), result));
    }

    // Commit des offsets ET de la production dans une seule transaction
    producer.sendOffsetsToTransaction(
        getOffsetsToCommit(records),
        consumer.groupMetadata()
    );

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Consumer : Gestion des Offsets

L'offset est la position du consumer dans chaque partition. Kafka offre trois strategies de gestion :

Auto-commit (defaut)

  • enable.auto.commit=true
  • Commit automatique toutes les 5s
  • Risque : perte de messages si crash entre poll et commit
  • Risque : doublons si crash entre traitement et commit

Commit manuel

  • enable.auto.commit=false
  • commitSync() ou commitAsync()
  • Controle total sur quand le commit se fait
  • Commit apres traitement complet = at-least-once
Java
// Consumer avec commit manuel - at-least-once semantics
Properties cProps = new Properties();
cProps.put("bootstrap.servers", "broker1:9092");
cProps.put("group.id", "payment-service");
cProps.put("enable.auto.commit", false);
cProps.put("auto.offset.reset", "earliest"); // latest | earliest | none
cProps.put("isolation.level", "read_committed"); // pour EOS

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
consumer.subscribe(Arrays.asList("commandes"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        processOrder(record);  // Traitement metier
    }
    consumer.commitSync();  // Commit apres traitement complet
}

Rebalancing des Consumers

Impact du Rebalancing

Un rebalancing se declenche quand un consumer rejoint ou quitte un groupe, ou quand les partitions changent. Pendant un rebalancing, aucun consumer du groupe ne traite de messages. Cela peut durer de quelques secondes a plusieurs minutes selon la taille du groupe. Strategies pour minimiser l'impact :

  • Static group membership : group.instance.id evite un rebalance lors de redemarrages courts
  • Cooperative rebalancing : partition.assignment.strategy=CooperativeStickyAssignor ne reassigne que les partitions necessaires
  • Heartbeat tuning : ajustez session.timeout.ms et heartbeat.interval.ms

Incident : Rebalancing en Cascade chez un E-Commerce

Un retailer en ligne avait 50 consumers dans un groupe traitant les commandes. Un deploiement rolling restart a declenche un rebalancing a chaque redemarrage de consumer. Avec session.timeout.ms=10s par defaut, chaque rebalance prenait ~45 secondes.

Resultat : 50 rebalancings consecutifs = plus de 30 minutes sans traitement de commandes pendant le Black Friday.

Solution : Passage au CooperativeStickyAssignor et utilisation de group.instance.id unique par instance, reduisant le temps de rebalancing a moins de 2 secondes par instance.

Ignorer les Erreurs de Deserialization

Un message mal forme (schema incompatible, corruption) bloque le consumer en boucle infinie par defaut. Configurez toujours un DeserializationExceptionHandler ou utilisez un dead letter topic pour isoler les messages problematiques sans bloquer le pipeline entier.

Quelle est la difference entre idempotence et transactions Kafka ?
L'idempotence (enable.idempotence=true) garantit qu'un message n'est ecrit qu'une fois dans une partition, meme en cas de retry. Les transactions etendent cette garantie a des operations multi-partitions et multi-topics : lire d'un topic, traiter, ecrire dans un autre, et committer les offsets, le tout de maniere atomique (tout ou rien).

Kafka Connect & Schema Registry

55 min Avance

Objectifs de la lecon

  • Configurer des connecteurs source et sink avec Kafka Connect
  • Appliquer des Single Message Transforms (SMT) et gerer les dead letter queues
  • Utiliser le Schema Registry pour gouverner les schemas Avro, Protobuf et JSON Schema
  • Comprendre les modes de compatibilite et leur impact sur l'evolution des schemas

Kafka Connect et le Schema Registry sont les deux piliers souvent negliges de l'ecosysteme Kafka. Connect elimine le code boilerplate d'integration, et le Schema Registry previent les ruptures de contrat entre producteurs et consommateurs. Ensemble, ils transforment Kafka d'un simple bus de messages en une plateforme d'integration d'entreprise.

Kafka Connect : Architecture

Kafka Connect - Architecture Distribuee
  Sources Externes              Kafka Connect Cluster           Kafka         Destinations
  ================    ======================================   =======    ================

  +------------+      +----------------------------------+                 +------------+
  | PostgreSQL |----->| Worker 1                         |                 | Elastic    |
  +------------+      |  [JDBC Source Task 0]            |     +------+   | Search     |
                      |  [JDBC Source Task 1]            |---->|Topics|-->|            |
  +------------+      +----------------------------------+     +------+   +------------+
  | MongoDB    |----->|                                  |
  +------------+      | Worker 2                         |                 +------------+
                      |  [MongoDB Source Task 0]         |                 | S3 Bucket  |
  +------------+      |  [S3 Sink Task 0]                |<----|Topics|-->|            |
  | API REST   |----->|  [Elasticsearch Sink Task 0]     |     +------+   +------------+
  +------------+      +----------------------------------+

Concepts Cles de Connect

Worker : processus JVM qui execute les connecteurs. Mode standalone (dev) ou distribue (prod).
Connector : definition logique d'une integration (config JSON).
Task : unite d'execution parallelisable. Un connecteur peut avoir N tasks.
Converter : serialise/deserialise les donnees (Avro, JSON, Protobuf).
SMT : transformation legere appliquee message par message.

JSON
{
  "name": "postgres-source-commandes",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "db-prod.internal",
    "database.port": "5432",
    "database.user": "cdc_reader",
    "database.password": "${vault:secret/kafka/db-password}",
    "database.dbname": "ecommerce",
    "table.include.list": "public.commandes,public.lignes_commande",
    "topic.prefix": "cdc.ecommerce",
    "plugin.name": "pgoutput",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "route,unwrap,addTimestamp",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "cdc\\.ecommerce\\.(.*)",
    "transforms.route.replacement": "streaming.$1",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "processed_at",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-postgres-commandes",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.context.headers.enable": true
  }
}

Single Message Transforms (SMT)

Les SMTs permettent de transformer les messages au vol sans ecrire de code. Voici les SMTs les plus utiles :

SMTUsageExemple
RegexRouterRenommer les topicsRerouter cdc.db.table vers stream.table
ExtractFieldExtraire un champExtraire uniquement la cle depuis la valeur
InsertFieldAjouter des champsAjouter un timestamp de traitement
MaskFieldMasquer des donneesMasquer email, telephone (RGPD)
TimestampConverterConvertir les datesUnix timestamp vers ISO 8601
Filter (Predicate)Filtrer les messagesIgnorer les messages de type DELETE

Dead Letter Queue (DLQ)

Configurez TOUJOURS une dead letter queue en production. Sans DLQ, un seul message mal forme peut bloquer tout votre pipeline. Avec errors.tolerance=all et une DLQ, les messages en erreur sont isoles pour investigation ulterieure sans impacter le flux principal. Activez context.headers.enable=true pour capturer la cause de l'erreur dans les headers du message DLQ.

Schema Registry

Le Schema Registry est un service centralise qui stocke et valide les schemas des messages Kafka. Il garantit que producteurs et consommateurs parlent le meme "langage".

Schema Registry - Flux de Validation
  Producer                    Schema Registry              Consumer
  ========                    ===============              ========

  1. Serialiser message  ---> 2. Verifier compatibilite
     avec schema v2           du schema v2 vs v1
                              |
                              +--compatible--> 3. Enregistrer schema v2
                              |                   retourner schema_id=42
                              +--incompatible--> ERREUR: schema rejete

  4. Envoyer message          5. Consumer recoit
     [magic_byte][schema_id]     message, extrait
     [payload_avro]              schema_id=42

                              6. Recuperer schema v2 ----> 7. Deserialiser
                                 (cache local)               avec schema v2

Formats de Serialisation

Apache Avro

  • Format binaire compact, schema integre
  • Evolution de schema native
  • Le plus utilise avec Kafka
  • Support excellent dans l'ecosysteme Java/JVM

Protobuf / JSON Schema

  • Protobuf : plus performant qu'Avro, genere du code type-safe, populaire en gRPC
  • JSON Schema : lisible par l'humain, facile a deboguer, mais plus volumineux
  • Choix selon l'ecosysteme existant
Avro Schema
{
  "type": "record",
  "name": "Commande",
  "namespace": "com.ecommerce.events",
  "fields": [
    {"name": "commande_id", "type": "string"},
    {"name": "client_id", "type": "string"},
    {"name": "montant", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}},
    {"name": "devise", "type": {"type": "enum", "name": "Devise", "symbols": ["EUR", "USD", "GBP"]}},
    {"name": "statut", "type": "string"},
    {"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {"name": "commentaire", "type": ["null", "string"], "default": null}
  ]
}

Modes de Compatibilite

ModeRegleCas d'usage
BACKWARD (defaut)Le nouveau schema peut lire les anciens messagesConsumers mis a jour en premier
FORWARDL'ancien schema peut lire les nouveaux messagesProducers mis a jour en premier
FULLBackward + ForwardMise a jour independante producteur/consommateur
NONEPas de verificationDeveloppement uniquement !

Regles d'Or de l'Evolution de Schema

En mode BACKWARD (le plus courant) : vous pouvez ajouter des champs optionnels (avec default), supprimer des champs qui ont un default. Vous ne pouvez PAS renommer des champs, changer le type d'un champ, ou supprimer un champ sans default. Violer ces regles = messages illisibles en production.

Schema Registry en Mode NONE en Production

Desactiver la validation de compatibilite en production est une bombe a retardement. Un producteur peut publier un changement de schema incompatible qui cassera silencieusement tous les consommateurs. Toujours utiliser au minimum BACKWARD en production et FULL quand possible.

Quelle est la difference entre un converter et une SMT dans Kafka Connect ?
Un converter gere la serialisation/deserialization des messages (ex: AvroConverter transforme les objets Connect en bytes Avro et inversement). Une SMT transforme la structure du message lui-meme (ajout/suppression de champs, renommage de topics, filtrage). Le converter opere au niveau des bytes, la SMT au niveau de la structure logique du message.

Kafka Streams & ksqlDB

60 min Avance

Objectifs de la lecon

  • Distinguer KStream (flux d'evenements) et KTable (table de changements)
  • Maitriser le windowing (tumbling, hopping, session, sliding)
  • Comprendre les joins entre streams et tables avec les state stores
  • Ecrire des requetes ksqlDB : materialized views, pull et push queries

Kafka Streams est la bibliotheque de stream processing la plus elegante de l'ecosysteme : pas de cluster separe a gerer, juste une dependance Java. ksqlDB democratise le streaming en le rendant accessible via SQL. Ensemble, ils couvrent 80% des cas de stream processing sans la complexite d'un Flink ou Spark Streaming.

KStream vs KTable

Dualite Stream-Table

Un KStream est un flux infini d'evenements immutables (chaque record est un fait). Une KTable est un changelog : seule la derniere valeur par cle est conservee (upsert). Cette dualite est fondamentale :

  • Stream : "l'utilisateur X a clique sur le produit Y a 14h03" (chaque clic compte)
  • Table : "le solde du compte X est 1500 EUR" (seule la derniere valeur importe)

Un stream peut devenir une table (via aggregation), et une table peut devenir un stream (via changelog).

Java
// Kafka Streams - Exemple complet de traitement de commandes
StreamsBuilder builder = new StreamsBuilder();

// KStream : flux de commandes (chaque commande est un evenement)
KStream<String, Commande> commandes = builder.stream(
    "commandes",
    Consumed.with(Serdes.String(), commandeSerde)
);

// KTable : table des clients (derniere info connue par client)
KTable<String, Client> clients = builder.table(
    "clients",
    Consumed.with(Serdes.String(), clientSerde)
);

// Enrichir chaque commande avec les infos client (stream-table join)
KStream<String, CommandeEnrichie> commandesEnrichies = commandes
    .selectKey((key, commande) -> commande.getClientId())
    .join(clients,
        (commande, client) -> new CommandeEnrichie(commande, client)
    );

// Aggregation : chiffre d'affaires par categorie sur fenetre de 1 heure
KTable<Windowed<String>, Double> caParCategorie = commandesEnrichies
    .groupBy((key, ce) -> ce.getCategorie(),
        Grouped.with(Serdes.String(), commandeEnrichieSerde))
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .aggregate(
        () -> 0.0,
        (categorie, ce, total) -> total + ce.getMontant(),
        Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("ca-par-categorie")
            .withValueSerde(Serdes.Double())
    );

// Ecrire le resultat dans un topic de sortie
caParCategorie.toStream()
    .map((windowedKey, montant) -> KeyValue.pair(windowedKey.key(), montant))
    .to("ca-par-categorie-hourly", Produced.with(Serdes.String(), Serdes.Double()));

Windowing

Types de Fenetres Temporelles
  Temps --->  |--1--|--2--|--3--|--4--|--5--|--6--|

  TUMBLING (fixe, sans overlap) - ofSize(2)
  |=====|=====|=====|
  [1, 2] [3, 4] [5, 6]

  HOPPING (fixe, avec overlap) - ofSize(3), advance(1)
  |========|
     |========|
        |========|
  [1,2,3] [2,3,4] [3,4,5] [4,5,6]

  SESSION (dynamique, basee sur l'activite) - inactivityGap(2)
  |===|     |=======|   |===|
  [1,2]     [4,5,6]     [9]
  (gap >= 2 entre sessions)

  SLIDING (declenchee par evenement) - ofSize(3), grace(0)
  Fenetre recalculee a chaque evenement entrant

State Stores

Gestion de l'Etat dans Kafka Streams

Les aggregations et joins necessitent un etat local. Kafka Streams utilise RocksDB par defaut comme state store local, et sauvegarde les changements dans des changelog topics Kafka pour la tolerance aux pannes. En cas de redemarrage, l'etat est restaure depuis le changelog topic. Le state store est aussi queriable via les Interactive Queries, permettant d'exposer l'etat via une API REST.

ksqlDB : SQL sur les Streams

SQL (ksqlDB)
-- Creer un stream depuis un topic Kafka
CREATE STREAM commandes_stream (
  commande_id VARCHAR KEY,
  client_id VARCHAR,
  montant DECIMAL(10,2),
  categorie VARCHAR,
  created_at TIMESTAMP
) WITH (
  KAFKA_TOPIC = 'commandes',
  VALUE_FORMAT = 'AVRO',
  TIMESTAMP = 'created_at'
);

-- Vue materialisee : total par client (KTable)
CREATE TABLE total_par_client AS
  SELECT client_id,
         COUNT(*) AS nb_commandes,
         SUM(montant) AS total_depense,
         MAX(created_at) AS derniere_commande
  FROM commandes_stream
  GROUP BY client_id
  EMIT CHANGES;

-- Push query : flux continu de mises a jour (long-running)
SELECT * FROM total_par_client EMIT CHANGES;

-- Pull query : etat actuel ponctuel (request/response)
SELECT * FROM total_par_client WHERE client_id = 'CLI-42';

-- Detection d'anomalie : clients avec plus de 10 commandes en 5 min
CREATE TABLE alertes_fraude AS
  SELECT client_id,
         COUNT(*) AS nb_commandes,
         SUM(montant) AS total
  FROM commandes_stream
  WINDOW TUMBLING (SIZE 5 MINUTES)
  GROUP BY client_id
  HAVING COUNT(*) > 10
  EMIT CHANGES;

Push Queries

  • Syntaxe : SELECT ... EMIT CHANGES
  • Flux continu de resultats
  • Comme un abonnement temps reel
  • Utilise pour les dashboards, alertes

Pull Queries

  • Syntaxe : SELECT ... WHERE key = ...
  • Reponse unique, instantanee
  • Interroge l'etat materialise
  • Utilise pour les APIs, lookups

Uber - Tarification Dynamique en Temps Reel

Uber utilise Kafka Streams pour calculer le "surge pricing" en temps reel. Le systeme traite des millions d'evenements par seconde : positions GPS des chauffeurs, demandes de courses, conditions de trafic.

L'architecture repose sur des KTables pour l'etat courant (offre/demande par zone geographique) et des fenetres glissantes pour detecter les tendances. Le multiplicateur de prix est recalcule toutes les 30 secondes par zone.

Pourquoi Kafka Streams ? Pas de cluster separe a gerer, deploiement en tant que microservice standard, scaling horizontal en ajoutant des instances.

Etat Non Borne (Unbounded State)

Creer une aggregation sans fenetre temporelle sur une cle a haute cardinalite (ex: GROUP BY user_id sans fenetre) fait grossir le state store indefiniment. La memoire et le disque augmentent jusqu'a saturation. Toujours utiliser des fenetres temporelles avec une retention explicite, ou s'assurer que la cardinalite des cles est bornee.

Quand utiliser une push query vs une pull query dans ksqlDB ?
Utilisez une push query (EMIT CHANGES) quand vous avez besoin d'un flux continu de mises a jour, par exemple pour alimenter un dashboard temps reel ou declencher des alertes. Utilisez une pull query quand vous avez besoin d'interroger l'etat actuel ponctuellement, comme un appel API pour obtenir le solde d'un client. Les pull queries sont comme un SELECT classique, les push queries comme un abonnement.

Apache Flink

65 min Avance

Objectifs de la lecon

  • Comprendre l'architecture de Flink : JobManager, TaskManager, slots
  • Maitriser la DataStream API avec les operateurs de transformation et windowing
  • Configurer le checkpointing et les savepoints pour la tolerance aux pannes
  • Utiliser les watermarks pour gerer l'event-time et les donnees en retard
  • Decouvrir le Complex Event Processing (CEP) et la Table API

Flink est le moteur de stream processing le plus puissant du marche. Contrairement a Spark qui a ajoute le streaming a posteriori (micro-batch), Flink a ete concu des le depart pour le streaming natif. Son modele de checkpointing (inspire de Chandy-Lamport) offre des garanties exactly-once sans sacrifier les performances.

Architecture de Flink

Architecture d'un Cluster Flink
  Client (CLI / REST)
       |
       v
  +--------------------+
  |    JobManager       |     Coordonne l'execution
  |  +---------------+  |     - Scheduling des taches
  |  | Dispatcher    |  |     - Checkpointing
  |  | ResourceMgr   |  |     - Recovery
  |  | JobMaster     |  |
  +----+-------------+--+
       |           |
  +----v----+ +----v----+
  |TaskMgr 1| |TaskMgr 2|    Executent les operateurs
  | Slot 0  | | Slot 0  |
  | Slot 1  | | Slot 1  |    Slot = unite de ressource
  +---------+ +---------+    (1 thread, memoire dediee)

  Source --> Map --> KeyBy --> Window --> Sink
  [-------- Pipeline parallelise sur les slots --------]

DataStream API

Java
// Flink DataStream - Detection de fraude en temps reel
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configuration du checkpointing (exactly-once)
env.enableCheckpointing(60_000); // checkpoint toutes les 60s
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Source Kafka
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
    .setBootstrapServers("broker1:9092")
    .setTopics("transactions")
    .setGroupId("fraud-detector")
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    .setDeserializer(new TransactionDeserializer())
    .build();

DataStream<Transaction> transactions = env.fromSource(
    source,
    WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
        .withTimestampAssigner((tx, ts) -> tx.getTimestamp()),
    "Kafka Source"
);

// Traitement : detecter les transactions suspectes
DataStream<Alerte> alertes = transactions
    .keyBy(Transaction::getCompteId)
    .window(SlidingEventTimeWindows.of(
        Time.minutes(5),  // taille de fenetre
        Time.minutes(1)   // slide
    ))
    .process(new DetectionFraudeFunction());

// Sink : ecrire les alertes
alertes.sinkTo(
    KafkaSink.<Alerte>builder()
        .setBootstrapServers("broker1:9092")
        .setRecordSerializer(new AlerteSerializer("alertes-fraude"))
        .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
        .build()
);

env.execute("Fraud Detection Pipeline");

Watermarks et Event-Time

Comprendre les Watermarks

Un watermark est un signal qui dit : "tous les evenements avec un timestamp <= W sont arrives". Il permet a Flink de savoir quand fermer une fenetre temporelle, meme si les evenements arrivent en desordre.

  • forBoundedOutOfOrderness(Duration.ofSeconds(10)) : tolere jusqu'a 10s de retard
  • Les evenements arrivant apres le watermark sont consideres "en retard" (late events)
  • Strategy allowedLateness() : garde la fenetre ouverte plus longtemps pour les retardataires
  • Side output : redirige les late events vers un flux separe pour traitement ulterieur
Watermarks et Fenetres
  Evenements arrivant :  t=3  t=1  t=7  t=5  t=4  t=12  t=8  t=15

  Avec BoundedOutOfOrderness(5s) :

  Temps reel : ----+----+----+----+----+-----+----+------->
                   3    1    7    5    4    12    8    15

  Watermarks :     -2   -2    2    2    2     7    7    10
                   (max_seen - 5 = watermark)

  Fenetre [0, 10) :
  - Se ferme quand watermark >= 10
  - Inclut : t=3, t=1, t=7, t=5, t=4, t=8
  - t=12 est dans la fenetre suivante

  Si t=6 arrive apres watermark=10 :
  -> Late event ! Redirige vers side output

Checkpointing vs Savepoints

Checkpoints

  • Automatiques et periodiques
  • Geres par Flink en interne
  • Pour la recovery apres panne
  • Supprimes quand le job s'arrete proprement
  • Stockage : HDFS, S3, GCS

Savepoints

  • Declenches manuellement par l'operateur
  • Portables entre versions de Flink
  • Pour les mises a jour de code, migrations
  • Conserves indefiniment
  • Permettent de redemarrer avec un code modifie
Bash
# Creer un savepoint avant une mise a jour
flink savepoint :jobId s3://checkpoints/savepoints/

# Arreter le job avec savepoint
flink stop --savepointPath s3://checkpoints/savepoints/ :jobId

# Redemarrer depuis le savepoint avec le nouveau code
flink run -s s3://checkpoints/savepoints/savepoint-abc123 \
  -c com.example.FraudDetection \
  fraud-detection-v2.jar

Complex Event Processing (CEP)

Java
// CEP : detecter un pattern de fraude
// Pattern : 3 transactions > 500 EUR en moins de 10 minutes
Pattern<Transaction, ?> fraudPattern = Pattern
    .<Transaction>begin("premier")
        .where(new SimpleCondition<Transaction>() {
            public boolean filter(Transaction tx) {
                return tx.getMontant() > 500;
            }
        })
    .next("deuxieme")
        .where(new SimpleCondition<Transaction>() {
            public boolean filter(Transaction tx) {
                return tx.getMontant() > 500;
            }
        })
    .next("troisieme")
        .where(new SimpleCondition<Transaction>() {
            public boolean filter(Transaction tx) {
                return tx.getMontant() > 500;
            }
        })
    .within(Time.minutes(10));

PatternStream<Transaction> patternStream = CEP.pattern(
    transactions.keyBy(Transaction::getCompteId),
    fraudPattern
);

DataStream<Alerte> alertes = patternStream.select(
    (Map<String, List<Transaction>> pattern) -> {
        Transaction t1 = pattern.get("premier").get(0);
        Transaction t3 = pattern.get("troisieme").get(0);
        return new Alerte(t1.getCompteId(), "3 transactions > 500 EUR en 10min",
            t1.getMontant() + t3.getMontant());
    });

Netflix - Traitement d'Evenements a l'Echelle

Netflix utilise Flink pour traiter des milliards d'evenements par jour : lectures de films, metriques de qualite de streaming, interactions utilisateur. Leur pipeline Flink alimente le systeme de recommandation en quasi-temps reel.

Un cas specifique : la detection de degradation de qualite video. Les evenements de buffering sont agreges par region et par CDN en fenetres glissantes de 5 minutes. Si le taux de buffering depasse un seuil, le trafic est automatiquement reroute vers un autre CDN en moins de 30 secondes.

Cle de succes : Les savepoints Flink permettent a Netflix de deployer des mises a jour de leurs pipelines plusieurs fois par jour sans perte de donnees ni interruption.

Ignorer la Backpressure

Flink propage naturellement la backpressure : si un operateur est lent, les operateurs en amont ralentissent automatiquement. Ignorer les metriques de backpressure mene a des pipelines qui semblent fonctionner mais accumulent du retard silencieusement. Monitorez isBackPressured sur chaque operateur et dimensionnez le parallelisme en consequence.

Pourquoi les watermarks sont-ils necessaires pour le windowing en event-time ?
Les evenements arrivent en desordre en raison des latences reseau, des retransmissions et du partitionnement. Sans watermarks, Flink ne saurait jamais quand fermer une fenetre temporelle car il pourrait toujours y avoir un evenement en retard. Le watermark W dit : "je ne recevrai plus d'evenements avec timestamp <= W", permettant a Flink de fermer les fenetres dont la borne superieure est <= W et d'emettre les resultats.

Spark Streaming & Apache Beam

55 min Avance

Objectifs de la lecon

  • Comprendre Structured Streaming de Spark et son modele micro-batch
  • Distinguer micro-batch et continuous processing dans Spark
  • Maitriser le modele unifie d'Apache Beam : PCollections, PTransforms, Runners
  • Choisir le bon moteur de streaming selon le contexte projet

Si Flink est le roi du streaming natif, Spark domine l'ecosysteme batch et analytique. Structured Streaming unifie batch et streaming dans Spark avec une API identique. Apache Beam va encore plus loin : ecrire une fois, executer sur n'importe quel moteur. Le choix entre ces outils depend de votre ecosysteme existant plus que de capacites techniques pures.

Spark Structured Streaming

Le Modele Micro-Batch

Structured Streaming traite un flux de donnees comme une table infinie en appendition. Chaque micro-batch ajoute de nouvelles lignes a cette table et re-execute la requete. Le modele est identique a une requete SQL classique, mais executee incrementalement. La latence typique est de 100ms a quelques secondes selon la configuration du trigger.

Python (PySpark)
# Structured Streaming - Pipeline de traitement de commandes
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("Commandes Streaming") \
    .config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/commandes") \
    .getOrCreate()

# Schema des commandes
schema_commande = StructType([
    StructField("commande_id", StringType()),
    StructField("client_id", StringType()),
    StructField("montant", DoubleType()),
    StructField("categorie", StringType()),
    StructField("timestamp", TimestampType())
])

# Source : lire depuis Kafka
commandes_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("subscribe", "commandes") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load() \
    .select(from_json(col("value").cast("string"), schema_commande).alias("data")) \
    .select("data.*")

# Transformation : CA par categorie avec fenetre de 10 minutes
ca_par_categorie = commandes_df \
    .withWatermark("timestamp", "5 minutes") \
    .groupBy(
        window("timestamp", "10 minutes", "5 minutes"),
        "categorie"
    ) \
    .agg(
        count("*").alias("nb_commandes"),
        sum("montant").alias("chiffre_affaires"),
        avg("montant").alias("panier_moyen")
    )

# Sink : ecrire dans Delta Lake
query = ca_par_categorie.writeStream \
    .format("delta") \
    .outputMode("update") \
    .option("checkpointLocation", "s3://checkpoints/ca-categorie") \
    .trigger(processingTime="30 seconds") \
    .start("s3://datalake/gold/ca_par_categorie")

Modes de Sortie et Triggers

Output ModeDescriptionUsage
appendUniquement les nouvelles lignesTransformations sans aggregation
updateLignes modifiees depuis le dernier batchAggregations avec upsert
completeToute la table de resultatsPetites aggregations sans fenetre
TriggerLatenceUsage
processingTime("30s")~30sDashboards quasi temps reel
once()N/A (batch)Rattrapage ponctuel
availableNow()N/ATraiter tout ce qui est disponible puis s'arreter
continuous("1s")~1s (experimental)Faible latence (API limitee)

Continuous Processing : Attention

Le mode continuous de Spark offre des latences de l'ordre de la milliseconde mais reste experimental et ne supporte qu'un sous-ensemble limite d'operations (map, filter, projections). Pas d'aggregations, pas de joins, pas de deduplication. Pour du vrai streaming a faible latence avec des operations complexes, preferez Flink.

Apache Beam : Le Modele Unifie

Apache Beam - Architecture
  +----------------------------------------------------------+
  |                    Beam Pipeline (SDK)                    |
  |                                                          |
  |  PCollection --> PTransform --> PCollection --> ...       |
  |  (donnees)      (traitement)   (donnees)                 |
  +---------------------------+------------------------------+
                              |
                    +---------+---------+
                    |    Beam Runner     |
                    +---------+---------+
                              |
              +---------------+---------------+
              |               |               |
        +-----v-----+  +-----v-----+  +------v----+
        |  Apache    |  |  Google   |  |  Apache   |
        |  Flink     |  |  Dataflow |  |  Spark    |
        |  Runner    |  |  Runner   |  |  Runner   |
        +-----------+  +-----------+  +-----------+

Concepts Fondamentaux de Beam

PCollection : collection distribuee de donnees (bornee pour batch, non bornee pour streaming).
PTransform : operation de transformation (ParDo, GroupByKey, Combine, Flatten, etc.).
Pipeline : graphe acyclique de PTransforms connectes par des PCollections.
Runner : moteur d'execution qui traduit le pipeline en jobs natifs (Flink, Spark, Dataflow).
Windowing : unifie batch (fenetre globale unique) et streaming (fenetres temporelles).

Python (Apache Beam)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, SlidingWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode

# Pipeline Beam - Analyse de logs en temps reel
options = PipelineOptions([
    '--runner=FlinkRunner',          # Peut etre DataflowRunner, SparkRunner...
    '--flink_master=localhost:8081',
    '--streaming'
])

with beam.Pipeline(options=options) as p:
    # Source : lire depuis Kafka
    logs = (p
        | 'Lire Kafka' >> beam.io.ReadFromKafka(
            consumer_config={'bootstrap.servers': 'broker1:9092'},
            topics=['application-logs']
        )
        | 'Parser JSON' >> beam.Map(parse_log_json)
        | 'Filtrer Erreurs' >> beam.Filter(lambda log: log['level'] == 'ERROR')
    )

    # Aggregation avec fenetre et trigger
    erreurs_par_service = (logs
        | 'Fenetre 5min' >> beam.WindowInto(
            FixedWindows(300),  # 5 minutes
            trigger=AfterWatermark(
                early=AfterProcessingTime(60)  # resultats partiels toutes les 60s
            ),
            accumulation_mode=AccumulationMode.DISCARDING
        )
        | 'Cle par Service' >> beam.Map(lambda log: (log['service'], 1))
        | 'Compter' >> beam.CombinePerKey(sum)
    )

    # Sinks multiples
    erreurs_par_service | 'Vers BigQuery' >> beam.io.WriteToBigQuery(
        'projet:dataset.erreurs_par_service',
        schema='service:STRING,count:INTEGER,window_start:TIMESTAMP'
    )

    (erreurs_par_service
        | 'Filtrer Alertes' >> beam.Filter(lambda kv: kv[1] > 100)
        | 'Vers PagerDuty' >> beam.ParDo(EnvoyerAlerteFn())
    )

Comparaison des Moteurs

CritereSpark StreamingFlinkBeam
ModeleMicro-batchStreaming natifAbstraction unifiee
Latence100ms - secondesMillisecondesDepend du runner
EcosystemeML, SQL, GraphXCEP, Table APIMulti-runner
State managementBasiqueAvance (RocksDB)Depend du runner
Exactly-onceOui (avec checkpoint)Oui (natif)Depend du runner
Courbe d'apprentissageModereeEleveeElevee
Ideal pourEquipes Spark existantesLow-latency critiqueMulti-cloud, portabilite

Comment Choisir ?

  • Vous avez deja Spark : Structured Streaming est le choix naturel. Meme API, meme cluster, meme ecosystem.
  • Latence < 100ms requise : Flink est le seul choix viable pour du streaming natif avec etat complexe.
  • Multi-cloud / portabilite : Beam vous permet de changer de runner sans reecrire le code.
  • Google Cloud : Beam + Dataflow est l'option la plus integree et serverless.
Quel est l'avantage principal d'Apache Beam par rapport a Flink ou Spark ?
L'avantage principal de Beam est la portabilite. Vous ecrivez votre pipeline une seule fois avec le SDK Beam, puis vous l'executez sur n'importe quel runner compatible (Flink, Spark, Google Dataflow, Samza). Cela evite le vendor lock-in et permet de migrer entre moteurs ou cloud providers sans reecrire le code. L'inconvenient est que vous perdez l'acces aux fonctionnalites specifiques de chaque moteur.

Event-Driven Architecture

60 min Avance

Objectifs de la lecon

  • Comprendre les patterns CQRS et Event Sourcing et leurs cas d'application
  • Maitriser le pattern Saga pour les transactions distribuees
  • Concevoir des domain events pertinents avec l'Event Storming
  • Integrer ces patterns dans une architecture microservices avec Kafka

L'Event-Driven Architecture n'est pas une technologie, c'est une philosophie de conception. Au lieu de penser en termes de requetes et de reponses, vous pensez en termes d'evenements et de reactions. Ce changement de paradigme debloque la scalabilite, le decouplage et la resilience, mais introduit de la complexite : eventual consistency, ordering, idempotence.

CQRS : Command Query Responsibility Segregation

Principe de CQRS

CQRS separe le modele d'ecriture (commands) du modele de lecture (queries). Le cote ecriture est optimise pour la validation et la consistance. Le cote lecture est optimise pour les performances de requetes, avec des projections denormalisees adaptees a chaque besoin de consultation.

Architecture CQRS avec Event Sourcing
  Client
  ======
    |                              |
    | Commands (POST/PUT)          | Queries (GET)
    v                              v
  +-------------------+    +-------------------+
  | Command Service   |    | Query Service     |
  | (validation,      |    | (lectures         |
  |  regles metier)   |    |  optimisees)      |
  +--------+----------+    +--------+----------+
           |                        ^
           v                        |
  +--------+----------+    +--------+----------+
  | Event Store       |    | Read Database     |
  | (append-only log  |--->| (projections      |
  |  = source verite) |    |  denormalisees)   |
  +-------------------+    +-------------------+
       Kafka Topics         PostgreSQL / Elastic
                            / Redis / MongoDB
Java
// Exemple CQRS - Cote Command
@Service
public class CommandeCommandService {

    private final KafkaTemplate<String, DomainEvent> kafkaTemplate;

    public void creerCommande(CreerCommandeRequest request) {
        // 1. Validation des regles metier
        validerStock(request.getProduits());
        validerLimiteCredit(request.getClientId(), request.getMontant());

        // 2. Emettre l'evenement (source de verite)
        CommandeCreee event = CommandeCreee.builder()
            .commandeId(UUID.randomUUID().toString())
            .clientId(request.getClientId())
            .produits(request.getProduits())
            .montant(request.getMontant())
            .timestamp(Instant.now())
            .version(1)
            .build();

        kafkaTemplate.send("domain-events.commandes", event.getCommandeId(), event);
    }
}

// Cote Query - Projecteur qui construit la vue de lecture
@Component
public class CommandeProjecteur {

    private final CommandeReadRepository readRepo;

    @KafkaListener(topics = "domain-events.commandes", groupId = "commande-projector")
    public void onEvent(DomainEvent event) {
        if (event instanceof CommandeCreee e) {
            readRepo.save(new CommandeView(e.getCommandeId(), e.getClientId(),
                e.getMontant(), "CREEE", e.getTimestamp()));
        } else if (event instanceof CommandePayee e) {
            readRepo.updateStatut(e.getCommandeId(), "PAYEE");
        } else if (event instanceof CommandeExpediee e) {
            readRepo.updateStatut(e.getCommandeId(), "EXPEDIEE");
            readRepo.setTracking(e.getCommandeId(), e.getNumeroSuivi());
        }
    }
}

Event Sourcing

Event Sourcing : L'Etat = Somme des Evenements

Au lieu de stocker l'etat courant, vous stockez tous les evenements qui ont mene a cet etat. L'etat est reconstitue en rejouant les evenements. Avantages : audit trail complet, capacite de remonter dans le temps (temporal queries), decouplage total entre ecriture et lecture. Inconvenients : complexite de gestion, snapshots necessaires pour les performances, evolution des schemas d'evenements.

Java
// Event Sourcing - Reconstitution de l'etat d'un compte bancaire
public class CompteBancaire {
    private String compteId;
    private BigDecimal solde = BigDecimal.ZERO;
    private String statut = "ACTIF";
    private List<DomainEvent> uncommittedEvents = new ArrayList<>();

    // Reconstituer l'etat depuis les evenements
    public static CompteBancaire fromEvents(List<DomainEvent> events) {
        CompteBancaire compte = new CompteBancaire();
        events.forEach(compte::apply);
        return compte;
    }

    // Commande : debiter le compte
    public void debiter(BigDecimal montant, String motif) {
        if (solde.compareTo(montant) < 0) {
            throw new SoldeInsuffisantException(compteId, solde, montant);
        }
        if (!"ACTIF".equals(statut)) {
            throw new CompteInactifException(compteId);
        }
        // Emettre l'evenement (pas de mutation directe !)
        emit(new CompteDebite(compteId, montant, motif, Instant.now()));
    }

    // Appliquer un evenement (mutation de l'etat)
    private void apply(DomainEvent event) {
        if (event instanceof CompteOuvert e) {
            this.compteId = e.getCompteId();
            this.statut = "ACTIF";
        } else if (event instanceof CompteCredite e) {
            this.solde = this.solde.add(e.getMontant());
        } else if (event instanceof CompteDebite e) {
            this.solde = this.solde.subtract(e.getMontant());
        } else if (event instanceof CompteFerme e) {
            this.statut = "FERME";
        }
    }

    // Snapshot pour eviter de rejouer des milliers d'evenements
    // Snapshot tous les 100 evenements
    public CompteSnapshot toSnapshot() {
        return new CompteSnapshot(compteId, solde, statut, version);
    }
}

Pattern Saga : Transactions Distribuees

Dans un systeme distribue, les transactions ACID classiques ne fonctionnent pas entre services. Le pattern Saga decompose une transaction en etapes locales, chacune avec sa compensation en cas d'echec.

Saga Choregraphiee - Processus de Commande
  Commande Service         Paiement Service        Stock Service        Livraison Service
  =================        =================       ===============      ==================

  1. CommandeCreee -------> 2. Paiement
                               autorise? ---------> 3. Stock
                               |                       reserve? -------> 4. Livraison
                               |                       |                    planifiee
                               |                       |                    |
                               |                       |                    v
  8. Commande              7. Paiement             6. Stock             5. Livraison
     confirmee <----------    capture <-----------    decremente <------   confirmee

  --- En cas d'echec a l'etape 3 (stock insuffisant) ---

  Commande Service         Paiement Service        Stock Service
  =================        =================       ===============

  CommandeAnnulee <-------- PaiementRembourse <---- StockInsuffisant
  (compensation)            (compensation)          (echec)

Saga Choregraphiee

  • Chaque service ecoute et emet des evenements
  • Pas de coordinateur central
  • Plus resilient (pas de SPOF)
  • Plus difficile a suivre et deboguer
  • Convient pour des flux simples (3-5 etapes)

Saga Orchestree

  • Un orchestrateur central coordonne les etapes
  • Logique de compensation centralisee
  • Plus facile a comprendre et tester
  • L'orchestrateur est un point de defaillance potentiel
  • Convient pour des flux complexes (5+ etapes)

Event Storming

L'Event Storming est un atelier collaboratif de modelisation qui reunit developpeurs et experts metier pour decouvrir les evenements du domaine :

Deroulement d'un Event Storming

  1. Orange (Domain Events) : les experts metier identifient tous les evenements significatifs ("CommandeCreee", "PaiementAutorise", "StockReserve")
  2. Bleu (Commands) : quelles actions declenchent ces evenements ("CreerCommande", "AutoriserPaiement")
  3. Jaune (Actors) : qui declenche les commandes (utilisateur, systeme, timer)
  4. Rose (Policies) : quelles regles automatiques reagissent aux evenements ("Quand PaiementAutorise, alors ReserverStock")
  5. Vert (Read Models) : quelles vues sont necessaires pour prendre des decisions
  6. Limites (Bounded Contexts) : regrouper les evenements par domaine metier

Meltdown Temps Reel - Plateforme de Trading

En 2012, Knight Capital a perdu 440 millions de dollars en 45 minutes a cause d'un incident dans leur systeme event-driven. Un deploiement partiel a active un ancien code de test sur un seul serveur parmi huit. Ce serveur a emis des ordres d'achat/vente erratiques a grande vitesse.

Causes racines :

  • Pas de circuit breaker : le systeme a continue a emettre des ordres malgre les pertes
  • Pas d'idempotence : les retries ont amplifie le probleme
  • Deploiement manuel sans rollback automatique
  • Pas de kill switch pour arreter un service en urgence

Lecons : un systeme event-driven a haute frequence DOIT avoir des mecanismes de protection : circuit breakers, rate limiters, kill switches, et surtout des alertes sur les anomalies de volume.

Pas de Dead Letter Queue dans les Sagas

Dans une saga choregraphiee, si un service ne peut pas traiter un evenement (bug, donnees corrompues), il peut bloquer toute la saga indefiniment. Sans dead letter queue ni mecanisme de retry avec backoff exponentiel, les transactions restent dans un etat intermediaire indefini. Chaque service participant a une saga DOIT avoir une DLQ et un processus de reconciliation pour les evenements en echec.

Lab : Concevoir une Architecture Event-Driven

Concevez l'architecture event-driven pour un systeme de reservation de restaurant :

Etape 1 : Identifier les Domain Events

Listez tous les evenements metier : ReservationDemandee, ReservationConfirmee, TableAssignee, ClientArrive, RepasTermine, PaiementEffectue, AvisDepose...

Etape 2 : Definir les Bounded Contexts

Regroupez les evenements par domaine : Reservation, Table Management, Cuisine, Paiement, Feedback. Chaque contexte aura son propre service et ses propres topics Kafka.

Etape 3 : Concevoir la Saga de Reservation

Definissez le flux : ReservationDemandee -> verification disponibilite -> TableReservee -> NotificationClient. Prevoyez les compensations : si le client annule, si la table n'est plus disponible, si le paiement de la caution echoue.

Etape 4 : Definir les Read Models (CQRS)

Quelles vues sont necessaires ? Planning du restaurant (par jour/semaine), historique client, taux d'occupation, revenus. Chaque vue est une projection optimisee alimentee par les evenements.

Quand utiliser Event Sourcing vs un CRUD classique ?
Utilisez Event Sourcing quand : (1) l'audit trail complet est un besoin metier (finance, sante, legal), (2) vous avez besoin de reconstituer l'etat a un moment dans le passe (temporal queries), (3) plusieurs vues de lecture differentes sont necessaires (CQRS). Gardez le CRUD classique quand : la logique est simple, l'historique n'a pas de valeur metier, et la complexite supplementaire n'est pas justifiee. Event Sourcing ajoute de la complexite significative (versioning des evenements, snapshots, eventual consistency).

Quiz : Patterns Event-Driven

Dans une Saga choregraphiee, que se passe-t-il si le service Stock detecte un stock insuffisant apres que le paiement a ete autorise ?

Le paiement est automatiquement annule par la base de donnees
Le service Stock emet un evenement StockInsuffisant, et le service Paiement reagit en emettant une compensation (remboursement)
La transaction entiere est annulee via un rollback distribue
Correct ! Dans une saga choregraphiee, chaque service reagit aux evenements. StockInsuffisant declenche la compensation PaiementRembourse, puis CommandeAnnulee. Il n'y a pas de rollback distribue ni de coordinateur central.

Quel est l'avantage principal de CQRS ?

Il elimine le besoin de base de donnees
Il permet de scaler et optimiser independamment les lectures et les ecritures
Il garantit la consistance forte entre tous les services
Correct ! CQRS permet d'utiliser des modeles differents optimises pour chaque besoin : un modele normalise pour les ecritures (consistance) et des modeles denormalises pour les lectures (performance). On peut scaler les read replicas independamment du write service.

Real-time Analytics

60 min Avance

Objectifs de la lecon

  • Comprendre les moteurs OLAP optimises pour le streaming et les requetes analytiques temps reel
  • Comparer Apache Druid, Apache Pinot, ClickHouse et Materialize
  • Maitriser la dualite stream-table et ses implications pour l'analytique
  • Concevoir une architecture real-time analytics de bout en bout

L'analytique temps reel est le Saint Graal du data engineering moderne. Mais attention : "temps reel" ne signifie pas toujours "sub-seconde". L'enjeu est de choisir le bon moteur OLAP en fonction de vos patterns de requetes, de votre volume de donnees et de votre budget operationnel. J'ai vu des equipes deployer Druid pour des cas ou un simple ClickHouse aurait suffi, et inversement.

Pourquoi un OLAP specifique pour le Streaming ?

Les bases de donnees analytiques classiques (Redshift, BigQuery, Snowflake) sont optimisees pour le batch. Elles excellent sur des requetes complexes sur des donnees au repos, mais ne sont pas concues pour ingerer des millions d'evenements par seconde tout en repondant a des requetes en millisecondes.

CritereOLAP Batch (Snowflake, BQ)OLAP Streaming (Druid, Pinot)
Latence d'ingestionMinutes a heuresSecondes
Latence de requeteSecondes a minutesMillisecondes
FreshnessNear real-time au mieuxTemps reel
Complexite des requetesTres complexe (joins, subqueries)Aggregations rapides, joins limites
Cout operationnelServerless / manageInfrastructure dediee
VolumePetaoctetsTeraoctets a petaoctets

La Dualite Stream-Table

Concept fondamental : un stream est un log immutable d'evenements ordonnes dans le temps. Une table est un snapshot de l'etat courant a un instant T. Ces deux representations sont duales :

  • Stream vers Table : en aggregant un stream (replay), on obtient une table (etat courant)
  • Table vers Stream : en capturant les changements d'une table (CDC), on obtient un stream

Cette dualite, formalisee par Jay Kreps, est la base theorique de toute l'analytique temps reel. Chaque moteur OLAP streaming exploite cette dualite differemment.

Apache Druid

Druid est un datastore analytique distribue concu pour des requetes OLAP sub-seconde sur des donnees de grande echelle. Il est utilise par Airbnb, Netflix, Confluent et des centaines d'entreprises.

Architecture Apache Druid
  Kafka / Kinesis          Batch (S3/HDFS)
       |                        |
       v                        v
  +-----------+          +-----------+
  | Real-time |          |  Batch    |
  | Ingestion |          | Ingestion |
  | (Indexing) |          | (Indexing)|
  +-----+-----+          +-----+-----+
        |                       |
        v                       v
  +-----------+          +-----------+
  | Real-time |          |Historical |
  |   Nodes   |          |   Nodes   |
  | (donnees  |          | (donnees  |
  |  fraiches)|          |  agees)   |
  +-----+-----+          +-----+-----+
        |                       |
        +-----------+-----------+
                    |
              +-----v-----+
              |   Broker   |
              |  (routage  |
              |  requetes) |
              +-----+------+
                    |
              +-----v------+
              |  Router /   |
              |  API Client |
              +-----------+
JSON - Druid Ingestion Spec (Kafka)
{
  "type": "kafka",
  "spec": {
    "dataSchema": {
      "dataSource": "clickstream_events",
      "timestampSpec": {
        "column": "event_time",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": ["user_id", "page_url", "device", "country", "campaign"]
      },
      "metricsSpec": [
        {"type": "count", "name": "event_count"},
        {"type": "longSum", "name": "duration_ms", "fieldName": "duration"},
        {"type": "thetaSketch", "name": "unique_users", "fieldName": "user_id"}
      ],
      "granularitySpec": {
        "segmentGranularity": "HOUR",
        "queryGranularity": "MINUTE",
        "rollup": true
      }
    },
    "ioConfig": {
      "consumerProperties": {
        "bootstrap.servers": "kafka-broker:9092"
      },
      "topic": "clickstream",
      "useEarliestOffset": true
    }
  }
}

Le Rollup Druid : cle de la performance

Le rollup est la fonctionnalite killer de Druid. Au lieu de stocker chaque evenement brut, Druid pre-agrege les donnees a l'ingestion selon la granularite configuree. Si vous avez 1 million de clics par minute mais seulement 10 000 combinaisons uniques de dimensions, le rollup reduit le stockage de 100x tout en permettant des requetes instantanees. Le compromis : vous perdez l'acces aux evenements individuels.

Apache Pinot

Pinot, cree par LinkedIn et donne a Apache, est optimise pour les requetes analytiques a faible latence sur des donnees fraiches. Il alimente le "Who Viewed Your Profile" de LinkedIn (des milliards de requetes/jour).

Druid

  • Rollup natif a l'ingestion
  • Excellent pour les time series
  • HyperLogLog et theta sketches natifs
  • Ecosysteme mature, large communaute
  • Plus complexe a operer

Pinot

  • Star-tree index pour aggregations pre-calculees
  • Upsert natif (mise a jour en place)
  • Supporte les donnees mutables
  • Multi-tenant par design
  • Meilleur support des joins

ClickHouse

ClickHouse, cree par Yandex, est un moteur OLAP columnar extremement rapide. Son approche est differente : au lieu d'une architecture distribuee complexe, il mise sur l'efficacite brute du traitement columnar et la compression.

SQL - ClickHouse + Kafka Engine
-- Table Kafka Engine : consomme directement depuis Kafka
CREATE TABLE clickstream_kafka (
    event_time DateTime64(3),
    user_id String,
    page_url String,
    device LowCardinality(String),
    country LowCardinality(String),
    duration_ms UInt32
) ENGINE = Kafka
SETTINGS
    kafka_broker_list = 'kafka-broker:9092',
    kafka_topic_list = 'clickstream',
    kafka_group_name = 'clickhouse_consumer',
    kafka_format = 'JSONEachRow';

-- Table de destination : MergeTree pour les requetes analytiques
CREATE TABLE clickstream (
    event_time DateTime64(3),
    user_id String,
    page_url String,
    device LowCardinality(String),
    country LowCardinality(String),
    duration_ms UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (country, device, event_time)
TTL event_time + INTERVAL 90 DAY;

-- Vue materialisee : transfert automatique Kafka -> MergeTree
CREATE MATERIALIZED VIEW clickstream_mv TO clickstream AS
SELECT * FROM clickstream_kafka;

-- Requete analytique sub-seconde
SELECT
    toStartOfMinute(event_time) AS minute,
    country,
    device,
    count() AS events,
    uniqExact(user_id) AS unique_users,
    avg(duration_ms) AS avg_duration
FROM clickstream
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute, country, device
ORDER BY minute DESC;

Materialize : SQL Streaming

Materialize adopte une approche radicalement differente : au lieu d'indexer des donnees pour les requeter, il maintient des vues materialisees incrementales qui se mettent a jour automatiquement a chaque nouvel evenement.

SQL - Materialize
-- Source : connexion directe a Kafka
CREATE SOURCE clickstream_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'clickstream')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn;

-- Vue materialisee incrementale : mise a jour en continu
CREATE MATERIALIZED VIEW dashboard_metrics AS
SELECT
    date_trunc('minute', event_time) AS minute,
    country,
    COUNT(*) AS total_events,
    COUNT(DISTINCT user_id) AS unique_users,
    AVG(duration_ms)::INT AS avg_duration
FROM clickstream_source
GROUP BY 1, 2;

-- Requete instantanee sur l'etat courant (pull)
SELECT * FROM dashboard_metrics
WHERE minute >= mz_now() - INTERVAL '1 hour';

-- Abonnement aux changements (push) via SUBSCRIBE
SUBSCRIBE TO dashboard_metrics WITH (SNAPSHOT = false);

Choisir le bon moteur : arbre de decision

Volume > 1TB/jour + requetes ad-hoc ? ClickHouse.
Dashboard temps reel + metrics pre-definies ? Druid ou Pinot.
Vues materialisees complexes + joins ? Materialize.
Multi-tenant + user-facing analytics ? Pinot.
Time series IoT ? Druid ou ClickHouse.
Le choix depend aussi de votre equipe : ClickHouse est le plus simple a operer, Druid/Pinot necessitent une expertise distribuee, Materialize est le plus innovant mais le moins mature.

Airbnb - Migration vers une Analytique Temps Reel

Airbnb traitait historiquement ses donnees analytiques en batch avec Hive et Spark, avec une latence de plusieurs heures. Pour leur dashboard de monitoring des reservations et de detection de fraude, ils avaient besoin de donnees fraiches en moins de 30 secondes.

Leur solution : un pipeline Kafka -> Flink -> Apache Druid pour l'analytique temps reel, avec un fallback vers Hive pour les requetes historiques complexes. Flink effectue l'enrichissement et le nettoyage en temps reel avant l'ingestion dans Druid.

Resultats :

  • Latence de fraicheur passee de 4 heures a 15 secondes
  • Detection de fraude en temps reel : reduction de 35% des pertes
  • Dashboards operationnels accessibles a 500+ employes
  • Rollup Druid : stockage reduit de 80% par rapport aux donnees brutes

Scenario : Concevoir un Dashboard Temps Reel E-commerce

Votre entreprise e-commerce veut un dashboard montrant en temps reel : le nombre de commandes par minute, le chiffre d'affaires cumule, le top 10 des produits, et des alertes si le taux de conversion chute de plus de 20%.

  • Ingestion : Kafka recoit les evenements clickstream et commandes depuis l'application
  • Traitement : Flink enrichit les evenements (jointure client/produit) et calcule les metriques fenetrees
  • Stockage : ClickHouse pour les requetes ad-hoc du dashboard, avec vues materialisees pour les metriques frequentes
  • Alerting : Flink CEP detecte les patterns de chute de conversion et envoie des alertes Slack

Utiliser un OLAP Streaming comme Data Warehouse

L'erreur frequente est de vouloir remplacer son data warehouse (Snowflake, BigQuery) par un Druid ou ClickHouse. Ces moteurs excellent pour les aggregations rapides sur des dimensions pre-definies, mais ils ne sont pas concus pour des joins complexes ad-hoc, des transformations lourdes, ou le stockage long terme a faible cout. Le bon pattern est de les utiliser en complement : le data warehouse pour l'analytique profonde, le OLAP streaming pour les dashboards temps reel.

Qu'est-ce que la dualite stream-table et pourquoi est-elle fondamentale pour l'analytique temps reel ?
La dualite stream-table signifie qu'un stream peut etre transforme en table (via aggregation/replay) et qu'une table peut etre transformee en stream (via CDC). Un stream capture les evenements dans le temps (le "comment"), une table capture l'etat a un instant T (le "quoi"). Les moteurs OLAP streaming exploitent cette dualite : ils ingerent des streams et materialisent des tables (indexes, vues materialisees) optimisees pour les requetes analytiques.
Quand choisir ClickHouse plutot que Druid pour l'analytique temps reel ?
Choisissez ClickHouse quand : (1) vous avez besoin de requetes SQL ad-hoc complexes, (2) votre equipe prefere une architecture simple sans de nombreux composants distribues, (3) vous avez un gros volume de donnees brutes a stocker (compression excellente), (4) vous n'avez pas besoin de rollup a l'ingestion. Choisissez Druid quand : vous avez des metriques pre-definies avec rollup, un besoin de requetes a latence garantie sub-seconde, et une equipe capable d'operer un systeme distribue complexe.

Quiz : Streaming & Real-time Analytics

25 min Evaluation

Objectifs de l'evaluation

  • Valider votre comprehension des architectures de streaming (Lambda, Kappa)
  • Tester vos connaissances sur Kafka, Flink et les moteurs OLAP temps reel
  • Evaluer votre capacite a choisir les bons outils selon le contexte
  • Verifier votre maitrise des patterns event-driven et de la dualite stream-table

Ce quiz couvre l'ensemble des lecons 0 a 8 sur le streaming et l'analytique temps reel. Prenez le temps de bien lire chaque question. Les questions portent autant sur la theorie que sur les choix architecturaux concrets. Un score de 6/8 ou plus indique une bonne maitrise du sujet.

Quiz Streaming & Real-time Analytics

Quelle est la principale difference entre l'architecture Lambda et l'architecture Kappa ?

Lambda utilise Kafka et Kappa utilise RabbitMQ
Lambda n'a pas de couche serving, Kappa en a une
Lambda maintient deux chemins (batch + speed) tandis que Kappa n'utilise qu'un seul chemin streaming avec replay du log
Kappa est plus ancienne et plus complexe que Lambda
Correct ! L'architecture Lambda combine une couche batch (pour l'exactitude) et une couche speed (pour la faible latence), avec un serving layer qui fusionne les resultats. L'architecture Kappa simplifie en n'utilisant qu'un seul chemin streaming, et retraite l'historique en rejouant le log immutable (Kafka). Kappa est devenue l'approche dominante dans l'industrie moderne.

Dans Kafka, qu'est-ce qui garantit l'ordre des messages ?

L'ordre est garanti au niveau du topic entier
L'ordre est garanti au niveau de chaque partition uniquement
L'ordre est garanti par le consumer group
Kafka ne garantit aucun ordre
Correct ! Kafka garantit l'ordre des messages uniquement au sein d'une meme partition. Les messages avec la meme cle de partitionnement sont envoyes dans la meme partition, donc traites dans l'ordre. Au niveau du topic (multi-partitions), il n'y a aucune garantie d'ordre global. C'est pourquoi le choix de la cle de partitionnement est crucial.

Quel mecanisme Apache Flink utilise-t-il pour assurer la tolerance aux pannes avec une semantique exactly-once ?

Le write-ahead log (WAL)
Le checkpointing base sur l'algorithme de Chandy-Lamport (snapshots distribues asynchrones)
La replication synchrone entre TaskManagers
Le two-phase commit sur chaque evenement
Correct ! Flink utilise un mecanisme de checkpointing inspire de l'algorithme de Chandy-Lamport. Des barriers sont injectees dans le flux de donnees et, quand tous les operateurs ont recu la barrier, un snapshot coherent de l'etat global est persiste (sur HDFS, S3, etc.). En cas de panne, Flink restaure le dernier checkpoint et reprend le traitement, garantissant la semantique exactly-once.

Qu'est-ce que le "rollup" dans Apache Druid ?

Un mecanisme de backup des donnees
La pre-aggregation des donnees a l'ingestion, combinant les lignes qui partagent les memes dimensions et timestamps (selon la granularite configuree)
Un algorithme de compression des colonnes
La migration automatique des donnees chaudes vers le stockage froid
Correct ! Le rollup de Druid pre-agrege les donnees lors de l'ingestion. Les lignes partageant les memes valeurs de dimensions et le meme timestamp (a la granularite configuree) sont fusionnees, et les metriques sont aggregees (count, sum, etc.). Cela reduit considerablement le volume de stockage et accelere les requetes, au prix de la perte d'acces aux evenements individuels.

Dans le contexte de la dualite stream-table, comment obtient-on une table a partir d'un stream ?

En appliquant un filtre sur le stream
En effectuant une aggregation ou un replay du stream pour obtenir l'etat courant
En ecrivant le stream dans un fichier CSV
En compressant le stream avec un codec
Correct ! Un stream peut etre transforme en table via aggregation (par exemple, la somme des debits/credits donne le solde courant) ou via replay complet du log. Inversement, une table peut etre transformee en stream via CDC (Change Data Capture) qui capture chaque modification. Cette dualite, formalisee par Jay Kreps, est fondamentale pour le stream processing.

Quel est le principal avantage de Materialize par rapport a Druid ou ClickHouse ?

Il stocke plus de donnees
Il supporte des requetes SQL plus complexes
Il maintient des vues materialisees incrementales qui se mettent a jour automatiquement a chaque nouvel evenement, sans re-computation complete
Il est plus rapide pour les requetes ad-hoc sur des petaoctets
Correct ! Materialize maintient des vues materialisees qui sont mises a jour de facon incrementale a chaque nouvel evenement. Contrairement a Druid ou ClickHouse qui indexent les donnees puis les requetent, Materialize calcule les resultats en continu. Cela permet des jointures et aggregations complexes sur des streams, avec des resultats toujours a jour sans requete couteuse.

Dans un systeme Event Sourcing, pourquoi utilise-t-on des snapshots ?

Pour compresser les evenements et economiser du stockage
Pour eviter de rejouer tous les evenements depuis l'origine lors de la reconstitution de l'etat d'un agregat
Pour permettre la suppression des evenements obsoletes
Pour synchroniser les replicas en temps reel
Correct ! Sans snapshots, reconstituer l'etat d'un agregat necessite de rejouer tous les evenements depuis la creation. Pour un agregat avec des millions d'evenements, cela peut prendre des minutes. Un snapshot enregistre l'etat complet a un point donne, permettant de ne rejouer que les evenements posterieurs au snapshot. On cree typiquement un snapshot tous les N evenements (ex: 100).

Quelle est la meilleure approche pour gerer un consumer qui doit traiter des evenements en retard (late events) dans Apache Flink ?

Ignorer tous les evenements en retard
Utiliser uniquement le processing-time pour eviter le probleme
Configurer des watermarks avec une tolerance de retard (allowed lateness) et un side output pour les evenements tardifs
Augmenter le nombre de partitions Kafka
Correct ! Les watermarks definissent la progression de l'event-time et permettent de detecter quand une fenetre peut etre declenchee. L'allowed lateness autorise des evenements arrivant apres le watermark a etre integres dans les fenetres deja declenchees (mise a jour du resultat). Les evenements arrivant encore plus tard sont rediriges vers un side output pour un traitement specifique (DLQ, reconciliation batch).

Uber - Evolution de leur Architecture Streaming

Uber a traverse trois generations d'architectures streaming. La premiere generation utilisait un systeme maison base sur des files d'attente simples. La deuxieme generation a adopte Kafka mais avec un modele Lambda (batch Hive + speed Storm). La troisieme generation est une architecture Kappa unifiee avec Kafka + Flink, traitant plus de 5 trillions de messages par jour.

Lecon cle : meme les geants tech evoluent incrementalement. La migration Lambda -> Kappa a pris 2 ans et a ete faite service par service, pas en big bang.

Quiz sans Revision

Passer un quiz sans avoir revise les concepts cles est contre-productif. Si votre score est inferieur a 6/8, relisez les lecons correspondantes avant de continuer. Le streaming est un domaine ou les fondamentaux (event-time, watermarks, exactly-once, dualite stream-table) sont essentiels pour construire des systemes fiables.

Data Quality Dimensions

55 min Avance

Objectifs de la lecon

  • Maitriser les six dimensions fondamentales de la qualite des donnees
  • Savoir mesurer chaque dimension avec des metriques concretes
  • Comprendre l'impact business de chaque dimension de qualite
  • Definir des seuils et des KPIs de qualite adaptes a votre contexte

La qualite des donnees n'est pas un concept abstrait. Chaque dimension peut etre mesuree, monitoree et amelioree. J'ai vu des entreprises perdre des millions a cause de donnees de mauvaise qualite : doublons dans les factures, adresses invalides pour les livraisons, montants incoherents entre systemes. La cle est de comprendre que la qualite est un spectre, pas un binaire, et que chaque cas d'usage a des exigences differentes.

Les Six Dimensions de la Qualite des Donnees

Le framework DAMA (Data Management Association) definit six dimensions fondamentales. Chacune repond a une question differente sur vos donnees :

Les 6 Dimensions de la Qualite des Donnees
                    +------------------+
                    |   DATA QUALITY   |
                    +--------+---------+
                             |
        +----------+---------+--------+----------+
        |          |         |        |          |
   +----v---+ +---v----+ +--v---+ +--v----+ +---v-----+
   |Accuracy| |Complete| |Consis| |Timeli | |Validity |
   |        | |ness    | |tency | |ness   | |         |
   +----+---+ +---+----+ +--+---+ +--+----+ +---+-----+
        |          |         |        |          |
        +----------+---------+--------+----------+
                             |
                      +------v------+
                      | Uniqueness  |
                      +-------------+

1. Accuracy (Exactitude)

L'exactitude mesure si les donnees refletent correctement la realite du monde reel qu'elles representent.

AspectQuestionExempleMetrique
SyntaxiqueLe format est-il correct ?Code postal "7500" au lieu de "75000"% de valeurs conformes au format attendu
SemantiqueLa valeur correspond-elle a la realite ?Date de naissance "2025-01-15" pour un client de 45 ans% de valeurs verifiables contre une source de reference
SQL - Mesurer l'exactitude
-- Exactitude syntaxique : codes postaux francais valides
SELECT
    COUNT(*) AS total_clients,
    SUM(CASE WHEN code_postal ~ '^\d{5}$' THEN 1 ELSE 0 END) AS codes_valides,
    ROUND(
        100.0 * SUM(CASE WHEN code_postal ~ '^\d{5}$' THEN 1 ELSE 0 END)
        / COUNT(*), 2
    ) AS pct_accuracy
FROM clients;

-- Exactitude semantique : montants coherents
SELECT
    COUNT(*) AS total_commandes,
    SUM(CASE
        WHEN montant_total = (quantite * prix_unitaire * (1 - remise / 100.0))
        THEN 1 ELSE 0
    END) AS montants_exacts,
    ROUND(
        100.0 * SUM(CASE
            WHEN montant_total = (quantite * prix_unitaire * (1 - remise / 100.0))
            THEN 1 ELSE 0
        END) / COUNT(*), 2
    ) AS pct_accuracy
FROM lignes_commande;

2. Completeness (Completude)

La completude mesure la proportion de donnees presentes par rapport aux donnees attendues. Elle se decline en trois niveaux :

Les trois niveaux de completude

Schema-level : toutes les colonnes attendues sont-elles presentes dans la table ?
Column-level : quel pourcentage de lignes ont une valeur non-null pour chaque colonne ?
Row-level : toutes les entites attendues sont-elles presentes ? (ex: tous les magasins ont-ils remonte leurs ventes ?)

SQL - Mesurer la completude
-- Completude au niveau colonne
SELECT
    'email' AS colonne,
    COUNT(*) AS total,
    SUM(CASE WHEN email IS NOT NULL AND email != '' THEN 1 ELSE 0 END) AS non_vides,
    ROUND(100.0 * SUM(CASE WHEN email IS NOT NULL AND email != '' THEN 1 ELSE 0 END) / COUNT(*), 2)
        AS pct_completude
FROM clients
UNION ALL
SELECT
    'telephone', COUNT(*),
    SUM(CASE WHEN telephone IS NOT NULL AND telephone != '' THEN 1 ELSE 0 END),
    ROUND(100.0 * SUM(CASE WHEN telephone IS NOT NULL AND telephone != '' THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients;

-- Completude au niveau entite : tous les magasins ont-ils remonte ?
SELECT
    m.magasin_id,
    m.nom,
    CASE WHEN v.magasin_id IS NOT NULL THEN 'PRESENT' ELSE 'MANQUANT' END AS statut
FROM magasins m
LEFT JOIN ventes_jour v ON m.magasin_id = v.magasin_id AND v.date_vente = CURRENT_DATE - 1
WHERE v.magasin_id IS NULL;

3. Consistency (Coherence)

La coherence mesure l'absence de contradictions dans les donnees, a la fois au sein d'un meme systeme et entre systemes differents.

Exemples de problemes de coherence

  • Intra-table : la date de fin d'un contrat est anterieure a la date de debut
  • Inter-tables : le nombre de commandes dans la table commandes ne correspond pas au compteur dans la table clients
  • Inter-systemes : le chiffre d'affaires dans le CRM differe de celui dans l'ERP
  • Temporelle : les memes metriques calculees a des moments differents donnent des resultats contradictoires
SQL - Mesurer la coherence
-- Coherence intra-table : dates logiques
SELECT COUNT(*) AS incoherences_dates
FROM contrats
WHERE date_fin IS NOT NULL AND date_fin < date_debut;

-- Coherence inter-tables : totaux correspondants
SELECT
    c.client_id,
    c.nb_commandes AS nb_dans_clients,
    COALESCE(cmd.nb_reel, 0) AS nb_reel_commandes,
    ABS(c.nb_commandes - COALESCE(cmd.nb_reel, 0)) AS ecart
FROM clients c
LEFT JOIN (
    SELECT client_id, COUNT(*) AS nb_reel
    FROM commandes
    GROUP BY client_id
) cmd ON c.client_id = cmd.client_id
WHERE c.nb_commandes != COALESCE(cmd.nb_reel, 0);

4. Timeliness (Fraicheur)

La fraicheur mesure si les donnees sont disponibles dans le delai attendu par les consommateurs. Une donnee exacte mais arrivee trop tard est souvent inutile.

Cas d'usageFraicheur attendueImpact du retard
Detection de fraude< 1 secondeTransaction frauduleuse autorisee
Dashboard operationnel< 5 minutesDecisions basees sur des donnees obsoletes
Reporting financier< 24 heuresRapports retardes, conformite impactee
Analyse marketing< 1 semaineCampagnes mal ciblees
SQL - Mesurer la fraicheur
-- Fraicheur : age des donnees les plus recentes par source
SELECT
    source_system,
    MAX(updated_at) AS derniere_mise_a_jour,
    EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 AS age_heures,
    CASE
        WHEN EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 < 1 THEN 'FRESH'
        WHEN EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 < 24 THEN 'ACCEPTABLE'
        ELSE 'STALE'
    END AS statut_fraicheur
FROM raw_data
GROUP BY source_system
ORDER BY age_heures DESC;

5. Validity (Validite)

La validite verifie que les donnees respectent les regles et contraintes definies (formats, plages, referentiels).

Validite vs Exactitude

Une donnee peut etre valide mais inexacte. Par exemple, un email "jean.dupont@gmail.com" est valide (format correct), mais peut etre inexact si le vrai email est "j.dupont@gmail.com". Inversement, une donnee peut etre exacte mais dans un format invalide. La validite est verifiable automatiquement, l'exactitude necessite souvent une verification externe.

SQL - Mesurer la validite
-- Validite : champs respectant les regles metier
SELECT
    'email_format' AS regle,
    SUM(CASE WHEN email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 ELSE 0 END) AS valides,
    COUNT(*) AS total,
    ROUND(100.0 * SUM(CASE WHEN email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct
FROM clients
UNION ALL
SELECT
    'age_range',
    SUM(CASE WHEN age BETWEEN 0 AND 150 THEN 1 ELSE 0 END),
    COUNT(*),
    ROUND(100.0 * SUM(CASE WHEN age BETWEEN 0 AND 150 THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients
UNION ALL
SELECT
    'statut_enum',
    SUM(CASE WHEN statut IN ('ACTIF', 'INACTIF', 'SUSPENDU', 'FERME') THEN 1 ELSE 0 END),
    COUNT(*),
    ROUND(100.0 * SUM(CASE WHEN statut IN ('ACTIF', 'INACTIF', 'SUSPENDU', 'FERME') THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients;

6. Uniqueness (Unicite)

L'unicite garantit que chaque entite n'est representee qu'une seule fois dans le dataset. Les doublons sont l'un des problemes de qualite les plus couteux.

SQL - Mesurer l'unicite
-- Detection de doublons exacts
SELECT
    email, COUNT(*) AS nb_occurrences
FROM clients
GROUP BY email
HAVING COUNT(*) > 1
ORDER BY nb_occurrences DESC;

-- Taux de doublons global
SELECT
    COUNT(*) AS total_lignes,
    COUNT(DISTINCT client_id) AS clients_uniques,
    COUNT(*) - COUNT(DISTINCT client_id) AS doublons,
    ROUND(100.0 * (COUNT(*) - COUNT(DISTINCT client_id)) / COUNT(*), 2) AS pct_doublons
FROM clients;

-- Doublons fuzzy : noms similaires (distance de Levenshtein)
SELECT
    a.client_id AS id_a,
    b.client_id AS id_b,
    a.nom, b.nom,
    a.email, b.email,
    levenshtein(LOWER(a.nom), LOWER(b.nom)) AS distance_nom
FROM clients a
JOIN clients b ON a.client_id < b.client_id
WHERE levenshtein(LOWER(a.nom), LOWER(b.nom)) <= 2
   AND (a.email = b.email OR a.telephone = b.telephone);

Framework de Mesure Global

Data Quality Scorecard
  Dimension       Score   Seuil    Statut
  ============   ======  ======   ========
  Accuracy        97.3%   > 95%    [OK]
  Completeness    94.1%   > 90%    [OK]
  Consistency     99.2%   > 98%    [OK]
  Timeliness      85.0%   > 95%    [ALERTE]
  Validity        98.7%   > 97%    [OK]
  Uniqueness      99.9%   > 99%    [OK]
  ============   ======  ======   ========
  Score Global    95.7%   > 95%    [OK]

Les seuils dependent du contexte

Un taux de completude de 90% peut etre excellent pour des donnees marketing mais catastrophique pour des donnees financieres reglementees. Definissez toujours vos seuils en collaboration avec les equipes metier. Un champ "commentaire" peut tolerer 50% de nulls, mais un champ "montant" sur une facture doit etre a 100%.

Airbnb - Data Quality a l'echelle

Airbnb a developpe en interne un framework de qualite des donnees appele Midas qui mesure systematiquement les six dimensions de qualite sur chaque dataset critique. Leur approche repose sur trois piliers :

  • Certification : chaque dataset est classe Bronze (brut), Silver (nettoye), ou Gold (certifie). Seuls les datasets Gold peuvent alimenter les dashboards de direction et les modeles ML de production.
  • SLAs par dimension : chaque dataset Gold a des SLAs explicites (completude > 99.5%, fraicheur < 1h, unicite > 99.9%).
  • Ownership : chaque dataset a un proprietaire identifie qui est responsable de maintenir les SLAs et qui est alerte en cas de degradation.

Resultat : le nombre d'incidents data impactant les decisions business a ete reduit de 60% en un an.

Mesurer la Qualite sans Agir

De nombreuses organisations mettent en place des dashboards de qualite des donnees... que personne ne consulte. Les scores de qualite baissent progressivement sans que personne ne reagisse. La mesure sans action est inutile. Chaque metrique de qualite doit etre associee a : (1) un seuil d'alerte, (2) un responsable clairement identifie, (3) un processus de remediation documente. Si vous ne pouvez pas agir sur une metrique, ne la mesurez pas.

Quelle est la difference entre validite et exactitude des donnees ?
La validite verifie qu'une donnee respecte un format ou une regle predefinied (ex: un email a le bon format, un age est entre 0 et 150). L'exactitude verifie que la donnee correspond a la realite du monde reel (ex: l'email est bien celui du client). Une donnee peut etre valide mais inexacte (email au bon format mais mauvais email), ou exacte mais dans un format invalide. La validite est automatisable, l'exactitude necessite souvent une source de reference externe.
Pourquoi la fraicheur est-elle consideree comme une dimension de qualite distincte ?
Une donnee peut etre parfaitement exacte, complete et coherente, mais si elle arrive trop tard, elle perd sa valeur. Par exemple, un score de risque credit calcule avec des donnees de la veille peut etre incorrect si le client a fait de grosses transactions aujourd'hui. La fraicheur est critique dans les systemes temps reel (fraude, pricing dynamique) et les dashboards operationnels. Elle se mesure par le delai entre la generation de la donnee et sa disponibilite pour les consommateurs.

Great Expectations & Soda

60 min Avance

Objectifs de la lecon

  • Maitriser Great Expectations : expectation suites, checkpoints, data docs
  • Comprendre SodaCL et Soda Cloud pour la validation des donnees
  • Comparer les deux outils et choisir selon le contexte
  • Integrer les tests de qualite dans un pipeline CI/CD data

Great Expectations et Soda sont les deux leaders open-source de la validation de donnees. GE est puissant mais complexe, Soda est plus simple mais moins flexible. En pratique, beaucoup d'equipes utilisent les deux : GE pour les validations complexes et programmatiques, Soda pour les checks rapides ecrits par les analystes. L'important n'est pas l'outil mais la couverture de vos tests.

Great Expectations : Vue d'Ensemble

Great Expectations (GE) est un framework Python open-source pour la validation, la documentation et le profiling des donnees. Il repose sur trois concepts fondamentaux :

Architecture Great Expectations
  +------------------+     +-------------------+     +----------------+
  |   Data Source     |     | Expectation Suite |     |  Data Docs     |
  | (Pandas, Spark,  |---->| (regles de        |---->| (documentation |
  |  SQL, fichiers)  |     |  validation)      |     |  auto-generee) |
  +------------------+     +-------------------+     +----------------+
                                    |
                                    v
                           +-------------------+
                           |   Checkpoint      |
                           | (orchestration    |
                           |  des validations) |
                           +--------+----------+
                                    |
                                    v
                           +-------------------+
                           | Validation Result |
                           | (succes/echec +   |
                           |  details)         |
                           +-------------------+

Les trois piliers de Great Expectations

Expectations : des assertions declaratives sur vos donnees ("cette colonne ne doit pas avoir de nulls", "les valeurs doivent etre entre 0 et 1000").
Checkpoints : orchestrent l'execution des expectations sur des batches de donnees specifiques, avec des actions conditionnelles (alerte, blocage du pipeline).
Data Docs : documentation HTML auto-generee qui presente les resultats de validation de facon visuelle et partageable.

Python - Great Expectations : Expectation Suite
import great_expectations as gx

# Initialiser le contexte GE
context = gx.get_context()

# Connecter une source de donnees
datasource = context.sources.add_or_update_pandas(name="source_ventes")
data_asset = datasource.add_csv_asset(
    name="ventes_quotidiennes",
    filepath_or_buffer="data/ventes_2024.csv"
)
batch_request = data_asset.build_batch_request()

# Creer une Expectation Suite
suite = context.add_or_update_expectation_suite("ventes_quality_suite")

# Obtenir un validator pour definir les expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="ventes_quality_suite"
)

# Definir les expectations
# 1. Completude : pas de nulls sur les champs critiques
validator.expect_column_values_to_not_be_null("commande_id")
validator.expect_column_values_to_not_be_null("montant")
validator.expect_column_values_to_not_be_null("date_vente")

# 2. Validite : types et plages
validator.expect_column_values_to_be_between(
    "montant", min_value=0.01, max_value=100000,
    mostly=0.99  # tolerer 1% d'anomalies
)
validator.expect_column_values_to_be_in_set(
    "statut", ["CONFIRMEE", "EXPEDIEE", "LIVREE", "ANNULEE"]
)
validator.expect_column_values_to_match_regex(
    "email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)

# 3. Unicite
validator.expect_column_values_to_be_unique("commande_id")

# 4. Coherence
validator.expect_column_pair_values_a_to_be_greater_than_b(
    "date_livraison", "date_vente"
)

# 5. Volume : nombre de lignes attendu
validator.expect_table_row_count_to_be_between(
    min_value=1000, max_value=50000
)

# Sauvegarder la suite
validator.save_expectation_suite(discard_failed_expectations=False)
Python - Great Expectations : Checkpoint
# Creer un checkpoint pour orchestrer la validation
checkpoint = context.add_or_update_checkpoint(
    name="ventes_daily_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": "ventes_quality_suite",
        }
    ],
    action_list=[
        # Stocker le resultat
        {
            "name": "store_validation_result",
            "action": {"class_name": "StoreValidationResultAction"}
        },
        # Mettre a jour les Data Docs
        {
            "name": "update_data_docs",
            "action": {"class_name": "UpdateDataDocsAction"}
        },
        # Envoyer une alerte Slack en cas d'echec
        {
            "name": "send_slack_notification",
            "action": {
                "class_name": "SlackNotificationAction",
                "slack_webhook": "${SLACK_WEBHOOK_URL}",
                "notify_on": "failure",
                "renderer": {
                    "class_name": "SlackRenderer"
                }
            }
        }
    ]
)

# Executer le checkpoint
result = checkpoint.run()

# Verifier le resultat
if not result.success:
    print("ECHEC de la validation !")
    for r in result.run_results.values():
        for expectation_result in r["validation_result"].results:
            if not expectation_result.success:
                print(f"  FAILED: {expectation_result.expectation_config.expectation_type}")
                print(f"  Details: {expectation_result.result}")
else:
    print("Validation reussie - pipeline peut continuer")

SodaCL : Validation Declarative

SodaCL (Soda Checks Language) est un langage declaratif YAML pour definir des tests de qualite des donnees. Son approche est plus simple et accessible que GE, ce qui le rend populaire aupres des analystes et data engineers.

YAML - SodaCL Checks
# checks/ventes_checks.yml
checks for ventes:
  # Completude
  - missing_count(commande_id) = 0:
      name: "Commande ID ne doit pas etre null"
  - missing_percent(email) < 5:
      name: "Email manquant tolere a 5%"

  # Volume
  - row_count between 1000 and 50000:
      name: "Volume de ventes quotidien attendu"
  - row_count > 0:
      name: "La table ne doit pas etre vide"

  # Fraicheur
  - freshness(date_vente) < 1d:
      name: "Les donnees doivent avoir moins de 24h"

  # Validite
  - invalid_percent(montant) < 1:
      valid min: 0.01
      valid max: 100000
      name: "Montants dans la plage valide"
  - invalid_count(statut) = 0:
      valid values: ["CONFIRMEE", "EXPEDIEE", "LIVREE", "ANNULEE"]

  # Unicite
  - duplicate_count(commande_id) = 0:
      name: "Pas de doublons sur commande_id"

  # Coherence inter-tables
  - values in (client_id) must exist in clients (client_id):
      name: "Integrite referentielle clients"

  # Custom SQL
  - failed rows:
      fail condition: date_livraison < date_vente
      name: "Date livraison doit etre apres date vente"

  # Schema
  - schema:
      fail:
        when required column missing:
          [commande_id, client_id, montant, date_vente, statut]
        when wrong type:
          montant: decimal
          date_vente: date
YAML - Soda Configuration
# configuration.yml
data_source ma_base:
  type: postgres
  host: ${POSTGRES_HOST}
  port: 5432
  username: ${POSTGRES_USER}
  password: ${POSTGRES_PASSWORD}
  database: production
  schema: public

soda_cloud:
  host: cloud.soda.io
  api_key_id: ${SODA_API_KEY_ID}
  api_key_secret: ${SODA_API_KEY_SECRET}
Bash - Executer Soda
# Installation
pip install soda-core-postgres

# Executer les checks
soda scan -d ma_base -c configuration.yml checks/ventes_checks.yml

# Resultat typique :
# Scan summary:
# 10/10 checks PASSED
# 0 checks WARNED
# 0 checks FAILED
# All is good. No failures. No warnings. No errors.

Comparaison GE vs Soda

Great Expectations

  • Framework Python complet et extensible
  • 200+ expectations built-in
  • Data Docs : documentation auto-generee
  • Profiling automatique pour decouvrir les expectations
  • Courbe d'apprentissage plus raide
  • Ideal pour les equipes Python/engineering
  • Open-source gratuit

Soda

  • Langage declaratif YAML simple
  • Accessible aux analystes non-developpeurs
  • Soda Cloud : UI collaborative, historique, alertes
  • Integration native Airflow, dbt, Prefect
  • Anomaly detection (ML) dans Soda Cloud
  • Soda Core open-source, Cloud payant
  • Ideal pour les equipes mixtes
CritereGreat ExpectationsSoda
LangagePython (programmatique)YAML (declaratif)
ComplexiteEleveeFaible a moyenne
ExtensibiliteTres elevee (custom expectations Python)Moyenne (custom SQL)
DocumentationData Docs HTMLSoda Cloud UI
CI/CDNatif (checkpoint en Python)CLI integrable facilement
Equipe cibleData EngineersAnalystes + Engineers
CoutGratuit (GX Cloud payant)Core gratuit, Cloud payant

GitLab - Qualite des Donnees avec Great Expectations

GitLab a integre Great Expectations dans leur pipeline dbt pour valider les donnees a chaque etape de transformation. Leur approche :

  • Layer Bronze : expectations basiques (schema, nulls, types) executees a chaque ingestion
  • Layer Silver : expectations metier (referential integrity, business rules) executees apres les transformations dbt
  • Layer Gold : expectations de conformite (KPIs coherents, totaux equilibres) executees avant publication
  • Data Docs : publies automatiquement sur un site interne, consultables par toute l'equipe data

Resultats : les incidents de qualite detectes en amont sont passes de 20% a 85%, reduisant significativement les corrections en production.

Scenario : Integration dans un Pipeline dbt + Airflow

Votre pipeline dbt transforme les donnees de ventes chaque nuit. Vous souhaitez ajouter des tests de qualite :

  • Avant les transformations : Soda scan sur les tables source (fraicheur, completude, volume)
  • Apres les transformations : dbt tests natifs (unique, not_null, relationships) + GE checkpoint pour les regles metier complexes
  • Avant la publication : GE checkpoint final sur les tables Gold avec des seuils stricts
  • Alerting : Slack en cas d'echec, blocage du pipeline si les tests critiques echouent

Tester Uniquement en Production

L'erreur la plus courante est de n'executer les tests de qualite qu'en production, sur les donnees finales. A ce stade, les donnees corrompues ont deja traverse tout le pipeline et potentiellement alimente des dashboards ou des modeles ML. Testez a chaque couche : source (ingestion), transformation (dbt), et serving (publication). Plus un probleme est detecte tot, moins il est couteux a corriger.

Qu'est-ce qu'un Checkpoint dans Great Expectations et a quoi sert-il ?
Un Checkpoint est l'unite d'orchestration dans GE. Il combine une ou plusieurs validations (batch de donnees + expectation suite) avec une liste d'actions a executer apres la validation (stocker les resultats, mettre a jour les Data Docs, envoyer une notification Slack, bloquer le pipeline). C'est le point d'entree pour executer des validations dans un pipeline automatise (Airflow, CI/CD). Un checkpoint peut valider plusieurs datasets avec differentes suites d'expectations en une seule execution.
Quand choisir Soda plutot que Great Expectations ?
Choisissez Soda quand : (1) votre equipe inclut des analystes non-Python qui doivent ecrire des tests, (2) vous voulez une syntaxe YAML simple et lisible, (3) vous avez besoin d'une UI collaborative (Soda Cloud), (4) vous voulez de l'anomaly detection ML sans effort. Choisissez GE quand : (1) vous avez des regles de validation complexes necessitant du code Python, (2) vous voulez une extensibilite maximale, (3) vous avez besoin de Data Docs auto-generees, (4) votre equipe est a l'aise avec Python. Les deux outils ne sont pas mutuellement exclusifs.

Data Observability

55 min Avance

Objectifs de la lecon

  • Comprendre les cinq piliers de la data observability : freshness, volume, schema, lineage, distribution
  • Evaluer les solutions du marche : Monte Carlo, Bigeye, Elementary, Metaplane
  • Mettre en place un monitoring proactif avec alertes et data lineage
  • Construire une culture d'observabilite data dans votre organisation

La data observability est a la data ce que l'APM (Application Performance Monitoring) est aux applications. Avant Datadog et New Relic, les equipes SRE decouvrait les pannes quand les utilisateurs se plaignaient. La meme chose se passe aujourd'hui avec les donnees : les equipes data decouvrent les problemes quand un VP demande "pourquoi mon dashboard est faux ?". L'observabilite data change ce paradigme de reactif a proactif.

Les 5 Piliers de la Data Observability

Les 5 Piliers de l'Observabilite Data
  +---------------------------------------------------------+
  |              DATA OBSERVABILITY                         |
  +---------------------------------------------------------+
  |                                                         |
  |  1. FRESHNESS    Les donnees arrivent-elles a temps ?   |
  |     [derniere MAJ il y a 2h - SLA: 1h] --> ALERTE      |
  |                                                         |
  |  2. VOLUME       Le nombre de lignes est-il normal ?    |
  |     [50K lignes au lieu de 200K habituels] --> ALERTE   |
  |                                                         |
  |  3. SCHEMA       La structure a-t-elle change ?         |
  |     [colonne "revenue" supprimee] --> ALERTE            |
  |                                                         |
  |  4. DISTRIBUTION Les valeurs sont-elles normales ?      |
  |     [% de nulls passe de 2% a 45%] --> ALERTE          |
  |                                                         |
  |  5. LINEAGE      Quel est l'impact en aval ?            |
  |     [table source cassee -> 15 dashboards impactes]     |
  |                                                         |
  +---------------------------------------------------------+

Pilier 1 : Freshness (Fraicheur)

Le monitoring de fraicheur detecte quand les donnees cessent d'arriver ou arrivent en retard par rapport aux SLAs definis.

SQL - Monitoring de Fraicheur
-- Monitoring de fraicheur pour toutes les tables critiques
WITH freshness_check AS (
    SELECT
        'commandes' AS table_name,
        MAX(updated_at) AS last_update,
        INTERVAL '1 hour' AS sla_interval
    FROM commandes
    UNION ALL
    SELECT 'paiements', MAX(updated_at), INTERVAL '2 hours'
    FROM paiements
    UNION ALL
    SELECT 'inventory', MAX(updated_at), INTERVAL '30 minutes'
    FROM inventory
)
SELECT
    table_name,
    last_update,
    NOW() - last_update AS age,
    sla_interval,
    CASE
        WHEN NOW() - last_update > sla_interval THEN 'BREACH'
        WHEN NOW() - last_update > sla_interval * 0.8 THEN 'WARNING'
        ELSE 'OK'
    END AS status
FROM freshness_check
ORDER BY
    CASE WHEN NOW() - last_update > sla_interval THEN 0 ELSE 1 END,
    age DESC;

Pilier 2 : Volume

Le monitoring de volume detecte les anomalies dans le nombre de lignes ingerees ou transformees. Un drop soudain peut indiquer une source en panne, un pipeline casse ou un filtre trop agressif.

Python - Detection d'Anomalies de Volume
import pandas as pd
from scipy import stats

def detect_volume_anomaly(table_name, connection):
    """Detecter les anomalies de volume avec Z-score."""
    # Historique des 30 derniers jours
    query = f"""
        SELECT date_trunc('day', created_at) AS jour, COUNT(*) AS volume
        FROM {table_name}
        WHERE created_at >= NOW() - INTERVAL '30 days'
        GROUP BY 1
        ORDER BY 1
    """
    df = pd.read_sql(query, connection)

    if len(df) < 7:
        return {"status": "INSUFFICIENT_DATA"}

    # Calcul du Z-score pour le jour le plus recent
    volumes = df['volume'].values
    current = volumes[-1]
    historical = volumes[:-1]

    z_score = (current - historical.mean()) / historical.std()

    status = "OK"
    if abs(z_score) > 3:
        status = "CRITICAL"  # anomalie forte
    elif abs(z_score) > 2:
        status = "WARNING"   # anomalie moderee

    return {
        "table": table_name,
        "current_volume": int(current),
        "expected_range": f"{int(historical.mean() - 2*historical.std())} - "
                         f"{int(historical.mean() + 2*historical.std())}",
        "z_score": round(z_score, 2),
        "status": status
    }

Pilier 3 : Schema

Le monitoring de schema detecte les changements de structure : colonnes ajoutees, supprimees, renommees, ou avec un type modifie.

Python - Schema Change Detection
def detect_schema_changes(table_name, connection, expected_schema):
    """Comparer le schema actuel avec le schema attendu."""
    query = f"""
        SELECT column_name, data_type, is_nullable
        FROM information_schema.columns
        WHERE table_name = '{table_name}'
        ORDER BY ordinal_position
    """
    current_schema = pd.read_sql(query, connection)
    current_cols = set(current_schema['column_name'])
    expected_cols = set(expected_schema.keys())

    changes = []

    # Colonnes ajoutees
    for col in current_cols - expected_cols:
        changes.append({"type": "ADDED", "column": col, "severity": "WARNING"})

    # Colonnes supprimees
    for col in expected_cols - current_cols:
        changes.append({"type": "REMOVED", "column": col, "severity": "CRITICAL"})

    # Changements de type
    for _, row in current_schema.iterrows():
        col = row['column_name']
        if col in expected_schema:
            if row['data_type'] != expected_schema[col]['type']:
                changes.append({
                    "type": "TYPE_CHANGED",
                    "column": col,
                    "from": expected_schema[col]['type'],
                    "to": row['data_type'],
                    "severity": "CRITICAL"
                })

    return changes

Pilier 4 : Distribution

Le monitoring de distribution detecte les changements statistiques dans les valeurs des colonnes : drift, augmentation de nulls, valeurs hors norme.

Metriques de Distribution a Monitorer

Null rate : pourcentage de valeurs nulles par colonne (alerter si deviation > 5 points).
Distinct count : nombre de valeurs uniques (alerter si changement soudain).
Min / Max / Mean / Median : statistiques descriptives (alerter si hors plage historique).
Value distribution : proportions des categories (alerter si une categorie disparait ou domine soudainement).
Ces metriques sont collectees periodiquement et comparees a l'historique pour detecter les anomalies.

Pilier 5 : Lineage (Lignage)

Le data lineage trace l'origine et les transformations des donnees de bout en bout. En cas d'incident, il permet de repondre instantanement a : "quels dashboards et modeles sont impactes ?"

Data Lineage - Impact Analysis
  Source CRM            Source ERP           Source Web
  (PostgreSQL)          (Oracle)             (Kafka)
       |                    |                    |
       v                    v                    v
  [raw_clients]        [raw_commandes]     [raw_clickstream]
       |                    |                    |
       +----------+---------+                    |
                  |                               |
             [stg_clients]                   [stg_events]
                  |                               |
                  +---------------+---------------+
                                  |
                          [fct_ventes_enrichies]
                                  |
                  +---------------+---------------+
                  |               |               |
          [dashboard_CA]  [model_churn_ML]  [report_finance]
          (Tableau)       (Sagemaker)       (Email PDF)

  Si [raw_commandes] est en retard --> Impact sur :
    - fct_ventes_enrichies
    - dashboard_CA, model_churn_ML, report_finance
    --> Notifier les owners de ces 3 assets

Solutions du Marche

SolutionTypeForcesLimites
Monte CarloSaaSLeader du marche, ML-driven, lineage auto, integrations richesCout eleve, vendor lock-in
BigeyeSaaSFocus sur les metriques data, auto-thresholds MLMoins d'integrations que Monte Carlo
ElementaryOpen-sourceNatif dbt, gratuit, alerts Slack/emailLimite a l'ecosysteme dbt
MetaplaneSaaSSimple a deployer, bon lineage, alertes contextuellesPlus recent, moins de features avancees
OpenMetadataOpen-sourceCatalogue + observabilite, profiling, lineageNecessite infrastructure dediee

Uber - Data Observability a Grande Echelle

Uber gere plus de 10 000 pipelines data et des petaoctets de donnees. Leur systeme de data observability, appele uMonitor, surveille en continu la fraicheur, le volume et la distribution de chaque table critique.

  • Detection automatique : des modeles ML apprennent les patterns normaux de chaque table (saisonnalite hebdomadaire, tendances) et alertent sur les deviations
  • Lineage automatique : construit a partir des logs Hive/Spark, il permet d'identifier en moins de 30 secondes tous les assets impactes par un incident
  • Root cause analysis : quand un dashboard montre des donnees incorrectes, le systeme remonte automatiquement la chaine de lineage pour identifier la source du probleme
  • SLAs data : chaque table critique a un SLA de fraicheur et de volume, avec escalation automatique en cas de breach

Resultat : le MTTD (Mean Time To Detect) des incidents data est passe de 4 heures a 15 minutes, et le MTTR de 8 heures a 1 heure.

Faire Confiance Aveuglements aux Sources

L'erreur la plus dangereuse est de supposer que les systemes source produisent toujours des donnees correctes. "On n'a pas besoin de verifier, le CRM est fiable." En realite, les sources changent sans prevenir : un deploiement modifie un format de date, une migration supprime une colonne, un bug introduit des doublons. Monitorez TOUJOURS vos sources avec au minimum des checks de fraicheur, volume et schema. La confiance aveugle est la premiere cause d'incidents data en production.

Scenario : Mettre en Place l'Observabilite Data en 30 Jours

Votre entreprise n'a aucun monitoring de donnees. Voici un plan en 4 semaines :

  • Semaine 1 : Identifier les 10 tables les plus critiques. Deployer Elementary (gratuit, dbt-natif). Ajouter des tests de freshness et volume.
  • Semaine 2 : Ajouter le monitoring de schema et les alertes Slack. Documenter les SLAs de fraicheur par table.
  • Semaine 3 : Ajouter le monitoring de distribution (null rate, distinct count) sur les colonnes critiques. Configurer les seuils d'alerte.
  • Semaine 4 : Construire le data lineage (dbt docs ou OpenMetadata). Former l'equipe sur le processus d'incident response. Premier post-mortem data.
Quels sont les cinq piliers de la data observability et pourquoi sont-ils tous necessaires ?
Les cinq piliers sont : Freshness (les donnees arrivent-elles a temps ?), Volume (le nombre de lignes est-il normal ?), Schema (la structure a-t-elle change ?), Distribution (les valeurs sont-elles normales ?), et Lineage (quel est l'impact ?). Chacun detecte un type de probleme different. La fraicheur seule ne detecte pas un changement de schema. Le volume seul ne detecte pas un drift de distribution. Le lineage ne detecte rien mais permet de comprendre l'impact. Les cinq piliers ensemble offrent une couverture complete.
Quelle est la difference entre data testing (GE, Soda) et data observability (Monte Carlo, Bigeye) ?
Le data testing est proactif et base sur des regles : vous definissez des assertions explicites qui sont verifiees a chaque execution du pipeline. Le data observability est une surveillance continue basee sur l'apprentissage de patterns : le systeme apprend le comportement normal de vos donnees et alerte sur les anomalies sans que vous ayez a definir chaque regle. Le testing est precis mais ne couvre que ce que vous avez pense a tester. L'observabilite couvre les "unknown unknowns". Les deux approches sont complementaires.

DataOps Principles

55 min Avance

Objectifs de la lecon

  • Comprendre les principes fondamentaux du DataOps
  • Appliquer le Manifeste DataOps a votre organisation
  • Evaluer la maturite DataOps de votre equipe
  • Mettre en place l'automatisation, le monitoring et la collaboration

Le DataOps, c'est le DevOps applique aux donnees. Mais attention : ce n'est pas juste une question d'outils. C'est un changement culturel profond qui transforme la facon dont les equipes data travaillent ensemble. J'ai vu des organisations reduire leurs temps de livraison de 90% simplement en adoptant ces principes.

Qu'est-ce que le DataOps ?

Le DataOps est une methodologie collaborative de gestion des donnees qui s'inspire des pratiques DevOps et Agile. Son objectif principal est d'ameliorer la qualite et de reduire le temps de cycle de l'analytique de donnees.

Definition formelle

Le DataOps est une pratique de gestion des donnees orientee processus et automatisee qui ameliore la qualite et reduit le cycle de livraison de l'analytique de donnees, tout en favorisant la collaboration entre les equipes data, les operations et les metiers.

Evolution : Du Data Management traditionnel au DataOps
  Traditionnel          Agile Data           DataOps
  ============        ============        ============
  Waterfall            Sprints              Flux continu
  Silos                Collaboration        Culture unifiee
  Manuel               Semi-auto            Full automation
  Reactif              Proactif             Predictif
  Mois                 Semaines             Heures/Minutes
  Documentation        User Stories         Data Contracts
  QA en fin            Tests reguliers      Tests continus

Le Manifeste DataOps

Le Manifeste DataOps definit 18 principes fondamentaux, regroupes en categories essentielles :

CategoriePrincipes clesImpact
Individus et interactionsCollaboration cross-fonctionnelle, communication continue, boucles de feedbackReduction des silos de 80%
Analytique fonctionnelleLivrer de la valeur en continu, prioriser les besoins utilisateursTime-to-insight reduit de 60%
Orchestration automatiseeAutomatiser tout ce qui peut l'etre, infrastructure as codeErreurs manuelles eliminees a 95%
Qualite intrinsequeTests continus, monitoring, alertes proactivesIncidents reduits de 70%
TransparenceMetriques visibles, lineage accessible, documentation vivanteConfiance data augmentee de 85%

Les 3 piliers du DataOps

1. Automatisation

L'automatisation est le coeur du DataOps. Chaque tache repetitive doit etre automatisee pour eliminer les erreurs humaines et accelerer les livraisons.

YAML - Pipeline DataOps automatise
# dataops-pipeline.yml
name: DataOps Pipeline
triggers:
  - schedule: "0 */2 * * *"   # Toutes les 2 heures
  - on_push:
      branches: [main, develop]
  - on_data_change:
      sources: [raw_layer]

stages:
  validate:
    - data_quality_checks:
        tool: great_expectations
        suite: production_suite
    - schema_validation:
        tool: soda
        checks: schema_changes

  transform:
    - dbt_run:
        target: production
        select: "tag:incremental"
        fail_fast: true

  test:
    - dbt_test:
        severity: warn
    - custom_tests:
        script: tests/business_rules.py

  deploy:
    - blue_green_swap:
        strategy: zero_downtime
    - notify:
        channels: [slack, email]
        on: [success, failure]

  monitor:
    - freshness_check:
        max_delay: 30m
    - anomaly_detection:
        model: isolation_forest
        sensitivity: 0.95

2. Monitoring et Observabilite

Le monitoring DataOps va bien au-dela de la simple surveillance des jobs. Il couvre les 5 piliers de l'observabilite data :

Les 5 piliers du monitoring DataOps
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚    OBSERVABILITE DATAOPS     β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β–Ό       β–Ό       β–Ό       β–Ό          β–Ό
 Fraicheur Volume  Schema  Lineage  Distribution
    β”‚       β”‚       β”‚       β”‚          β”‚
 SLA ok?  +/-20%?  Drift?  Impact?  Anomalie?
    β”‚       β”‚       β”‚       β”‚          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                    β”‚
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”
            β”‚   ALERTING    β”‚
            β”‚  PagerDuty /  β”‚
            β”‚  Slack / Emailβ”‚
            β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3. Collaboration

Le DataOps brise les silos entre les equipes. Data Engineers, Data Analysts, Data Scientists et metiers travaillent en synergie.

Avant DataOps

  • Tickets Jira entre equipes
  • Reunions hebdomadaires
  • Documentation obsolete
  • Blame culture lors d'incidents
  • Chaque equipe a ses propres outils

Avec DataOps

  • Pull Requests collaboratives
  • Standups quotidiens cross-equipe
  • Data Contracts vivants
  • Blameless post-mortems
  • Plateforme data unifiee

Modele de maturite DataOps

Evaluez ou se situe votre organisation sur l'echelle de maturite DataOps :

NiveauNomCaracteristiquesMetriques typiques
1InitialProcessus manuels, pas de tests, deploiements ad hocDeploiement : jours/semaines, Incidents : frequents
2GereQuelques scripts d'automatisation, tests basiques, documentation partielleDeploiement : heures, Incidents : reguliers
3DefiniCI/CD en place, tests automatises, monitoring basiqueDeploiement : minutes, Incidents : occasionnels
4MesureMetriques SLA, data quality automatique, alertes proactivesDeploiement : minutes, Incidents : rares, MTTR < 1h
5OptimiseSelf-healing pipelines, ML-driven monitoring, amelioration continueDeploiement : secondes, Incidents : auto-corriges

Netflix - DataOps a l'echelle

Netflix traite des petaoctets de donnees quotidiennement pour alimenter ses systemes de recommandation et ses dashboards business. Leur approche DataOps repose sur :

  • Maestro : leur orchestrateur maison qui gere des dizaines de milliers de workflows quotidiens avec des SLA stricts
  • Data Mesh interne : chaque equipe (streaming, content, finance) possede et opere ses propres pipelines comme des produits data
  • Metacat : catalogue de metadonnees unifie qui offre une visibilite complete sur tout le lineage
  • Blameless post-mortems : chaque incident data majeur fait l'objet d'une analyse collaborative avec un plan d'action concret

Resultat : temps de livraison d'une nouvelle metrique business passe de 2 semaines a 2 heures, reduction de 75% des incidents data critiques.

Le "Tool-First DataOps"

L'erreur la plus courante est de croire qu'acheter un outil DataOps suffit a transformer votre organisation. J'ai vu des entreprises investir des centaines de milliers d'euros dans des plateformes DataOps sans changer leurs processus ni leur culture.

  • Symptome : on achete un outil d'orchestration mais les deploiements restent manuels
  • Symptome : on installe un outil de monitoring mais personne ne regarde les alertes
  • Correction : commencer par les processus et la culture, puis choisir les outils qui supportent ces processus

Demarrer le DataOps dans votre equipe

Commencez petit : automatisez d'abord un seul pipeline critique de bout en bout (tests, deploy, monitoring). Mesurez l'impact, puis etendez progressivement. Le DataOps est un voyage, pas une destination.

Quelle est la difference principale entre DevOps et DataOps ?
Le DataOps integre les memes principes que le DevOps (automatisation, CI/CD, collaboration) mais ajoute des preoccupations specifiques aux donnees : qualite des donnees, freshness, lineage, schema evolution, et gestion du volume. Le DataOps doit gerer l'incertitude inherente aux donnees (schemas qui changent, volumes variables, sources externes).
Quels sont les 5 piliers du monitoring DataOps ?
Les 5 piliers sont : (1) Fraicheur - les donnees sont-elles a jour selon les SLA, (2) Volume - le nombre de lignes est-il dans les limites attendues, (3) Schema - la structure a-t-elle change de maniere inattendue, (4) Lineage - quel est l'impact en aval d'un probleme, (5) Distribution - les valeurs suivent-elles les patterns attendus.

Quiz - DataOps Principles

Quel est le premier principe du Manifeste DataOps ?

A) Automatiser tous les processus
B) Les individus et les interactions avant les processus et les outils
C) Satisfaire le client par la livraison continue de donnees de valeur
D) Mesurer toutes les metriques possibles
La bonne reponse est B. Comme dans le Manifeste Agile, le DataOps met l'accent sur les individus et les interactions, car ce sont les personnes qui font la difference dans la reussite des projets data.

A quel niveau de maturite DataOps une organisation dispose-t-elle de pipelines "self-healing" ?

A) Niveau 3 - Defini
B) Niveau 4 - Mesure
C) Niveau 5 - Optimise
D) Niveau 2 - Gere
La bonne reponse est C. Le niveau 5 (Optimise) est le stade le plus avance ou les pipelines peuvent se reparer automatiquement grace au ML et a l'automatisation poussee.

CI/CD pour dbt

60 min Expert

Objectifs de la lecon

  • Configurer un pipeline CI/CD complet pour dbt avec GitHub Actions
  • Implementer des pre-commit hooks pour dbt
  • Maitriser les strategies de build dbt (slim CI, deferred)
  • Gerer les environnements (dev, staging, prod) et les deploiements blue/green

Le CI/CD pour dbt est un game-changer. Avant, les equipes data deployaient en production en priant pour que tout fonctionne. Avec un pipeline CI/CD bien configure, chaque Pull Request est testee, validee et deployee de maniere fiable. C'est la difference entre l'artisanat et l'ingenierie.

Architecture CI/CD pour dbt

Un pipeline CI/CD pour dbt comprend plusieurs etapes qui garantissent la qualite du code et des donnees avant chaque deploiement en production.

Pipeline CI/CD dbt complet
  Developer         GitHub              CI/CD              Warehouse
  =========        ========           ========            ==========
      β”‚                β”‚                  β”‚                    β”‚
      │─── git push ──▢│                  β”‚                    β”‚
      β”‚                │── trigger ──────▢│                    β”‚
      β”‚                β”‚                  │── dbt compile ───▢│
      β”‚                β”‚                  │── dbt test ──────▢│
      β”‚                β”‚                  │── slim CI ───────▢│
      β”‚                β”‚                  β”‚                    β”‚
      β”‚                │◀── PR comment ───│                    β”‚
      │◀── review ─────│                  β”‚                    β”‚
      β”‚                β”‚                  β”‚                    β”‚
      │─── merge ─────▢│                  β”‚                    β”‚
      β”‚                │── trigger prod ─▢│                    β”‚
      β”‚                β”‚                  │── dbt build ────▢│
      β”‚                β”‚                  │── swap schemas ──▢│
      β”‚                β”‚                  │── notify ────────▢│
      β”‚                β”‚                  β”‚                    β”‚

GitHub Actions Workflow pour dbt

YAML - .github/workflows/dbt-ci.yml
name: dbt CI/CD Pipeline

on:
  pull_request:
    branches: [main]
    paths:
      - 'models/**'
      - 'macros/**'
      - 'tests/**'
      - 'dbt_project.yml'
      - 'packages.yml'

env:
  DBT_PROFILES_DIR: ./ci
  DBT_TARGET: ci
  SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
  SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
  SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}

jobs:
  lint:
    name: "Lint & Format Check"
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install sqlfluff dbt-core dbt-snowflake
      - name: SQLFluff Lint
        run: sqlfluff lint models/ --dialect snowflake

  slim-ci:
    name: "Slim CI - Modified Models Only"
    needs: lint
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0
      - uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      - run: pip install dbt-core dbt-snowflake

      - name: Get production manifest
        run: |
          aws s3 cp s3://dbt-artifacts/prod/manifest.json \
            ./target/manifest.json

      - name: dbt deps
        run: dbt deps

      - name: dbt build (modified only)
        run: |
          dbt build \
            --select state:modified+ \
            --defer \
            --state ./target \
            --target ci \
            --fail-fast

      - name: Generate docs
        run: dbt docs generate

      - name: Comment PR with results
        uses: actions/github-script@v7
        with:
          script: |
            const results = require('./target/run_results.json');
            const summary = results.results.map(r =>
              `| ${r.unique_id} | ${r.status} | ${r.execution_time}s |`
            ).join('\n');
            github.rest.issues.createComment({
              owner: context.repo.owner,
              repo: context.repo.repo,
              issue_number: context.issue.number,
              body: `## dbt CI Results\n| Model | Status | Time |\n|---|---|---|\n${summary}`
            });

Pre-commit Hooks pour dbt

Les pre-commit hooks garantissent que le code est valide avant meme d'etre pousse sur le depot distant.

YAML - .pre-commit-config.yaml
repos:
  - repo: https://github.com/sqlfluff/sqlfluff
    rev: 3.0.0
    hooks:
      - id: sqlfluff-lint
        args: [--dialect, snowflake]
        files: ^models/
      - id: sqlfluff-fix
        args: [--dialect, snowflake, --force]
        files: ^models/

  - repo: https://github.com/dbt-checkpoint/dbt-checkpoint
    rev: 2.0.0
    hooks:
      - id: check-model-has-description
      - id: check-model-has-tests
        args: [--test-cnt, "2"]
      - id: check-model-has-properties-file
      - id: check-model-name-contract
        args: [--pattern, "(staging|intermediate|marts)_.*"]
      - id: check-source-has-freshness
      - id: check-script-semicolon

  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.6.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-json

Strategies de build dbt

Slim CI vs Full Build

Le Slim CI (avec --select state:modified+) ne rebuild que les modeles modifies et leurs dependances en aval, en utilisant le manifest de production comme reference. Cela reduit drastiquement le temps de CI (de 45 min a 5 min typiquement).

StrategieCommande dbtCas d'usageDuree typique
Slim CIdbt build --select state:modified+ --deferPull Requests3-10 min
Full Builddbt buildDeploiement production30-90 min
Tag-baseddbt build --select tag:dailyScheduled runsVariable
Freshness-firstdbt source freshness && dbt buildValidation sourcesVariable + build

Gestion des environnements

YAML - profiles.yml multi-environnement
data_warehouse:
  target: dev
  outputs:
    dev:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      database: ANALYTICS_DEV
      schema: "dbt_{{ env_var('USER', 'default') }}"
      warehouse: DEV_WH_XS
      threads: 4

    ci:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      database: ANALYTICS_CI
      schema: "pr_{{ env_var('PR_NUMBER', '0') }}"
      warehouse: CI_WH_S
      threads: 8

    staging:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      database: ANALYTICS_STAGING
      schema: public
      warehouse: STAGING_WH_M
      threads: 16

    prod:
      type: snowflake
      account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
      user: "{{ env_var('SNOWFLAKE_USER') }}"
      database: ANALYTICS_PROD
      schema: public
      warehouse: PROD_WH_L
      threads: 32

Deploiements Blue/Green pour dbt

Le deploiement blue/green permet de deployer sans interruption de service en construisant les modeles dans un schema secondaire puis en effectuant un swap atomique.

SQL - Blue/Green Swap dans Snowflake
-- Etape 1: Build dans le schema "green" (secondaire)
-- dbt build --target prod --vars '{schema_suffix: _green}'

-- Etape 2: Verification de sante (health check)
SELECT
    'stg_orders' AS model_name,
    COUNT(*) AS row_count,
    MAX(updated_at) AS max_updated
FROM analytics_prod.public_green.stg_orders

UNION ALL

SELECT
    'fct_revenue' AS model_name,
    COUNT(*) AS row_count,
    MAX(updated_at) AS max_updated
FROM analytics_prod.public_green.fct_revenue;

-- Etape 3: Swap atomique des schemas
ALTER SCHEMA analytics_prod.public
  RENAME TO analytics_prod.public_old;
ALTER SCHEMA analytics_prod.public_green
  RENAME TO analytics_prod.public;

-- Etape 4: Cleanup (apres validation)
DROP SCHEMA IF EXISTS analytics_prod.public_old CASCADE;

Spotify - CI/CD pour pipelines data

Spotify gere plus de 20 000 pipelines data avec une approche CI/CD mature :

  • Backstage : leur portail developpeur open-source integre les pipelines data comme des "software components" avec CI/CD automatise
  • Golden Paths : des templates pre-configures avec CI/CD inclus pour que chaque nouvelle pipeline soit automatiquement testee et deployee
  • Automated Canary Analysis : les nouvelles versions de pipelines sont d'abord deployees sur un sous-ensemble de donnees avant le rollout complet
  • Resultat : 200+ data engineers deployent en production plusieurs fois par jour avec un taux d'echec inferieur a 2%

Incident de production sans CI/CD

Une fintech europeenne a subi une panne de 3 jours sur ses rapports reglementaires apres qu'un data engineer a deploye directement en production un modele dbt non teste :

  • Cause racine : un JOIN sans condition de deduplication a cree une explosion de lignes (x1000) dans les tables de reporting
  • Impact : rapports reglementaires incorrects envoyes au regulateur, amende potentielle de 500K euros
  • Duree : 3 jours pour identifier le probleme car aucun monitoring sur le volume des tables
  • Resolution : mise en place immediate d'un pipeline CI/CD avec tests de volume, dbt slim CI, et deploiement blue/green

Le deploiement "YOLO" en production

Deployer directement en production sans pipeline CI/CD est l'anti-pattern le plus dangereux en data engineering.

  • Symptome : dbt run --target prod execute depuis le poste d'un developpeur
  • Risque : pas de review, pas de tests, pas de rollback possible
  • Correction : interdire les acces directs en production, tout deploiement passe par le pipeline CI/CD

Secrets et CI/CD

Ne stockez JAMAIS les credentials dans le code ou les fichiers YAML. Utilisez toujours des variables d'environnement ou un gestionnaire de secrets (GitHub Secrets, HashiCorp Vault, AWS Secrets Manager). Un leak de credentials Snowflake peut couter des centaines de milliers d'euros en compute non autorise.

Qu'est-ce que le "Slim CI" dans dbt et pourquoi est-il essentiel ?
Le Slim CI utilise la commande dbt build --select state:modified+ --defer --state ./target pour ne reconstruire que les modeles modifies et leurs dependances en aval. Il utilise le manifest de production comme reference pour les modeles non modifies (--defer). C'est essentiel car cela reduit le temps de CI de 90% (par exemple, de 45 min a 5 min) tout en validant l'impact reel des changements.

Quiz - CI/CD pour dbt

Quel flag dbt permet de ne builder que les modeles modifies par rapport a la production ?

A) --select changed+
B) --select state:modified+
C) --select modified:true
D) --select diff:production
La bonne reponse est B. --select state:modified+ compare l'etat actuel avec un manifest de reference (typiquement production) et selectionne les modeles modifies et leurs dependances en aval (+).

Quel est l'avantage principal du deploiement Blue/Green pour dbt ?

A) Reduire le cout de compute
B) Zero downtime lors des deploiements
C) Accelerer les builds dbt
D) Simplifier les rollbacks
La bonne reponse est B. Le deploiement Blue/Green construit les modeles dans un schema secondaire puis effectue un swap atomique, garantissant zero downtime pour les utilisateurs. C'est aussi utile pour les rollbacks (D), mais l'avantage principal est le zero downtime.

Testing Strategies

55 min Expert

Objectifs de la lecon

  • Comprendre la pyramide de tests adaptee au Data Engineering
  • Implementer des tests unitaires, d'integration et de contrat
  • Mettre en place des tests de regression data
  • Concevoir une strategie de test complete pour les pipelines data

Le testing est le parent pauvre du Data Engineering. 80% des pipelines data en production n'ont aucun test automatise. C'est comme construire un pont sans verification de la structure. La bonne nouvelle : il n'est jamais trop tard pour commencer. Meme un seul test de volume sur votre table la plus critique est mieux que rien.

La pyramide de tests pour les donnees

Comme en genie logiciel, les tests data suivent une pyramide. Les tests les plus rapides et nombreux sont a la base, les plus lents et complexes au sommet.

Pyramide de tests Data Engineering
                    β•±β•²
                   β•±  β•²
                  β•± E2Eβ•²           Tests End-to-End
                 β•± Tests β•²         (Pipeline complete)
                ╱──────────╲       Lents, peu nombreux
               β•±            β•²
              β•±  Integration  β•²    Tests d'integration
             β•±    Tests        β•²   (Joins, aggregations)
            ╱──────────────────╲   Moyens
           β•±                    β•²
          β•±   Contract Tests     β•²  Tests de contrat
         β•±    (Schema, SLA)       β•² (Interfaces data)
        ╱──────────────────────────╲
       β•±                            β•²
      β•±      Unit Tests              β•² Tests unitaires
     β•±   (Logique transformation)     β•² Rapides, nombreux
    ╱──────────────────────────────────╲

  Rapidite β–²                    β–Ό Couverture
  Nombre   β–²                    β–Ό Complexite

1. Tests unitaires

Les tests unitaires verifient la logique de transformation de maniere isolee, sans avoir besoin de donnees reelles.

YAML - dbt unit test (dbt v1.8+)
unit_tests:
  - name: test_calcul_revenue_net
    description: "Verifie le calcul du revenu net avec remises et taxes"
    model: fct_revenue
    given:
      - input: ref('stg_orders')
        rows:
          - { order_id: 1, amount: 100.00, discount: 10.00, tax_rate: 0.20 }
          - { order_id: 2, amount: 200.00, discount: 0.00, tax_rate: 0.20 }
          - { order_id: 3, amount: 50.00, discount: 50.00, tax_rate: 0.20 }
      - input: ref('stg_refunds')
        rows:
          - { order_id: 3, refund_amount: 0.00 }
    expect:
      rows:
        - { order_id: 1, net_revenue: 108.00 }  # (100-10) * 1.20
        - { order_id: 2, net_revenue: 240.00 }  # (200-0) * 1.20
        - { order_id: 3, net_revenue: 0.00 }    # (50-50) * 1.20

  - name: test_classification_client
    description: "Verifie la segmentation client par montant d'achat"
    model: dim_customers
    given:
      - input: ref('stg_customers')
        rows:
          - { customer_id: 1, total_purchases: 15000 }
          - { customer_id: 2, total_purchases: 5000 }
          - { customer_id: 3, total_purchases: 500 }
    expect:
      rows:
        - { customer_id: 1, segment: 'premium' }
        - { customer_id: 2, segment: 'standard' }
        - { customer_id: 3, segment: 'basic' }

2. Tests d'integration

Les tests d'integration verifient que les transformations fonctionnent correctement avec des donnees reelles sur des volumes reduits.

YAML - tests dbt schema.yml
models:
  - name: fct_orders
    description: "Table de faits des commandes"
    tests:
      - dbt_utils.fewer_rows_than:
          compare_model: ref('stg_orders')
          # Apres deduplication, fct doit avoir moins de lignes
      - dbt_utils.expression_is_true:
          expression: "total_amount >= 0"
          config:
            severity: error

    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id
      - name: order_date
        tests:
          - not_null
          - dbt_utils.accepted_range:
              min_value: "'2020-01-01'"
              max_value: "current_date()"
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']

3. Tests de contrat (Contract Tests)

Les tests de contrat verifient que les interfaces entre producteurs et consommateurs de donnees sont respectees.

YAML - dbt model contract
models:
  - name: fct_revenue
    config:
      contract:
        enforced: true
    columns:
      - name: revenue_id
        data_type: varchar(36)
        constraints:
          - type: not_null
          - type: primary_key
      - name: customer_id
        data_type: varchar(36)
        constraints:
          - type: not_null
          - type: foreign_key
            to: ref('dim_customers')
            to_columns: [customer_id]
      - name: revenue_date
        data_type: date
        constraints:
          - type: not_null
      - name: net_amount
        data_type: number(18,2)
        constraints:
          - type: not_null
          - type: check
            expression: "net_amount >= 0"

Pourquoi les tests de contrat sont essentiels

Les tests de contrat garantissent que les modifications en amont ne cassent pas les consommateurs en aval. Si un producteur change le type d'une colonne ou supprime un champ, le test de contrat echoue AVANT que les donnees corrompues n'atteignent les dashboards ou les modeles ML.

4. Tests de regression data

Les tests de regression comparent les resultats d'une nouvelle version du pipeline avec la version precedente pour detecter les changements inattendus.

Python - Test de regression avec audit_helper
# macros/test_regression_revenue.sql
{% test regression_revenue(model, compare_model) %}

  {% set audit_query = dbt_utils.audit_helper.compare_relations(
      a_relation=model,
      b_relation=compare_model,
      primary_key="revenue_id",
      columns=["customer_id", "revenue_date", "net_amount"]
  ) %}

  WITH comparison AS (
      {{ audit_query }}
  )

  SELECT *
  FROM comparison
  WHERE in_a != in_b
    -- Tolerance de 1% sur les montants
    AND ABS(a_net_amount - b_net_amount) / NULLIF(a_net_amount, 0) > 0.01

{% endtest %}
Python - data_diff pour comparaison cross-DB
# Comparaison entre staging et production
# pip install data-diff

from data_diff import connect, diff_tables

db_prod = connect("snowflake://user:pass@account/ANALYTICS_PROD/PUBLIC")
db_staging = connect("snowflake://user:pass@account/ANALYTICS_STAGING/PUBLIC")

diff_result = diff_tables(
    table1=db_prod.get_table("fct_revenue"),
    table2=db_staging.get_table("fct_revenue"),
    key_columns=["revenue_id"],
    extra_columns=["net_amount", "customer_id"],
    where="revenue_date >= '2024-01-01'"
)

for row_diff in diff_result:
    print(f"Difference trouvee: {row_diff}")

# Resultat:
# + row in staging only: revenue_id=X, net_amount=150.00
# ~ changed: revenue_id=Y, net_amount 100->120
# - row in prod only: revenue_id=Z (supprime en staging)

5. Strategie de test complete

Type de testQuoi testerQuandOutils
UnitLogique de transformation isoleePre-commit, CIdbt unit tests, pytest
IntegrationJoins, aggregations, deduplicationCI, post-deploydbt tests, Great Expectations
ContractSchema, types, contraintesCI, chaque builddbt contracts, Soda
RegressionComparaison avant/apresPre-deploy productiondata-diff, audit_helper
E2EPipeline complete bout en boutNightly, pre-releaseCustom scripts, Airflow tests
PerformanceTemps d'execution, coutCI, monitoringdbt timing, warehouse metrics

Zero tests en data pipeline

L'anti-pattern le plus repandu en Data Engineering : deployer des pipelines sans aucun test automatise.

  • Excuse courante : "Les donnees changent tout le temps, on ne peut pas les tester"
  • Realite : ce sont justement les changements imprevisibles qui rendent les tests indispensables
  • Consequence : les erreurs sont detectees par les utilisateurs finaux, pas par l'equipe data
  • Correction : commencer par les tests basiques (not_null, unique, relationships) puis monter progressivement en complexite

Scenario : Mise en place de tests sur un pipeline existant

Vous heritez d'un pipeline dbt de 150 modeles sans aucun test. Par ou commencer ?

  • Semaine 1 : Identifier les 10 tables les plus critiques (celles qui alimentent les dashboards C-level et les rapports reglementaires)
  • Semaine 2 : Ajouter des tests basiques (not_null, unique sur les cles primaires, relationships sur les cles etrangeres)
  • Semaine 3 : Ajouter des tests de volume (row count dans une plage acceptable) et de fraicheur (source freshness)
  • Semaine 4 : Implementer des tests de contrat sur les modeles marts exposes aux consommateurs
  • Mois 2+ : Ajouter des unit tests pour les transformations complexes, des tests de regression pour les deploiements
Quelle est la difference entre un test d'integration et un test de contrat en Data Engineering ?
Un test d'integration verifie que les transformations produisent des resultats corrects (joins, aggregations, logique metier). Un test de contrat verifie que l'interface de donnees (schema, types, contraintes) est respectee entre producteur et consommateur. Le test de contrat ne verifie pas la logique, il garantit que la structure est conforme au "contrat" agree entre equipes.
Pourquoi les tests de regression data sont-ils plus complexes que les tests de regression logicielle ?
En logiciel, l'output est deterministe pour un input donne. En data, les donnees changent en permanence (nouvelles lignes, valeurs differentes), donc on ne peut pas simplement comparer "output attendu vs output reel". Il faut definir des tolerances (ex: +/- 1% sur les montants), exclure les variations normales, et se concentrer sur les invariants structurels (schema, relations, bornes).

Quiz - Testing Strategies

Quel type de test se trouve a la BASE de la pyramide de tests data ?

A) Tests E2E
B) Tests d'integration
C) Tests unitaires
D) Tests de contrat
La bonne reponse est C. Les tests unitaires constituent la base de la pyramide : ils sont rapides, nombreux et verifient la logique de transformation de maniere isolee.

Quel est le principal avantage des dbt contracts (enforced: true) ?

A) Accelerer les builds dbt
B) Garantir la compatibilite du schema entre producteurs et consommateurs
C) Reduire le nombre de tests necessaires
D) Automatiser la documentation
La bonne reponse est B. Les dbt contracts (avec enforced: true) garantissent que les colonnes, types et contraintes du modele respectent le contrat defini, protegeant ainsi les consommateurs en aval des changements inattendus.

Version Control Data

55 min Expert

Objectifs de la lecon

  • Comprendre les enjeux du versionnement des donnees
  • Maitriser DVC, lakeFS et Nessie pour le data versioning
  • Implementer des patterns de versionnement adaptes a chaque cas d'usage
  • Exploiter le Time Travel pour l'audit et le rollback

Versionner le code est une evidence depuis 20 ans. Versionner les donnees reste un defi majeur en 2024. Pourtant, sans versionnement, impossible de reproduire un resultat d'analyse, de faire un rollback apres une corruption, ou de comparer deux versions d'un dataset. C'est le chapitre qui distingue les data engineers juniors des seniors.

Pourquoi versionner les donnees ?

Sans versionnement

  • Impossible de reproduire un resultat
  • Pas de rollback apres corruption
  • Pas d'audit des modifications
  • Conflicts de donnees non detectes
  • "Ca marchait hier" sans preuve

Avec versionnement

  • Reproductibilite totale
  • Rollback instantane
  • Historique complet des changements
  • Branching et merging de donnees
  • Audit reglementaire simplifie

DVC (Data Version Control)

DVC est un outil open-source qui etend Git pour versionner les fichiers de donnees et les modeles ML. Il stocke les metadonnees dans Git et les fichiers volumineux dans un stockage distant (S3, GCS, Azure Blob).

Bash - Workflow DVC complet
# Initialiser DVC dans un projet Git existant
git init && dvc init

# Configurer le stockage distant (S3)
dvc remote add -d myremote s3://my-bucket/dvc-store
dvc remote modify myremote region eu-west-1

# Tracker un dataset
dvc add data/raw/transactions_2024.parquet
# Cree: data/raw/transactions_2024.parquet.dvc (metadonnees)
# Ajoute au .gitignore: data/raw/transactions_2024.parquet

# Commiter les metadonnees dans Git
git add data/raw/transactions_2024.parquet.dvc data/raw/.gitignore
git commit -m "feat: add transactions dataset v1"

# Pousser les donnees vers le stockage distant
dvc push

# --- Plus tard: mettre a jour le dataset ---
# (apres modification du fichier parquet)
dvc add data/raw/transactions_2024.parquet
git add data/raw/transactions_2024.parquet.dvc
git commit -m "feat: update transactions dataset v2"
dvc push

# --- Rollback vers la version precedente ---
git checkout HEAD~1 -- data/raw/transactions_2024.parquet.dvc
dvc checkout
# Les donnees v1 sont restaurees !
YAML - dvc.yaml : Pipeline reproductible
stages:
  prepare:
    cmd: python src/prepare.py
    deps:
      - data/raw/transactions_2024.parquet
      - src/prepare.py
    params:
      - prepare.min_amount
      - prepare.date_range
    outs:
      - data/prepared/transactions_clean.parquet

  features:
    cmd: python src/featurize.py
    deps:
      - data/prepared/transactions_clean.parquet
      - src/featurize.py
    params:
      - features.window_size
      - features.aggregations
    outs:
      - data/features/feature_store.parquet

  train:
    cmd: python src/train.py
    deps:
      - data/features/feature_store.parquet
      - src/train.py
    params:
      - train.model_type
      - train.learning_rate
    outs:
      - models/fraud_detector.pkl
    metrics:
      - metrics/scores.json:
          cache: false

lakeFS - Git pour le Data Lake

lakeFS fonctionne comme Git mais directement sur le Data Lake (S3, GCS, Azure). Il permet le branching, committing et merging de donnees a l'echelle du petaoctet.

Python - lakeFS operations
import lakefs_sdk
from lakefs_sdk.client import LakeFSClient

client = LakeFSClient(
    configuration=lakefs_sdk.Configuration(
        host="http://localhost:8000/api/v1",
        username="AKIAIOSFODNN7EXAMPLE",
        password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
    )
)

# Creer un repository
client.repositories_api.create_repository(
    repository_creation={
        "name": "analytics-lake",
        "storage_namespace": "s3://my-bucket/analytics/",
        "default_branch": "main"
    }
)

# Creer une branche pour une experimentation
client.branches_api.create_branch(
    repository="analytics-lake",
    branch_creation={
        "name": "experiment/new-fraud-features",
        "source": "main"
    }
)

# Uploader des donnees sur la branche
with open("data/fraud_features_v2.parquet", "rb") as f:
    client.objects_api.upload_object(
        repository="analytics-lake",
        branch="experiment/new-fraud-features",
        path="features/fraud_features.parquet",
        content=f
    )

# Commiter
client.commits_api.commit(
    repository="analytics-lake",
    branch="experiment/new-fraud-features",
    commit_creation={
        "message": "Add new fraud detection features v2",
        "metadata": {
            "author": "data-team",
            "ticket": "DATA-1234"
        }
    }
)

# Merger vers main apres validation
client.refs_api.merge_into_branch(
    repository="analytics-lake",
    source_ref="experiment/new-fraud-features",
    destination_branch="main"
)

Nessie (Project Nessie)

Nessie apporte le versionnement Git-like directement au niveau du catalogue de donnees (Apache Iceberg, Delta Lake). Contrairement a lakeFS qui travaille au niveau fichier, Nessie versionne les tables.

SQL - Nessie avec Iceberg et Spark
-- Creer une branche pour les modifications
CREATE BRANCH experiment_branch IN nessie FROM main;

-- Travailler sur la branche
USE REFERENCE experiment_branch IN nessie;

-- Modifier les tables sur la branche
ALTER TABLE analytics.fct_revenue
  ADD COLUMN new_metric DOUBLE;

INSERT INTO analytics.fct_revenue
SELECT *, calculate_new_metric(*)
FROM analytics.fct_revenue@main;  -- Reference croisee

-- Valider les resultats
SELECT COUNT(*), AVG(new_metric)
FROM analytics.fct_revenue;

-- Merger vers main
MERGE BRANCH experiment_branch INTO main IN nessie;

Comparaison des outils

CritereDVClakeFSNessie
GranulariteFichierObjet (fichier)Table (catalogue)
Integration GitExtension Git nativeAPI Git-like separeeAPI Git-like separee
Cas d'usage principalML PipelinesData Lake versioningLakehouse versioning
EchelleGB-TBTB-PBTB-PB
BranchingVia GitZero-copy branchesZero-copy branches
FormatTout fichierTout objet S3/GCSIceberg, Delta Lake
Complexite setupFaibleMoyenneElevee

Time Travel Strategies

Le Time Travel permet de requeter les donnees a un point precis dans le temps, sans avoir besoin de snapshots manuels.

SQL - Time Travel dans differents systemes
-- Snowflake Time Travel (jusqu'a 90 jours)
SELECT * FROM fct_revenue
AT(TIMESTAMP => '2024-03-15 10:00:00'::TIMESTAMP);

-- Comparer deux versions
SELECT
    COALESCE(a.revenue_id, b.revenue_id) AS revenue_id,
    a.net_amount AS amount_before,
    b.net_amount AS amount_after,
    b.net_amount - a.net_amount AS delta
FROM fct_revenue AT(OFFSET => -3600) a  -- Il y a 1 heure
FULL OUTER JOIN fct_revenue b
    ON a.revenue_id = b.revenue_id
WHERE a.net_amount != b.net_amount
   OR a.revenue_id IS NULL
   OR b.revenue_id IS NULL;

-- Delta Lake Time Travel (versions illimitees)
SELECT * FROM delta.`/data/fct_revenue`
VERSION AS OF 42;

SELECT * FROM delta.`/data/fct_revenue`
TIMESTAMP AS OF '2024-03-15 10:00:00';

-- Apache Iceberg Time Travel
SELECT * FROM analytics.fct_revenue
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-03-15 10:00:00';

-- Iceberg: lister les snapshots disponibles
SELECT * FROM analytics.fct_revenue.snapshots;

ING Bank - Versionnement de donnees reglementaires

ING Bank, l'une des plus grandes banques europeennes, utilise le versionnement de donnees pour repondre aux exigences reglementaires (BCBS 239, GDPR) :

  • lakeFS en production : chaque rapport reglementaire est genere a partir d'une branche "figee" du data lake, garantissant la reproductibilite
  • Audit trail complet : chaque modification de donnee est tracee avec l'identite de l'auteur, la date et la justification
  • Rollback reglementaire : en cas de detection d'erreur dans un rapport soumis, ING peut retrouver l'etat exact des donnees au moment de la generation
  • Resultat : temps de reponse aux auditeurs passe de semaines a heures, conformite BCBS 239 atteinte a 98%

Cout du versionnement

Le versionnement de donnees a un cout en stockage. lakeFS et Nessie utilisent le "zero-copy branching" (pas de duplication physique), mais DVC duplique les fichiers. Planifiez votre politique de retention : combien de versions conserver et pendant combien de temps.

Quelle est la difference entre DVC et lakeFS ?
DVC est une extension de Git qui versionne des fichiers individuels via des metadonnees (.dvc files) stockees dans Git, avec les fichiers reels dans un stockage distant. lakeFS est un serveur autonome qui fournit une API Git-like directement sur le Data Lake (S3/GCS), avec du zero-copy branching a l'echelle du petaoctet. DVC est ideal pour les projets ML, lakeFS pour le versionnement de Data Lakes entiers.

Quiz - Version Control Data

Quel outil est le mieux adapte pour versionner un Data Lake a l'echelle du petaoctet ?

A) Git LFS
B) DVC
C) lakeFS
D) SVN
La bonne reponse est C. lakeFS est concu specifiquement pour le versionnement de Data Lakes a l'echelle du petaoctet, avec du zero-copy branching directement sur S3/GCS. DVC est mieux adapte aux projets ML de taille moderee.

Quelle est la granularite de versionnement de Nessie ?

A) Fichier
B) Ligne
C) Table (catalogue)
D) Partition
La bonne reponse est C. Nessie travaille au niveau du catalogue de donnees (tables Iceberg ou Delta Lake), pas au niveau fichier comme DVC ou lakeFS. Cela permet de versionner les schemas et les donnees ensemble.

Projet : Pipeline Streaming de Detection de Fraude

90 min Expert

Objectifs du projet

  • Concevoir et implementer un pipeline de detection de fraude en temps reel
  • Integrer Kafka, Flink et un systeme de data quality
  • Mettre en place le monitoring et les alertes
  • Appliquer les principes DataOps au pipeline complet

Ce projet est le point culminant de la Phase 5. Vous allez combiner tout ce que vous avez appris : streaming, data quality, observabilite et DataOps dans un pipeline de detection de fraude realiste. C'est exactement le type de projet que vous aurez en entretien pour un poste de Data Architect senior.

Architecture du pipeline

Architecture Pipeline de Detection de Fraude
  Transactions       Kafka           Flink              Actions
  ============     ========        ========           =========

  POS Terminal ──▢ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                   β”‚ topic: β”‚    β”‚              β”‚    β”‚  Alerting β”‚
  Mobile App  ──▢  β”‚ raw-txn│──▢│  Flink Job   │──▢│  (PagerD) β”‚
                   β”‚        β”‚    β”‚              β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  Web Payment ──▢  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚         β”‚
                                 β”‚ β”‚ Rules    β”‚β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  ATM         ──▢  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚ β”‚ Engine   ││──▢│  Block    β”‚
                   β”‚ topic: β”‚    β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚    β”‚  Card API β”‚
  Schema      ──▢  β”‚ enriched│◀──│              β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  Registry         β”‚ -txn   β”‚    β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚         β”‚
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚ β”‚ ML Model β”‚β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                 β”‚ β”‚ Scoring  ││──▢│  Dashboardβ”‚
                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚    β”‚  Grafana  β”‚
                   β”‚ topic: β”‚    β”‚              β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚ fraud- │◀───│ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”β”‚
                   β”‚ alerts β”‚    β”‚ β”‚ Quality  β”‚β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚ β”‚ Checks   ││──▢│  Data Lakeβ”‚
                                 β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜β”‚    β”‚  (Iceberg)β”‚
                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚              β”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚ topic: │◀───│              β”‚
                   β”‚ metricsβ”‚    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Etape 1 : Configuration Kafka

1.1 - Definir les schemas Avro

Commencez par definir les schemas pour les evenements de transaction. L'utilisation d'un Schema Registry garantit la compatibilite entre producteurs et consommateurs.

JSON - Schema Avro : Transaction
{
  "type": "record",
  "name": "Transaction",
  "namespace": "com.fraud.detection",
  "fields": [
    {"name": "transaction_id", "type": "string"},
    {"name": "card_id", "type": "string"},
    {"name": "customer_id", "type": "string"},
    {"name": "merchant_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "currency", "type": "string", "default": "EUR"},
    {"name": "merchant_category", "type": "string"},
    {"name": "country_code", "type": "string"},
    {"name": "channel", "type": {
      "type": "enum",
      "name": "Channel",
      "symbols": ["POS", "ONLINE", "ATM", "MOBILE"]
    }},
    {"name": "timestamp", "type": {
      "type": "long",
      "logicalType": "timestamp-millis"
    }},
    {"name": "geo_lat", "type": ["null", "double"], "default": null},
    {"name": "geo_lon", "type": ["null", "double"], "default": null}
  ]
}

1.2 - Configurer les topics Kafka

Bash - Creation des topics
# Topic principal : transactions brutes
kafka-topics --create \
  --topic raw-transactions \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete \
  --config min.insync.replicas=2 \
  --bootstrap-server kafka:9092

# Topic : transactions enrichies
kafka-topics --create \
  --topic enriched-transactions \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=2592000000 \
  --bootstrap-server kafka:9092

# Topic : alertes de fraude
kafka-topics --create \
  --topic fraud-alerts \
  --partitions 6 \
  --replication-factor 3 \
  --config retention.ms=-1 \
  --config cleanup.policy=compact \
  --bootstrap-server kafka:9092

# Topic : metriques pipeline
kafka-topics --create \
  --topic pipeline-metrics \
  --partitions 3 \
  --replication-factor 2 \
  --bootstrap-server kafka:9092

Etape 2 : Job Flink de detection de fraude

2.1 - Regles de detection basees sur des fenetres temporelles

Java - FraudDetectionJob.java
public class FraudDetectionJob {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(60000); // Checkpoint toutes les 60s

        // Source : Kafka raw-transactions
        KafkaSource<Transaction> source = KafkaSource
            .<Transaction>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("raw-transactions")
            .setGroupId("fraud-detection-group")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setDeserializer(new TransactionDeserializer())
            .build();

        DataStream<Transaction> transactions = env
            .fromSource(source, WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(
                    Duration.ofSeconds(10))
                .withTimestampAssigner((txn, ts) ->
                    txn.getTimestamp()),
                "Kafka Source");

        // Regle 1: Velocity Check
        // Plus de 5 transactions en 1 minute pour la meme carte
        DataStream<FraudAlert> velocityAlerts = transactions
            .keyBy(Transaction::getCardId)
            .window(TumblingEventTimeWindows.of(
                Time.minutes(1)))
            .process(new VelocityCheckFunction(5));

        // Regle 2: Amount Anomaly
        // Montant > 3x la moyenne des 24 dernieres heures
        DataStream<FraudAlert> amountAlerts = transactions
            .keyBy(Transaction::getCardId)
            .window(SlidingEventTimeWindows.of(
                Time.hours(24), Time.minutes(5)))
            .process(new AmountAnomalyFunction(3.0));

        // Regle 3: Geo Velocity
        // Transaction dans un pays different en < 2h
        DataStream<FraudAlert> geoAlerts = transactions
            .keyBy(Transaction::getCardId)
            .process(new GeoVelocityFunction(
                Duration.ofHours(2)));

        // Union des alertes + deduplication
        velocityAlerts
            .union(amountAlerts, geoAlerts)
            .keyBy(FraudAlert::getTransactionId)
            .process(new AlertDeduplicationFunction())
            .sinkTo(kafkaAlertsSink());

        env.execute("Fraud Detection Pipeline v2.1");
    }
}

2.2 - Fonction de verification de velocite

Java - VelocityCheckFunction.java
public class VelocityCheckFunction
    extends ProcessWindowFunction<
        Transaction, FraudAlert, String, TimeWindow> {

    private final int threshold;

    public VelocityCheckFunction(int threshold) {
        this.threshold = threshold;
    }

    @Override
    public void process(
            String cardId,
            Context context,
            Iterable<Transaction> transactions,
            Collector<FraudAlert> out) {

        List<Transaction> txnList = new ArrayList<>();
        transactions.forEach(txnList::add);

        if (txnList.size() > threshold) {
            double totalAmount = txnList.stream()
                .mapToDouble(Transaction::getAmount)
                .sum();

            FraudAlert alert = FraudAlert.builder()
                .alertId(UUID.randomUUID().toString())
                .transactionId(
                    txnList.get(txnList.size()-1)
                        .getTransactionId())
                .cardId(cardId)
                .ruleTriggered("VELOCITY_CHECK")
                .severity(txnList.size() > threshold * 2
                    ? "CRITICAL" : "HIGH")
                .details(String.format(
                    "%d transactions (%.2f EUR) in 1 min",
                    txnList.size(), totalAmount))
                .timestamp(context.window().getEnd())
                .build();

            out.collect(alert);
        }
    }
}

Etape 3 : Data Quality en temps reel

3.1 - Controles de qualite dans le flux

Java - DataQualityFilter.java
public class DataQualityFilter
    extends ProcessFunction<Transaction, Transaction> {

    // Compteurs de metriques
    private transient Counter validCounter;
    private transient Counter invalidCounter;
    private transient Counter lateCounter;

    // Side output pour les enregistrements invalides
    static final OutputTag<QualityIssue> QUALITY_ISSUES =
        new OutputTag<>("quality-issues"){};

    @Override
    public void open(Configuration params) {
        validCounter = getRuntimeContext()
            .getMetricGroup().counter("valid_transactions");
        invalidCounter = getRuntimeContext()
            .getMetricGroup().counter("invalid_transactions");
        lateCounter = getRuntimeContext()
            .getMetricGroup().counter("late_transactions");
    }

    @Override
    public void processElement(
            Transaction txn,
            Context ctx,
            Collector<Transaction> out) {

        List<String> issues = new ArrayList<>();

        // Controle 1: Champs obligatoires
        if (txn.getTransactionId() == null)
            issues.add("transaction_id is null");
        if (txn.getAmount() <= 0)
            issues.add("amount must be positive: "
                + txn.getAmount());
        if (txn.getCardId() == null)
            issues.add("card_id is null");

        // Controle 2: Fraicheur
        long delay = System.currentTimeMillis()
            - txn.getTimestamp();
        if (delay > 300_000) { // > 5 min de retard
            issues.add("late event: " + delay + "ms");
            lateCounter.inc();
        }

        // Controle 3: Plausibilite
        if (txn.getAmount() > 1_000_000)
            issues.add("suspiciously high amount: "
                + txn.getAmount());

        if (issues.isEmpty()) {
            validCounter.inc();
            out.collect(txn);
        } else {
            invalidCounter.inc();
            ctx.output(QUALITY_ISSUES,
                new QualityIssue(txn, issues));
        }
    }
}

Etape 4 : Monitoring et alertes

4.1 - Dashboard Grafana et metriques

YAML - docker-compose monitoring
version: "3.8"
services:
  prometheus:
    image: prom/prometheus:v2.51.0
    ports: ["9090:9090"]
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana:10.4.0
    ports: ["3000:3000"]
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=secret
    volumes:
      - ./dashboards:/var/lib/grafana/dashboards
      - ./datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml

  alertmanager:
    image: prom/alertmanager:v0.27.0
    ports: ["9093:9093"]
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
YAML - Regles d'alerte Prometheus
groups:
  - name: fraud_pipeline_alerts
    rules:
      - alert: HighFraudRate
        expr: |
          rate(fraud_alerts_total[5m])
          / rate(transactions_total[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Taux de fraude anormalement eleve"
          description: |
            Le taux de fraude depasse 5% sur les 5
            dernieres minutes. Valeur actuelle: {{ $value }}

      - alert: PipelineLatencyHigh
        expr: |
          histogram_quantile(0.99,
            rate(processing_latency_seconds_bucket[5m])
          ) > 2
        for: 3m
        labels:
          severity: warning
        annotations:
          summary: "Latence p99 superieure a 2 secondes"

      - alert: DataQualityDegradation
        expr: |
          rate(invalid_transactions[5m])
          / rate(valid_transactions[5m]) > 0.01
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Plus de 1% de transactions invalides"

      - alert: ConsumerLag
        expr: kafka_consumer_lag > 10000
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Consumer lag depasse 10000 messages"

Etape 5 : Pipeline CI/CD du projet

5.1 - Tests et deploiement automatise

YAML - .github/workflows/fraud-pipeline-ci.yml
name: Fraud Pipeline CI/CD

on:
  pull_request:
    branches: [main]
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      kafka:
        image: confluentinc/cp-kafka:7.6.0
        ports: ["9092:9092"]
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-java@v4
        with:
          java-version: '17'
          distribution: 'temurin'

      - name: Unit Tests
        run: mvn test -pl fraud-detection-core

      - name: Integration Tests
        run: |
          mvn verify -pl fraud-detection-integration \
            -Dkafka.bootstrap.servers=localhost:9092

      - name: Schema Compatibility Check
        run: |
          curl -X POST \
            http://schema-registry:8081/compatibility/subjects/raw-transactions-value/versions/latest \
            -H "Content-Type: application/json" \
            -d @schemas/transaction-v2.avsc

  deploy:
    needs: test
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - name: Deploy to Kubernetes
        run: |
          kubectl apply -f k8s/flink-job.yml
          kubectl rollout status deployment/fraud-detection

Criteres de validation du projet

Pour valider ce projet, votre pipeline doit : (1) traiter au moins 1000 transactions/seconde, (2) detecter les fraudes en moins de 2 secondes (latence p99), (3) avoir un taux de faux positifs inferieur a 5%, (4) inclure des tests automatises et du monitoring, (5) supporter un deploiement blue/green sans perte de donnees.

Pour aller plus loin

Ajoutez un modele de ML (Random Forest ou XGBoost) entraine sur des donnees historiques pour scorer chaque transaction en complement des regles metier. Utilisez le pattern "Champion/Challenger" pour comparer les performances du modele ML avec les regles classiques en production.

Examen Final - Phase 5 : Data Engineering Avance

45 min Expert

Consignes de l'examen

  • 12 questions couvrant l'ensemble de la Phase 5
  • Streaming, Data Quality, Observabilite, DataOps, CI/CD, Testing, Versionnement
  • Prenez le temps de bien lire chaque question avant de repondre
  • Un score de 80% ou plus valide la Phase 5

Cet examen couvre tout ce que vous avez appris dans la Phase 5. Pas de piege : si vous avez bien suivi les lecons et fait les exercices, vous etes pret. Concentrez-vous sur les concepts fondamentaux, pas sur la memorisation de syntaxe. Un Data Architect doit comprendre le "pourquoi" autant que le "comment".

Examen Final - Phase 5

Question 1/12 - Streaming : Quelle est la difference fondamentale entre le traitement "at-least-once" et "exactly-once" dans Kafka ?

A) "At-least-once" est plus rapide que "exactly-once"
B) "Exactly-once" garantit qu'aucun message n'est duplique ni perdu, au prix d'une latence plus elevee
C) "At-least-once" ne fonctionne qu'avec Kafka Streams
D) Il n'y a pas de difference en pratique
La bonne reponse est B. "Exactly-once" utilise des transactions Kafka et l'idempotence des producteurs pour garantir qu'aucun message n'est duplique ni perdu. Cela ajoute de la latence car chaque ecriture doit etre confirmee de maniere transactionnelle.

Question 2/12 - Kafka Architecture : A quoi sert le Schema Registry dans une architecture Kafka ?

A) A stocker les messages Kafka sur disque
B) A gerer les schemas des messages et garantir la compatibilite entre producteurs et consommateurs
C) A equilibrer la charge entre les brokers
D) A chiffrer les messages en transit
La bonne reponse est B. Le Schema Registry stocke et valide les schemas (Avro, Protobuf, JSON Schema) et verifie la compatibilite (backward, forward, full) entre les versions pour eviter les breaking changes.

Question 3/12 - Apache Flink : Quelle est la difference entre une fenetre Tumbling et une fenetre Sliding dans Flink ?

A) Tumbling: fenetres qui se chevauchent. Sliding: fenetres disjointes
B) Tumbling: fenetres de taille fixe disjointes. Sliding: fenetres qui se chevauchent avec un pas de glissement
C) Tumbling fonctionne en batch, Sliding en streaming
D) Tumbling utilise l'event time, Sliding utilise le processing time
La bonne reponse est B. Une fenetre Tumbling (ex: 5 min) cree des fenetres disjointes qui ne se chevauchent pas. Une fenetre Sliding (ex: 5 min avec un pas de 1 min) cree des fenetres qui se chevauchent, permettant une analyse plus fine.

Question 4/12 - Data Quality : Quelles sont les 6 dimensions de la qualite des donnees selon le framework DAMA ?

A) Precision, Vitesse, Volume, Variete, Veracite, Valeur
B) Completude, Unicite, Validite, Coherence, Exactitude, Temporalite
C) Schema, Volume, Fraicheur, Distribution, Lineage, Format
D) Disponibilite, Fiabilite, Performance, Securite, Conformite, Maintenabilite
La bonne reponse est B. Les 6 dimensions DAMA sont : Completude (pas de valeurs manquantes), Unicite (pas de doublons), Validite (format correct), Coherence (regles metier respectees), Exactitude (valeurs correctes), Temporalite (donnees a jour).

Question 5/12 - Data Observability : Quel outil permet de definir des "expectations" sur les donnees de maniere declarative ?

A) Apache Airflow
B) Prometheus
C) Great Expectations
D) Grafana
La bonne reponse est C. Great Expectations est un framework Python qui permet de definir, documenter et valider des "expectations" (assertions) sur les donnees de maniere declarative, par exemple "cette colonne ne doit jamais etre nulle" ou "le nombre de lignes doit etre entre 1000 et 10000".

Question 6/12 - Data Contracts : Quel est le role principal d'un Data Contract entre un producteur et un consommateur ?

A) Remplacer la documentation technique
B) Definir un accord formel sur le schema, la qualite et les SLA des donnees echangees
C) Automatiser les pipelines de transformation
D) Gerer les acces et la securite des donnees
La bonne reponse est B. Un Data Contract est un accord formel entre un producteur et un consommateur de donnees qui specifie le schema, les regles de qualite, les SLA (fraicheur, disponibilite), le format et les conditions de compatibilite.

Question 7/12 - DataOps : Quel est le premier principe du Manifeste DataOps ?

A) Automatiser tout ce qui peut l'etre
B) Mesurer et monitorer en continu
C) Les individus et interactions avant les processus et outils
D) La satisfaction client par la livraison continue
La bonne reponse est C. Comme le Manifeste Agile, le Manifeste DataOps place les individus et les interactions au-dessus des processus et des outils, car le succes du DataOps depend avant tout de la culture et de la collaboration.

Question 8/12 - CI/CD dbt : Que fait la commande dbt build --select state:modified+ --defer --state ./target ?

A) Reconstruit tous les modeles du projet
B) Reconstruit uniquement les modeles modifies et leurs dependances en aval, en utilisant la production pour les modeles non modifies
C) Supprime et recree tous les schemas
D) Compare les performances entre deux versions du projet
La bonne reponse est B. C'est le "Slim CI" : state:modified+ selectionne les modeles modifies et leurs dependances en aval (+), --defer utilise le manifest de production pour les modeles non modifies, et --state pointe vers le manifest de reference.

Question 9/12 - Testing : Dans la pyramide de tests data, quel type de test est le plus rapide et le plus nombreux ?

A) Tests End-to-End
B) Tests de contrat
C) Tests d'integration
D) Tests unitaires
La bonne reponse est D. Les tests unitaires constituent la base de la pyramide : ils sont les plus rapides (pas d'acces base de donnees reel), les plus nombreux et verifient la logique de transformation de maniere isolee.

Question 10/12 - Version Control : Quelle est la principale difference entre DVC et lakeFS ?

A) DVC est payant, lakeFS est gratuit
B) DVC versionne des fichiers via Git, lakeFS fournit une API Git-like directement sur le Data Lake avec zero-copy branching
C) DVC fonctionne uniquement avec AWS, lakeFS avec GCP
D) lakeFS est un remplacement de Git pour le code
La bonne reponse est B. DVC est une extension de Git qui stocke des metadonnees (.dvc files) dans Git et les fichiers dans un stockage distant. lakeFS est un serveur independant qui offre une interface Git-like (branches, commits, merges) directement sur les objets du Data Lake, avec du zero-copy branching pour une efficacite maximale en stockage.

Question 11/12 - Streaming Avance : Dans un pipeline de detection de fraude en temps reel, pourquoi utilise-t-on des "side outputs" dans Flink ?

A) Pour accelerer le traitement des messages
B) Pour router les evenements invalides ou speciaux vers des flux secondaires sans bloquer le flux principal
C) Pour dupliquer les messages entre topics Kafka
D) Pour compresser les donnees en transit
La bonne reponse est B. Les side outputs permettent de separer les evenements selon des criteres (invalides, retardataires, cas speciaux) vers des flux secondaires dedies, sans impacter le flux principal. C'est essentiel pour le data quality en streaming.

Question 12/12 - Architecture globale : Vous devez concevoir un pipeline data pour une banque qui doit traiter 100K transactions/seconde avec une latence maximale de 500ms et repondre aux exigences reglementaires BCBS 239. Quelle combinaison de technologies est la plus appropriee ?

A) Airflow + Spark Batch + PostgreSQL + DVC
B) Kafka + Flink + Iceberg + lakeFS + Great Expectations + Prometheus/Grafana
C) RabbitMQ + Python Scripts + MySQL + Git LFS
D) AWS Lambda + DynamoDB + S3 + CloudWatch
La bonne reponse est B. Pour 100K txn/s avec latence < 500ms : Kafka (ingestion haute performance) + Flink (traitement streaming temps reel) + Iceberg (stockage analytique avec Time Travel pour BCBS 239) + lakeFS (versionnement pour l'audit reglementaire) + Great Expectations (data quality) + Prometheus/Grafana (observabilite). L'option A est batch et ne respecte pas la latence. C n'est pas assez performante. D pourrait fonctionner mais manque de versionnement data pour BCBS 239.

Bilan de la Phase 5

Felicitations ! Vous avez parcouru l'ensemble des sujets avances du Data Engineering :

  • Streaming : Kafka, Flink, Spark Streaming, architectures event-driven
  • Data Quality : dimensions, Great Expectations, Soda, data contracts
  • Observabilite : 5 piliers, monitoring, lineage, anomaly detection
  • DataOps : manifeste, maturite, automatisation, collaboration
  • CI/CD : GitHub Actions, pre-commit hooks, slim CI, blue/green deployments
  • Testing : pyramide de tests, unit, integration, contrat, regression
  • Versionnement : DVC, lakeFS, Nessie, Time Travel

Vous etes maintenant pret pour la Phase 6 : Specialisation !