Cheatsheet - Phase 5 : Data Engineering Avancee

Reference rapide : Streaming, Data Quality, DataOps

1. Apache Kafka CLI

Topics

# Creer un topic kafka-topics.sh --create --topic orders \ --bootstrap-server broker:9092 \ --partitions 12 --replication-factor 3 # Lister les topics kafka-topics.sh --list --bootstrap-server broker:9092 # Decrire un topic (partitions, ISR, leader) kafka-topics.sh --describe --topic orders --bootstrap-server broker:9092 # Modifier le nombre de partitions (augmentation uniquement) kafka-topics.sh --alter --topic orders --partitions 24 --bootstrap-server broker:9092

Console Producer / Consumer

# Produire avec cle kafka-console-producer.sh --topic orders --bootstrap-server broker:9092 \ --property "parse.key=true" --property "key.separator=:" # Consommer depuis le debut kafka-console-consumer.sh --topic orders --from-beginning \ --bootstrap-server broker:9092 # Consommer avec cle + groupe kafka-console-consumer.sh --topic orders --group my-group \ --property print.key=true --property key.separator=":" \ --bootstrap-server broker:9092

Consumer Groups

# Lister les consumer groups kafka-consumer-groups.sh --list --bootstrap-server broker:9092 # Decrire un group (lag, offset, partitions) kafka-consumer-groups.sh --describe --group my-group --bootstrap-server broker:9092 # Reset offsets (--to-earliest, --to-latest, --shift-by N) kafka-consumer-groups.sh --reset-offsets --group my-group \ --topic orders --to-earliest --execute --bootstrap-server broker:9092

2. Kafka Configuration Production

Producer Config

ParametreValeur ProdPourquoi
acksallTous les ISR confirment
enable.idempotencetrueEviter les doublons
retries5Retry sur erreur transiente
linger.ms20Batching pour performance
compression.typelz4Bon ratio vitesse/taille
batch.size3276832 KB par batch

Broker Config

ParametreValeur ProdPourquoi
replication.factor3Tolerer 1 broker down
min.insync.replicas2Pas d'ecriture si < 2 ISR
unclean.leader.electionfalseJamais un non-ISR comme leader
log.retention.hours1687 jours de retention

3. Kafka Connect & Schema Registry

Connect REST API

# Lister les connecteurs GET http://connect:8083/connectors # Deployer un connecteur POST http://connect:8083/connectors {"name": "pg-cdc", "config": {...}} # Statut d'un connecteur GET http://connect:8083/connectors/pg-cdc/status # Pause / Resume / Delete PUT http://connect:8083/connectors/pg-cdc/pause PUT http://connect:8083/connectors/pg-cdc/resume DELETE http://connect:8083/connectors/pg-cdc

Schema Registry

# Lister les subjects GET http://schema-registry:8081/subjects # Obtenir le dernier schema GET http://schema-registry:8081/subjects/orders-value/versions/latest # Tester la compatibilite POST http://schema-registry:8081/compatibility/subjects/orders-value/versions/latest {"schema": "{...}"} # Changer le mode de compatibilite PUT http://schema-registry:8081/config/orders-value {"compatibility": "FULL"}

Compatibilite des schemas

ModeAjouter champSupprimer champUsage
BACKWARDAvec defaultNonConsumer mis a jour en premier
FORWARDNonAvec defaultProducer mis a jour en premier
FULLAvec defaultAvec defaultRecommande en production
NONEOuiOuiDev/test uniquement

4. ksqlDB - SQL sur les Streams

