Introduction au Streaming de Donnees
Objectifs de la lecon
- Comprendre le spectre batch-to-streaming et les differents paradigmes de traitement
- Distinguer les types de latence : event-time, processing-time, ingestion-time
- Comparer les architectures Lambda et Kappa avec leurs compromis
- Identifier les cas d'usage reels necessitant du streaming
Le streaming n'est pas simplement "du batch plus rapide". C'est un changement fondamental de paradigme : au lieu de traiter des donnees au repos, vous traitez des donnees en mouvement. Cette distinction change tout, de la semantique de traitement a la gestion des erreurs.
Le Spectre Batch-to-Streaming
Le traitement de donnees n'est pas binaire entre batch et streaming. Il existe un spectre complet de modes de traitement, chacun avec ses caracteristiques propres :
| Mode | Latence | Complexite | Cas d'usage |
|---|---|---|---|
| Batch classique | Heures / Jours | Faible | Rapports mensuels, ETL nocturne |
| Micro-batch | Secondes / Minutes | Moyenne | Dashboards quasi temps reel |
| Near Real-Time | Millisecondes / Secondes | Elevee | Alertes, monitoring |
| Real-Time | Millisecondes | Tres elevee | Fraude, trading, IoT |
Les Trois Temps du Streaming
Event-time : le moment ou l'evenement s'est reellement produit. C'est le temps metier.
Ingestion-time : le moment ou l'evenement arrive dans le systeme de streaming.
Processing-time : le moment ou l'evenement est traite par votre application.
La difference entre ces temps s'appelle le skew et constitue un defi majeur du streaming.
Architecture Lambda
L'architecture Lambda, proposee par Nathan Marz, combine une couche batch et une couche temps reel pour offrir des vues completes et a faible latence :
+------------------+
| Sources Donnees |
+--------+---------+
|
+--------------+--------------+
| |
+--------v--------+ +--------v--------+
| Batch Layer | | Speed Layer |
| (MapReduce/ | | (Storm/Flink) |
| Spark Batch) | | |
+--------+--------+ +--------+--------+
| |
+--------v--------+ +--------v--------+
| Batch Views | | Real-time Views |
+--------+--------+ +--------+--------+
| |
+--------------+--------------+
|
+--------v--------+
| Serving Layer |
| (Merge Views) |
+-----------------+
Avantages Lambda
- Tolerante aux pannes : le batch corrige les erreurs du speed layer
- Resultats exacts via la couche batch
- Latence faible via la couche speed
Inconvenients Lambda
- Double logique metier a maintenir (batch + stream)
- Complexite operationnelle elevee
- Synchronisation difficile entre les couches
Architecture Kappa
L'architecture Kappa, proposee par Jay Kreps (co-createur de Kafka chez LinkedIn), simplifie en eliminant la couche batch :
+------------------+
| Sources Donnees |
+--------+---------+
|
+--------v---------+
| Stream Layer |
| (Kafka + Flink) |
| |
| Reprocessing = |
| Replay du log |
+--------+----------+
|
+--------v---------+
| Serving Layer |
| (Vues unifiees) |
+-------------------+
Quand choisir Kappa ?
Privilegiez Kappa quand votre log immutable (Kafka) peut servir de source de verite et que vous pouvez retraiter l'historique en rejouant le stream. C'est l'approche dominante dans l'industrie moderne.
Cas d'Usage du Streaming
Scenarios Concrets
- Detection de fraude bancaire : chaque transaction doit etre evaluee en <100ms avant autorisation
- IoT industriel : des milliers de capteurs envoient des metriques en continu, les anomalies doivent declencher des alertes instantanees
- Personnalisation e-commerce : recommandations basees sur le parcours en temps reel de l'utilisateur
- Tarification dynamique : ajustement des prix en fonction de l'offre et la demande en temps reel (Uber Surge Pricing)
LinkedIn - La Genese de Kafka
En 2010, LinkedIn faisait face a un probleme majeur : des centaines de systemes generaient des donnees qui devaient etre consommees par des dizaines d'autres systemes. Le modele point-a-point creait un graphe de N x M connexions ingerables.
Jay Kreps, Neha Narkhede et Jun Rao ont concu Apache Kafka comme un log distribue unifie : chaque producteur ecrit dans un topic, chaque consommateur lit independamment. Le nombre de connexions passe de N x M a N + M.
Resultats : Kafka traite aujourd'hui plus de 7 trillions de messages par jour chez LinkedIn, avec une latence mediane inferieure a 5ms.
Tout en Streaming
L'erreur classique est de vouloir tout traiter en streaming. Si votre rapport mensuel peut attendre 24h, un batch simple sera plus fiable, moins couteux et plus facile a deboguer. Le streaming ajoute de la complexite : ne l'utilisez que quand la latence est un besoin metier reel.
Kafka Architecture
Objectifs de la lecon
- Comprendre l'architecture interne de Kafka : brokers, topics, partitions
- Maitriser les mecanismes de replication et le concept d'ISR
- Comprendre le fonctionnement des consumer groups
- Comparer ZooKeeper et KRaft pour la gestion du cluster
Kafka n'est pas qu'un simple message broker. C'est un log distribue, append-only, concu pour la durabilite et le debit. Comprendre cette nature fondamentale est la cle pour bien l'utiliser et eviter les pieges classiques de configuration.
Concepts Fondamentaux
Kafka = Log Distribue Immutable
Un topic Kafka est un log append-only partitionne. Chaque message recoit un offset sequentiel unique au sein de sa partition. Les messages ne sont jamais modifies ni supprimes (avant expiration du retention). Cette immutabilite est la base de toute l'architecture.
Producers Kafka Cluster Consumers
========= ================================ =========
+-------+ Broker 1 Broker 2 +-------+
| App A +-------> +-----------+ +-----------+ +--------> | App X |
+-------+ | Topic T | | Topic T | | +-------+
| Part 0 (L)| | Part 0 (F)| |
+-------+ | Part 2 (F)| | Part 1 (L)| | +-------+
| App B +-------> +-----------+ +-----------+ +--------> | App Y |
+-------+ | +-------+
Broker 3 |
+-----------+ | +-------+
| Topic T | +--------> | App Z |
| Part 1 (F)| +-------+
| Part 2 (L)|
+-----------+
(L)=Leader (F)=Follower
Brokers et Partitions
Un cluster Kafka est compose de brokers (serveurs). Chaque topic est divise en partitions distribuees sur les brokers. Les partitions sont l'unite de parallelisme : plus de partitions = plus de throughput potentiel.
# Creer un topic avec 6 partitions et facteur de replication 3 kafka-topics.sh --bootstrap-server localhost:9092 \ --create --topic commandes \ --partitions 6 \ --replication-factor 3 # Decrire un topic existant kafka-topics.sh --bootstrap-server localhost:9092 \ --describe --topic commandes # Resultat typique : # Topic: commandes Partitions: 6 ReplicationFactor: 3 # Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 # Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 # ...
Replication et ISR
Chaque partition a un leader et des followers. Toutes les lectures et ecritures passent par le leader. Les followers repliquent le log de maniere asynchrone.
In-Sync Replicas (ISR)
L'ISR est l'ensemble des replicas qui sont "a jour" avec le leader (dans la limite de replica.lag.time.max.ms, par defaut 30s). Un message n'est considere comme committed que lorsqu'il a ete replique sur tous les ISR. Si un follower prend du retard, il sort de l'ISR. Le parametre min.insync.replicas definit le nombre minimum d'ISR requis pour accepter une ecriture.
Producer --acks=all---> Partition 0 Leader (Broker 1)
|
+----------+----------+
| |
Follower (Broker 2) Follower (Broker 3)
[ISR: oui] [ISR: oui]
min.insync.replicas=2 : il faut au moins 2 ISR
pour que l'ecriture soit acceptee avec acks=all
Si Broker 3 tombe :
- ISR = {Broker 1, Broker 2} -> ecriture OK (2 >= 2)
Si Broker 2 tombe aussi :
- ISR = {Broker 1} -> ecriture REFUSEE (1 < 2)
- NotEnoughReplicasException
Consumer Groups
Les consumer groups permettent le traitement parallele. Chaque partition est assignee a exactement un consumer du groupe. Si un consumer tombe, ses partitions sont redistribuees aux survivants (rebalancing).
Topic "commandes" (6 partitions) Consumer Group "payment-service" : +------------+ +------------+ +------------+ | Consumer 1 | | Consumer 2 | | Consumer 3 | | P0, P1 | | P2, P3 | | P4, P5 | +------------+ +------------+ +------------+ Consumer Group "analytics-service" : +------------+ +------------+ | Consumer A | | Consumer B | | P0, P1, P2 | | P3, P4, P5 | +------------+ +------------+ Regle : #consumers <= #partitions dans un groupe Un consumer en trop sera idle (inactif)
ZooKeeper vs KRaft
ZooKeeper (legacy)
- Service externe pour les metadonnees du cluster
- Election du controller via ZooKeeper
- Limite a ~200K partitions par cluster
- Composant supplementaire a operer et monitorer
- Deprecie depuis Kafka 3.3
KRaft (moderne)
- Metadonnees gerees nativement par Kafka via Raft
- Quorum controller integre
- Supporte des millions de partitions
- Demarrage plus rapide, architecture simplifiee
- Mode par defaut depuis Kafka 3.6+
# Configuration KRaft (server.properties) process.roles=broker,controller node.id=1 controller.quorum.voters=1@broker1:9093,2@broker2:9093,3@broker3:9093 listeners=PLAINTEXT://:9092,CONTROLLER://:9093 log.dirs=/var/kafka-logs metadata.log.dir=/var/kafka-metadata
LinkedIn - Scaling Kafka a l'Extreme
LinkedIn opere plus de 100 clusters Kafka avec des milliers de brokers. Leur plus grand cluster gere plus de 100 000 partitions. La migration vers KRaft a ete motivee par les limitations de ZooKeeper : au-dela de 200K partitions, le temps de recovery apres panne d'un controller depassait plusieurs minutes. Avec KRaft, ce temps est reduit a quelques secondes.
Lecon cle : Le nombre de partitions est le levier principal de scalabilite, mais chaque partition a un cout en memoire et en temps de recovery. Dimensionnez selon votre throughput cible, pas "au cas ou".
Trop de Partitions
Creer des centaines de partitions "par precaution" est un anti-pattern courant. Chaque partition consomme de la memoire (file descriptors, index segments), augmente le temps de rebalancing des consumers, et complexifie le monitoring. Commencez avec nombre de partitions = 2x le nombre de consumers et augmentez si necessaire. On ne peut pas reduire le nombre de partitions d'un topic existant.
Ordonnancement
L'ordre des messages n'est garanti que au sein d'une meme partition. Si l'ordre global est critique, utilisez une seule partition (au detriment du parallelisme) ou une cle de partitionnement pertinente pour regrouper les messages lies dans la meme partition.
Quiz rapide
Quel parametre garantit qu'un message est ecrit sur tous les ISR avant d'etre confirme au producer ?
acks=all (ou acks=-1) attend la confirmation de tous les ISR. min.insync.replicas definit combien d'ISR sont necessaires.Kafka Producers & Consumers
Objectifs de la lecon
- Configurer un producer Kafka avec les bons niveaux de garantie (acks, idempotence)
- Comprendre la semantique exactly-once de bout en bout
- Maitriser la gestion des offsets cote consumer
- Gerer le rebalancing et ses impacts sur les applications
La configuration du producer et du consumer est souvent negligee par les developpeurs. Pourtant, c'est la que se joue la difference entre un systeme qui perd des messages, qui en duplique, ou qui offre des garanties exactly-once. Chaque parametre a un impact direct sur la fiabilite et les performances.
Producer : Niveaux de Garantie
| Parametre acks | Comportement | Durabilite | Latence |
|---|---|---|---|
acks=0 | Fire and forget, pas d'attente de confirmation | Perte possible | Tres faible |
acks=1 | Confirmation du leader uniquement | Perte si le leader tombe avant replication | Faible |
acks=all | Confirmation de tous les ISR | Maximale (avec min.insync.replicas >= 2) | Plus elevee |
Idempotence du Producer
Sans idempotence, un retry apres timeout reseau peut causer des doublons. L'idempotence garantit que chaque message est ecrit exactement une fois dans sa partition, meme en cas de retry.
// Configuration d'un producer idempotent
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Idempotence : elimine les doublons en cas de retry
props.put("enable.idempotence", true); // implicitement acks=all
props.put("max.in.flight.requests.per.connection", 5); // max 5 avec idempotence
props.put("retries", Integer.MAX_VALUE);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Envoi avec callback pour gestion d'erreur
producer.send(new ProducerRecord<>("commandes", orderId, orderJson),
(metadata, exception) -> {
if (exception != null) {
log.error("Echec envoi commande {}: {}", orderId, exception.getMessage());
// Logique de compensation
} else {
log.info("Commande {} envoyee - partition:{} offset:{}",
orderId, metadata.partition(), metadata.offset());
}
});
Comment fonctionne l'idempotence ?
Kafka attribue un Producer ID (PID) et un sequence number a chaque message. Le broker rejette tout message dont le sequence number a deja ete vu pour ce PID+partition. Ainsi, meme si le producer renvoie un message apres un timeout, le broker le detecte comme doublon et repond OK sans l'ecrire une seconde fois.
Exactly-Once Semantics (EOS)
L'idempotence protege contre les doublons au sein d'une partition. Pour une garantie exactly-once de bout en bout (lire depuis un topic, traiter, ecrire dans un autre topic), il faut les transactions Kafka :
// Producer transactionnel pour exactly-once
props.put("transactional.id", "payment-processor-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
// Lire les messages d'entree
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// Traitement metier
String result = processPayment(record.value());
// Ecriture du resultat dans le topic de sortie
producer.send(new ProducerRecord<>("paiements-valides", record.key(), result));
}
// Commit des offsets ET de la production dans une seule transaction
producer.sendOffsetsToTransaction(
getOffsetsToCommit(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
Consumer : Gestion des Offsets
L'offset est la position du consumer dans chaque partition. Kafka offre trois strategies de gestion :
Auto-commit (defaut)
enable.auto.commit=true- Commit automatique toutes les 5s
- Risque : perte de messages si crash entre poll et commit
- Risque : doublons si crash entre traitement et commit
Commit manuel
enable.auto.commit=falsecommitSync()oucommitAsync()- Controle total sur quand le commit se fait
- Commit apres traitement complet = at-least-once
// Consumer avec commit manuel - at-least-once semantics
Properties cProps = new Properties();
cProps.put("bootstrap.servers", "broker1:9092");
cProps.put("group.id", "payment-service");
cProps.put("enable.auto.commit", false);
cProps.put("auto.offset.reset", "earliest"); // latest | earliest | none
cProps.put("isolation.level", "read_committed"); // pour EOS
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
consumer.subscribe(Arrays.asList("commandes"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record); // Traitement metier
}
consumer.commitSync(); // Commit apres traitement complet
}
Rebalancing des Consumers
Impact du Rebalancing
Un rebalancing se declenche quand un consumer rejoint ou quitte un groupe, ou quand les partitions changent. Pendant un rebalancing, aucun consumer du groupe ne traite de messages. Cela peut durer de quelques secondes a plusieurs minutes selon la taille du groupe. Strategies pour minimiser l'impact :
- Static group membership :
group.instance.idevite un rebalance lors de redemarrages courts - Cooperative rebalancing :
partition.assignment.strategy=CooperativeStickyAssignorne reassigne que les partitions necessaires - Heartbeat tuning : ajustez
session.timeout.msetheartbeat.interval.ms
Incident : Rebalancing en Cascade chez un E-Commerce
Un retailer en ligne avait 50 consumers dans un groupe traitant les commandes. Un deploiement rolling restart a declenche un rebalancing a chaque redemarrage de consumer. Avec session.timeout.ms=10s par defaut, chaque rebalance prenait ~45 secondes.
Resultat : 50 rebalancings consecutifs = plus de 30 minutes sans traitement de commandes pendant le Black Friday.
Solution : Passage au CooperativeStickyAssignor et utilisation de group.instance.id unique par instance, reduisant le temps de rebalancing a moins de 2 secondes par instance.
Ignorer les Erreurs de Deserialization
Un message mal forme (schema incompatible, corruption) bloque le consumer en boucle infinie par defaut. Configurez toujours un DeserializationExceptionHandler ou utilisez un dead letter topic pour isoler les messages problematiques sans bloquer le pipeline entier.
enable.idempotence=true) garantit qu'un message n'est ecrit qu'une fois dans une partition, meme en cas de retry. Les transactions etendent cette garantie a des operations multi-partitions et multi-topics : lire d'un topic, traiter, ecrire dans un autre, et committer les offsets, le tout de maniere atomique (tout ou rien).Kafka Connect & Schema Registry
Objectifs de la lecon
- Configurer des connecteurs source et sink avec Kafka Connect
- Appliquer des Single Message Transforms (SMT) et gerer les dead letter queues
- Utiliser le Schema Registry pour gouverner les schemas Avro, Protobuf et JSON Schema
- Comprendre les modes de compatibilite et leur impact sur l'evolution des schemas
Kafka Connect et le Schema Registry sont les deux piliers souvent negliges de l'ecosysteme Kafka. Connect elimine le code boilerplate d'integration, et le Schema Registry previent les ruptures de contrat entre producteurs et consommateurs. Ensemble, ils transforment Kafka d'un simple bus de messages en une plateforme d'integration d'entreprise.
Kafka Connect : Architecture
Sources Externes Kafka Connect Cluster Kafka Destinations
================ ====================================== ======= ================
+------------+ +----------------------------------+ +------------+
| PostgreSQL |----->| Worker 1 | | Elastic |
+------------+ | [JDBC Source Task 0] | +------+ | Search |
| [JDBC Source Task 1] |---->|Topics|-->| |
+------------+ +----------------------------------+ +------+ +------------+
| MongoDB |----->| |
+------------+ | Worker 2 | +------------+
| [MongoDB Source Task 0] | | S3 Bucket |
+------------+ | [S3 Sink Task 0] |<----|Topics|-->| |
| API REST |----->| [Elasticsearch Sink Task 0] | +------+ +------------+
+------------+ +----------------------------------+
Concepts Cles de Connect
Worker : processus JVM qui execute les connecteurs. Mode standalone (dev) ou distribue (prod).
Connector : definition logique d'une integration (config JSON).
Task : unite d'execution parallelisable. Un connecteur peut avoir N tasks.
Converter : serialise/deserialise les donnees (Avro, JSON, Protobuf).
SMT : transformation legere appliquee message par message.
{
"name": "postgres-source-commandes",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "db-prod.internal",
"database.port": "5432",
"database.user": "cdc_reader",
"database.password": "${vault:secret/kafka/db-password}",
"database.dbname": "ecommerce",
"table.include.list": "public.commandes,public.lignes_commande",
"topic.prefix": "cdc.ecommerce",
"plugin.name": "pgoutput",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "route,unwrap,addTimestamp",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "cdc\\.ecommerce\\.(.*)",
"transforms.route.replacement": "streaming.$1",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-postgres-commandes",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true
}
}
Single Message Transforms (SMT)
Les SMTs permettent de transformer les messages au vol sans ecrire de code. Voici les SMTs les plus utiles :
| SMT | Usage | Exemple |
|---|---|---|
RegexRouter | Renommer les topics | Rerouter cdc.db.table vers stream.table |
ExtractField | Extraire un champ | Extraire uniquement la cle depuis la valeur |
InsertField | Ajouter des champs | Ajouter un timestamp de traitement |
MaskField | Masquer des donnees | Masquer email, telephone (RGPD) |
TimestampConverter | Convertir les dates | Unix timestamp vers ISO 8601 |
Filter (Predicate) | Filtrer les messages | Ignorer les messages de type DELETE |
Dead Letter Queue (DLQ)
Configurez TOUJOURS une dead letter queue en production. Sans DLQ, un seul message mal forme peut bloquer tout votre pipeline. Avec errors.tolerance=all et une DLQ, les messages en erreur sont isoles pour investigation ulterieure sans impacter le flux principal. Activez context.headers.enable=true pour capturer la cause de l'erreur dans les headers du message DLQ.
Schema Registry
Le Schema Registry est un service centralise qui stocke et valide les schemas des messages Kafka. Il garantit que producteurs et consommateurs parlent le meme "langage".
Producer Schema Registry Consumer
======== =============== ========
1. Serialiser message ---> 2. Verifier compatibilite
avec schema v2 du schema v2 vs v1
|
+--compatible--> 3. Enregistrer schema v2
| retourner schema_id=42
+--incompatible--> ERREUR: schema rejete
4. Envoyer message 5. Consumer recoit
[magic_byte][schema_id] message, extrait
[payload_avro] schema_id=42
6. Recuperer schema v2 ----> 7. Deserialiser
(cache local) avec schema v2
Formats de Serialisation
Apache Avro
- Format binaire compact, schema integre
- Evolution de schema native
- Le plus utilise avec Kafka
- Support excellent dans l'ecosysteme Java/JVM
Protobuf / JSON Schema
- Protobuf : plus performant qu'Avro, genere du code type-safe, populaire en gRPC
- JSON Schema : lisible par l'humain, facile a deboguer, mais plus volumineux
- Choix selon l'ecosysteme existant
{
"type": "record",
"name": "Commande",
"namespace": "com.ecommerce.events",
"fields": [
{"name": "commande_id", "type": "string"},
{"name": "client_id", "type": "string"},
{"name": "montant", "type": {"type": "bytes", "logicalType": "decimal", "precision": 10, "scale": 2}},
{"name": "devise", "type": {"type": "enum", "name": "Devise", "symbols": ["EUR", "USD", "GBP"]}},
{"name": "statut", "type": "string"},
{"name": "created_at", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "commentaire", "type": ["null", "string"], "default": null}
]
}
Modes de Compatibilite
| Mode | Regle | Cas d'usage |
|---|---|---|
BACKWARD (defaut) | Le nouveau schema peut lire les anciens messages | Consumers mis a jour en premier |
FORWARD | L'ancien schema peut lire les nouveaux messages | Producers mis a jour en premier |
FULL | Backward + Forward | Mise a jour independante producteur/consommateur |
NONE | Pas de verification | Developpement uniquement ! |
Regles d'Or de l'Evolution de Schema
En mode BACKWARD (le plus courant) : vous pouvez ajouter des champs optionnels (avec default), supprimer des champs qui ont un default. Vous ne pouvez PAS renommer des champs, changer le type d'un champ, ou supprimer un champ sans default. Violer ces regles = messages illisibles en production.
Schema Registry en Mode NONE en Production
Desactiver la validation de compatibilite en production est une bombe a retardement. Un producteur peut publier un changement de schema incompatible qui cassera silencieusement tous les consommateurs. Toujours utiliser au minimum BACKWARD en production et FULL quand possible.
Kafka Streams & ksqlDB
Objectifs de la lecon
- Distinguer KStream (flux d'evenements) et KTable (table de changements)
- Maitriser le windowing (tumbling, hopping, session, sliding)
- Comprendre les joins entre streams et tables avec les state stores
- Ecrire des requetes ksqlDB : materialized views, pull et push queries
Kafka Streams est la bibliotheque de stream processing la plus elegante de l'ecosysteme : pas de cluster separe a gerer, juste une dependance Java. ksqlDB democratise le streaming en le rendant accessible via SQL. Ensemble, ils couvrent 80% des cas de stream processing sans la complexite d'un Flink ou Spark Streaming.
KStream vs KTable
Dualite Stream-Table
Un KStream est un flux infini d'evenements immutables (chaque record est un fait). Une KTable est un changelog : seule la derniere valeur par cle est conservee (upsert). Cette dualite est fondamentale :
- Stream : "l'utilisateur X a clique sur le produit Y a 14h03" (chaque clic compte)
- Table : "le solde du compte X est 1500 EUR" (seule la derniere valeur importe)
Un stream peut devenir une table (via aggregation), et une table peut devenir un stream (via changelog).
// Kafka Streams - Exemple complet de traitement de commandes
StreamsBuilder builder = new StreamsBuilder();
// KStream : flux de commandes (chaque commande est un evenement)
KStream<String, Commande> commandes = builder.stream(
"commandes",
Consumed.with(Serdes.String(), commandeSerde)
);
// KTable : table des clients (derniere info connue par client)
KTable<String, Client> clients = builder.table(
"clients",
Consumed.with(Serdes.String(), clientSerde)
);
// Enrichir chaque commande avec les infos client (stream-table join)
KStream<String, CommandeEnrichie> commandesEnrichies = commandes
.selectKey((key, commande) -> commande.getClientId())
.join(clients,
(commande, client) -> new CommandeEnrichie(commande, client)
);
// Aggregation : chiffre d'affaires par categorie sur fenetre de 1 heure
KTable<Windowed<String>, Double> caParCategorie = commandesEnrichies
.groupBy((key, ce) -> ce.getCategorie(),
Grouped.with(Serdes.String(), commandeEnrichieSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.aggregate(
() -> 0.0,
(categorie, ce, total) -> total + ce.getMontant(),
Materialized.<String, Double, WindowStore<Bytes, byte[]>>as("ca-par-categorie")
.withValueSerde(Serdes.Double())
);
// Ecrire le resultat dans un topic de sortie
caParCategorie.toStream()
.map((windowedKey, montant) -> KeyValue.pair(windowedKey.key(), montant))
.to("ca-par-categorie-hourly", Produced.with(Serdes.String(), Serdes.Double()));
Windowing
Temps ---> |--1--|--2--|--3--|--4--|--5--|--6--|
TUMBLING (fixe, sans overlap) - ofSize(2)
|=====|=====|=====|
[1, 2] [3, 4] [5, 6]
HOPPING (fixe, avec overlap) - ofSize(3), advance(1)
|========|
|========|
|========|
[1,2,3] [2,3,4] [3,4,5] [4,5,6]
SESSION (dynamique, basee sur l'activite) - inactivityGap(2)
|===| |=======| |===|
[1,2] [4,5,6] [9]
(gap >= 2 entre sessions)
SLIDING (declenchee par evenement) - ofSize(3), grace(0)
Fenetre recalculee a chaque evenement entrant
State Stores
Gestion de l'Etat dans Kafka Streams
Les aggregations et joins necessitent un etat local. Kafka Streams utilise RocksDB par defaut comme state store local, et sauvegarde les changements dans des changelog topics Kafka pour la tolerance aux pannes. En cas de redemarrage, l'etat est restaure depuis le changelog topic. Le state store est aussi queriable via les Interactive Queries, permettant d'exposer l'etat via une API REST.
ksqlDB : SQL sur les Streams
-- Creer un stream depuis un topic Kafka
CREATE STREAM commandes_stream (
commande_id VARCHAR KEY,
client_id VARCHAR,
montant DECIMAL(10,2),
categorie VARCHAR,
created_at TIMESTAMP
) WITH (
KAFKA_TOPIC = 'commandes',
VALUE_FORMAT = 'AVRO',
TIMESTAMP = 'created_at'
);
-- Vue materialisee : total par client (KTable)
CREATE TABLE total_par_client AS
SELECT client_id,
COUNT(*) AS nb_commandes,
SUM(montant) AS total_depense,
MAX(created_at) AS derniere_commande
FROM commandes_stream
GROUP BY client_id
EMIT CHANGES;
-- Push query : flux continu de mises a jour (long-running)
SELECT * FROM total_par_client EMIT CHANGES;
-- Pull query : etat actuel ponctuel (request/response)
SELECT * FROM total_par_client WHERE client_id = 'CLI-42';
-- Detection d'anomalie : clients avec plus de 10 commandes en 5 min
CREATE TABLE alertes_fraude AS
SELECT client_id,
COUNT(*) AS nb_commandes,
SUM(montant) AS total
FROM commandes_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY client_id
HAVING COUNT(*) > 10
EMIT CHANGES;
Push Queries
- Syntaxe :
SELECT ... EMIT CHANGES - Flux continu de resultats
- Comme un abonnement temps reel
- Utilise pour les dashboards, alertes
Pull Queries
- Syntaxe :
SELECT ... WHERE key = ... - Reponse unique, instantanee
- Interroge l'etat materialise
- Utilise pour les APIs, lookups
Uber - Tarification Dynamique en Temps Reel
Uber utilise Kafka Streams pour calculer le "surge pricing" en temps reel. Le systeme traite des millions d'evenements par seconde : positions GPS des chauffeurs, demandes de courses, conditions de trafic.
L'architecture repose sur des KTables pour l'etat courant (offre/demande par zone geographique) et des fenetres glissantes pour detecter les tendances. Le multiplicateur de prix est recalcule toutes les 30 secondes par zone.
Pourquoi Kafka Streams ? Pas de cluster separe a gerer, deploiement en tant que microservice standard, scaling horizontal en ajoutant des instances.
Etat Non Borne (Unbounded State)
Creer une aggregation sans fenetre temporelle sur une cle a haute cardinalite (ex: GROUP BY user_id sans fenetre) fait grossir le state store indefiniment. La memoire et le disque augmentent jusqu'a saturation. Toujours utiliser des fenetres temporelles avec une retention explicite, ou s'assurer que la cardinalite des cles est bornee.
EMIT CHANGES) quand vous avez besoin d'un flux continu de mises a jour, par exemple pour alimenter un dashboard temps reel ou declencher des alertes. Utilisez une pull query quand vous avez besoin d'interroger l'etat actuel ponctuellement, comme un appel API pour obtenir le solde d'un client. Les pull queries sont comme un SELECT classique, les push queries comme un abonnement.Apache Flink
Objectifs de la lecon
- Comprendre l'architecture de Flink : JobManager, TaskManager, slots
- Maitriser la DataStream API avec les operateurs de transformation et windowing
- Configurer le checkpointing et les savepoints pour la tolerance aux pannes
- Utiliser les watermarks pour gerer l'event-time et les donnees en retard
- Decouvrir le Complex Event Processing (CEP) et la Table API
Flink est le moteur de stream processing le plus puissant du marche. Contrairement a Spark qui a ajoute le streaming a posteriori (micro-batch), Flink a ete concu des le depart pour le streaming natif. Son modele de checkpointing (inspire de Chandy-Lamport) offre des garanties exactly-once sans sacrifier les performances.
Architecture de Flink
Client (CLI / REST)
|
v
+--------------------+
| JobManager | Coordonne l'execution
| +---------------+ | - Scheduling des taches
| | Dispatcher | | - Checkpointing
| | ResourceMgr | | - Recovery
| | JobMaster | |
+----+-------------+--+
| |
+----v----+ +----v----+
|TaskMgr 1| |TaskMgr 2| Executent les operateurs
| Slot 0 | | Slot 0 |
| Slot 1 | | Slot 1 | Slot = unite de ressource
+---------+ +---------+ (1 thread, memoire dediee)
Source --> Map --> KeyBy --> Window --> Sink
[-------- Pipeline parallelise sur les slots --------]
DataStream API
// Flink DataStream - Detection de fraude en temps reel
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration du checkpointing (exactly-once)
env.enableCheckpointing(60_000); // checkpoint toutes les 60s
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30_000);
env.getCheckpointConfig().setCheckpointTimeout(120_000);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Source Kafka
KafkaSource<Transaction> source = KafkaSource.<Transaction>builder()
.setBootstrapServers("broker1:9092")
.setTopics("transactions")
.setGroupId("fraud-detector")
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setDeserializer(new TransactionDeserializer())
.build();
DataStream<Transaction> transactions = env.fromSource(
source,
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner((tx, ts) -> tx.getTimestamp()),
"Kafka Source"
);
// Traitement : detecter les transactions suspectes
DataStream<Alerte> alertes = transactions
.keyBy(Transaction::getCompteId)
.window(SlidingEventTimeWindows.of(
Time.minutes(5), // taille de fenetre
Time.minutes(1) // slide
))
.process(new DetectionFraudeFunction());
// Sink : ecrire les alertes
alertes.sinkTo(
KafkaSink.<Alerte>builder()
.setBootstrapServers("broker1:9092")
.setRecordSerializer(new AlerteSerializer("alertes-fraude"))
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build()
);
env.execute("Fraud Detection Pipeline");
Watermarks et Event-Time
Comprendre les Watermarks
Un watermark est un signal qui dit : "tous les evenements avec un timestamp <= W sont arrives". Il permet a Flink de savoir quand fermer une fenetre temporelle, meme si les evenements arrivent en desordre.
forBoundedOutOfOrderness(Duration.ofSeconds(10)): tolere jusqu'a 10s de retard- Les evenements arrivant apres le watermark sont consideres "en retard" (late events)
- Strategy
allowedLateness(): garde la fenetre ouverte plus longtemps pour les retardataires - Side output : redirige les late events vers un flux separe pour traitement ulterieur
Evenements arrivant : t=3 t=1 t=7 t=5 t=4 t=12 t=8 t=15
Avec BoundedOutOfOrderness(5s) :
Temps reel : ----+----+----+----+----+-----+----+------->
3 1 7 5 4 12 8 15
Watermarks : -2 -2 2 2 2 7 7 10
(max_seen - 5 = watermark)
Fenetre [0, 10) :
- Se ferme quand watermark >= 10
- Inclut : t=3, t=1, t=7, t=5, t=4, t=8
- t=12 est dans la fenetre suivante
Si t=6 arrive apres watermark=10 :
-> Late event ! Redirige vers side output
Checkpointing vs Savepoints
Checkpoints
- Automatiques et periodiques
- Geres par Flink en interne
- Pour la recovery apres panne
- Supprimes quand le job s'arrete proprement
- Stockage : HDFS, S3, GCS
Savepoints
- Declenches manuellement par l'operateur
- Portables entre versions de Flink
- Pour les mises a jour de code, migrations
- Conserves indefiniment
- Permettent de redemarrer avec un code modifie
# Creer un savepoint avant une mise a jour flink savepoint :jobId s3://checkpoints/savepoints/ # Arreter le job avec savepoint flink stop --savepointPath s3://checkpoints/savepoints/ :jobId # Redemarrer depuis le savepoint avec le nouveau code flink run -s s3://checkpoints/savepoints/savepoint-abc123 \ -c com.example.FraudDetection \ fraud-detection-v2.jar
Complex Event Processing (CEP)
// CEP : detecter un pattern de fraude
// Pattern : 3 transactions > 500 EUR en moins de 10 minutes
Pattern<Transaction, ?> fraudPattern = Pattern
.<Transaction>begin("premier")
.where(new SimpleCondition<Transaction>() {
public boolean filter(Transaction tx) {
return tx.getMontant() > 500;
}
})
.next("deuxieme")
.where(new SimpleCondition<Transaction>() {
public boolean filter(Transaction tx) {
return tx.getMontant() > 500;
}
})
.next("troisieme")
.where(new SimpleCondition<Transaction>() {
public boolean filter(Transaction tx) {
return tx.getMontant() > 500;
}
})
.within(Time.minutes(10));
PatternStream<Transaction> patternStream = CEP.pattern(
transactions.keyBy(Transaction::getCompteId),
fraudPattern
);
DataStream<Alerte> alertes = patternStream.select(
(Map<String, List<Transaction>> pattern) -> {
Transaction t1 = pattern.get("premier").get(0);
Transaction t3 = pattern.get("troisieme").get(0);
return new Alerte(t1.getCompteId(), "3 transactions > 500 EUR en 10min",
t1.getMontant() + t3.getMontant());
});
Netflix - Traitement d'Evenements a l'Echelle
Netflix utilise Flink pour traiter des milliards d'evenements par jour : lectures de films, metriques de qualite de streaming, interactions utilisateur. Leur pipeline Flink alimente le systeme de recommandation en quasi-temps reel.
Un cas specifique : la detection de degradation de qualite video. Les evenements de buffering sont agreges par region et par CDN en fenetres glissantes de 5 minutes. Si le taux de buffering depasse un seuil, le trafic est automatiquement reroute vers un autre CDN en moins de 30 secondes.
Cle de succes : Les savepoints Flink permettent a Netflix de deployer des mises a jour de leurs pipelines plusieurs fois par jour sans perte de donnees ni interruption.
Ignorer la Backpressure
Flink propage naturellement la backpressure : si un operateur est lent, les operateurs en amont ralentissent automatiquement. Ignorer les metriques de backpressure mene a des pipelines qui semblent fonctionner mais accumulent du retard silencieusement. Monitorez isBackPressured sur chaque operateur et dimensionnez le parallelisme en consequence.
Spark Streaming & Apache Beam
Objectifs de la lecon
- Comprendre Structured Streaming de Spark et son modele micro-batch
- Distinguer micro-batch et continuous processing dans Spark
- Maitriser le modele unifie d'Apache Beam : PCollections, PTransforms, Runners
- Choisir le bon moteur de streaming selon le contexte projet
Si Flink est le roi du streaming natif, Spark domine l'ecosysteme batch et analytique. Structured Streaming unifie batch et streaming dans Spark avec une API identique. Apache Beam va encore plus loin : ecrire une fois, executer sur n'importe quel moteur. Le choix entre ces outils depend de votre ecosysteme existant plus que de capacites techniques pures.
Spark Structured Streaming
Le Modele Micro-Batch
Structured Streaming traite un flux de donnees comme une table infinie en appendition. Chaque micro-batch ajoute de nouvelles lignes a cette table et re-execute la requete. Le modele est identique a une requete SQL classique, mais executee incrementalement. La latence typique est de 100ms a quelques secondes selon la configuration du trigger.
# Structured Streaming - Pipeline de traitement de commandes
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("Commandes Streaming") \
.config("spark.sql.streaming.checkpointLocation", "s3://checkpoints/commandes") \
.getOrCreate()
# Schema des commandes
schema_commande = StructType([
StructField("commande_id", StringType()),
StructField("client_id", StringType()),
StructField("montant", DoubleType()),
StructField("categorie", StringType()),
StructField("timestamp", TimestampType())
])
# Source : lire depuis Kafka
commandes_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092") \
.option("subscribe", "commandes") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), schema_commande).alias("data")) \
.select("data.*")
# Transformation : CA par categorie avec fenetre de 10 minutes
ca_par_categorie = commandes_df \
.withWatermark("timestamp", "5 minutes") \
.groupBy(
window("timestamp", "10 minutes", "5 minutes"),
"categorie"
) \
.agg(
count("*").alias("nb_commandes"),
sum("montant").alias("chiffre_affaires"),
avg("montant").alias("panier_moyen")
)
# Sink : ecrire dans Delta Lake
query = ca_par_categorie.writeStream \
.format("delta") \
.outputMode("update") \
.option("checkpointLocation", "s3://checkpoints/ca-categorie") \
.trigger(processingTime="30 seconds") \
.start("s3://datalake/gold/ca_par_categorie")
Modes de Sortie et Triggers
| Output Mode | Description | Usage |
|---|---|---|
append | Uniquement les nouvelles lignes | Transformations sans aggregation |
update | Lignes modifiees depuis le dernier batch | Aggregations avec upsert |
complete | Toute la table de resultats | Petites aggregations sans fenetre |
| Trigger | Latence | Usage |
|---|---|---|
processingTime("30s") | ~30s | Dashboards quasi temps reel |
once() | N/A (batch) | Rattrapage ponctuel |
availableNow() | N/A | Traiter tout ce qui est disponible puis s'arreter |
continuous("1s") | ~1s (experimental) | Faible latence (API limitee) |
Continuous Processing : Attention
Le mode continuous de Spark offre des latences de l'ordre de la milliseconde mais reste experimental et ne supporte qu'un sous-ensemble limite d'operations (map, filter, projections). Pas d'aggregations, pas de joins, pas de deduplication. Pour du vrai streaming a faible latence avec des operations complexes, preferez Flink.
Apache Beam : Le Modele Unifie
+----------------------------------------------------------+
| Beam Pipeline (SDK) |
| |
| PCollection --> PTransform --> PCollection --> ... |
| (donnees) (traitement) (donnees) |
+---------------------------+------------------------------+
|
+---------+---------+
| Beam Runner |
+---------+---------+
|
+---------------+---------------+
| | |
+-----v-----+ +-----v-----+ +------v----+
| Apache | | Google | | Apache |
| Flink | | Dataflow | | Spark |
| Runner | | Runner | | Runner |
+-----------+ +-----------+ +-----------+
Concepts Fondamentaux de Beam
PCollection : collection distribuee de donnees (bornee pour batch, non bornee pour streaming).
PTransform : operation de transformation (ParDo, GroupByKey, Combine, Flatten, etc.).
Pipeline : graphe acyclique de PTransforms connectes par des PCollections.
Runner : moteur d'execution qui traduit le pipeline en jobs natifs (Flink, Spark, Dataflow).
Windowing : unifie batch (fenetre globale unique) et streaming (fenetres temporelles).
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms.window import FixedWindows, SlidingWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
# Pipeline Beam - Analyse de logs en temps reel
options = PipelineOptions([
'--runner=FlinkRunner', # Peut etre DataflowRunner, SparkRunner...
'--flink_master=localhost:8081',
'--streaming'
])
with beam.Pipeline(options=options) as p:
# Source : lire depuis Kafka
logs = (p
| 'Lire Kafka' >> beam.io.ReadFromKafka(
consumer_config={'bootstrap.servers': 'broker1:9092'},
topics=['application-logs']
)
| 'Parser JSON' >> beam.Map(parse_log_json)
| 'Filtrer Erreurs' >> beam.Filter(lambda log: log['level'] == 'ERROR')
)
# Aggregation avec fenetre et trigger
erreurs_par_service = (logs
| 'Fenetre 5min' >> beam.WindowInto(
FixedWindows(300), # 5 minutes
trigger=AfterWatermark(
early=AfterProcessingTime(60) # resultats partiels toutes les 60s
),
accumulation_mode=AccumulationMode.DISCARDING
)
| 'Cle par Service' >> beam.Map(lambda log: (log['service'], 1))
| 'Compter' >> beam.CombinePerKey(sum)
)
# Sinks multiples
erreurs_par_service | 'Vers BigQuery' >> beam.io.WriteToBigQuery(
'projet:dataset.erreurs_par_service',
schema='service:STRING,count:INTEGER,window_start:TIMESTAMP'
)
(erreurs_par_service
| 'Filtrer Alertes' >> beam.Filter(lambda kv: kv[1] > 100)
| 'Vers PagerDuty' >> beam.ParDo(EnvoyerAlerteFn())
)
Comparaison des Moteurs
| Critere | Spark Streaming | Flink | Beam |
|---|---|---|---|
| Modele | Micro-batch | Streaming natif | Abstraction unifiee |
| Latence | 100ms - secondes | Millisecondes | Depend du runner |
| Ecosysteme | ML, SQL, GraphX | CEP, Table API | Multi-runner |
| State management | Basique | Avance (RocksDB) | Depend du runner |
| Exactly-once | Oui (avec checkpoint) | Oui (natif) | Depend du runner |
| Courbe d'apprentissage | Moderee | Elevee | Elevee |
| Ideal pour | Equipes Spark existantes | Low-latency critique | Multi-cloud, portabilite |
Comment Choisir ?
- Vous avez deja Spark : Structured Streaming est le choix naturel. Meme API, meme cluster, meme ecosystem.
- Latence < 100ms requise : Flink est le seul choix viable pour du streaming natif avec etat complexe.
- Multi-cloud / portabilite : Beam vous permet de changer de runner sans reecrire le code.
- Google Cloud : Beam + Dataflow est l'option la plus integree et serverless.
Event-Driven Architecture
Objectifs de la lecon
- Comprendre les patterns CQRS et Event Sourcing et leurs cas d'application
- Maitriser le pattern Saga pour les transactions distribuees
- Concevoir des domain events pertinents avec l'Event Storming
- Integrer ces patterns dans une architecture microservices avec Kafka
L'Event-Driven Architecture n'est pas une technologie, c'est une philosophie de conception. Au lieu de penser en termes de requetes et de reponses, vous pensez en termes d'evenements et de reactions. Ce changement de paradigme debloque la scalabilite, le decouplage et la resilience, mais introduit de la complexite : eventual consistency, ordering, idempotence.
CQRS : Command Query Responsibility Segregation
Principe de CQRS
CQRS separe le modele d'ecriture (commands) du modele de lecture (queries). Le cote ecriture est optimise pour la validation et la consistance. Le cote lecture est optimise pour les performances de requetes, avec des projections denormalisees adaptees a chaque besoin de consultation.
Client
======
| |
| Commands (POST/PUT) | Queries (GET)
v v
+-------------------+ +-------------------+
| Command Service | | Query Service |
| (validation, | | (lectures |
| regles metier) | | optimisees) |
+--------+----------+ +--------+----------+
| ^
v |
+--------+----------+ +--------+----------+
| Event Store | | Read Database |
| (append-only log |--->| (projections |
| = source verite) | | denormalisees) |
+-------------------+ +-------------------+
Kafka Topics PostgreSQL / Elastic
/ Redis / MongoDB
// Exemple CQRS - Cote Command
@Service
public class CommandeCommandService {
private final KafkaTemplate<String, DomainEvent> kafkaTemplate;
public void creerCommande(CreerCommandeRequest request) {
// 1. Validation des regles metier
validerStock(request.getProduits());
validerLimiteCredit(request.getClientId(), request.getMontant());
// 2. Emettre l'evenement (source de verite)
CommandeCreee event = CommandeCreee.builder()
.commandeId(UUID.randomUUID().toString())
.clientId(request.getClientId())
.produits(request.getProduits())
.montant(request.getMontant())
.timestamp(Instant.now())
.version(1)
.build();
kafkaTemplate.send("domain-events.commandes", event.getCommandeId(), event);
}
}
// Cote Query - Projecteur qui construit la vue de lecture
@Component
public class CommandeProjecteur {
private final CommandeReadRepository readRepo;
@KafkaListener(topics = "domain-events.commandes", groupId = "commande-projector")
public void onEvent(DomainEvent event) {
if (event instanceof CommandeCreee e) {
readRepo.save(new CommandeView(e.getCommandeId(), e.getClientId(),
e.getMontant(), "CREEE", e.getTimestamp()));
} else if (event instanceof CommandePayee e) {
readRepo.updateStatut(e.getCommandeId(), "PAYEE");
} else if (event instanceof CommandeExpediee e) {
readRepo.updateStatut(e.getCommandeId(), "EXPEDIEE");
readRepo.setTracking(e.getCommandeId(), e.getNumeroSuivi());
}
}
}
Event Sourcing
Event Sourcing : L'Etat = Somme des Evenements
Au lieu de stocker l'etat courant, vous stockez tous les evenements qui ont mene a cet etat. L'etat est reconstitue en rejouant les evenements. Avantages : audit trail complet, capacite de remonter dans le temps (temporal queries), decouplage total entre ecriture et lecture. Inconvenients : complexite de gestion, snapshots necessaires pour les performances, evolution des schemas d'evenements.
// Event Sourcing - Reconstitution de l'etat d'un compte bancaire
public class CompteBancaire {
private String compteId;
private BigDecimal solde = BigDecimal.ZERO;
private String statut = "ACTIF";
private List<DomainEvent> uncommittedEvents = new ArrayList<>();
// Reconstituer l'etat depuis les evenements
public static CompteBancaire fromEvents(List<DomainEvent> events) {
CompteBancaire compte = new CompteBancaire();
events.forEach(compte::apply);
return compte;
}
// Commande : debiter le compte
public void debiter(BigDecimal montant, String motif) {
if (solde.compareTo(montant) < 0) {
throw new SoldeInsuffisantException(compteId, solde, montant);
}
if (!"ACTIF".equals(statut)) {
throw new CompteInactifException(compteId);
}
// Emettre l'evenement (pas de mutation directe !)
emit(new CompteDebite(compteId, montant, motif, Instant.now()));
}
// Appliquer un evenement (mutation de l'etat)
private void apply(DomainEvent event) {
if (event instanceof CompteOuvert e) {
this.compteId = e.getCompteId();
this.statut = "ACTIF";
} else if (event instanceof CompteCredite e) {
this.solde = this.solde.add(e.getMontant());
} else if (event instanceof CompteDebite e) {
this.solde = this.solde.subtract(e.getMontant());
} else if (event instanceof CompteFerme e) {
this.statut = "FERME";
}
}
// Snapshot pour eviter de rejouer des milliers d'evenements
// Snapshot tous les 100 evenements
public CompteSnapshot toSnapshot() {
return new CompteSnapshot(compteId, solde, statut, version);
}
}
Pattern Saga : Transactions Distribuees
Dans un systeme distribue, les transactions ACID classiques ne fonctionnent pas entre services. Le pattern Saga decompose une transaction en etapes locales, chacune avec sa compensation en cas d'echec.
Commande Service Paiement Service Stock Service Livraison Service
================= ================= =============== ==================
1. CommandeCreee -------> 2. Paiement
autorise? ---------> 3. Stock
| reserve? -------> 4. Livraison
| | planifiee
| | |
| | v
8. Commande 7. Paiement 6. Stock 5. Livraison
confirmee <---------- capture <----------- decremente <------ confirmee
--- En cas d'echec a l'etape 3 (stock insuffisant) ---
Commande Service Paiement Service Stock Service
================= ================= ===============
CommandeAnnulee <-------- PaiementRembourse <---- StockInsuffisant
(compensation) (compensation) (echec)
Saga Choregraphiee
- Chaque service ecoute et emet des evenements
- Pas de coordinateur central
- Plus resilient (pas de SPOF)
- Plus difficile a suivre et deboguer
- Convient pour des flux simples (3-5 etapes)
Saga Orchestree
- Un orchestrateur central coordonne les etapes
- Logique de compensation centralisee
- Plus facile a comprendre et tester
- L'orchestrateur est un point de defaillance potentiel
- Convient pour des flux complexes (5+ etapes)
Event Storming
L'Event Storming est un atelier collaboratif de modelisation qui reunit developpeurs et experts metier pour decouvrir les evenements du domaine :
Deroulement d'un Event Storming
- Orange (Domain Events) : les experts metier identifient tous les evenements significatifs ("CommandeCreee", "PaiementAutorise", "StockReserve")
- Bleu (Commands) : quelles actions declenchent ces evenements ("CreerCommande", "AutoriserPaiement")
- Jaune (Actors) : qui declenche les commandes (utilisateur, systeme, timer)
- Rose (Policies) : quelles regles automatiques reagissent aux evenements ("Quand PaiementAutorise, alors ReserverStock")
- Vert (Read Models) : quelles vues sont necessaires pour prendre des decisions
- Limites (Bounded Contexts) : regrouper les evenements par domaine metier
Meltdown Temps Reel - Plateforme de Trading
En 2012, Knight Capital a perdu 440 millions de dollars en 45 minutes a cause d'un incident dans leur systeme event-driven. Un deploiement partiel a active un ancien code de test sur un seul serveur parmi huit. Ce serveur a emis des ordres d'achat/vente erratiques a grande vitesse.
Causes racines :
- Pas de circuit breaker : le systeme a continue a emettre des ordres malgre les pertes
- Pas d'idempotence : les retries ont amplifie le probleme
- Deploiement manuel sans rollback automatique
- Pas de kill switch pour arreter un service en urgence
Lecons : un systeme event-driven a haute frequence DOIT avoir des mecanismes de protection : circuit breakers, rate limiters, kill switches, et surtout des alertes sur les anomalies de volume.
Pas de Dead Letter Queue dans les Sagas
Dans une saga choregraphiee, si un service ne peut pas traiter un evenement (bug, donnees corrompues), il peut bloquer toute la saga indefiniment. Sans dead letter queue ni mecanisme de retry avec backoff exponentiel, les transactions restent dans un etat intermediaire indefini. Chaque service participant a une saga DOIT avoir une DLQ et un processus de reconciliation pour les evenements en echec.
Lab : Concevoir une Architecture Event-Driven
Concevez l'architecture event-driven pour un systeme de reservation de restaurant :
Etape 1 : Identifier les Domain Events
Listez tous les evenements metier : ReservationDemandee, ReservationConfirmee, TableAssignee, ClientArrive, RepasTermine, PaiementEffectue, AvisDepose...
Etape 2 : Definir les Bounded Contexts
Regroupez les evenements par domaine : Reservation, Table Management, Cuisine, Paiement, Feedback. Chaque contexte aura son propre service et ses propres topics Kafka.
Etape 3 : Concevoir la Saga de Reservation
Definissez le flux : ReservationDemandee -> verification disponibilite -> TableReservee -> NotificationClient. Prevoyez les compensations : si le client annule, si la table n'est plus disponible, si le paiement de la caution echoue.
Etape 4 : Definir les Read Models (CQRS)
Quelles vues sont necessaires ? Planning du restaurant (par jour/semaine), historique client, taux d'occupation, revenus. Chaque vue est une projection optimisee alimentee par les evenements.
Quiz : Patterns Event-Driven
Dans une Saga choregraphiee, que se passe-t-il si le service Stock detecte un stock insuffisant apres que le paiement a ete autorise ?
Quel est l'avantage principal de CQRS ?
Real-time Analytics
Objectifs de la lecon
- Comprendre les moteurs OLAP optimises pour le streaming et les requetes analytiques temps reel
- Comparer Apache Druid, Apache Pinot, ClickHouse et Materialize
- Maitriser la dualite stream-table et ses implications pour l'analytique
- Concevoir une architecture real-time analytics de bout en bout
L'analytique temps reel est le Saint Graal du data engineering moderne. Mais attention : "temps reel" ne signifie pas toujours "sub-seconde". L'enjeu est de choisir le bon moteur OLAP en fonction de vos patterns de requetes, de votre volume de donnees et de votre budget operationnel. J'ai vu des equipes deployer Druid pour des cas ou un simple ClickHouse aurait suffi, et inversement.
Pourquoi un OLAP specifique pour le Streaming ?
Les bases de donnees analytiques classiques (Redshift, BigQuery, Snowflake) sont optimisees pour le batch. Elles excellent sur des requetes complexes sur des donnees au repos, mais ne sont pas concues pour ingerer des millions d'evenements par seconde tout en repondant a des requetes en millisecondes.
| Critere | OLAP Batch (Snowflake, BQ) | OLAP Streaming (Druid, Pinot) |
|---|---|---|
| Latence d'ingestion | Minutes a heures | Secondes |
| Latence de requete | Secondes a minutes | Millisecondes |
| Freshness | Near real-time au mieux | Temps reel |
| Complexite des requetes | Tres complexe (joins, subqueries) | Aggregations rapides, joins limites |
| Cout operationnel | Serverless / manage | Infrastructure dediee |
| Volume | Petaoctets | Teraoctets a petaoctets |
La Dualite Stream-Table
Concept fondamental : un stream est un log immutable d'evenements ordonnes dans le temps. Une table est un snapshot de l'etat courant a un instant T. Ces deux representations sont duales :
- Stream vers Table : en aggregant un stream (replay), on obtient une table (etat courant)
- Table vers Stream : en capturant les changements d'une table (CDC), on obtient un stream
Cette dualite, formalisee par Jay Kreps, est la base theorique de toute l'analytique temps reel. Chaque moteur OLAP streaming exploite cette dualite differemment.
Apache Druid
Druid est un datastore analytique distribue concu pour des requetes OLAP sub-seconde sur des donnees de grande echelle. Il est utilise par Airbnb, Netflix, Confluent et des centaines d'entreprises.
Kafka / Kinesis Batch (S3/HDFS)
| |
v v
+-----------+ +-----------+
| Real-time | | Batch |
| Ingestion | | Ingestion |
| (Indexing) | | (Indexing)|
+-----+-----+ +-----+-----+
| |
v v
+-----------+ +-----------+
| Real-time | |Historical |
| Nodes | | Nodes |
| (donnees | | (donnees |
| fraiches)| | agees) |
+-----+-----+ +-----+-----+
| |
+-----------+-----------+
|
+-----v-----+
| Broker |
| (routage |
| requetes) |
+-----+------+
|
+-----v------+
| Router / |
| API Client |
+-----------+
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "clickstream_events",
"timestampSpec": {
"column": "event_time",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": ["user_id", "page_url", "device", "country", "campaign"]
},
"metricsSpec": [
{"type": "count", "name": "event_count"},
{"type": "longSum", "name": "duration_ms", "fieldName": "duration"},
{"type": "thetaSketch", "name": "unique_users", "fieldName": "user_id"}
],
"granularitySpec": {
"segmentGranularity": "HOUR",
"queryGranularity": "MINUTE",
"rollup": true
}
},
"ioConfig": {
"consumerProperties": {
"bootstrap.servers": "kafka-broker:9092"
},
"topic": "clickstream",
"useEarliestOffset": true
}
}
}
Le Rollup Druid : cle de la performance
Le rollup est la fonctionnalite killer de Druid. Au lieu de stocker chaque evenement brut, Druid pre-agrege les donnees a l'ingestion selon la granularite configuree. Si vous avez 1 million de clics par minute mais seulement 10 000 combinaisons uniques de dimensions, le rollup reduit le stockage de 100x tout en permettant des requetes instantanees. Le compromis : vous perdez l'acces aux evenements individuels.
Apache Pinot
Pinot, cree par LinkedIn et donne a Apache, est optimise pour les requetes analytiques a faible latence sur des donnees fraiches. Il alimente le "Who Viewed Your Profile" de LinkedIn (des milliards de requetes/jour).
Druid
- Rollup natif a l'ingestion
- Excellent pour les time series
- HyperLogLog et theta sketches natifs
- Ecosysteme mature, large communaute
- Plus complexe a operer
Pinot
- Star-tree index pour aggregations pre-calculees
- Upsert natif (mise a jour en place)
- Supporte les donnees mutables
- Multi-tenant par design
- Meilleur support des joins
ClickHouse
ClickHouse, cree par Yandex, est un moteur OLAP columnar extremement rapide. Son approche est differente : au lieu d'une architecture distribuee complexe, il mise sur l'efficacite brute du traitement columnar et la compression.
-- Table Kafka Engine : consomme directement depuis Kafka
CREATE TABLE clickstream_kafka (
event_time DateTime64(3),
user_id String,
page_url String,
device LowCardinality(String),
country LowCardinality(String),
duration_ms UInt32
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka-broker:9092',
kafka_topic_list = 'clickstream',
kafka_group_name = 'clickhouse_consumer',
kafka_format = 'JSONEachRow';
-- Table de destination : MergeTree pour les requetes analytiques
CREATE TABLE clickstream (
event_time DateTime64(3),
user_id String,
page_url String,
device LowCardinality(String),
country LowCardinality(String),
duration_ms UInt32
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(event_time)
ORDER BY (country, device, event_time)
TTL event_time + INTERVAL 90 DAY;
-- Vue materialisee : transfert automatique Kafka -> MergeTree
CREATE MATERIALIZED VIEW clickstream_mv TO clickstream AS
SELECT * FROM clickstream_kafka;
-- Requete analytique sub-seconde
SELECT
toStartOfMinute(event_time) AS minute,
country,
device,
count() AS events,
uniqExact(user_id) AS unique_users,
avg(duration_ms) AS avg_duration
FROM clickstream
WHERE event_time >= now() - INTERVAL 1 HOUR
GROUP BY minute, country, device
ORDER BY minute DESC;
Materialize : SQL Streaming
Materialize adopte une approche radicalement differente : au lieu d'indexer des donnees pour les requeter, il maintient des vues materialisees incrementales qui se mettent a jour automatiquement a chaque nouvel evenement.
-- Source : connexion directe a Kafka
CREATE SOURCE clickstream_source
FROM KAFKA CONNECTION kafka_conn (TOPIC 'clickstream')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn;
-- Vue materialisee incrementale : mise a jour en continu
CREATE MATERIALIZED VIEW dashboard_metrics AS
SELECT
date_trunc('minute', event_time) AS minute,
country,
COUNT(*) AS total_events,
COUNT(DISTINCT user_id) AS unique_users,
AVG(duration_ms)::INT AS avg_duration
FROM clickstream_source
GROUP BY 1, 2;
-- Requete instantanee sur l'etat courant (pull)
SELECT * FROM dashboard_metrics
WHERE minute >= mz_now() - INTERVAL '1 hour';
-- Abonnement aux changements (push) via SUBSCRIBE
SUBSCRIBE TO dashboard_metrics WITH (SNAPSHOT = false);
Choisir le bon moteur : arbre de decision
Volume > 1TB/jour + requetes ad-hoc ? ClickHouse.
Dashboard temps reel + metrics pre-definies ? Druid ou Pinot.
Vues materialisees complexes + joins ? Materialize.
Multi-tenant + user-facing analytics ? Pinot.
Time series IoT ? Druid ou ClickHouse.
Le choix depend aussi de votre equipe : ClickHouse est le plus simple a operer, Druid/Pinot necessitent une expertise distribuee, Materialize est le plus innovant mais le moins mature.
Airbnb - Migration vers une Analytique Temps Reel
Airbnb traitait historiquement ses donnees analytiques en batch avec Hive et Spark, avec une latence de plusieurs heures. Pour leur dashboard de monitoring des reservations et de detection de fraude, ils avaient besoin de donnees fraiches en moins de 30 secondes.
Leur solution : un pipeline Kafka -> Flink -> Apache Druid pour l'analytique temps reel, avec un fallback vers Hive pour les requetes historiques complexes. Flink effectue l'enrichissement et le nettoyage en temps reel avant l'ingestion dans Druid.
Resultats :
- Latence de fraicheur passee de 4 heures a 15 secondes
- Detection de fraude en temps reel : reduction de 35% des pertes
- Dashboards operationnels accessibles a 500+ employes
- Rollup Druid : stockage reduit de 80% par rapport aux donnees brutes
Scenario : Concevoir un Dashboard Temps Reel E-commerce
Votre entreprise e-commerce veut un dashboard montrant en temps reel : le nombre de commandes par minute, le chiffre d'affaires cumule, le top 10 des produits, et des alertes si le taux de conversion chute de plus de 20%.
- Ingestion : Kafka recoit les evenements clickstream et commandes depuis l'application
- Traitement : Flink enrichit les evenements (jointure client/produit) et calcule les metriques fenetrees
- Stockage : ClickHouse pour les requetes ad-hoc du dashboard, avec vues materialisees pour les metriques frequentes
- Alerting : Flink CEP detecte les patterns de chute de conversion et envoie des alertes Slack
Utiliser un OLAP Streaming comme Data Warehouse
L'erreur frequente est de vouloir remplacer son data warehouse (Snowflake, BigQuery) par un Druid ou ClickHouse. Ces moteurs excellent pour les aggregations rapides sur des dimensions pre-definies, mais ils ne sont pas concus pour des joins complexes ad-hoc, des transformations lourdes, ou le stockage long terme a faible cout. Le bon pattern est de les utiliser en complement : le data warehouse pour l'analytique profonde, le OLAP streaming pour les dashboards temps reel.
Quiz : Streaming & Real-time Analytics
Objectifs de l'evaluation
- Valider votre comprehension des architectures de streaming (Lambda, Kappa)
- Tester vos connaissances sur Kafka, Flink et les moteurs OLAP temps reel
- Evaluer votre capacite a choisir les bons outils selon le contexte
- Verifier votre maitrise des patterns event-driven et de la dualite stream-table
Ce quiz couvre l'ensemble des lecons 0 a 8 sur le streaming et l'analytique temps reel. Prenez le temps de bien lire chaque question. Les questions portent autant sur la theorie que sur les choix architecturaux concrets. Un score de 6/8 ou plus indique une bonne maitrise du sujet.
Quiz Streaming & Real-time Analytics
Quelle est la principale difference entre l'architecture Lambda et l'architecture Kappa ?
Dans Kafka, qu'est-ce qui garantit l'ordre des messages ?
Quel mecanisme Apache Flink utilise-t-il pour assurer la tolerance aux pannes avec une semantique exactly-once ?
Qu'est-ce que le "rollup" dans Apache Druid ?
Dans le contexte de la dualite stream-table, comment obtient-on une table a partir d'un stream ?
Quel est le principal avantage de Materialize par rapport a Druid ou ClickHouse ?
Dans un systeme Event Sourcing, pourquoi utilise-t-on des snapshots ?
Quelle est la meilleure approche pour gerer un consumer qui doit traiter des evenements en retard (late events) dans Apache Flink ?
Uber - Evolution de leur Architecture Streaming
Uber a traverse trois generations d'architectures streaming. La premiere generation utilisait un systeme maison base sur des files d'attente simples. La deuxieme generation a adopte Kafka mais avec un modele Lambda (batch Hive + speed Storm). La troisieme generation est une architecture Kappa unifiee avec Kafka + Flink, traitant plus de 5 trillions de messages par jour.
Lecon cle : meme les geants tech evoluent incrementalement. La migration Lambda -> Kappa a pris 2 ans et a ete faite service par service, pas en big bang.
Quiz sans Revision
Passer un quiz sans avoir revise les concepts cles est contre-productif. Si votre score est inferieur a 6/8, relisez les lecons correspondantes avant de continuer. Le streaming est un domaine ou les fondamentaux (event-time, watermarks, exactly-once, dualite stream-table) sont essentiels pour construire des systemes fiables.
Data Quality Dimensions
Objectifs de la lecon
- Maitriser les six dimensions fondamentales de la qualite des donnees
- Savoir mesurer chaque dimension avec des metriques concretes
- Comprendre l'impact business de chaque dimension de qualite
- Definir des seuils et des KPIs de qualite adaptes a votre contexte
La qualite des donnees n'est pas un concept abstrait. Chaque dimension peut etre mesuree, monitoree et amelioree. J'ai vu des entreprises perdre des millions a cause de donnees de mauvaise qualite : doublons dans les factures, adresses invalides pour les livraisons, montants incoherents entre systemes. La cle est de comprendre que la qualite est un spectre, pas un binaire, et que chaque cas d'usage a des exigences differentes.
Les Six Dimensions de la Qualite des Donnees
Le framework DAMA (Data Management Association) definit six dimensions fondamentales. Chacune repond a une question differente sur vos donnees :
+------------------+
| DATA QUALITY |
+--------+---------+
|
+----------+---------+--------+----------+
| | | | |
+----v---+ +---v----+ +--v---+ +--v----+ +---v-----+
|Accuracy| |Complete| |Consis| |Timeli | |Validity |
| | |ness | |tency | |ness | | |
+----+---+ +---+----+ +--+---+ +--+----+ +---+-----+
| | | | |
+----------+---------+--------+----------+
|
+------v------+
| Uniqueness |
+-------------+
1. Accuracy (Exactitude)
L'exactitude mesure si les donnees refletent correctement la realite du monde reel qu'elles representent.
| Aspect | Question | Exemple | Metrique |
|---|---|---|---|
| Syntaxique | Le format est-il correct ? | Code postal "7500" au lieu de "75000" | % de valeurs conformes au format attendu |
| Semantique | La valeur correspond-elle a la realite ? | Date de naissance "2025-01-15" pour un client de 45 ans | % de valeurs verifiables contre une source de reference |
-- Exactitude syntaxique : codes postaux francais valides
SELECT
COUNT(*) AS total_clients,
SUM(CASE WHEN code_postal ~ '^\d{5}$' THEN 1 ELSE 0 END) AS codes_valides,
ROUND(
100.0 * SUM(CASE WHEN code_postal ~ '^\d{5}$' THEN 1 ELSE 0 END)
/ COUNT(*), 2
) AS pct_accuracy
FROM clients;
-- Exactitude semantique : montants coherents
SELECT
COUNT(*) AS total_commandes,
SUM(CASE
WHEN montant_total = (quantite * prix_unitaire * (1 - remise / 100.0))
THEN 1 ELSE 0
END) AS montants_exacts,
ROUND(
100.0 * SUM(CASE
WHEN montant_total = (quantite * prix_unitaire * (1 - remise / 100.0))
THEN 1 ELSE 0
END) / COUNT(*), 2
) AS pct_accuracy
FROM lignes_commande;
2. Completeness (Completude)
La completude mesure la proportion de donnees presentes par rapport aux donnees attendues. Elle se decline en trois niveaux :
Les trois niveaux de completude
Schema-level : toutes les colonnes attendues sont-elles presentes dans la table ?
Column-level : quel pourcentage de lignes ont une valeur non-null pour chaque colonne ?
Row-level : toutes les entites attendues sont-elles presentes ? (ex: tous les magasins ont-ils remonte leurs ventes ?)
-- Completude au niveau colonne
SELECT
'email' AS colonne,
COUNT(*) AS total,
SUM(CASE WHEN email IS NOT NULL AND email != '' THEN 1 ELSE 0 END) AS non_vides,
ROUND(100.0 * SUM(CASE WHEN email IS NOT NULL AND email != '' THEN 1 ELSE 0 END) / COUNT(*), 2)
AS pct_completude
FROM clients
UNION ALL
SELECT
'telephone', COUNT(*),
SUM(CASE WHEN telephone IS NOT NULL AND telephone != '' THEN 1 ELSE 0 END),
ROUND(100.0 * SUM(CASE WHEN telephone IS NOT NULL AND telephone != '' THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients;
-- Completude au niveau entite : tous les magasins ont-ils remonte ?
SELECT
m.magasin_id,
m.nom,
CASE WHEN v.magasin_id IS NOT NULL THEN 'PRESENT' ELSE 'MANQUANT' END AS statut
FROM magasins m
LEFT JOIN ventes_jour v ON m.magasin_id = v.magasin_id AND v.date_vente = CURRENT_DATE - 1
WHERE v.magasin_id IS NULL;
3. Consistency (Coherence)
La coherence mesure l'absence de contradictions dans les donnees, a la fois au sein d'un meme systeme et entre systemes differents.
Exemples de problemes de coherence
- Intra-table : la date de fin d'un contrat est anterieure a la date de debut
- Inter-tables : le nombre de commandes dans la table
commandesne correspond pas au compteur dans la tableclients - Inter-systemes : le chiffre d'affaires dans le CRM differe de celui dans l'ERP
- Temporelle : les memes metriques calculees a des moments differents donnent des resultats contradictoires
-- Coherence intra-table : dates logiques
SELECT COUNT(*) AS incoherences_dates
FROM contrats
WHERE date_fin IS NOT NULL AND date_fin < date_debut;
-- Coherence inter-tables : totaux correspondants
SELECT
c.client_id,
c.nb_commandes AS nb_dans_clients,
COALESCE(cmd.nb_reel, 0) AS nb_reel_commandes,
ABS(c.nb_commandes - COALESCE(cmd.nb_reel, 0)) AS ecart
FROM clients c
LEFT JOIN (
SELECT client_id, COUNT(*) AS nb_reel
FROM commandes
GROUP BY client_id
) cmd ON c.client_id = cmd.client_id
WHERE c.nb_commandes != COALESCE(cmd.nb_reel, 0);
4. Timeliness (Fraicheur)
La fraicheur mesure si les donnees sont disponibles dans le delai attendu par les consommateurs. Une donnee exacte mais arrivee trop tard est souvent inutile.
| Cas d'usage | Fraicheur attendue | Impact du retard |
|---|---|---|
| Detection de fraude | < 1 seconde | Transaction frauduleuse autorisee |
| Dashboard operationnel | < 5 minutes | Decisions basees sur des donnees obsoletes |
| Reporting financier | < 24 heures | Rapports retardes, conformite impactee |
| Analyse marketing | < 1 semaine | Campagnes mal ciblees |
-- Fraicheur : age des donnees les plus recentes par source
SELECT
source_system,
MAX(updated_at) AS derniere_mise_a_jour,
EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 AS age_heures,
CASE
WHEN EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 < 1 THEN 'FRESH'
WHEN EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) / 3600 < 24 THEN 'ACCEPTABLE'
ELSE 'STALE'
END AS statut_fraicheur
FROM raw_data
GROUP BY source_system
ORDER BY age_heures DESC;
5. Validity (Validite)
La validite verifie que les donnees respectent les regles et contraintes definies (formats, plages, referentiels).
Validite vs Exactitude
Une donnee peut etre valide mais inexacte. Par exemple, un email "jean.dupont@gmail.com" est valide (format correct), mais peut etre inexact si le vrai email est "j.dupont@gmail.com". Inversement, une donnee peut etre exacte mais dans un format invalide. La validite est verifiable automatiquement, l'exactitude necessite souvent une verification externe.
-- Validite : champs respectant les regles metier
SELECT
'email_format' AS regle,
SUM(CASE WHEN email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 ELSE 0 END) AS valides,
COUNT(*) AS total,
ROUND(100.0 * SUM(CASE WHEN email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct
FROM clients
UNION ALL
SELECT
'age_range',
SUM(CASE WHEN age BETWEEN 0 AND 150 THEN 1 ELSE 0 END),
COUNT(*),
ROUND(100.0 * SUM(CASE WHEN age BETWEEN 0 AND 150 THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients
UNION ALL
SELECT
'statut_enum',
SUM(CASE WHEN statut IN ('ACTIF', 'INACTIF', 'SUSPENDU', 'FERME') THEN 1 ELSE 0 END),
COUNT(*),
ROUND(100.0 * SUM(CASE WHEN statut IN ('ACTIF', 'INACTIF', 'SUSPENDU', 'FERME') THEN 1 ELSE 0 END) / COUNT(*), 2)
FROM clients;
6. Uniqueness (Unicite)
L'unicite garantit que chaque entite n'est representee qu'une seule fois dans le dataset. Les doublons sont l'un des problemes de qualite les plus couteux.
-- Detection de doublons exacts
SELECT
email, COUNT(*) AS nb_occurrences
FROM clients
GROUP BY email
HAVING COUNT(*) > 1
ORDER BY nb_occurrences DESC;
-- Taux de doublons global
SELECT
COUNT(*) AS total_lignes,
COUNT(DISTINCT client_id) AS clients_uniques,
COUNT(*) - COUNT(DISTINCT client_id) AS doublons,
ROUND(100.0 * (COUNT(*) - COUNT(DISTINCT client_id)) / COUNT(*), 2) AS pct_doublons
FROM clients;
-- Doublons fuzzy : noms similaires (distance de Levenshtein)
SELECT
a.client_id AS id_a,
b.client_id AS id_b,
a.nom, b.nom,
a.email, b.email,
levenshtein(LOWER(a.nom), LOWER(b.nom)) AS distance_nom
FROM clients a
JOIN clients b ON a.client_id < b.client_id
WHERE levenshtein(LOWER(a.nom), LOWER(b.nom)) <= 2
AND (a.email = b.email OR a.telephone = b.telephone);
Framework de Mesure Global
Dimension Score Seuil Statut ============ ====== ====== ======== Accuracy 97.3% > 95% [OK] Completeness 94.1% > 90% [OK] Consistency 99.2% > 98% [OK] Timeliness 85.0% > 95% [ALERTE] Validity 98.7% > 97% [OK] Uniqueness 99.9% > 99% [OK] ============ ====== ====== ======== Score Global 95.7% > 95% [OK]
Les seuils dependent du contexte
Un taux de completude de 90% peut etre excellent pour des donnees marketing mais catastrophique pour des donnees financieres reglementees. Definissez toujours vos seuils en collaboration avec les equipes metier. Un champ "commentaire" peut tolerer 50% de nulls, mais un champ "montant" sur une facture doit etre a 100%.
Airbnb - Data Quality a l'echelle
Airbnb a developpe en interne un framework de qualite des donnees appele Midas qui mesure systematiquement les six dimensions de qualite sur chaque dataset critique. Leur approche repose sur trois piliers :
- Certification : chaque dataset est classe Bronze (brut), Silver (nettoye), ou Gold (certifie). Seuls les datasets Gold peuvent alimenter les dashboards de direction et les modeles ML de production.
- SLAs par dimension : chaque dataset Gold a des SLAs explicites (completude > 99.5%, fraicheur < 1h, unicite > 99.9%).
- Ownership : chaque dataset a un proprietaire identifie qui est responsable de maintenir les SLAs et qui est alerte en cas de degradation.
Resultat : le nombre d'incidents data impactant les decisions business a ete reduit de 60% en un an.
Mesurer la Qualite sans Agir
De nombreuses organisations mettent en place des dashboards de qualite des donnees... que personne ne consulte. Les scores de qualite baissent progressivement sans que personne ne reagisse. La mesure sans action est inutile. Chaque metrique de qualite doit etre associee a : (1) un seuil d'alerte, (2) un responsable clairement identifie, (3) un processus de remediation documente. Si vous ne pouvez pas agir sur une metrique, ne la mesurez pas.
Great Expectations & Soda
Objectifs de la lecon
- Maitriser Great Expectations : expectation suites, checkpoints, data docs
- Comprendre SodaCL et Soda Cloud pour la validation des donnees
- Comparer les deux outils et choisir selon le contexte
- Integrer les tests de qualite dans un pipeline CI/CD data
Great Expectations et Soda sont les deux leaders open-source de la validation de donnees. GE est puissant mais complexe, Soda est plus simple mais moins flexible. En pratique, beaucoup d'equipes utilisent les deux : GE pour les validations complexes et programmatiques, Soda pour les checks rapides ecrits par les analystes. L'important n'est pas l'outil mais la couverture de vos tests.
Great Expectations : Vue d'Ensemble
Great Expectations (GE) est un framework Python open-source pour la validation, la documentation et le profiling des donnees. Il repose sur trois concepts fondamentaux :
+------------------+ +-------------------+ +----------------+
| Data Source | | Expectation Suite | | Data Docs |
| (Pandas, Spark, |---->| (regles de |---->| (documentation |
| SQL, fichiers) | | validation) | | auto-generee) |
+------------------+ +-------------------+ +----------------+
|
v
+-------------------+
| Checkpoint |
| (orchestration |
| des validations) |
+--------+----------+
|
v
+-------------------+
| Validation Result |
| (succes/echec + |
| details) |
+-------------------+
Les trois piliers de Great Expectations
Expectations : des assertions declaratives sur vos donnees ("cette colonne ne doit pas avoir de nulls", "les valeurs doivent etre entre 0 et 1000").
Checkpoints : orchestrent l'execution des expectations sur des batches de donnees specifiques, avec des actions conditionnelles (alerte, blocage du pipeline).
Data Docs : documentation HTML auto-generee qui presente les resultats de validation de facon visuelle et partageable.
import great_expectations as gx
# Initialiser le contexte GE
context = gx.get_context()
# Connecter une source de donnees
datasource = context.sources.add_or_update_pandas(name="source_ventes")
data_asset = datasource.add_csv_asset(
name="ventes_quotidiennes",
filepath_or_buffer="data/ventes_2024.csv"
)
batch_request = data_asset.build_batch_request()
# Creer une Expectation Suite
suite = context.add_or_update_expectation_suite("ventes_quality_suite")
# Obtenir un validator pour definir les expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="ventes_quality_suite"
)
# Definir les expectations
# 1. Completude : pas de nulls sur les champs critiques
validator.expect_column_values_to_not_be_null("commande_id")
validator.expect_column_values_to_not_be_null("montant")
validator.expect_column_values_to_not_be_null("date_vente")
# 2. Validite : types et plages
validator.expect_column_values_to_be_between(
"montant", min_value=0.01, max_value=100000,
mostly=0.99 # tolerer 1% d'anomalies
)
validator.expect_column_values_to_be_in_set(
"statut", ["CONFIRMEE", "EXPEDIEE", "LIVREE", "ANNULEE"]
)
validator.expect_column_values_to_match_regex(
"email", r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# 3. Unicite
validator.expect_column_values_to_be_unique("commande_id")
# 4. Coherence
validator.expect_column_pair_values_a_to_be_greater_than_b(
"date_livraison", "date_vente"
)
# 5. Volume : nombre de lignes attendu
validator.expect_table_row_count_to_be_between(
min_value=1000, max_value=50000
)
# Sauvegarder la suite
validator.save_expectation_suite(discard_failed_expectations=False)
# Creer un checkpoint pour orchestrer la validation
checkpoint = context.add_or_update_checkpoint(
name="ventes_daily_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": "ventes_quality_suite",
}
],
action_list=[
# Stocker le resultat
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"}
},
# Mettre a jour les Data Docs
{
"name": "update_data_docs",
"action": {"class_name": "UpdateDataDocsAction"}
},
# Envoyer une alerte Slack en cas d'echec
{
"name": "send_slack_notification",
"action": {
"class_name": "SlackNotificationAction",
"slack_webhook": "${SLACK_WEBHOOK_URL}",
"notify_on": "failure",
"renderer": {
"class_name": "SlackRenderer"
}
}
}
]
)
# Executer le checkpoint
result = checkpoint.run()
# Verifier le resultat
if not result.success:
print("ECHEC de la validation !")
for r in result.run_results.values():
for expectation_result in r["validation_result"].results:
if not expectation_result.success:
print(f" FAILED: {expectation_result.expectation_config.expectation_type}")
print(f" Details: {expectation_result.result}")
else:
print("Validation reussie - pipeline peut continuer")
SodaCL : Validation Declarative
SodaCL (Soda Checks Language) est un langage declaratif YAML pour definir des tests de qualite des donnees. Son approche est plus simple et accessible que GE, ce qui le rend populaire aupres des analystes et data engineers.
# checks/ventes_checks.yml
checks for ventes:
# Completude
- missing_count(commande_id) = 0:
name: "Commande ID ne doit pas etre null"
- missing_percent(email) < 5:
name: "Email manquant tolere a 5%"
# Volume
- row_count between 1000 and 50000:
name: "Volume de ventes quotidien attendu"
- row_count > 0:
name: "La table ne doit pas etre vide"
# Fraicheur
- freshness(date_vente) < 1d:
name: "Les donnees doivent avoir moins de 24h"
# Validite
- invalid_percent(montant) < 1:
valid min: 0.01
valid max: 100000
name: "Montants dans la plage valide"
- invalid_count(statut) = 0:
valid values: ["CONFIRMEE", "EXPEDIEE", "LIVREE", "ANNULEE"]
# Unicite
- duplicate_count(commande_id) = 0:
name: "Pas de doublons sur commande_id"
# Coherence inter-tables
- values in (client_id) must exist in clients (client_id):
name: "Integrite referentielle clients"
# Custom SQL
- failed rows:
fail condition: date_livraison < date_vente
name: "Date livraison doit etre apres date vente"
# Schema
- schema:
fail:
when required column missing:
[commande_id, client_id, montant, date_vente, statut]
when wrong type:
montant: decimal
date_vente: date
# configuration.yml
data_source ma_base:
type: postgres
host: ${POSTGRES_HOST}
port: 5432
username: ${POSTGRES_USER}
password: ${POSTGRES_PASSWORD}
database: production
schema: public
soda_cloud:
host: cloud.soda.io
api_key_id: ${SODA_API_KEY_ID}
api_key_secret: ${SODA_API_KEY_SECRET}
# Installation pip install soda-core-postgres # Executer les checks soda scan -d ma_base -c configuration.yml checks/ventes_checks.yml # Resultat typique : # Scan summary: # 10/10 checks PASSED # 0 checks WARNED # 0 checks FAILED # All is good. No failures. No warnings. No errors.
Comparaison GE vs Soda
Great Expectations
- Framework Python complet et extensible
- 200+ expectations built-in
- Data Docs : documentation auto-generee
- Profiling automatique pour decouvrir les expectations
- Courbe d'apprentissage plus raide
- Ideal pour les equipes Python/engineering
- Open-source gratuit
Soda
- Langage declaratif YAML simple
- Accessible aux analystes non-developpeurs
- Soda Cloud : UI collaborative, historique, alertes
- Integration native Airflow, dbt, Prefect
- Anomaly detection (ML) dans Soda Cloud
- Soda Core open-source, Cloud payant
- Ideal pour les equipes mixtes
| Critere | Great Expectations | Soda |
|---|---|---|
| Langage | Python (programmatique) | YAML (declaratif) |
| Complexite | Elevee | Faible a moyenne |
| Extensibilite | Tres elevee (custom expectations Python) | Moyenne (custom SQL) |
| Documentation | Data Docs HTML | Soda Cloud UI |
| CI/CD | Natif (checkpoint en Python) | CLI integrable facilement |
| Equipe cible | Data Engineers | Analystes + Engineers |
| Cout | Gratuit (GX Cloud payant) | Core gratuit, Cloud payant |
GitLab - Qualite des Donnees avec Great Expectations
GitLab a integre Great Expectations dans leur pipeline dbt pour valider les donnees a chaque etape de transformation. Leur approche :
- Layer Bronze : expectations basiques (schema, nulls, types) executees a chaque ingestion
- Layer Silver : expectations metier (referential integrity, business rules) executees apres les transformations dbt
- Layer Gold : expectations de conformite (KPIs coherents, totaux equilibres) executees avant publication
- Data Docs : publies automatiquement sur un site interne, consultables par toute l'equipe data
Resultats : les incidents de qualite detectes en amont sont passes de 20% a 85%, reduisant significativement les corrections en production.
Scenario : Integration dans un Pipeline dbt + Airflow
Votre pipeline dbt transforme les donnees de ventes chaque nuit. Vous souhaitez ajouter des tests de qualite :
- Avant les transformations : Soda scan sur les tables source (fraicheur, completude, volume)
- Apres les transformations : dbt tests natifs (unique, not_null, relationships) + GE checkpoint pour les regles metier complexes
- Avant la publication : GE checkpoint final sur les tables Gold avec des seuils stricts
- Alerting : Slack en cas d'echec, blocage du pipeline si les tests critiques echouent
Tester Uniquement en Production
L'erreur la plus courante est de n'executer les tests de qualite qu'en production, sur les donnees finales. A ce stade, les donnees corrompues ont deja traverse tout le pipeline et potentiellement alimente des dashboards ou des modeles ML. Testez a chaque couche : source (ingestion), transformation (dbt), et serving (publication). Plus un probleme est detecte tot, moins il est couteux a corriger.
Data Observability
Objectifs de la lecon
- Comprendre les cinq piliers de la data observability : freshness, volume, schema, lineage, distribution
- Evaluer les solutions du marche : Monte Carlo, Bigeye, Elementary, Metaplane
- Mettre en place un monitoring proactif avec alertes et data lineage
- Construire une culture d'observabilite data dans votre organisation
La data observability est a la data ce que l'APM (Application Performance Monitoring) est aux applications. Avant Datadog et New Relic, les equipes SRE decouvrait les pannes quand les utilisateurs se plaignaient. La meme chose se passe aujourd'hui avec les donnees : les equipes data decouvrent les problemes quand un VP demande "pourquoi mon dashboard est faux ?". L'observabilite data change ce paradigme de reactif a proactif.
Les 5 Piliers de la Data Observability
+---------------------------------------------------------+ | DATA OBSERVABILITY | +---------------------------------------------------------+ | | | 1. FRESHNESS Les donnees arrivent-elles a temps ? | | [derniere MAJ il y a 2h - SLA: 1h] --> ALERTE | | | | 2. VOLUME Le nombre de lignes est-il normal ? | | [50K lignes au lieu de 200K habituels] --> ALERTE | | | | 3. SCHEMA La structure a-t-elle change ? | | [colonne "revenue" supprimee] --> ALERTE | | | | 4. DISTRIBUTION Les valeurs sont-elles normales ? | | [% de nulls passe de 2% a 45%] --> ALERTE | | | | 5. LINEAGE Quel est l'impact en aval ? | | [table source cassee -> 15 dashboards impactes] | | | +---------------------------------------------------------+
Pilier 1 : Freshness (Fraicheur)
Le monitoring de fraicheur detecte quand les donnees cessent d'arriver ou arrivent en retard par rapport aux SLAs definis.
-- Monitoring de fraicheur pour toutes les tables critiques
WITH freshness_check AS (
SELECT
'commandes' AS table_name,
MAX(updated_at) AS last_update,
INTERVAL '1 hour' AS sla_interval
FROM commandes
UNION ALL
SELECT 'paiements', MAX(updated_at), INTERVAL '2 hours'
FROM paiements
UNION ALL
SELECT 'inventory', MAX(updated_at), INTERVAL '30 minutes'
FROM inventory
)
SELECT
table_name,
last_update,
NOW() - last_update AS age,
sla_interval,
CASE
WHEN NOW() - last_update > sla_interval THEN 'BREACH'
WHEN NOW() - last_update > sla_interval * 0.8 THEN 'WARNING'
ELSE 'OK'
END AS status
FROM freshness_check
ORDER BY
CASE WHEN NOW() - last_update > sla_interval THEN 0 ELSE 1 END,
age DESC;
Pilier 2 : Volume
Le monitoring de volume detecte les anomalies dans le nombre de lignes ingerees ou transformees. Un drop soudain peut indiquer une source en panne, un pipeline casse ou un filtre trop agressif.
import pandas as pd
from scipy import stats
def detect_volume_anomaly(table_name, connection):
"""Detecter les anomalies de volume avec Z-score."""
# Historique des 30 derniers jours
query = f"""
SELECT date_trunc('day', created_at) AS jour, COUNT(*) AS volume
FROM {table_name}
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1
"""
df = pd.read_sql(query, connection)
if len(df) < 7:
return {"status": "INSUFFICIENT_DATA"}
# Calcul du Z-score pour le jour le plus recent
volumes = df['volume'].values
current = volumes[-1]
historical = volumes[:-1]
z_score = (current - historical.mean()) / historical.std()
status = "OK"
if abs(z_score) > 3:
status = "CRITICAL" # anomalie forte
elif abs(z_score) > 2:
status = "WARNING" # anomalie moderee
return {
"table": table_name,
"current_volume": int(current),
"expected_range": f"{int(historical.mean() - 2*historical.std())} - "
f"{int(historical.mean() + 2*historical.std())}",
"z_score": round(z_score, 2),
"status": status
}
Pilier 3 : Schema
Le monitoring de schema detecte les changements de structure : colonnes ajoutees, supprimees, renommees, ou avec un type modifie.
def detect_schema_changes(table_name, connection, expected_schema):
"""Comparer le schema actuel avec le schema attendu."""
query = f"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position
"""
current_schema = pd.read_sql(query, connection)
current_cols = set(current_schema['column_name'])
expected_cols = set(expected_schema.keys())
changes = []
# Colonnes ajoutees
for col in current_cols - expected_cols:
changes.append({"type": "ADDED", "column": col, "severity": "WARNING"})
# Colonnes supprimees
for col in expected_cols - current_cols:
changes.append({"type": "REMOVED", "column": col, "severity": "CRITICAL"})
# Changements de type
for _, row in current_schema.iterrows():
col = row['column_name']
if col in expected_schema:
if row['data_type'] != expected_schema[col]['type']:
changes.append({
"type": "TYPE_CHANGED",
"column": col,
"from": expected_schema[col]['type'],
"to": row['data_type'],
"severity": "CRITICAL"
})
return changes
Pilier 4 : Distribution
Le monitoring de distribution detecte les changements statistiques dans les valeurs des colonnes : drift, augmentation de nulls, valeurs hors norme.
Metriques de Distribution a Monitorer
Null rate : pourcentage de valeurs nulles par colonne (alerter si deviation > 5 points).
Distinct count : nombre de valeurs uniques (alerter si changement soudain).
Min / Max / Mean / Median : statistiques descriptives (alerter si hors plage historique).
Value distribution : proportions des categories (alerter si une categorie disparait ou domine soudainement).
Ces metriques sont collectees periodiquement et comparees a l'historique pour detecter les anomalies.
Pilier 5 : Lineage (Lignage)
Le data lineage trace l'origine et les transformations des donnees de bout en bout. En cas d'incident, il permet de repondre instantanement a : "quels dashboards et modeles sont impactes ?"
Source CRM Source ERP Source Web
(PostgreSQL) (Oracle) (Kafka)
| | |
v v v
[raw_clients] [raw_commandes] [raw_clickstream]
| | |
+----------+---------+ |
| |
[stg_clients] [stg_events]
| |
+---------------+---------------+
|
[fct_ventes_enrichies]
|
+---------------+---------------+
| | |
[dashboard_CA] [model_churn_ML] [report_finance]
(Tableau) (Sagemaker) (Email PDF)
Si [raw_commandes] est en retard --> Impact sur :
- fct_ventes_enrichies
- dashboard_CA, model_churn_ML, report_finance
--> Notifier les owners de ces 3 assets
Solutions du Marche
| Solution | Type | Forces | Limites |
|---|---|---|---|
| Monte Carlo | SaaS | Leader du marche, ML-driven, lineage auto, integrations riches | Cout eleve, vendor lock-in |
| Bigeye | SaaS | Focus sur les metriques data, auto-thresholds ML | Moins d'integrations que Monte Carlo |
| Elementary | Open-source | Natif dbt, gratuit, alerts Slack/email | Limite a l'ecosysteme dbt |
| Metaplane | SaaS | Simple a deployer, bon lineage, alertes contextuelles | Plus recent, moins de features avancees |
| OpenMetadata | Open-source | Catalogue + observabilite, profiling, lineage | Necessite infrastructure dediee |
Uber - Data Observability a Grande Echelle
Uber gere plus de 10 000 pipelines data et des petaoctets de donnees. Leur systeme de data observability, appele uMonitor, surveille en continu la fraicheur, le volume et la distribution de chaque table critique.
- Detection automatique : des modeles ML apprennent les patterns normaux de chaque table (saisonnalite hebdomadaire, tendances) et alertent sur les deviations
- Lineage automatique : construit a partir des logs Hive/Spark, il permet d'identifier en moins de 30 secondes tous les assets impactes par un incident
- Root cause analysis : quand un dashboard montre des donnees incorrectes, le systeme remonte automatiquement la chaine de lineage pour identifier la source du probleme
- SLAs data : chaque table critique a un SLA de fraicheur et de volume, avec escalation automatique en cas de breach
Resultat : le MTTD (Mean Time To Detect) des incidents data est passe de 4 heures a 15 minutes, et le MTTR de 8 heures a 1 heure.
Faire Confiance Aveuglements aux Sources
L'erreur la plus dangereuse est de supposer que les systemes source produisent toujours des donnees correctes. "On n'a pas besoin de verifier, le CRM est fiable." En realite, les sources changent sans prevenir : un deploiement modifie un format de date, une migration supprime une colonne, un bug introduit des doublons. Monitorez TOUJOURS vos sources avec au minimum des checks de fraicheur, volume et schema. La confiance aveugle est la premiere cause d'incidents data en production.
Scenario : Mettre en Place l'Observabilite Data en 30 Jours
Votre entreprise n'a aucun monitoring de donnees. Voici un plan en 4 semaines :
- Semaine 1 : Identifier les 10 tables les plus critiques. Deployer Elementary (gratuit, dbt-natif). Ajouter des tests de freshness et volume.
- Semaine 2 : Ajouter le monitoring de schema et les alertes Slack. Documenter les SLAs de fraicheur par table.
- Semaine 3 : Ajouter le monitoring de distribution (null rate, distinct count) sur les colonnes critiques. Configurer les seuils d'alerte.
- Semaine 4 : Construire le data lineage (dbt docs ou OpenMetadata). Former l'equipe sur le processus d'incident response. Premier post-mortem data.
DataOps Principles
Objectifs de la lecon
- Comprendre les principes fondamentaux du DataOps
- Appliquer le Manifeste DataOps a votre organisation
- Evaluer la maturite DataOps de votre equipe
- Mettre en place l'automatisation, le monitoring et la collaboration
Le DataOps, c'est le DevOps applique aux donnees. Mais attention : ce n'est pas juste une question d'outils. C'est un changement culturel profond qui transforme la facon dont les equipes data travaillent ensemble. J'ai vu des organisations reduire leurs temps de livraison de 90% simplement en adoptant ces principes.
Qu'est-ce que le DataOps ?
Le DataOps est une methodologie collaborative de gestion des donnees qui s'inspire des pratiques DevOps et Agile. Son objectif principal est d'ameliorer la qualite et de reduire le temps de cycle de l'analytique de donnees.
Definition formelle
Le DataOps est une pratique de gestion des donnees orientee processus et automatisee qui ameliore la qualite et reduit le cycle de livraison de l'analytique de donnees, tout en favorisant la collaboration entre les equipes data, les operations et les metiers.
Traditionnel Agile Data DataOps ============ ============ ============ Waterfall Sprints Flux continu Silos Collaboration Culture unifiee Manuel Semi-auto Full automation Reactif Proactif Predictif Mois Semaines Heures/Minutes Documentation User Stories Data Contracts QA en fin Tests reguliers Tests continus
Le Manifeste DataOps
Le Manifeste DataOps definit 18 principes fondamentaux, regroupes en categories essentielles :
| Categorie | Principes cles | Impact |
|---|---|---|
| Individus et interactions | Collaboration cross-fonctionnelle, communication continue, boucles de feedback | Reduction des silos de 80% |
| Analytique fonctionnelle | Livrer de la valeur en continu, prioriser les besoins utilisateurs | Time-to-insight reduit de 60% |
| Orchestration automatisee | Automatiser tout ce qui peut l'etre, infrastructure as code | Erreurs manuelles eliminees a 95% |
| Qualite intrinseque | Tests continus, monitoring, alertes proactives | Incidents reduits de 70% |
| Transparence | Metriques visibles, lineage accessible, documentation vivante | Confiance data augmentee de 85% |
Les 3 piliers du DataOps
1. Automatisation
L'automatisation est le coeur du DataOps. Chaque tache repetitive doit etre automatisee pour eliminer les erreurs humaines et accelerer les livraisons.
# dataops-pipeline.yml
name: DataOps Pipeline
triggers:
- schedule: "0 */2 * * *" # Toutes les 2 heures
- on_push:
branches: [main, develop]
- on_data_change:
sources: [raw_layer]
stages:
validate:
- data_quality_checks:
tool: great_expectations
suite: production_suite
- schema_validation:
tool: soda
checks: schema_changes
transform:
- dbt_run:
target: production
select: "tag:incremental"
fail_fast: true
test:
- dbt_test:
severity: warn
- custom_tests:
script: tests/business_rules.py
deploy:
- blue_green_swap:
strategy: zero_downtime
- notify:
channels: [slack, email]
on: [success, failure]
monitor:
- freshness_check:
max_delay: 30m
- anomaly_detection:
model: isolation_forest
sensitivity: 0.95
2. Monitoring et Observabilite
Le monitoring DataOps va bien au-dela de la simple surveillance des jobs. Il couvre les 5 piliers de l'observabilite data :
ββββββββββββββββββββββββββββββββ
β OBSERVABILITE DATAOPS β
ββββββββββββ¬ββββββββββββββββββββ
β
βββββββββ¬ββββββββΌββββββββ¬βββββββββββ
βΌ βΌ βΌ βΌ βΌ
Fraicheur Volume Schema Lineage Distribution
β β β β β
SLA ok? +/-20%? Drift? Impact? Anomalie?
β β β β β
βββββββββ΄ββββββββ΄ββββββββ΄βββββββββββ
β
βββββββββ΄ββββββββ
β ALERTING β
β PagerDuty / β
β Slack / Emailβ
βββββββββββββββββ
3. Collaboration
Le DataOps brise les silos entre les equipes. Data Engineers, Data Analysts, Data Scientists et metiers travaillent en synergie.
Avant DataOps
- Tickets Jira entre equipes
- Reunions hebdomadaires
- Documentation obsolete
- Blame culture lors d'incidents
- Chaque equipe a ses propres outils
Avec DataOps
- Pull Requests collaboratives
- Standups quotidiens cross-equipe
- Data Contracts vivants
- Blameless post-mortems
- Plateforme data unifiee
Modele de maturite DataOps
Evaluez ou se situe votre organisation sur l'echelle de maturite DataOps :
| Niveau | Nom | Caracteristiques | Metriques typiques |
|---|---|---|---|
| 1 | Initial | Processus manuels, pas de tests, deploiements ad hoc | Deploiement : jours/semaines, Incidents : frequents |
| 2 | Gere | Quelques scripts d'automatisation, tests basiques, documentation partielle | Deploiement : heures, Incidents : reguliers |
| 3 | Defini | CI/CD en place, tests automatises, monitoring basique | Deploiement : minutes, Incidents : occasionnels |
| 4 | Mesure | Metriques SLA, data quality automatique, alertes proactives | Deploiement : minutes, Incidents : rares, MTTR < 1h |
| 5 | Optimise | Self-healing pipelines, ML-driven monitoring, amelioration continue | Deploiement : secondes, Incidents : auto-corriges |
Netflix - DataOps a l'echelle
Netflix traite des petaoctets de donnees quotidiennement pour alimenter ses systemes de recommandation et ses dashboards business. Leur approche DataOps repose sur :
- Maestro : leur orchestrateur maison qui gere des dizaines de milliers de workflows quotidiens avec des SLA stricts
- Data Mesh interne : chaque equipe (streaming, content, finance) possede et opere ses propres pipelines comme des produits data
- Metacat : catalogue de metadonnees unifie qui offre une visibilite complete sur tout le lineage
- Blameless post-mortems : chaque incident data majeur fait l'objet d'une analyse collaborative avec un plan d'action concret
Resultat : temps de livraison d'une nouvelle metrique business passe de 2 semaines a 2 heures, reduction de 75% des incidents data critiques.
Le "Tool-First DataOps"
L'erreur la plus courante est de croire qu'acheter un outil DataOps suffit a transformer votre organisation. J'ai vu des entreprises investir des centaines de milliers d'euros dans des plateformes DataOps sans changer leurs processus ni leur culture.
- Symptome : on achete un outil d'orchestration mais les deploiements restent manuels
- Symptome : on installe un outil de monitoring mais personne ne regarde les alertes
- Correction : commencer par les processus et la culture, puis choisir les outils qui supportent ces processus
Demarrer le DataOps dans votre equipe
Commencez petit : automatisez d'abord un seul pipeline critique de bout en bout (tests, deploy, monitoring). Mesurez l'impact, puis etendez progressivement. Le DataOps est un voyage, pas une destination.
Quiz - DataOps Principles
Quel est le premier principe du Manifeste DataOps ?
A quel niveau de maturite DataOps une organisation dispose-t-elle de pipelines "self-healing" ?
CI/CD pour dbt
Objectifs de la lecon
- Configurer un pipeline CI/CD complet pour dbt avec GitHub Actions
- Implementer des pre-commit hooks pour dbt
- Maitriser les strategies de build dbt (slim CI, deferred)
- Gerer les environnements (dev, staging, prod) et les deploiements blue/green
Le CI/CD pour dbt est un game-changer. Avant, les equipes data deployaient en production en priant pour que tout fonctionne. Avec un pipeline CI/CD bien configure, chaque Pull Request est testee, validee et deployee de maniere fiable. C'est la difference entre l'artisanat et l'ingenierie.
Architecture CI/CD pour dbt
Un pipeline CI/CD pour dbt comprend plusieurs etapes qui garantissent la qualite du code et des donnees avant chaque deploiement en production.
Developer GitHub CI/CD Warehouse
========= ======== ======== ==========
β β β β
ββββ git push βββΆβ β β
β βββ trigger βββββββΆβ β
β β βββ dbt compile ββββΆβ
β β βββ dbt test βββββββΆβ
β β βββ slim CI ββββββββΆβ
β β β β
β ββββ PR comment ββββ β
ββββ review ββββββ β β
β β β β
ββββ merge ββββββΆβ β β
β βββ trigger prod ββΆβ β
β β βββ dbt build βββββΆβ
β β βββ swap schemas βββΆβ
β β βββ notify βββββββββΆβ
β β β β
GitHub Actions Workflow pour dbt
name: dbt CI/CD Pipeline
on:
pull_request:
branches: [main]
paths:
- 'models/**'
- 'macros/**'
- 'tests/**'
- 'dbt_project.yml'
- 'packages.yml'
env:
DBT_PROFILES_DIR: ./ci
DBT_TARGET: ci
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
jobs:
lint:
name: "Lint & Format Check"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install sqlfluff dbt-core dbt-snowflake
- name: SQLFluff Lint
run: sqlfluff lint models/ --dialect snowflake
slim-ci:
name: "Slim CI - Modified Models Only"
needs: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install dbt-core dbt-snowflake
- name: Get production manifest
run: |
aws s3 cp s3://dbt-artifacts/prod/manifest.json \
./target/manifest.json
- name: dbt deps
run: dbt deps
- name: dbt build (modified only)
run: |
dbt build \
--select state:modified+ \
--defer \
--state ./target \
--target ci \
--fail-fast
- name: Generate docs
run: dbt docs generate
- name: Comment PR with results
uses: actions/github-script@v7
with:
script: |
const results = require('./target/run_results.json');
const summary = results.results.map(r =>
`| ${r.unique_id} | ${r.status} | ${r.execution_time}s |`
).join('\n');
github.rest.issues.createComment({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.issue.number,
body: `## dbt CI Results\n| Model | Status | Time |\n|---|---|---|\n${summary}`
});
Pre-commit Hooks pour dbt
Les pre-commit hooks garantissent que le code est valide avant meme d'etre pousse sur le depot distant.
repos:
- repo: https://github.com/sqlfluff/sqlfluff
rev: 3.0.0
hooks:
- id: sqlfluff-lint
args: [--dialect, snowflake]
files: ^models/
- id: sqlfluff-fix
args: [--dialect, snowflake, --force]
files: ^models/
- repo: https://github.com/dbt-checkpoint/dbt-checkpoint
rev: 2.0.0
hooks:
- id: check-model-has-description
- id: check-model-has-tests
args: [--test-cnt, "2"]
- id: check-model-has-properties-file
- id: check-model-name-contract
args: [--pattern, "(staging|intermediate|marts)_.*"]
- id: check-source-has-freshness
- id: check-script-semicolon
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-json
Strategies de build dbt
Slim CI vs Full Build
Le Slim CI (avec --select state:modified+) ne rebuild que les modeles modifies et leurs dependances en aval, en utilisant le manifest de production comme reference. Cela reduit drastiquement le temps de CI (de 45 min a 5 min typiquement).
| Strategie | Commande dbt | Cas d'usage | Duree typique |
|---|---|---|---|
| Slim CI | dbt build --select state:modified+ --defer | Pull Requests | 3-10 min |
| Full Build | dbt build | Deploiement production | 30-90 min |
| Tag-based | dbt build --select tag:daily | Scheduled runs | Variable |
| Freshness-first | dbt source freshness && dbt build | Validation sources | Variable + build |
Gestion des environnements
data_warehouse:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
database: ANALYTICS_DEV
schema: "dbt_{{ env_var('USER', 'default') }}"
warehouse: DEV_WH_XS
threads: 4
ci:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
database: ANALYTICS_CI
schema: "pr_{{ env_var('PR_NUMBER', '0') }}"
warehouse: CI_WH_S
threads: 8
staging:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
database: ANALYTICS_STAGING
schema: public
warehouse: STAGING_WH_M
threads: 16
prod:
type: snowflake
account: "{{ env_var('SNOWFLAKE_ACCOUNT') }}"
user: "{{ env_var('SNOWFLAKE_USER') }}"
database: ANALYTICS_PROD
schema: public
warehouse: PROD_WH_L
threads: 32
Deploiements Blue/Green pour dbt
Le deploiement blue/green permet de deployer sans interruption de service en construisant les modeles dans un schema secondaire puis en effectuant un swap atomique.
-- Etape 1: Build dans le schema "green" (secondaire)
-- dbt build --target prod --vars '{schema_suffix: _green}'
-- Etape 2: Verification de sante (health check)
SELECT
'stg_orders' AS model_name,
COUNT(*) AS row_count,
MAX(updated_at) AS max_updated
FROM analytics_prod.public_green.stg_orders
UNION ALL
SELECT
'fct_revenue' AS model_name,
COUNT(*) AS row_count,
MAX(updated_at) AS max_updated
FROM analytics_prod.public_green.fct_revenue;
-- Etape 3: Swap atomique des schemas
ALTER SCHEMA analytics_prod.public
RENAME TO analytics_prod.public_old;
ALTER SCHEMA analytics_prod.public_green
RENAME TO analytics_prod.public;
-- Etape 4: Cleanup (apres validation)
DROP SCHEMA IF EXISTS analytics_prod.public_old CASCADE;
Spotify - CI/CD pour pipelines data
Spotify gere plus de 20 000 pipelines data avec une approche CI/CD mature :
- Backstage : leur portail developpeur open-source integre les pipelines data comme des "software components" avec CI/CD automatise
- Golden Paths : des templates pre-configures avec CI/CD inclus pour que chaque nouvelle pipeline soit automatiquement testee et deployee
- Automated Canary Analysis : les nouvelles versions de pipelines sont d'abord deployees sur un sous-ensemble de donnees avant le rollout complet
- Resultat : 200+ data engineers deployent en production plusieurs fois par jour avec un taux d'echec inferieur a 2%
Incident de production sans CI/CD
Une fintech europeenne a subi une panne de 3 jours sur ses rapports reglementaires apres qu'un data engineer a deploye directement en production un modele dbt non teste :
- Cause racine : un JOIN sans condition de deduplication a cree une explosion de lignes (x1000) dans les tables de reporting
- Impact : rapports reglementaires incorrects envoyes au regulateur, amende potentielle de 500K euros
- Duree : 3 jours pour identifier le probleme car aucun monitoring sur le volume des tables
- Resolution : mise en place immediate d'un pipeline CI/CD avec tests de volume, dbt slim CI, et deploiement blue/green
Le deploiement "YOLO" en production
Deployer directement en production sans pipeline CI/CD est l'anti-pattern le plus dangereux en data engineering.
- Symptome :
dbt run --target prodexecute depuis le poste d'un developpeur - Risque : pas de review, pas de tests, pas de rollback possible
- Correction : interdire les acces directs en production, tout deploiement passe par le pipeline CI/CD
Secrets et CI/CD
Ne stockez JAMAIS les credentials dans le code ou les fichiers YAML. Utilisez toujours des variables d'environnement ou un gestionnaire de secrets (GitHub Secrets, HashiCorp Vault, AWS Secrets Manager). Un leak de credentials Snowflake peut couter des centaines de milliers d'euros en compute non autorise.
dbt build --select state:modified+ --defer --state ./target pour ne reconstruire que les modeles modifies et leurs dependances en aval. Il utilise le manifest de production comme reference pour les modeles non modifies (--defer). C'est essentiel car cela reduit le temps de CI de 90% (par exemple, de 45 min a 5 min) tout en validant l'impact reel des changements.Quiz - CI/CD pour dbt
Quel flag dbt permet de ne builder que les modeles modifies par rapport a la production ?
--select state:modified+ compare l'etat actuel avec un manifest de reference (typiquement production) et selectionne les modeles modifies et leurs dependances en aval (+).Quel est l'avantage principal du deploiement Blue/Green pour dbt ?
Testing Strategies
Objectifs de la lecon
- Comprendre la pyramide de tests adaptee au Data Engineering
- Implementer des tests unitaires, d'integration et de contrat
- Mettre en place des tests de regression data
- Concevoir une strategie de test complete pour les pipelines data
Le testing est le parent pauvre du Data Engineering. 80% des pipelines data en production n'ont aucun test automatise. C'est comme construire un pont sans verification de la structure. La bonne nouvelle : il n'est jamais trop tard pour commencer. Meme un seul test de volume sur votre table la plus critique est mieux que rien.
La pyramide de tests pour les donnees
Comme en genie logiciel, les tests data suivent une pyramide. Les tests les plus rapides et nombreux sont a la base, les plus lents et complexes au sommet.
β±β²
β± β²
β± E2Eβ² Tests End-to-End
β± Tests β² (Pipeline complete)
β±βββββββββββ² Lents, peu nombreux
β± β²
β± Integration β² Tests d'integration
β± Tests β² (Joins, aggregations)
β±βββββββββββββββββββ² Moyens
β± β²
β± Contract Tests β² Tests de contrat
β± (Schema, SLA) β² (Interfaces data)
β±βββββββββββββββββββββββββββ²
β± β²
β± Unit Tests β² Tests unitaires
β± (Logique transformation) β² Rapides, nombreux
β±βββββββββββββββββββββββββββββββββββ²
Rapidite β² βΌ Couverture
Nombre β² βΌ Complexite
1. Tests unitaires
Les tests unitaires verifient la logique de transformation de maniere isolee, sans avoir besoin de donnees reelles.
unit_tests:
- name: test_calcul_revenue_net
description: "Verifie le calcul du revenu net avec remises et taxes"
model: fct_revenue
given:
- input: ref('stg_orders')
rows:
- { order_id: 1, amount: 100.00, discount: 10.00, tax_rate: 0.20 }
- { order_id: 2, amount: 200.00, discount: 0.00, tax_rate: 0.20 }
- { order_id: 3, amount: 50.00, discount: 50.00, tax_rate: 0.20 }
- input: ref('stg_refunds')
rows:
- { order_id: 3, refund_amount: 0.00 }
expect:
rows:
- { order_id: 1, net_revenue: 108.00 } # (100-10) * 1.20
- { order_id: 2, net_revenue: 240.00 } # (200-0) * 1.20
- { order_id: 3, net_revenue: 0.00 } # (50-50) * 1.20
- name: test_classification_client
description: "Verifie la segmentation client par montant d'achat"
model: dim_customers
given:
- input: ref('stg_customers')
rows:
- { customer_id: 1, total_purchases: 15000 }
- { customer_id: 2, total_purchases: 5000 }
- { customer_id: 3, total_purchases: 500 }
expect:
rows:
- { customer_id: 1, segment: 'premium' }
- { customer_id: 2, segment: 'standard' }
- { customer_id: 3, segment: 'basic' }
2. Tests d'integration
Les tests d'integration verifient que les transformations fonctionnent correctement avec des donnees reelles sur des volumes reduits.
models:
- name: fct_orders
description: "Table de faits des commandes"
tests:
- dbt_utils.fewer_rows_than:
compare_model: ref('stg_orders')
# Apres deduplication, fct doit avoir moins de lignes
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
config:
severity: error
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_date
tests:
- not_null
- dbt_utils.accepted_range:
min_value: "'2020-01-01'"
max_value: "current_date()"
- name: status
tests:
- accepted_values:
values: ['pending', 'confirmed', 'shipped', 'delivered', 'cancelled']
3. Tests de contrat (Contract Tests)
Les tests de contrat verifient que les interfaces entre producteurs et consommateurs de donnees sont respectees.
models:
- name: fct_revenue
config:
contract:
enforced: true
columns:
- name: revenue_id
data_type: varchar(36)
constraints:
- type: not_null
- type: primary_key
- name: customer_id
data_type: varchar(36)
constraints:
- type: not_null
- type: foreign_key
to: ref('dim_customers')
to_columns: [customer_id]
- name: revenue_date
data_type: date
constraints:
- type: not_null
- name: net_amount
data_type: number(18,2)
constraints:
- type: not_null
- type: check
expression: "net_amount >= 0"
Pourquoi les tests de contrat sont essentiels
Les tests de contrat garantissent que les modifications en amont ne cassent pas les consommateurs en aval. Si un producteur change le type d'une colonne ou supprime un champ, le test de contrat echoue AVANT que les donnees corrompues n'atteignent les dashboards ou les modeles ML.
4. Tests de regression data
Les tests de regression comparent les resultats d'une nouvelle version du pipeline avec la version precedente pour detecter les changements inattendus.
# macros/test_regression_revenue.sql
{% test regression_revenue(model, compare_model) %}
{% set audit_query = dbt_utils.audit_helper.compare_relations(
a_relation=model,
b_relation=compare_model,
primary_key="revenue_id",
columns=["customer_id", "revenue_date", "net_amount"]
) %}
WITH comparison AS (
{{ audit_query }}
)
SELECT *
FROM comparison
WHERE in_a != in_b
-- Tolerance de 1% sur les montants
AND ABS(a_net_amount - b_net_amount) / NULLIF(a_net_amount, 0) > 0.01
{% endtest %}
# Comparaison entre staging et production
# pip install data-diff
from data_diff import connect, diff_tables
db_prod = connect("snowflake://user:pass@account/ANALYTICS_PROD/PUBLIC")
db_staging = connect("snowflake://user:pass@account/ANALYTICS_STAGING/PUBLIC")
diff_result = diff_tables(
table1=db_prod.get_table("fct_revenue"),
table2=db_staging.get_table("fct_revenue"),
key_columns=["revenue_id"],
extra_columns=["net_amount", "customer_id"],
where="revenue_date >= '2024-01-01'"
)
for row_diff in diff_result:
print(f"Difference trouvee: {row_diff}")
# Resultat:
# + row in staging only: revenue_id=X, net_amount=150.00
# ~ changed: revenue_id=Y, net_amount 100->120
# - row in prod only: revenue_id=Z (supprime en staging)
5. Strategie de test complete
| Type de test | Quoi tester | Quand | Outils |
|---|---|---|---|
| Unit | Logique de transformation isolee | Pre-commit, CI | dbt unit tests, pytest |
| Integration | Joins, aggregations, deduplication | CI, post-deploy | dbt tests, Great Expectations |
| Contract | Schema, types, contraintes | CI, chaque build | dbt contracts, Soda |
| Regression | Comparaison avant/apres | Pre-deploy production | data-diff, audit_helper |
| E2E | Pipeline complete bout en bout | Nightly, pre-release | Custom scripts, Airflow tests |
| Performance | Temps d'execution, cout | CI, monitoring | dbt timing, warehouse metrics |
Zero tests en data pipeline
L'anti-pattern le plus repandu en Data Engineering : deployer des pipelines sans aucun test automatise.
- Excuse courante : "Les donnees changent tout le temps, on ne peut pas les tester"
- Realite : ce sont justement les changements imprevisibles qui rendent les tests indispensables
- Consequence : les erreurs sont detectees par les utilisateurs finaux, pas par l'equipe data
- Correction : commencer par les tests basiques (not_null, unique, relationships) puis monter progressivement en complexite
Scenario : Mise en place de tests sur un pipeline existant
Vous heritez d'un pipeline dbt de 150 modeles sans aucun test. Par ou commencer ?
- Semaine 1 : Identifier les 10 tables les plus critiques (celles qui alimentent les dashboards C-level et les rapports reglementaires)
- Semaine 2 : Ajouter des tests basiques (not_null, unique sur les cles primaires, relationships sur les cles etrangeres)
- Semaine 3 : Ajouter des tests de volume (row count dans une plage acceptable) et de fraicheur (source freshness)
- Semaine 4 : Implementer des tests de contrat sur les modeles marts exposes aux consommateurs
- Mois 2+ : Ajouter des unit tests pour les transformations complexes, des tests de regression pour les deploiements
Quiz - Testing Strategies
Quel type de test se trouve a la BASE de la pyramide de tests data ?
Quel est le principal avantage des dbt contracts (enforced: true) ?
Version Control Data
Objectifs de la lecon
- Comprendre les enjeux du versionnement des donnees
- Maitriser DVC, lakeFS et Nessie pour le data versioning
- Implementer des patterns de versionnement adaptes a chaque cas d'usage
- Exploiter le Time Travel pour l'audit et le rollback
Versionner le code est une evidence depuis 20 ans. Versionner les donnees reste un defi majeur en 2024. Pourtant, sans versionnement, impossible de reproduire un resultat d'analyse, de faire un rollback apres une corruption, ou de comparer deux versions d'un dataset. C'est le chapitre qui distingue les data engineers juniors des seniors.
Pourquoi versionner les donnees ?
Sans versionnement
- Impossible de reproduire un resultat
- Pas de rollback apres corruption
- Pas d'audit des modifications
- Conflicts de donnees non detectes
- "Ca marchait hier" sans preuve
Avec versionnement
- Reproductibilite totale
- Rollback instantane
- Historique complet des changements
- Branching et merging de donnees
- Audit reglementaire simplifie
DVC (Data Version Control)
DVC est un outil open-source qui etend Git pour versionner les fichiers de donnees et les modeles ML. Il stocke les metadonnees dans Git et les fichiers volumineux dans un stockage distant (S3, GCS, Azure Blob).
# Initialiser DVC dans un projet Git existant git init && dvc init # Configurer le stockage distant (S3) dvc remote add -d myremote s3://my-bucket/dvc-store dvc remote modify myremote region eu-west-1 # Tracker un dataset dvc add data/raw/transactions_2024.parquet # Cree: data/raw/transactions_2024.parquet.dvc (metadonnees) # Ajoute au .gitignore: data/raw/transactions_2024.parquet # Commiter les metadonnees dans Git git add data/raw/transactions_2024.parquet.dvc data/raw/.gitignore git commit -m "feat: add transactions dataset v1" # Pousser les donnees vers le stockage distant dvc push # --- Plus tard: mettre a jour le dataset --- # (apres modification du fichier parquet) dvc add data/raw/transactions_2024.parquet git add data/raw/transactions_2024.parquet.dvc git commit -m "feat: update transactions dataset v2" dvc push # --- Rollback vers la version precedente --- git checkout HEAD~1 -- data/raw/transactions_2024.parquet.dvc dvc checkout # Les donnees v1 sont restaurees !
stages:
prepare:
cmd: python src/prepare.py
deps:
- data/raw/transactions_2024.parquet
- src/prepare.py
params:
- prepare.min_amount
- prepare.date_range
outs:
- data/prepared/transactions_clean.parquet
features:
cmd: python src/featurize.py
deps:
- data/prepared/transactions_clean.parquet
- src/featurize.py
params:
- features.window_size
- features.aggregations
outs:
- data/features/feature_store.parquet
train:
cmd: python src/train.py
deps:
- data/features/feature_store.parquet
- src/train.py
params:
- train.model_type
- train.learning_rate
outs:
- models/fraud_detector.pkl
metrics:
- metrics/scores.json:
cache: false
lakeFS - Git pour le Data Lake
lakeFS fonctionne comme Git mais directement sur le Data Lake (S3, GCS, Azure). Il permet le branching, committing et merging de donnees a l'echelle du petaoctet.
import lakefs_sdk
from lakefs_sdk.client import LakeFSClient
client = LakeFSClient(
configuration=lakefs_sdk.Configuration(
host="http://localhost:8000/api/v1",
username="AKIAIOSFODNN7EXAMPLE",
password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)
)
# Creer un repository
client.repositories_api.create_repository(
repository_creation={
"name": "analytics-lake",
"storage_namespace": "s3://my-bucket/analytics/",
"default_branch": "main"
}
)
# Creer une branche pour une experimentation
client.branches_api.create_branch(
repository="analytics-lake",
branch_creation={
"name": "experiment/new-fraud-features",
"source": "main"
}
)
# Uploader des donnees sur la branche
with open("data/fraud_features_v2.parquet", "rb") as f:
client.objects_api.upload_object(
repository="analytics-lake",
branch="experiment/new-fraud-features",
path="features/fraud_features.parquet",
content=f
)
# Commiter
client.commits_api.commit(
repository="analytics-lake",
branch="experiment/new-fraud-features",
commit_creation={
"message": "Add new fraud detection features v2",
"metadata": {
"author": "data-team",
"ticket": "DATA-1234"
}
}
)
# Merger vers main apres validation
client.refs_api.merge_into_branch(
repository="analytics-lake",
source_ref="experiment/new-fraud-features",
destination_branch="main"
)
Nessie (Project Nessie)
Nessie apporte le versionnement Git-like directement au niveau du catalogue de donnees (Apache Iceberg, Delta Lake). Contrairement a lakeFS qui travaille au niveau fichier, Nessie versionne les tables.
-- Creer une branche pour les modifications CREATE BRANCH experiment_branch IN nessie FROM main; -- Travailler sur la branche USE REFERENCE experiment_branch IN nessie; -- Modifier les tables sur la branche ALTER TABLE analytics.fct_revenue ADD COLUMN new_metric DOUBLE; INSERT INTO analytics.fct_revenue SELECT *, calculate_new_metric(*) FROM analytics.fct_revenue@main; -- Reference croisee -- Valider les resultats SELECT COUNT(*), AVG(new_metric) FROM analytics.fct_revenue; -- Merger vers main MERGE BRANCH experiment_branch INTO main IN nessie;
Comparaison des outils
| Critere | DVC | lakeFS | Nessie |
|---|---|---|---|
| Granularite | Fichier | Objet (fichier) | Table (catalogue) |
| Integration Git | Extension Git native | API Git-like separee | API Git-like separee |
| Cas d'usage principal | ML Pipelines | Data Lake versioning | Lakehouse versioning |
| Echelle | GB-TB | TB-PB | TB-PB |
| Branching | Via Git | Zero-copy branches | Zero-copy branches |
| Format | Tout fichier | Tout objet S3/GCS | Iceberg, Delta Lake |
| Complexite setup | Faible | Moyenne | Elevee |
Time Travel Strategies
Le Time Travel permet de requeter les donnees a un point precis dans le temps, sans avoir besoin de snapshots manuels.
-- Snowflake Time Travel (jusqu'a 90 jours)
SELECT * FROM fct_revenue
AT(TIMESTAMP => '2024-03-15 10:00:00'::TIMESTAMP);
-- Comparer deux versions
SELECT
COALESCE(a.revenue_id, b.revenue_id) AS revenue_id,
a.net_amount AS amount_before,
b.net_amount AS amount_after,
b.net_amount - a.net_amount AS delta
FROM fct_revenue AT(OFFSET => -3600) a -- Il y a 1 heure
FULL OUTER JOIN fct_revenue b
ON a.revenue_id = b.revenue_id
WHERE a.net_amount != b.net_amount
OR a.revenue_id IS NULL
OR b.revenue_id IS NULL;
-- Delta Lake Time Travel (versions illimitees)
SELECT * FROM delta.`/data/fct_revenue`
VERSION AS OF 42;
SELECT * FROM delta.`/data/fct_revenue`
TIMESTAMP AS OF '2024-03-15 10:00:00';
-- Apache Iceberg Time Travel
SELECT * FROM analytics.fct_revenue
FOR SYSTEM_TIME AS OF TIMESTAMP '2024-03-15 10:00:00';
-- Iceberg: lister les snapshots disponibles
SELECT * FROM analytics.fct_revenue.snapshots;
ING Bank - Versionnement de donnees reglementaires
ING Bank, l'une des plus grandes banques europeennes, utilise le versionnement de donnees pour repondre aux exigences reglementaires (BCBS 239, GDPR) :
- lakeFS en production : chaque rapport reglementaire est genere a partir d'une branche "figee" du data lake, garantissant la reproductibilite
- Audit trail complet : chaque modification de donnee est tracee avec l'identite de l'auteur, la date et la justification
- Rollback reglementaire : en cas de detection d'erreur dans un rapport soumis, ING peut retrouver l'etat exact des donnees au moment de la generation
- Resultat : temps de reponse aux auditeurs passe de semaines a heures, conformite BCBS 239 atteinte a 98%
Cout du versionnement
Le versionnement de donnees a un cout en stockage. lakeFS et Nessie utilisent le "zero-copy branching" (pas de duplication physique), mais DVC duplique les fichiers. Planifiez votre politique de retention : combien de versions conserver et pendant combien de temps.
Quiz - Version Control Data
Quel outil est le mieux adapte pour versionner un Data Lake a l'echelle du petaoctet ?
Quelle est la granularite de versionnement de Nessie ?
Projet : Pipeline Streaming de Detection de Fraude
Objectifs du projet
- Concevoir et implementer un pipeline de detection de fraude en temps reel
- Integrer Kafka, Flink et un systeme de data quality
- Mettre en place le monitoring et les alertes
- Appliquer les principes DataOps au pipeline complet
Ce projet est le point culminant de la Phase 5. Vous allez combiner tout ce que vous avez appris : streaming, data quality, observabilite et DataOps dans un pipeline de detection de fraude realiste. C'est exactement le type de projet que vous aurez en entretien pour un poste de Data Architect senior.
Architecture du pipeline
Transactions Kafka Flink Actions
============ ======== ======== =========
POS Terminal βββΆ ββββββββββ ββββββββββββββββ ββββββββββββ
β topic: β β β β Alerting β
Mobile App βββΆ β raw-txnββββΆβ Flink Job ββββΆβ (PagerD) β
β β β β ββββββββββββ
Web Payment βββΆ ββββββββββ β βββββββββββββ β
β β Rules ββ ββββββββββββ
ATM βββΆ ββββββββββ β β Engine βββββΆβ Block β
β topic: β β βββββββββββββ β Card API β
Schema βββΆ β enrichedβββββ β ββββββββββββ
Registry β -txn β β βββββββββββββ β
ββββββββββ β β ML Model ββ ββββββββββββ
β β Scoring βββββΆβ Dashboardβ
ββββββββββ β βββββββββββββ β Grafana β
β topic: β β β ββββββββββββ
β fraud- ββββββ βββββββββββββ
β alerts β β β Quality ββ ββββββββββββ
ββββββββββ β β Checks βββββΆβ Data Lakeβ
β βββββββββββββ β (Iceberg)β
ββββββββββ β β ββββββββββββ
β topic: ββββββ β
β metricsβ ββββββββββββββββ
ββββββββββ
Etape 1 : Configuration Kafka
1.1 - Definir les schemas Avro
Commencez par definir les schemas pour les evenements de transaction. L'utilisation d'un Schema Registry garantit la compatibilite entre producteurs et consommateurs.
{
"type": "record",
"name": "Transaction",
"namespace": "com.fraud.detection",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "card_id", "type": "string"},
{"name": "customer_id", "type": "string"},
{"name": "merchant_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "EUR"},
{"name": "merchant_category", "type": "string"},
{"name": "country_code", "type": "string"},
{"name": "channel", "type": {
"type": "enum",
"name": "Channel",
"symbols": ["POS", "ONLINE", "ATM", "MOBILE"]
}},
{"name": "timestamp", "type": {
"type": "long",
"logicalType": "timestamp-millis"
}},
{"name": "geo_lat", "type": ["null", "double"], "default": null},
{"name": "geo_lon", "type": ["null", "double"], "default": null}
]
}
1.2 - Configurer les topics Kafka
# Topic principal : transactions brutes kafka-topics --create \ --topic raw-transactions \ --partitions 12 \ --replication-factor 3 \ --config retention.ms=604800000 \ --config cleanup.policy=delete \ --config min.insync.replicas=2 \ --bootstrap-server kafka:9092 # Topic : transactions enrichies kafka-topics --create \ --topic enriched-transactions \ --partitions 12 \ --replication-factor 3 \ --config retention.ms=2592000000 \ --bootstrap-server kafka:9092 # Topic : alertes de fraude kafka-topics --create \ --topic fraud-alerts \ --partitions 6 \ --replication-factor 3 \ --config retention.ms=-1 \ --config cleanup.policy=compact \ --bootstrap-server kafka:9092 # Topic : metriques pipeline kafka-topics --create \ --topic pipeline-metrics \ --partitions 3 \ --replication-factor 2 \ --bootstrap-server kafka:9092
Etape 2 : Job Flink de detection de fraude
2.1 - Regles de detection basees sur des fenetres temporelles
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
env.enableCheckpointing(60000); // Checkpoint toutes les 60s
// Source : Kafka raw-transactions
KafkaSource<Transaction> source = KafkaSource
.<Transaction>builder()
.setBootstrapServers("kafka:9092")
.setTopics("raw-transactions")
.setGroupId("fraud-detection-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new TransactionDeserializer())
.build();
DataStream<Transaction> transactions = env
.fromSource(source, WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(
Duration.ofSeconds(10))
.withTimestampAssigner((txn, ts) ->
txn.getTimestamp()),
"Kafka Source");
// Regle 1: Velocity Check
// Plus de 5 transactions en 1 minute pour la meme carte
DataStream<FraudAlert> velocityAlerts = transactions
.keyBy(Transaction::getCardId)
.window(TumblingEventTimeWindows.of(
Time.minutes(1)))
.process(new VelocityCheckFunction(5));
// Regle 2: Amount Anomaly
// Montant > 3x la moyenne des 24 dernieres heures
DataStream<FraudAlert> amountAlerts = transactions
.keyBy(Transaction::getCardId)
.window(SlidingEventTimeWindows.of(
Time.hours(24), Time.minutes(5)))
.process(new AmountAnomalyFunction(3.0));
// Regle 3: Geo Velocity
// Transaction dans un pays different en < 2h
DataStream<FraudAlert> geoAlerts = transactions
.keyBy(Transaction::getCardId)
.process(new GeoVelocityFunction(
Duration.ofHours(2)));
// Union des alertes + deduplication
velocityAlerts
.union(amountAlerts, geoAlerts)
.keyBy(FraudAlert::getTransactionId)
.process(new AlertDeduplicationFunction())
.sinkTo(kafkaAlertsSink());
env.execute("Fraud Detection Pipeline v2.1");
}
}
2.2 - Fonction de verification de velocite
public class VelocityCheckFunction
extends ProcessWindowFunction<
Transaction, FraudAlert, String, TimeWindow> {
private final int threshold;
public VelocityCheckFunction(int threshold) {
this.threshold = threshold;
}
@Override
public void process(
String cardId,
Context context,
Iterable<Transaction> transactions,
Collector<FraudAlert> out) {
List<Transaction> txnList = new ArrayList<>();
transactions.forEach(txnList::add);
if (txnList.size() > threshold) {
double totalAmount = txnList.stream()
.mapToDouble(Transaction::getAmount)
.sum();
FraudAlert alert = FraudAlert.builder()
.alertId(UUID.randomUUID().toString())
.transactionId(
txnList.get(txnList.size()-1)
.getTransactionId())
.cardId(cardId)
.ruleTriggered("VELOCITY_CHECK")
.severity(txnList.size() > threshold * 2
? "CRITICAL" : "HIGH")
.details(String.format(
"%d transactions (%.2f EUR) in 1 min",
txnList.size(), totalAmount))
.timestamp(context.window().getEnd())
.build();
out.collect(alert);
}
}
}
Etape 3 : Data Quality en temps reel
3.1 - Controles de qualite dans le flux
public class DataQualityFilter
extends ProcessFunction<Transaction, Transaction> {
// Compteurs de metriques
private transient Counter validCounter;
private transient Counter invalidCounter;
private transient Counter lateCounter;
// Side output pour les enregistrements invalides
static final OutputTag<QualityIssue> QUALITY_ISSUES =
new OutputTag<>("quality-issues"){};
@Override
public void open(Configuration params) {
validCounter = getRuntimeContext()
.getMetricGroup().counter("valid_transactions");
invalidCounter = getRuntimeContext()
.getMetricGroup().counter("invalid_transactions");
lateCounter = getRuntimeContext()
.getMetricGroup().counter("late_transactions");
}
@Override
public void processElement(
Transaction txn,
Context ctx,
Collector<Transaction> out) {
List<String> issues = new ArrayList<>();
// Controle 1: Champs obligatoires
if (txn.getTransactionId() == null)
issues.add("transaction_id is null");
if (txn.getAmount() <= 0)
issues.add("amount must be positive: "
+ txn.getAmount());
if (txn.getCardId() == null)
issues.add("card_id is null");
// Controle 2: Fraicheur
long delay = System.currentTimeMillis()
- txn.getTimestamp();
if (delay > 300_000) { // > 5 min de retard
issues.add("late event: " + delay + "ms");
lateCounter.inc();
}
// Controle 3: Plausibilite
if (txn.getAmount() > 1_000_000)
issues.add("suspiciously high amount: "
+ txn.getAmount());
if (issues.isEmpty()) {
validCounter.inc();
out.collect(txn);
} else {
invalidCounter.inc();
ctx.output(QUALITY_ISSUES,
new QualityIssue(txn, issues));
}
}
}
Etape 4 : Monitoring et alertes
4.1 - Dashboard Grafana et metriques
version: "3.8"
services:
prometheus:
image: prom/prometheus:v2.51.0
ports: ["9090:9090"]
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:10.4.0
ports: ["3000:3000"]
environment:
- GF_SECURITY_ADMIN_PASSWORD=secret
volumes:
- ./dashboards:/var/lib/grafana/dashboards
- ./datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
alertmanager:
image: prom/alertmanager:v0.27.0
ports: ["9093:9093"]
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
groups:
- name: fraud_pipeline_alerts
rules:
- alert: HighFraudRate
expr: |
rate(fraud_alerts_total[5m])
/ rate(transactions_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Taux de fraude anormalement eleve"
description: |
Le taux de fraude depasse 5% sur les 5
dernieres minutes. Valeur actuelle: {{ $value }}
- alert: PipelineLatencyHigh
expr: |
histogram_quantile(0.99,
rate(processing_latency_seconds_bucket[5m])
) > 2
for: 3m
labels:
severity: warning
annotations:
summary: "Latence p99 superieure a 2 secondes"
- alert: DataQualityDegradation
expr: |
rate(invalid_transactions[5m])
/ rate(valid_transactions[5m]) > 0.01
for: 10m
labels:
severity: warning
annotations:
summary: "Plus de 1% de transactions invalides"
- alert: ConsumerLag
expr: kafka_consumer_lag > 10000
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer lag depasse 10000 messages"
Etape 5 : Pipeline CI/CD du projet
5.1 - Tests et deploiement automatise
name: Fraud Pipeline CI/CD
on:
pull_request:
branches: [main]
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports: ["9092:9092"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
java-version: '17'
distribution: 'temurin'
- name: Unit Tests
run: mvn test -pl fraud-detection-core
- name: Integration Tests
run: |
mvn verify -pl fraud-detection-integration \
-Dkafka.bootstrap.servers=localhost:9092
- name: Schema Compatibility Check
run: |
curl -X POST \
http://schema-registry:8081/compatibility/subjects/raw-transactions-value/versions/latest \
-H "Content-Type: application/json" \
-d @schemas/transaction-v2.avsc
deploy:
needs: test
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- name: Deploy to Kubernetes
run: |
kubectl apply -f k8s/flink-job.yml
kubectl rollout status deployment/fraud-detection
Criteres de validation du projet
Pour valider ce projet, votre pipeline doit : (1) traiter au moins 1000 transactions/seconde, (2) detecter les fraudes en moins de 2 secondes (latence p99), (3) avoir un taux de faux positifs inferieur a 5%, (4) inclure des tests automatises et du monitoring, (5) supporter un deploiement blue/green sans perte de donnees.
Pour aller plus loin
Ajoutez un modele de ML (Random Forest ou XGBoost) entraine sur des donnees historiques pour scorer chaque transaction en complement des regles metier. Utilisez le pattern "Champion/Challenger" pour comparer les performances du modele ML avec les regles classiques en production.
Examen Final - Phase 5 : Data Engineering Avance
Consignes de l'examen
- 12 questions couvrant l'ensemble de la Phase 5
- Streaming, Data Quality, Observabilite, DataOps, CI/CD, Testing, Versionnement
- Prenez le temps de bien lire chaque question avant de repondre
- Un score de 80% ou plus valide la Phase 5
Cet examen couvre tout ce que vous avez appris dans la Phase 5. Pas de piege : si vous avez bien suivi les lecons et fait les exercices, vous etes pret. Concentrez-vous sur les concepts fondamentaux, pas sur la memorisation de syntaxe. Un Data Architect doit comprendre le "pourquoi" autant que le "comment".
Examen Final - Phase 5
Question 1/12 - Streaming : Quelle est la difference fondamentale entre le traitement "at-least-once" et "exactly-once" dans Kafka ?
Question 2/12 - Kafka Architecture : A quoi sert le Schema Registry dans une architecture Kafka ?
Question 3/12 - Apache Flink : Quelle est la difference entre une fenetre Tumbling et une fenetre Sliding dans Flink ?
Question 4/12 - Data Quality : Quelles sont les 6 dimensions de la qualite des donnees selon le framework DAMA ?
Question 5/12 - Data Observability : Quel outil permet de definir des "expectations" sur les donnees de maniere declarative ?
Question 6/12 - Data Contracts : Quel est le role principal d'un Data Contract entre un producteur et un consommateur ?
Question 7/12 - DataOps : Quel est le premier principe du Manifeste DataOps ?
Question 8/12 - CI/CD dbt : Que fait la commande dbt build --select state:modified+ --defer --state ./target ?
state:modified+ selectionne les modeles modifies et leurs dependances en aval (+), --defer utilise le manifest de production pour les modeles non modifies, et --state pointe vers le manifest de reference.Question 9/12 - Testing : Dans la pyramide de tests data, quel type de test est le plus rapide et le plus nombreux ?
Question 10/12 - Version Control : Quelle est la principale difference entre DVC et lakeFS ?
Question 11/12 - Streaming Avance : Dans un pipeline de detection de fraude en temps reel, pourquoi utilise-t-on des "side outputs" dans Flink ?
Question 12/12 - Architecture globale : Vous devez concevoir un pipeline data pour une banque qui doit traiter 100K transactions/seconde avec une latence maximale de 500ms et repondre aux exigences reglementaires BCBS 239. Quelle combinaison de technologies est la plus appropriee ?
Bilan de la Phase 5
Felicitations ! Vous avez parcouru l'ensemble des sujets avances du Data Engineering :
- Streaming : Kafka, Flink, Spark Streaming, architectures event-driven
- Data Quality : dimensions, Great Expectations, Soda, data contracts
- Observabilite : 5 piliers, monitoring, lineage, anomaly detection
- DataOps : manifeste, maturite, automatisation, collaboration
- CI/CD : GitHub Actions, pre-commit hooks, slim CI, blue/green deployments
- Testing : pyramide de tests, unit, integration, contrat, regression
- Versionnement : DVC, lakeFS, Nessie, Time Travel
Vous etes maintenant pret pour la Phase 6 : Specialisation !