1. Apache Kafka CLI
Topics
# Creer un topic
kafka-topics.sh --create --topic orders \
--bootstrap-server broker:9092 \
--partitions 12 --replication-factor 3
# Lister les topics
kafka-topics.sh --list --bootstrap-server broker:9092
# Decrire un topic (partitions, ISR, leader)
kafka-topics.sh --describe --topic orders --bootstrap-server broker:9092
# Modifier le nombre de partitions (augmentation uniquement)
kafka-topics.sh --alter --topic orders --partitions 24 --bootstrap-server broker:9092
Console Producer / Consumer
# Produire avec cle
kafka-console-producer.sh --topic orders --bootstrap-server broker:9092 \
--property "parse.key=true" --property "key.separator=:"
# Consommer depuis le debut
kafka-console-consumer.sh --topic orders --from-beginning \
--bootstrap-server broker:9092
# Consommer avec cle + groupe
kafka-console-consumer.sh --topic orders --group my-group \
--property print.key=true --property key.separator=":" \
--bootstrap-server broker:9092
Consumer Groups
# Lister les consumer groups
kafka-consumer-groups.sh --list --bootstrap-server broker:9092
# Decrire un group (lag, offset, partitions)
kafka-consumer-groups.sh --describe --group my-group --bootstrap-server broker:9092
# Reset offsets (--to-earliest, --to-latest, --shift-by N)
kafka-consumer-groups.sh --reset-offsets --group my-group \
--topic orders --to-earliest --execute --bootstrap-server broker:9092
2. Kafka Configuration Production
Producer Config
| Parametre | Valeur Prod | Pourquoi |
acks | all | Tous les ISR confirment |
enable.idempotence | true | Eviter les doublons |
retries | 5 | Retry sur erreur transiente |
linger.ms | 20 | Batching pour performance |
compression.type | lz4 | Bon ratio vitesse/taille |
batch.size | 32768 | 32 KB par batch |
Broker Config
| Parametre | Valeur Prod | Pourquoi |
replication.factor | 3 | Tolerer 1 broker down |
min.insync.replicas | 2 | Pas d'ecriture si < 2 ISR |
unclean.leader.election | false | Jamais un non-ISR comme leader |
log.retention.hours | 168 | 7 jours de retention |
3. Kafka Connect & Schema Registry
Connect REST API
# Lister les connecteurs
GET http://connect:8083/connectors
# Deployer un connecteur
POST http://connect:8083/connectors
{"name": "pg-cdc", "config": {...}}
# Statut d'un connecteur
GET http://connect:8083/connectors/pg-cdc/status
# Pause / Resume / Delete
PUT http://connect:8083/connectors/pg-cdc/pause
PUT http://connect:8083/connectors/pg-cdc/resume
DELETE http://connect:8083/connectors/pg-cdc
Schema Registry
# Lister les subjects
GET http://schema-registry:8081/subjects
# Obtenir le dernier schema
GET http://schema-registry:8081/subjects/orders-value/versions/latest
# Tester la compatibilite
POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest
{"schema": "{...}"}
# Changer le mode de compatibilite
PUT http://schema-registry:8081/config/orders-value
{"compatibility": "FULL"}
Compatibilite des schemas
| Mode | Ajouter champ | Supprimer champ | Usage |
| BACKWARD | Avec default | Non | Consumer mis a jour en premier |
| FORWARD | Non | Avec default | Producer mis a jour en premier |
| FULL | Avec default | Avec default | Recommande en production |
| NONE | Oui | Oui | Dev/test uniquement |
4. ksqlDB - SQL sur les Streams
-- Creer un stream depuis un topic
CREATE STREAM orders_stream (
order_id VARCHAR KEY,
amount DOUBLE,
product VARCHAR,
created_at TIMESTAMP
) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO');
-- Creer une table materialisee
CREATE TABLE customers (
customer_id VARCHAR PRIMARY KEY,
name VARCHAR
) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='AVRO');
-- Query continue avec jointure
CREATE STREAM enriched AS
SELECT o.order_id, o.amount, c.name
FROM orders_stream o
LEFT JOIN customers c ON o.customer_id = c.customer_id
EMIT CHANGES;
-- Agregation avec fenetre
CREATE TABLE revenue_hourly AS
SELECT product, COUNT(*) AS cnt, SUM(amount) AS total
FROM orders_stream
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY product EMIT CHANGES;
-- Pull query (ponctuelle)
SELECT * FROM revenue_hourly WHERE product='laptop';
5. Apache Flink - Essentiel
Architecture
| Composant | Role |
| JobManager | Scheduling, checkpoints, resource management |
| TaskManager | Execution des taches dans des slots |
| State Backend | RocksDB (disk) ou HashMap (memory) |
| Checkpoint | Snapshot distribue (Chandy-Lamport) → S3/HDFS |
Flink SQL
-- Source Kafka avec watermark
CREATE TABLE orders (
order_id STRING,
amount DOUBLE,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'format' = 'avro-confluent',
'scan.startup.mode' = 'latest-offset'
);
-- Agregation fenetre tumbling
SELECT product,
TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS w_start,
COUNT(*) AS cnt, SUM(amount) AS revenue
FROM orders
GROUP BY product, TUMBLE(event_time, INTERVAL '10' MINUTE);
Checkpoint interval recommande : 60-300s. Trop frequent = overhead, trop rare = replay long apres echec.
6. Spark Structured Streaming
# Lire depuis Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load()
# Deserialiser et traiter
orders = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# Watermark + window aggregation
result = orders \
.withWatermark("timestamp", "5 minutes") \
.groupBy(window("timestamp", "10 minutes"), "product") \
.agg(count("*"), sum("amount"))
# Ecrire
query = result.writeStream \
.outputMode("update") \
.format("console") \
.trigger(processingTime="30 seconds") \
.start()
Output Modes
| Mode | Description | Usage |
| Append | Nouvelles lignes uniquement | Sans agregation ou apres watermark |
| Update | Lignes modifiees | Agregations avec mises a jour |
| Complete | Table entiere | Petit nombre de groupes |
7. Comparaison Frameworks Streaming
| Critere | Kafka Streams | Flink | Spark Streaming | Beam |
| Modele | True streaming | True streaming | Micro-batch | Abstraction |
| Latence | ms | ms | secondes | Depend runner |
| Sources | Kafka only | Kafka, JDBC, files | Kafka, files | Kafka, GCS, BQ |
| Deploy | Librairie (JVM) | Cluster dedie | Spark cluster | Runner |
| SQL | ksqlDB (separe) | Flink SQL | Spark SQL | Beam SQL |
| Batch+Stream | Non | Oui | Oui | Oui |
| Ideal | Microservices K→K | Low-latency complexe | Equipes Spark | GCP / multi-cloud |
8. Event-Driven Architecture Patterns
| Pattern | Description | Quand |
| Event Notification | Event minimal (ID seul), consumer rappelle la source | Faible couplage, event leger |
| Event-Carried State | Event contient toutes les donnees | Consumer autonome, 80% des cas |
| Event Sourcing | Stocker TOUS les events (append-only) | Audit trail, replay, temporal queries |
| CQRS | Separer lecture (query) et ecriture (command) | Multi-read models optimises |
Dead Letter Queue (DLQ)
1. Consumer recoit message
2. Traitement echoue → retry (exponential backoff)
3. Max retries atteint → envoyer dans topic.dlq
4. DLQ contient : original message + error reason + stack trace
5. Equipe ops investigue et rejoue manuellement
Toujours prefixer les DLQ topics : orders.dlq, payments.dlq. Ne jamais drop silencieusement un message.
9. Moteurs OLAP Temps Reel
| Critere | ClickHouse | Apache Druid | Apache Pinot |
| Point fort | SQL complet, polyvalent | Time-series OLAP | User-facing haute concurrence |
| Latence | 10ms - 500ms | 100ms - 1s | 10ms - 200ms |
| Concurrence | Moderee-haute | Moderee | Tres haute |
| Ingestion RT | Kafka Engine + MV | Kafka native | Kafka native |
| Joins | Complets | Limites (lookups) | Limites (lookups) |
| Cloud manage | ClickHouse Cloud | Imply Cloud | StarTree Cloud |
ClickHouse - Materialized View Pattern
-- 1. Kafka Engine (source)
CREATE TABLE events_queue ENGINE = Kafka SETTINGS ...;
-- 2. MergeTree (stockage)
CREATE TABLE events (...) ENGINE = MergeTree()
ORDER BY (dimension, timestamp);
-- 3. Materialized View (auto-ingestion)
CREATE MATERIALIZED VIEW events_mv TO events AS SELECT * FROM events_queue;
-- 4. Aggregated MV (pre-calcul)
CREATE MATERIALIZED VIEW stats_mv ENGINE = AggregatingMergeTree() ORDER BY (...)
AS SELECT ..., countState(), sumState(amount) FROM events GROUP BY ...;
10. Data Quality - 6 Dimensions
| Dimension | Question | Metrique | Check |
| Completeness | Manque-t-il des donnees ? | % NULLs | NOT NULL, row_count > 0 |
| Accuracy | Donnees correctes ? | % valeurs valides | age BETWEEN 0 AND 150 |
| Consistency | Coherence inter-tables ? | % correspondances | total = SUM(items) |
| Timeliness | Donnees fraiches ? | Freshness | MAX(updated) > NOW()-1h |
| Validity | Format correct ? | % conformes | email LIKE '%@%.%' |
| Uniqueness | Pas de doublons ? | % doublons | COUNT(DISTINCT id)=COUNT(*) |
11. Great Expectations (GX)
import great_expectations as gx
context = gx.get_context()
datasource = context.data_sources.add_postgres(name="prod", connection_string="...")
asset = datasource.add_table_asset(name="orders", table_name="orders")
batch = asset.add_batch_definition_whole_table("full")
# Creer une suite
suite = context.suites.add(gx.ExpectationSuite(name="orders_quality"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="order_id"))
suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween(
column="amount", min_value=0, max_value=1000000))
# Checkpoint
checkpoint = context.checkpoints.add(gx.Checkpoint(
name="daily", validation_definitions=[validation], actions=[...]))
result = checkpoint.run()
12. Soda - Data Quality en YAML
# soda/checks/orders.yml
checks for orders:
# Completeness
- missing_count(order_id) = 0
- missing_percent(email) < 1
# Uniqueness
- duplicate_count(order_id) = 0
# Validity
- invalid_percent(amount) < 0.1:
valid min: 0
valid max: 1000000
# Volume
- row_count between 1000 and 10000000
# Freshness
- freshness(updated_at) < 2h
# Custom SQL
- my_check:
my_check query: |
SELECT COUNT(*) FROM ...
fail: when > 0
# Executer
pip install soda-core-postgres
soda scan -d production -c soda/config.yml soda/checks/orders.yml
13. Data Observability - 5 Piliers
| Pilier | Question | Metrique | Alerte |
| Freshness | Donnees a jour ? | NOW()-MAX(updated_at) | Si freshness > SLO |
| Volume | Volume normal ? | Row count vs baseline | Si ecart > 20% |
| Schema | Schema change ? | Hash du schema | Colonne ajoutee/supprimee |
| Distribution | Valeurs normales ? | Histogrammes, z-score | Si z-score > 3 |
| Lineage | D'ou viennent les donnees ? | Graphe de deps | Impact analysis |
SLOs Data Quality par Tier
| Tier | Completeness | Duplicates | Freshness | Alerting |
| Tier 1 (critical) | 99.9% | < 0.01% | < 1h | PagerDuty |
| Tier 2 (important) | 99% | < 0.1% | < 4h | Slack urgent |
| Tier 3 (best-effort) | 95% | < 1% | < 24h | Email digest |
14. Data Contracts
# datacontract.yaml
dataContractSpecification: 0.9
id: orders-contract-v2
info:
title: Orders Data Contract
version: 2.1.0
owner: order-team
models:
Order:
fields:
order_id: {type: string, required: true, unique: true}
amount: {type: double, required: true, minimum: 0}
status: {type: string, enum: [CREATED, CONFIRMED, SHIPPED]}
quality:
type: SodaCL
specification:
checks for orders:
- freshness(created_at) < 1h
- duplicate_count(order_id) = 0
sla:
availability: 99.9%
freshness: 1h
Regles d'or : 1) Producer responsable de la qualite. 2) Contract versionne (semver) en Git. 3) Breaking changes = deprecation process. 4) CI/CD teste la compatibilite.
15. CI/CD pour dbt
Slim CI (essentiel pour les gros projets)
# Telecharger le manifest de production (reference)
aws s3 cp s3://dbt-artifacts/prod/manifest.json target/manifest.json
# Slim CI : ne run que les modeles modifies + dependants
dbt run --select state:modified+ --defer --state target --target ci
# Tests uniquement sur les modeles modifies
dbt test --select state:modified+ --defer --state target --target ci
# Apres deploy prod : uploader le nouveau manifest
aws s3 cp target/manifest.json s3://dbt-artifacts/prod/manifest.json
Promotion d'environnements
| Env | Objectif | Donnees | Trigger |
| Dev | Developpement local | Echantillon | Manuel |
| CI | Tests auto (PR) | Schema PR-specific, defer prod | Pull Request |
| Staging | Validation pre-prod | Copie recente de prod | Merge staging |
| Prod | Live | Donnees reelles | Merge main |
16. Testing Strategies Data
| Type | Quoi | Outil | Quand |
| Unit | Logique SQL individuelle | dbt unit tests (1.8+), pytest | A chaque commit |
| Integration | Pipeline staging | pytest + dbt + Soda | A chaque PR |
| Data Diff | Compare staging vs prod | Datafold | A chaque PR |
| E2E | Pipeline complet | Custom scripts | Deploy staging |
| Quality | Qualite donnees sortie | GX, Soda, dbt tests | Continu |
dbt Unit Test (1.8+)
# tests/unit/test_ltv.yml
unit_tests:
- name: test_ltv_calculation
model: fct_customer_ltv
given:
- input: ref('stg_orders')
rows:
- {customer_id: "C1", amount: 100}
- {customer_id: "C1", amount: 200}
expect:
rows:
- {customer_id: "C1", total_revenue: 300}
17. lakeFS - Version Control Data Lake
# Creer un repository
lakectl repo create lakefs://analytics s3://my-data-lake/
# Creer une branche
lakectl branch create lakefs://analytics/feature-model \
--source lakefs://analytics/main
# Upload et commit
lakectl fs upload lakefs://analytics/feature-model/raw/orders/ \
--source local/orders.parquet
lakectl commit lakefs://analytics/feature-model -m "Add orders data"
# Merge dans main
lakectl merge lakefs://analytics/feature-model lakefs://analytics/main
# Rollback
lakectl branch revert lakefs://analytics/main --commit abc123
lakeFS utilise le zero-copy branching : creer une branche ne duplique pas les donnees. Compatible S3 → fonctionne avec Spark, Presto, dbt nativement.
18. DVC - Data Version Control
# Init
git init && dvc init
dvc remote add -d myremote s3://bucket/dvc-store
# Tracker un dataset
dvc add data/training.csv
git add data/training.csv.dvc .gitignore
git commit -m "Track dataset v1"
dvc push
# Revenir a une version
git checkout v1.0 -- data/training.csv.dvc
dvc checkout
# Pipeline reproductible
dvc run -n preprocess -d src/preprocess.py -d data/raw.csv \
-o data/processed.csv python src/preprocess.py
19. Garanties de Livraison Streaming
| Semantique | Garantie | Risque | Usage |
| At-most-once | 0 ou 1 fois | Perte possible | Logs, metriques non critiques |
| At-least-once | 1 ou N fois | Doublons possibles | Default Kafka, la plupart |
| Exactly-once | Exactement 1 fois | Complexe a implementer | Transactions financieres |
Exactly-once en pratique
1. Producer : enable.idempotence=true + acks=all
2. Kafka Transactions : read-process-write atomique
3. Flink : checkpoints Chandy-Lamport
4. Sink : UPSERT (idempotent) au lieu d'INSERT
→ "Effectively-once" = at-least-once + deduplication a chaque etape
20. Checklist - Pipeline Streaming Production
- Kafka : replication.factor=3, min.insync.replicas=2
- Producer : acks=all, enable.idempotence=true
- Consumer : enable.auto.commit=false (commit manuel)
- Schema Registry : mode FULL compatibility
- Dead Letter Queue pour chaque consumer
- Monitoring : lag consumer, throughput, erreurs
- Data Quality : Soda/GX checks automatises
- Alerting : Freshness SLO, volume anomaly detection
- CI/CD : dbt Slim CI, contract testing
- Version control : lakeFS/DVC pour les donnees
- Documentation : lineage, data contracts, SLOs
- Disaster recovery : plan de rollback teste