-- Creer un stream depuis un topic CREATE STREAM orders_stream ( order_id VARCHAR KEY, amount DOUBLE, product VARCHAR, created_at TIMESTAMP ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='AVRO'); -- Creer une table materialisee CREATE TABLE customers ( customer_id VARCHAR PRIMARY KEY, name VARCHAR ) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='AVRO'); -- Query continue avec jointure CREATE STREAM enriched AS SELECT o.order_id, o.amount, c.name FROM orders_stream o LEFT JOIN customers c ON o.customer_id = c.customer_id EMIT CHANGES; -- Agregation avec fenetre CREATE TABLE revenue_hourly AS SELECT product, COUNT(*) AS cnt, SUM(amount) AS total FROM orders_stream WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY product EMIT CHANGES; -- Pull query (ponctuelle) SELECT * FROM revenue_hourly WHERE product='laptop';

5. Apache Flink - Essentiel

Architecture

ComposantRole
JobManagerScheduling, checkpoints, resource management
TaskManagerExecution des taches dans des slots
State BackendRocksDB (disk) ou HashMap (memory)
CheckpointSnapshot distribue (Chandy-Lamport) → S3/HDFS

Flink SQL

-- Source Kafka avec watermark CREATE TABLE orders ( order_id STRING, amount DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'format' = 'avro-confluent', 'scan.startup.mode' = 'latest-offset' ); -- Agregation fenetre tumbling SELECT product, TUMBLE_START(event_time, INTERVAL '10' MINUTE) AS w_start, COUNT(*) AS cnt, SUM(amount) AS revenue FROM orders GROUP BY product, TUMBLE(event_time, INTERVAL '10' MINUTE);
Checkpoint interval recommande : 60-300s. Trop frequent = overhead, trop rare = replay long apres echec.

6. Spark Structured Streaming

# Lire depuis Kafka df = spark.readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "broker:9092") \ .option("subscribe", "orders") \ .load() # Deserialiser et traiter orders = df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*") # Watermark + window aggregation result = orders \ .withWatermark("timestamp", "5 minutes") \ .groupBy(window("timestamp", "10 minutes"), "product") \ .agg(count("*"), sum("amount")) # Ecrire query = result.writeStream \ .outputMode("update") \ .format("console") \ .trigger(processingTime="30 seconds") \ .start()

Output Modes

ModeDescriptionUsage
AppendNouvelles lignes uniquementSans agregation ou apres watermark
UpdateLignes modifieesAgregations avec mises a jour
CompleteTable entierePetit nombre de groupes

7. Comparaison Frameworks Streaming

CritereKafka StreamsFlinkSpark StreamingBeam
ModeleTrue streamingTrue streamingMicro-batchAbstraction
LatencemsmssecondesDepend runner
SourcesKafka onlyKafka, JDBC, filesKafka, filesKafka, GCS, BQ
DeployLibrairie (JVM)Cluster dedieSpark clusterRunner
SQLksqlDB (separe)Flink SQLSpark SQLBeam SQL
Batch+StreamNonOuiOuiOui
IdealMicroservices K→KLow-latency complexeEquipes SparkGCP / multi-cloud

8. Event-Driven Architecture Patterns

PatternDescriptionQuand
Event NotificationEvent minimal (ID seul), consumer rappelle la sourceFaible couplage, event leger
Event-Carried StateEvent contient toutes les donneesConsumer autonome, 80% des cas
Event SourcingStocker TOUS les events (append-only)Audit trail, replay, temporal queries
CQRSSeparer lecture (query) et ecriture (command)Multi-read models optimises

Dead Letter Queue (DLQ)

1. Consumer recoit message 2. Traitement echoue → retry (exponential backoff) 3. Max retries atteint → envoyer dans topic.dlq 4. DLQ contient : original message + error reason + stack trace 5. Equipe ops investigue et rejoue manuellement
Toujours prefixer les DLQ topics : orders.dlq, payments.dlq. Ne jamais drop silencieusement un message.

9. Moteurs OLAP Temps Reel

CritereClickHouseApache DruidApache Pinot
Point fortSQL complet, polyvalentTime-series OLAPUser-facing haute concurrence
Latence10ms - 500ms100ms - 1s10ms - 200ms
ConcurrenceModeree-hauteModereeTres haute
Ingestion RTKafka Engine + MVKafka nativeKafka native
JoinsCompletsLimites (lookups)Limites (lookups)
Cloud manageClickHouse CloudImply CloudStarTree Cloud

ClickHouse - Materialized View Pattern

-- 1. Kafka Engine (source) CREATE TABLE events_queue ENGINE = Kafka SETTINGS ...; -- 2. MergeTree (stockage) CREATE TABLE events (...) ENGINE = MergeTree() ORDER BY (dimension, timestamp); -- 3. Materialized View (auto-ingestion) CREATE MATERIALIZED VIEW events_mv TO events AS SELECT * FROM events_queue; -- 4. Aggregated MV (pre-calcul) CREATE MATERIALIZED VIEW stats_mv ENGINE = AggregatingMergeTree() ORDER BY (...) AS SELECT ..., countState(), sumState(amount) FROM events GROUP BY ...;

10. Data Quality - 6 Dimensions

DimensionQuestionMetriqueCheck
CompletenessManque-t-il des donnees ?% NULLsNOT NULL, row_count > 0
AccuracyDonnees correctes ?% valeurs validesage BETWEEN 0 AND 150
ConsistencyCoherence inter-tables ?% correspondancestotal = SUM(items)
TimelinessDonnees fraiches ?FreshnessMAX(updated) > NOW()-1h
ValidityFormat correct ?% conformesemail LIKE '%@%.%'
UniquenessPas de doublons ?% doublonsCOUNT(DISTINCT id)=COUNT(*)

11. Great Expectations (GX)

import great_expectations as gx context = gx.get_context() datasource = context.data_sources.add_postgres(name="prod", connection_string="...") asset = datasource.add_table_asset(name="orders", table_name="orders") batch = asset.add_batch_definition_whole_table("full") # Creer une suite suite = context.suites.add(gx.ExpectationSuite(name="orders_quality")) suite.add_expectation(gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")) suite.add_expectation(gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")) suite.add_expectation(gx.expectations.ExpectColumnValuesToBeBetween( column="amount", min_value=0, max_value=1000000)) # Checkpoint checkpoint = context.checkpoints.add(gx.Checkpoint( name="daily", validation_definitions=[validation], actions=[...])) result = checkpoint.run()

12. Soda - Data Quality en YAML

# soda/checks/orders.yml checks for orders: # Completeness - missing_count(order_id) = 0 - missing_percent(email) < 1 # Uniqueness - duplicate_count(order_id) = 0 # Validity - invalid_percent(amount) < 0.1: valid min: 0 valid max: 1000000 # Volume - row_count between 1000 and 10000000 # Freshness - freshness(updated_at) < 2h # Custom SQL - my_check: my_check query: | SELECT COUNT(*) FROM ... fail: when > 0
# Executer pip install soda-core-postgres soda scan -d production -c soda/config.yml soda/checks/orders.yml

13. Data Observability - 5 Piliers

PilierQuestionMetriqueAlerte
FreshnessDonnees a jour ?NOW()-MAX(updated_at)Si freshness > SLO
VolumeVolume normal ?Row count vs baselineSi ecart > 20%
SchemaSchema change ?Hash du schemaColonne ajoutee/supprimee
DistributionValeurs normales ?Histogrammes, z-scoreSi z-score > 3
LineageD'ou viennent les donnees ?Graphe de depsImpact analysis

SLOs Data Quality par Tier

TierCompletenessDuplicatesFreshnessAlerting
Tier 1 (critical)99.9%< 0.01%< 1hPagerDuty
Tier 2 (important)99%< 0.1%< 4hSlack urgent
Tier 3 (best-effort)95%< 1%< 24hEmail digest

14. Data Contracts

# datacontract.yaml dataContractSpecification: 0.9 id: orders-contract-v2 info: title: Orders Data Contract version: 2.1.0 owner: order-team models: Order: fields: order_id: {type: string, required: true, unique: true} amount: {type: double, required: true, minimum: 0} status: {type: string, enum: [CREATED, CONFIRMED, SHIPPED]} quality: type: SodaCL specification: checks for orders: - freshness(created_at) < 1h - duplicate_count(order_id) = 0 sla: availability: 99.9% freshness: 1h
Regles d'or : 1) Producer responsable de la qualite. 2) Contract versionne (semver) en Git. 3) Breaking changes = deprecation process. 4) CI/CD teste la compatibilite.

15. CI/CD pour dbt

Slim CI (essentiel pour les gros projets)

# Telecharger le manifest de production (reference) aws s3 cp s3://dbt-artifacts/prod/manifest.json target/manifest.json # Slim CI : ne run que les modeles modifies + dependants dbt run --select state:modified+ --defer --state target --target ci # Tests uniquement sur les modeles modifies dbt test --select state:modified+ --defer --state target --target ci # Apres deploy prod : uploader le nouveau manifest aws s3 cp target/manifest.json s3://dbt-artifacts/prod/manifest.json

Promotion d'environnements

EnvObjectifDonneesTrigger
DevDeveloppement localEchantillonManuel
CITests auto (PR)Schema PR-specific, defer prodPull Request
StagingValidation pre-prodCopie recente de prodMerge staging
ProdLiveDonnees reellesMerge main

16. Testing Strategies Data

TypeQuoiOutilQuand
UnitLogique SQL individuelledbt unit tests (1.8+), pytestA chaque commit
IntegrationPipeline stagingpytest + dbt + SodaA chaque PR
Data DiffCompare staging vs prodDatafoldA chaque PR
E2EPipeline completCustom scriptsDeploy staging
QualityQualite donnees sortieGX, Soda, dbt testsContinu

dbt Unit Test (1.8+)

# tests/unit/test_ltv.yml unit_tests: - name: test_ltv_calculation model: fct_customer_ltv given: - input: ref('stg_orders') rows: - {customer_id: "C1", amount: 100} - {customer_id: "C1", amount: 200} expect: rows: - {customer_id: "C1", total_revenue: 300}

17. lakeFS - Version Control Data Lake

# Creer un repository lakectl repo create lakefs://analytics s3://my-data-lake/ # Creer une branche lakectl branch create lakefs://analytics/feature-model \ --source lakefs://analytics/main # Upload et commit lakectl fs upload lakefs://analytics/feature-model/raw/orders/ \ --source local/orders.parquet lakectl commit lakefs://analytics/feature-model -m "Add orders data" # Merge dans main lakectl merge lakefs://analytics/feature-model lakefs://analytics/main # Rollback lakectl branch revert lakefs://analytics/main --commit abc123
lakeFS utilise le zero-copy branching : creer une branche ne duplique pas les donnees. Compatible S3 → fonctionne avec Spark, Presto, dbt nativement.

18. DVC - Data Version Control

# Init git init && dvc init dvc remote add -d myremote s3://bucket/dvc-store # Tracker un dataset dvc add data/training.csv git add data/training.csv.dvc .gitignore git commit -m "Track dataset v1" dvc push # Revenir a une version git checkout v1.0 -- data/training.csv.dvc dvc checkout # Pipeline reproductible dvc run -n preprocess -d src/preprocess.py -d data/raw.csv \ -o data/processed.csv python src/preprocess.py

19. Garanties de Livraison Streaming

SemantiqueGarantieRisqueUsage
At-most-once0 ou 1 foisPerte possibleLogs, metriques non critiques
At-least-once1 ou N foisDoublons possiblesDefault Kafka, la plupart
Exactly-onceExactement 1 foisComplexe a implementerTransactions financieres

Exactly-once en pratique

1. Producer : enable.idempotence=true + acks=all 2. Kafka Transactions : read-process-write atomique 3. Flink : checkpoints Chandy-Lamport 4. Sink : UPSERT (idempotent) au lieu d'INSERT → "Effectively-once" = at-least-once + deduplication a chaque etape

20. Checklist - Pipeline Streaming Production