Introduction au SQL Avance
Bienvenue dans le module SQL Avance. Ce module vous transformera d'un utilisateur SQL standard en un expert capable d'ecrire des requetes performantes sur des tables de millions de lignes. Vous apprendrez les techniques utilisees par les plus grandes entreprises tech pour gerer leurs donnees a grande echelle.
Objectifs d'apprentissage
- Comprendre les differences fondamentales entre OLTP et OLAP
- Maitriser l'ordre d'execution reel d'une requete SQL
- Connaitre les types de JOIN et leurs cas d'usage
- Configurer un environnement PostgreSQL pour la pratique
- Identifier les anti-patterns SQL les plus courants
Le SQL est le langage universel de la donnee. Meme avec l'essor du NoSQL, plus de 80% des donnees structurees en entreprise restent dans des bases relationnelles. Maitriser le SQL avance, c'est maitriser l'acces a la majorite des donnees du monde.
OLTP vs OLAP : Deux Mondes Differents
Avant de plonger dans le SQL avance, il est crucial de comprendre les deux grandes familles de systemes de bases de donnees. Chacune a des objectifs, des schemas et des patterns de requetes radicalement differents.
Concept Cle
OLTP (Online Transaction Processing) : Optimise pour les operations transactionnelles rapides (INSERT, UPDATE, DELETE). Pense "application web" - chaque clic d'utilisateur genere une transaction.
OLAP (Online Analytical Processing) : Optimise pour l'analyse de gros volumes de donnees. Pense "rapport de ventes annuel" - peu de requetes mais qui scannent des millions de lignes.
| Critere | OLTP | OLAP |
|---|---|---|
| Objectif | Transactions rapides | Analyse et reporting |
| Operations | INSERT, UPDATE, DELETE | SELECT complexes, agregations |
| Volume par requete | Quelques lignes | Millions de lignes |
| Utilisateurs | Milliers (clients, apps) | Dizaines (analystes, BI) |
| Schema | Normalise (3NF) | Denormalise (Star, Snowflake) |
| Latence | Millisecondes | Secondes a minutes |
| Exemples | PostgreSQL, MySQL, SQL Server | Snowflake, BigQuery, Redshift |
| Taille typique | Go a To | To a Po |
| Concurrence | Haute (milliers de sessions) | Basse (dizaines de sessions) |
OLTP (Transactionnel) OLAP (Analytique)
===================== ==================
[App Web] [App Mobile] [API] [Tableau] [PowerBI] [Jupyter]
| | | | | |
v v v v v v
+---------------------------+ +---------------------------+
| Load Balancer / Pool | | Query Engine |
+---------------------------+ +---------------------------+
| | | |
v v v v
+---------------------------+ +---------------------------+
| PostgreSQL | ETL | Snowflake |
| Schema Normalise (3NF) | --------> | Schema en Etoile |
| Tables: users, orders | Nuit | Faits + Dimensions |
| Index B-tree partout | | Colonnar Storage |
+---------------------------+ +---------------------------+
~ ms par requete ~ secondes par requete
~ 10,000 req/sec ~ 50 req en parallele
L'Ordre d'Execution SQL : Ce Que Vous Devez Savoir
L'une des erreurs les plus frequentes en SQL est de penser que la requete s'execute dans l'ordre ou elle est ecrite. En realite, le moteur SQL suit un ordre logique tres different. Comprendre cet ordre est fondamental pour ecrire des requetes correctes et performantes.
Ordre d'ECRITURE: Ordre d'EXECUTION:
================== ===================
SELECT (5e) 1. FROM / JOIN -- Determine les tables
FROM (1er) 2. WHERE -- Filtre les lignes
WHERE (2e) 3. GROUP BY -- Regroupe les lignes
GROUP BY (3e) 4. HAVING -- Filtre les groupes
HAVING (4e) 5. SELECT -- Projette les colonnes
ORDER BY (6e) 6. ORDER BY -- Trie le resultat
LIMIT (7e) 7. LIMIT / OFFSET -- Limite le resultat
+-----------+
| 1. FROM | "De quelles tables je lis ?"
+-----+-----+
|
+-----v-----+
| 2. WHERE | "Quelles lignes je garde ?"
+-----+-----+
|
+-----v-------+
| 3. GROUP BY | "Comment je regroupe ?"
+-----+-------+
|
+-----v-----+
| 4. HAVING | "Quels groupes je garde ?"
+-----+-----+
|
+-----v-----+
| 5. SELECT | "Quelles colonnes j'affiche ?"
+-----+-----+
|
+-----v-------+
| 6. ORDER BY | "Dans quel ordre ?"
+-----+-------+
|
+-----v-----+
| 7. LIMIT | "Combien de lignes ?"
+-----------+
Consequence Pratique
Vous ne pouvez PAS utiliser un alias defini dans le SELECT dans la clause WHERE, car WHERE s'execute AVANT SELECT. Exemple : SELECT price * 0.8 AS discounted FROM products WHERE discounted > 100 ne fonctionnera PAS. Utilisez une sous-requete ou un CTE a la place.
-- ERREUR : l'alias 'discounted' n'existe pas encore au moment du WHERE
SELECT price * 0.8 AS discounted
FROM products
WHERE discounted > 100; -- ERREUR !
-- SOLUTION 1 : repeter l'expression
SELECT price * 0.8 AS discounted
FROM products
WHERE price * 0.8 > 100;
-- SOLUTION 2 : utiliser un CTE (plus lisible)
WITH priced AS (
SELECT *, price * 0.8 AS discounted
FROM products
)
SELECT * FROM priced WHERE discounted > 100;
-- SOLUTION 3 : sous-requete dans FROM
SELECT * FROM (
SELECT *, price * 0.8 AS discounted
FROM products
) sub
WHERE discounted > 100;
Les Types de JOIN en Detail
Les JOIN sont le coeur du SQL relationnel. Un Data Architect doit maitriser chaque type et savoir quand l'utiliser.
Table A Table B Resultat
+-----+ +-----+
| a1 | | b1 |
| a2 +---------+ b2 | INNER JOIN : a2-b2, a3-b3
| a3 +---------+ b3 | LEFT JOIN : a1-NULL, a2-b2, a3-b3
+-----+ | b4 | RIGHT JOIN : a2-b2, a3-b3, NULL-b4
+-----+ FULL OUTER JOIN : a1-NULL, a2-b2, a3-b3, NULL-b4
CROSS JOIN : a1-b1, a1-b2, a1-b3, a1-b4,
a2-b1, a2-b2, ... (12 lignes)
LEFT ANTI JOIN (lignes de A sans correspondance dans B):
SELECT a.* FROM A LEFT JOIN B ON a.id = b.id WHERE b.id IS NULL;
Resultat: a1
SEMI JOIN (lignes de A avec au moins une correspondance dans B):
SELECT a.* FROM A WHERE EXISTS (SELECT 1 FROM B WHERE B.id = A.id);
Resultat: a2, a3
-- INNER JOIN : Clients avec des commandes
SELECT c.name, o.order_date, o.total
FROM customers c
INNER JOIN orders o ON c.id = o.customer_id;
-- LEFT JOIN : Tous les clients, meme sans commande
SELECT c.name, COALESCE(COUNT(o.id), 0) AS nb_orders
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
GROUP BY c.name;
-- LEFT ANTI JOIN : Clients qui n'ont JAMAIS commande
SELECT c.name, c.email
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE o.id IS NULL;
-- SELF JOIN : Employes et leurs managers
SELECT e.name AS employe, m.name AS manager
FROM employees e
LEFT JOIN employees m ON e.manager_id = m.id;
-- CROSS JOIN : Toutes les combinaisons (utile pour calendriers)
SELECT d.date, p.product_name
FROM dates d
CROSS JOIN products p;
-- FULL OUTER JOIN : Union complete avec correspondances
SELECT COALESCE(a.id, b.id) AS id,
a.value AS value_a,
b.value AS value_b
FROM table_a a
FULL OUTER JOIN table_b b ON a.id = b.id;
Performance des JOINs
Le moteur SQL choisit parmi 3 algorithmes de JOIN selon la taille des tables :
Nested Loop : Optimal pour petites tables ou quand un index existe (O(n*m) pire cas)
Hash Join : Optimal pour grosses tables sans index (O(n+m), mais couteux en memoire)
Merge Join : Optimal quand les deux tables sont deja triees (O(n+m))
Verifiez toujours avec EXPLAIN ANALYZE quel algorithme est utilise !
Cas d'Etude : Comment StackOverflow Gere 50M Requetes/Jour
StackOverflow - SQL Server a Echelle Massive
StackOverflow est l'un des sites les plus visites au monde pour les developpeurs, avec des statistiques impressionnantes :
- 50+ millions de requetes SQL par jour sur seulement 2 serveurs SQL Server
- 1.3 milliard de pages vues par mois avec un temps de reponse moyen de 18ms
- Base de donnees principale de ~700 Go avec des tables de 50M+ lignes (Posts, Votes)
- Equipe DBA de seulement 2 personnes
Les secrets de leur performance :
- Index cibles : Pas d'over-indexing. Chaque index est justifie par des patterns de requetes reels, surveilles via les DMV (Dynamic Management Views) de SQL Server
- Pas d'ORM pour les requetes critiques : Dapper (micro-ORM) au lieu d'Entity Framework pour garder le controle du SQL genere
- Mise en cache agressive : Redis devant SQL Server, avec invalidation intelligente
- Denormalisation strategique : Le champ Score sur la table Posts est pre-calcule (pas de COUNT a chaque affichage)
- OLTP pur : L'analytique est faite sur un replica en lecture seule, jamais sur la base primaire
Lecon : On n'a pas besoin de technologies complexes. Un SQL bien ecrit avec de bons index sur du hardware adapte peut aller tres loin. StackOverflow prouve qu'une architecture simple et maitrisee surpasse souvent une architecture distribuee mal comprise.
Anti-Pattern : SELECT * en Production
SELECT * : Le Tueur Silencieux de Performance
L'utilisation de SELECT * en production est l'anti-pattern le plus repandu et le plus couteux. Voici pourquoi :
- Bande passante gaspillee : Si votre table a 50 colonnes et que vous n'en avez besoin que de 3, vous transferez ~16x trop de donnees
- Index covering impossible : Avec SELECT *, le moteur doit TOUJOURS faire un "heap lookup" pour aller chercher les colonnes manquantes dans l'index
- Bugs silencieux : Quand quelqu'un ajoute une colonne a la table, votre application recoit des donnees inattendues
- Memoire serveur : Le buffer pool se remplit de donnees inutiles, evictant les donnees utiles du cache
-- MAUVAIS : SELECT * sur une table avec 40 colonnes (dont 3 colonnes TEXT) SELECT * FROM orders WHERE status = 'pending'; -- Temps: 2.4 secondes | I/O: 45,000 pages lues | Memoire: 180 Mo -- BON : Seulement les colonnes necessaires SELECT order_id, customer_id, total, created_at FROM orders WHERE status = 'pending'; -- Temps: 0.12 secondes | I/O: 3,200 pages lues | Memoire: 12 Mo -- ENCORE MIEUX : Avec un index covering CREATE INDEX idx_orders_status_covering ON orders (status) INCLUDE (order_id, customer_id, total, created_at); SELECT order_id, customer_id, total, created_at FROM orders WHERE status = 'pending'; -- Temps: 0.008 secondes | I/O: 85 pages lues | Memoire: 0.8 Mo
Regle d'or : En production, listez TOUJOURS explicitement vos colonnes. Reservez SELECT * au developpement et a l'exploration.
Configuration de l'Environnement PostgreSQL
Pour ce module, nous utiliserons PostgreSQL, la base de donnees open-source la plus avancee. Voici comment configurer votre environnement.
# Option 1 : Docker (recommande pour la formation) docker run --name pg-training \ -e POSTGRES_PASSWORD=training \ -e POSTGRES_DB=dataarchitect \ -p 5432:5432 \ -d postgres:16 # Option 2 : Installation native # macOS: brew install postgresql@16 # Ubuntu: sudo apt install postgresql-16 # Windows: Telecharger depuis postgresql.org # Connexion psql -h localhost -U postgres -d dataarchitect # Verification de la version SELECT version(); -- PostgreSQL 16.x ...
-- Schema e-commerce pour les exercices du module
CREATE SCHEMA IF NOT EXISTS ecommerce;
SET search_path TO ecommerce;
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
first_name VARCHAR(50) NOT NULL,
last_name VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
city VARCHAR(50),
country VARCHAR(50) DEFAULT 'France',
signup_date DATE NOT NULL DEFAULT CURRENT_DATE,
segment VARCHAR(20) CHECK (segment IN ('Bronze','Silver','Gold','Platinum'))
);
CREATE TABLE categories (
category_id SERIAL PRIMARY KEY,
name VARCHAR(50) NOT NULL,
parent_id INT REFERENCES categories(category_id)
);
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
category_id INT REFERENCES categories(category_id),
price DECIMAL(10,2) NOT NULL CHECK (price > 0),
stock INT NOT NULL DEFAULT 0,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
customer_id INT REFERENCES customers(customer_id),
order_date DATE NOT NULL DEFAULT CURRENT_DATE,
status VARCHAR(20) DEFAULT 'pending'
CHECK (status IN ('pending','confirmed','shipped','delivered','cancelled')),
total DECIMAL(12,2)
);
CREATE TABLE order_items (
item_id SERIAL PRIMARY KEY,
order_id INT REFERENCES orders(order_id),
product_id INT REFERENCES products(product_id),
quantity INT NOT NULL CHECK (quantity > 0),
unit_price DECIMAL(10,2) NOT NULL
);
-- Index de base pour les requetes frequentes
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_products_category ON products(category_id);
Scenario : Votre Premier Jour en Tant que Data Architect
Vous venez d'etre embauche comme Data Architect dans une startup e-commerce. Le CTO vous dit : "Notre base PostgreSQL rame, les pages produit mettent 3 secondes a charger, et nos rapports du lundi matin bloquent le site pendant 1 heure."
Questions a vous poser :
- Les rapports du lundi (OLAP) tournent-ils sur la meme base que le site (OLTP) ? Si oui, c'est le premier probleme.
- Les requetes du site utilisent-elles SELECT * ? Probablement.
- Y a-t-il un systeme de cache (Redis) devant la base ? Probablement pas.
- Les index sont-ils adaptes aux requetes reelles ? A verifier avec pg_stat_user_indexes.
Ce module vous donnera toutes les competences pour resoudre ces problemes systematiquement.
Flashcards - Concepts Cles
Window Functions - Classement
Les Window Functions sont l'une des fonctionnalites les plus puissantes du SQL moderne. Elles permettent de faire des calculs sur un ensemble de lignes liees a la ligne courante, sans les regrouper. Pensez-y comme un GROUP BY qui ne collapse pas les lignes.
Objectifs d'apprentissage
- Comprendre la syntaxe OVER(PARTITION BY ... ORDER BY ...)
- Maitriser ROW_NUMBER, RANK, DENSE_RANK et NTILE
- Savoir choisir la bonne fonction de classement selon le besoin
- Appliquer les window functions dans des scenarios reels
Les Window Functions ont ete introduites dans SQL:2003, mais beaucoup de developpeurs les ignorent encore. C'est pourtant LA fonctionnalite qui separe un utilisateur SQL intermediaire d'un expert. Maitrisez-les et vous diviserez la complexite de vos requetes par 2.
Anatomie d'une Window Function
Chaque window function suit la meme structure syntaxique. Comprendre cette structure est la cle pour maitriser toutes les variantes.
fonction(arguments) OVER (
PARTITION BY colonne(s) -- Optionnel : decoupe en groupes
ORDER BY colonne(s) -- Optionnel : ordre dans chaque groupe
frame_clause -- Optionnel : fenetre de lignes
)
Exemple concret :
+-------------------------------------------------+
| RANK() OVER (PARTITION BY dept ORDER BY salary DESC) |
+-------------------------------------------------+
| | |
v v v
Fonction Decoupage par Tri dans chaque
de rang departement departement (desc)
Visualisation sur les donnees :
+--------+--------+--------+------+
| dept | name | salary | RANK |
+--------+--------+--------+------+
| Sales | Alice | 80000 | 1 | <- Partition "Sales"
| Sales | Bob | 70000 | 2 |
| Sales | Carol | 70000 | 2 | <- Meme salaire = meme rang
| Sales | Dave | 60000 | 4 | <- Rang 3 saute !
+--------+--------+--------+------+
| IT | Eve | 95000 | 1 | <- Partition "IT" (repart de 1)
| IT | Frank | 85000 | 2 |
+--------+--------+--------+------+
Les 4 Fonctions de Classement
ROW_NUMBER() - Numero Unique
Attribue un numero sequentiel unique a chaque ligne dans sa partition. Meme en cas d'egalite, chaque ligne recoit un numero different (le choix est arbitraire pour les ex aequo).
-- Numerotation simple de toutes les commandes par client
SELECT
customer_id,
order_id,
order_date,
total,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date DESC
) AS order_rank
FROM orders;
-- Resultat :
-- customer_id | order_id | order_date | total | order_rank
-- 1 | 1052 | 2024-12-15 | 299.99 | 1
-- 1 | 1023 | 2024-11-20 | 149.50 | 2
-- 1 | 987 | 2024-10-05 | 89.99 | 3
-- 2 | 1061 | 2024-12-18 | 450.00 | 1 <- repart de 1
-- 2 | 1044 | 2024-12-01 | 175.25 | 2
Astuce : Deduplication avec ROW_NUMBER
ROW_NUMBER est l'outil ideal pour supprimer les doublons. Gardez seulement la ligne avec row_number = 1 :
-- Garder seulement la commande la plus recente par client
WITH ranked AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY customer_id
ORDER BY order_date DESC
) AS rn
FROM orders
)
SELECT * FROM ranked WHERE rn = 1;
-- Supprimer les vrais doublons d'une table
DELETE FROM customers
WHERE customer_id IN (
SELECT customer_id FROM (
SELECT customer_id,
ROW_NUMBER() OVER (
PARTITION BY email ORDER BY customer_id
) AS rn
FROM customers
) dupes
WHERE rn > 1
);
RANK() vs DENSE_RANK() - Gestion des Ex Aequo
La difference entre RANK et DENSE_RANK est subtile mais importante : RANK laisse des "trous" apres les ex aequo, DENSE_RANK ne laisse pas de trous.
-- Comparaison des 3 fonctions de classement
SELECT
name,
department,
salary,
ROW_NUMBER() OVER (ORDER BY salary DESC) AS row_num,
RANK() OVER (ORDER BY salary DESC) AS rank_val,
DENSE_RANK() OVER (ORDER BY salary DESC) AS dense_rank_val
FROM employees;
-- Resultat :
-- name | salary | row_num | rank_val | dense_rank_val
-- Alice | 95000 | 1 | 1 | 1
-- Bob | 85000 | 2 | 2 | 2
-- Carol | 85000 | 3 | 2 | 2 <- ex aequo
-- Dave | 75000 | 4 | 4 | 3 <- RANK=4, DENSE=3 !
-- Eve | 70000 | 5 | 5 | 4
-- Frank | 70000 | 6 | 5 | 4 <- ex aequo
-- Grace | 60000 | 7 | 7 | 5 <- RANK=7, DENSE=5 !
Utilisez RANK quand...
- Vous faites un classement sportif (le 3e est 3e meme si 2 personnes sont 1eres, il n'y a pas de "2e")
- Vous voulez montrer la "position reelle" dans le classement
- Le nombre total de rangs doit correspondre au nombre de lignes
Utilisez DENSE_RANK quand...
- Vous cherchez le "Top N valeurs distinctes" (top 3 salaires differents)
- Vous ne voulez pas de trous dans la numerotation
- Vous groupez par niveaux (1er niveau, 2e niveau...)
NTILE(n) - Decoupage en Quantiles
NTILE divise les lignes en N groupes de taille (presque) egale. Indispensable pour creer des quartiles, deciles, percentiles.
-- Diviser les clients en 4 quartiles selon leur CA total
SELECT
c.customer_id,
c.first_name,
c.last_name,
SUM(o.total) AS total_spent,
NTILE(4) OVER (ORDER BY SUM(o.total) DESC) AS quartile
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name;
-- Resultat :
-- customer_id | name | total_spent | quartile
-- 42 | Marie | 12500.00 | 1 <- Top 25%
-- 17 | Jean | 9800.00 | 1
-- 8 | Pierre | 7200.00 | 2 <- 25-50%
-- 31 | Sophie | 5100.00 | 2
-- 55 | Luc | 3400.00 | 3 <- 50-75%
-- 12 | Claire | 1200.00 | 4 <- Bottom 25%
-- Segmentation marketing basee sur les quartiles
WITH customer_quartiles AS (
SELECT
c.customer_id,
c.first_name || ' ' || c.last_name AS name,
SUM(o.total) AS total_spent,
NTILE(4) OVER (ORDER BY SUM(o.total) DESC) AS quartile
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name
)
SELECT
name,
total_spent,
CASE quartile
WHEN 1 THEN 'Platinum - VIP (remise 20%)'
WHEN 2 THEN 'Gold - Fidele (remise 10%)'
WHEN 3 THEN 'Silver - Regulier (remise 5%)'
WHEN 4 THEN 'Bronze - Occasionnel (offre bienvenue)'
END AS segment
FROM customer_quartiles;
Cas d'Etude : Comment Spotify Classe les Top 50 par Pays
Spotify - Classement Musical avec RANK()
Spotify genere des classements "Top 50" pour chaque pays, chaque jour. Avec 180+ pays et des milliards de streams, comment font-ils ?
Le probleme :
- Des milliards d'evenements de streaming par jour
- Classements par pays, par genre, par playlist
- Besoin de gerer les ex aequo (2 chansons avec le meme nombre de streams)
- Mise a jour toutes les heures
L'approche simplifiee (conceptuelle) :
-- Etape 1 : Agreger les streams par pays et par chanson
WITH daily_streams AS (
SELECT
country_code,
track_id,
COUNT(*) AS stream_count
FROM stream_events
WHERE event_date = CURRENT_DATE
GROUP BY country_code, track_id
),
-- Etape 2 : Classer par pays
ranked AS (
SELECT
country_code,
track_id,
stream_count,
RANK() OVER (
PARTITION BY country_code
ORDER BY stream_count DESC
) AS chart_position
FROM daily_streams
)
-- Etape 3 : Garder le Top 50
SELECT r.country_code, t.title, t.artist, r.stream_count, r.chart_position
FROM ranked r
JOIN tracks t ON r.track_id = t.track_id
WHERE r.chart_position <= 50
ORDER BY r.country_code, r.chart_position;
Pourquoi RANK et pas DENSE_RANK ? Spotify utilise RANK car si 2 chansons sont en position 5, la suivante est en position 7 (pas 6). Cela reflète mieux le classement reel : il y a bien 6 chansons devant la 7e.
En realite, Spotify utilise un pipeline Spark/Kafka qui pre-agrege les streams en temps reel, mais la logique SQL sous-jacente est identique. Le pattern PARTITION BY + ORDER BY + RANK est universel.
Anti-Pattern : GROUP BY au Lieu de Window Functions
Quand GROUP BY Devient un Piege
Beaucoup de developpeurs utilisent GROUP BY + sous-requetes pour des problemes que les Window Functions resolvent en une seule passe :
-- ANTI-PATTERN : Trouver le salaire max par departement avec les details
-- Cette requete fait N+1 lectures de la table !
SELECT e.*
FROM employees e
WHERE e.salary = (
SELECT MAX(salary)
FROM employees e2
WHERE e2.department = e.department
);
-- Probleme : sous-requete correlee executee pour CHAQUE ligne
-- Sur 100K employes avec 50 departements = 100K executions de sous-requete
-- CORRECT : Une seule lecture de la table
WITH ranked AS (
SELECT *,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rk
FROM employees
)
SELECT * FROM ranked WHERE rk = 1;
-- La table est lue UNE SEULE FOIS
-- Performance : 10x a 100x plus rapide sur de gros volumes
Regle : Si vous ecrivez une sous-requete correlee dans le WHERE qui reference la requete externe, demandez-vous toujours si une Window Function ne serait pas plus adaptee.
Scenario Pratique : Top 3 Produits par Categorie
Votre Manager Demande : "Quels sont les 3 produits les plus vendus par categorie ?"
Contexte : Vous travaillez chez un e-commerΓ§ant. Le directeur marketing veut savoir les 3 best-sellers de chaque categorie pour les mettre en avant sur la homepage.
Contraintes :
- Si 2 produits ont le meme nombre de ventes, les afficher tous les deux
- Afficher le CA genere par chaque produit
- Ne considerer que les commandes des 90 derniers jours
WITH product_sales AS (
SELECT
cat.name AS category_name,
p.name AS product_name,
SUM(oi.quantity) AS total_sold,
SUM(oi.quantity * oi.unit_price) AS revenue,
DENSE_RANK() OVER (
PARTITION BY cat.category_id
ORDER BY SUM(oi.quantity) DESC
) AS sales_rank
FROM order_items oi
JOIN orders o ON oi.order_id = o.order_id
JOIN products p ON oi.product_id = p.product_id
JOIN categories cat ON p.category_id = cat.category_id
WHERE o.order_date >= CURRENT_DATE - INTERVAL '90 days'
AND o.status != 'cancelled'
GROUP BY cat.category_id, cat.name, p.product_id, p.name
)
SELECT
category_name,
product_name,
total_sold,
TO_CHAR(revenue, '999,999.99') AS revenue_formatted,
sales_rank
FROM product_sales
WHERE sales_rank <= 3
ORDER BY category_name, sales_rank;
Pourquoi DENSE_RANK ici ? Parce que si 2 produits sont numero 2, on veut quand meme voir le produit numero 3 (pas qu'il soit saute). DENSE_RANK garantit qu'on a bien 3 niveaux de ventes differents.
Exercices Pratiques
Exercices - Window Functions de Classement
Exercice 1 : Classement des clients par CA
Ecrivez une requete qui affiche chaque client avec son CA total et son rang parmi tous les clients. Utilisez RANK(). Affichez aussi le percentile avec NTILE(100).
Exercice 2 : Deduplication
La table customers contient des doublons (meme email). Ecrivez une requete qui identifie les doublons en gardant le customer_id le plus ancien, et qui retourne les IDs a supprimer.
Exercice 3 : Top N par Groupe
Affichez les 2 commandes les plus cheres par mois (sur les 12 derniers mois). Incluez le mois, le customer_id, le montant, et le rang.
Flashcards
Window Functions - Agregation et Frames
Apres les fonctions de classement, decouvrons les fonctions d'agregation fenΓͺtrees et les fonctions de navigation (LAG, LEAD). Ces outils sont indispensables pour l'analyse temporelle, la detection d'anomalies et les tableaux de bord.
Objectifs d'apprentissage
- Maitriser LAG et LEAD pour comparer des lignes consecutives
- Utiliser SUM/AVG/COUNT OVER pour des agregations fenΓͺtrees
- Comprendre les frames ROWS vs RANGE
- Calculer des moyennes mobiles et des cumuls
- Appliquer ces techniques a des cas concrets
LAG et LEAD : Regarder en Arriere et en Avant
LAG et LEAD permettent d'acceder a la valeur d'une ligne precedente (LAG) ou suivante (LEAD) dans la partition, sans avoir besoin d'un self-join.
Donnees ordonnees par date :
Ligne | date | ventes | LAG(ventes,1) | LEAD(ventes,1)
--------|------------|--------|---------------|----------------
1 | 2024-01 | 100 | NULL | 150
2 | 2024-02 | 150 | 100 | 120
3 | 2024-03 | 120 | 150 | 200
4 | 2024-04 | 200 | 120 | 180
5 | 2024-05 | 180 | 200 | NULL
LAG regarde ^ ^ LEAD regarde
EN ARRIERE | | EN AVANT
(ligne precedente) (ligne suivante)
Syntaxe complete :
LAG(colonne, offset, valeur_par_defaut) OVER (PARTITION BY ... ORDER BY ...)
| | |
v v v
Quelle Combien de Valeur si NULL
colonne lignes avant (debut/fin)
-- Evolution mensuelle du CA avec variation
SELECT
DATE_TRUNC('month', order_date) AS mois,
SUM(total) AS ca_mensuel,
LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)) AS ca_mois_prec,
ROUND(
(SUM(total) - LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)))
/ LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)) * 100
, 1) AS variation_pct
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY mois;
-- Resultat :
-- mois | ca_mensuel | ca_mois_prec | variation_pct
-- 2024-01-01 | 45000.00 | NULL | NULL
-- 2024-02-01 | 52000.00 | 45000.00 | 15.6
-- 2024-03-01 | 48000.00 | 52000.00 | -7.7
-- 2024-04-01 | 61000.00 | 48000.00 | 27.1
-- Comparaison Year-over-Year (LAG avec offset de 12 mois)
SELECT
DATE_TRUNC('month', order_date) AS mois,
SUM(total) AS ca_mensuel,
LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)) AS ca_meme_mois_an_prec,
ROUND(
(SUM(total) - LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)))
/ NULLIF(LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)), 0) * 100
, 1) AS yoy_growth_pct
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY mois;
-- Detection de chute soudaine avec LEAD
SELECT
order_date,
daily_revenue,
LEAD(daily_revenue, 1) OVER (ORDER BY order_date) AS next_day_revenue,
CASE
WHEN LEAD(daily_revenue, 1) OVER (ORDER BY order_date) < daily_revenue * 0.5
THEN 'ALERTE: Chute >50% le lendemain !'
ELSE 'Normal'
END AS alert
FROM (
SELECT order_date, SUM(total) AS daily_revenue
FROM orders
GROUP BY order_date
) daily;
Cas d'Etude : Detection de Fraude Bancaire avec LAG/LEAD
Detection de Fraude en Temps Reel - Banque BNP Paribas (Approche Simplifiee)
Les systemes de detection de fraude bancaire utilisent massivement LAG/LEAD pour identifier des patterns suspects. Voici les regles typiques :
- Regle 1 : Transaction > 5x le montant moyen des 10 dernieres transactions
- Regle 2 : 2 transactions a plus de 500km de distance en moins de 30 minutes
- Regle 3 : Montant croissant sur 5 transactions consecutives (pattern "test de carte")
-- Regle 1 : Transaction anormalement elevee
WITH transaction_context AS (
SELECT
t.transaction_id,
t.account_id,
t.amount,
t.transaction_time,
t.merchant_city,
LAG(t.amount, 1) OVER w AS prev_amount,
LAG(t.transaction_time, 1) OVER w AS prev_time,
LAG(t.merchant_city, 1) OVER w AS prev_city,
AVG(t.amount) OVER (
PARTITION BY t.account_id
ORDER BY t.transaction_time
ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING
) AS avg_last_10
FROM transactions t
WINDOW w AS (PARTITION BY t.account_id ORDER BY t.transaction_time)
)
SELECT
transaction_id,
account_id,
amount,
avg_last_10,
ROUND(amount / NULLIF(avg_last_10, 0), 1) AS ratio,
CASE
WHEN amount > avg_last_10 * 5 THEN 'FRAUDE POTENTIELLE - Montant anormal'
WHEN EXTRACT(EPOCH FROM (transaction_time - prev_time)) < 1800
AND prev_city != merchant_city THEN 'FRAUDE POTENTIELLE - Deplacement impossible'
ELSE 'OK'
END AS fraud_flag
FROM transaction_context
WHERE amount > avg_last_10 * 5
OR (EXTRACT(EPOCH FROM (transaction_time - prev_time)) < 1800
AND prev_city != merchant_city);
-- Regle 3 : Pattern "test de carte" (montants croissants)
WITH consecutive_increases AS (
SELECT
account_id,
transaction_id,
amount,
transaction_time,
CASE WHEN amount > LAG(amount, 1) OVER w THEN 1 ELSE 0 END AS is_increase
FROM transactions
WINDOW w AS (PARTITION BY account_id ORDER BY transaction_time)
)
SELECT account_id, MIN(transaction_time), MAX(transaction_time),
COUNT(*) AS streak_length
FROM (
SELECT *,
SUM(CASE WHEN is_increase = 0 THEN 1 ELSE 0 END) OVER (
PARTITION BY account_id ORDER BY transaction_time
) AS grp
FROM consecutive_increases
)
GROUP BY account_id, grp
HAVING COUNT(*) >= 5 -- 5+ transactions croissantes consecutives
ORDER BY streak_length DESC;
Impact reel : Ce type de requetes, executees en temps reel sur des flux Kafka/Flink, permet de bloquer des transactions frauduleuses en moins de 100ms. Les banques qui implementent ces regles reduisent la fraude de 60-80%.
Agregations FenΓͺtrees : SUM, AVG, COUNT OVER
Les fonctions d'agregation classiques (SUM, AVG, COUNT, MIN, MAX) deviennent extremement puissantes quand on les combine avec OVER. Contrairement a GROUP BY, elles ne reduisent pas le nombre de lignes.
-- Cumul progressif du CA (running total)
SELECT
order_date,
total,
SUM(total) OVER (ORDER BY order_date) AS cumul_ca,
COUNT(*) OVER (ORDER BY order_date) AS nb_commandes_cumul,
AVG(total) OVER (ORDER BY order_date) AS panier_moyen_cumul
FROM orders
WHERE status = 'delivered'
ORDER BY order_date;
-- Resultat :
-- order_date | total | cumul_ca | nb_commandes | panier_moyen
-- 2024-01-02 | 150.00 | 150.00 | 1 | 150.00
-- 2024-01-03 | 200.00 | 350.00 | 2 | 175.00
-- 2024-01-03 | 75.00 | 425.00 | 3 | 141.67
-- 2024-01-05 | 300.00 | 725.00 | 4 | 181.25
-- Pourcentage du total par categorie
SELECT
cat.name AS categorie,
p.name AS produit,
SUM(oi.quantity * oi.unit_price) AS ca_produit,
SUM(SUM(oi.quantity * oi.unit_price)) OVER (
PARTITION BY cat.category_id
) AS ca_categorie,
ROUND(
SUM(oi.quantity * oi.unit_price) * 100.0
/ SUM(SUM(oi.quantity * oi.unit_price)) OVER (PARTITION BY cat.category_id)
, 1) AS pct_de_la_categorie
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
JOIN categories cat ON p.category_id = cat.category_id
GROUP BY cat.category_id, cat.name, p.product_id, p.name
ORDER BY categorie, ca_produit DESC;
ROWS vs RANGE : Les Frames de Fenetre
La clause frame definit exactement quelles lignes sont incluses dans le calcul de la window function. C'est ce qui permet les moyennes mobiles, les cumuls avec plage, etc.
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW ========================================= Compte les LIGNES physiques (exactement 2 lignes avant) date | valeur | SUM (ROWS 2 PREC) 2024-01-01 | 10 | 10 (seulement ligne courante) 2024-01-02 | 20 | 30 (10 + 20) 2024-01-03 | 30 | 60 (10 + 20 + 30) 2024-01-04 | 15 | 65 (20 + 30 + 15) <- 10 est sorti 2024-01-05 | 25 | 70 (30 + 15 + 25) RANGE BETWEEN 2 PRECEDING AND CURRENT ROW ========================================== Compte les VALEURS dans une plage (toutes les lignes dont ORDER BY value est dans [current - 2, current]) value | data | SUM (RANGE 2 PREC) 1 | A | A (valeurs 1 a 1 : juste A) 2 | B | A+B (valeurs 0 a 2 : A, B) 2 | C | A+B+C (valeurs 0 a 2 : A, B, C) <- B et C ont value=2 3 | D | A+B+C+D (valeurs 1 a 3 : A, B, C, D) 5 | E | D+E (valeurs 3 a 5 : juste D, E) Frames possibles : +-------------------------------------------------------+ | ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | <- Running total | ROWS BETWEEN 6 PRECEDING AND CURRENT ROW | <- Moyenne mobile 7j | ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING | <- Cumul inverse | ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING | <- Lissage 3 points | RANGE BETWEEN INTERVAL '7 days' PRECEDING AND ... | <- Plage temporelle +-------------------------------------------------------+
-- Moyenne mobile sur 7 jours (methode ROWS - requiert 1 ligne par jour)
SELECT
order_date,
daily_revenue,
ROUND(AVG(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
), 2) AS moving_avg_7d,
ROUND(AVG(daily_revenue) OVER (
ORDER BY order_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
), 2) AS moving_avg_30d
FROM (
SELECT order_date, SUM(total) AS daily_revenue
FROM orders
WHERE status != 'cancelled'
GROUP BY order_date
) daily
ORDER BY order_date;
-- Moyenne mobile avec RANGE et INTERVAL (gere les jours manquants)
SELECT
order_date,
daily_revenue,
AVG(daily_revenue) OVER (
ORDER BY order_date
RANGE BETWEEN INTERVAL '6 days' PRECEDING AND CURRENT ROW
) AS moving_avg_7d_range
FROM (
SELECT order_date, SUM(total) AS daily_revenue
FROM orders GROUP BY order_date
) daily
ORDER BY order_date;
ROWS vs RANGE : Piege Courant
Si vous avez des jours sans commande, ROWS BETWEEN 6 PRECEDING donnera une fenetre de 7 LIGNES (qui peut couvrir plus de 7 jours). Utilisez RANGE BETWEEN INTERVAL '6 days' PRECEDING pour une vraie fenetre de 7 jours calendaires. Attention : RANGE avec INTERVAL n'est supporte qu'a partir de PostgreSQL 11+.
Scenario : Dashboard COVID - Moyenne Mobile 7 Jours
Calculer la Moyenne Mobile 7 Jours pour un Dashboard COVID
Contexte : Vous etes data analyst dans une ARS (Agence Regionale de Sante). On vous demande de creer le tableau de bord qui affiche la courbe de cas COVID avec une moyenne mobile sur 7 jours, comme ceux que l'on voyait quotidiennement a la television.
Donnees : Table covid_cases(report_date, department, new_cases, new_deaths, new_tests)
-- Dashboard COVID avec moyennes mobiles et taux
WITH daily_stats AS (
SELECT
report_date,
department,
new_cases,
new_deaths,
new_tests,
-- Moyenne mobile 7 jours des nouveaux cas
ROUND(AVG(new_cases) OVER (
PARTITION BY department
ORDER BY report_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
), 1) AS avg_7d_cases,
-- Moyenne mobile 7 jours des deces
ROUND(AVG(new_deaths) OVER (
PARTITION BY department
ORDER BY report_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
), 1) AS avg_7d_deaths,
-- Taux de positivite glissant 7 jours
ROUND(
SUM(new_cases) OVER (
PARTITION BY department
ORDER BY report_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) * 100.0 / NULLIF(SUM(new_tests) OVER (
PARTITION BY department
ORDER BY report_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
), 0)
, 2) AS taux_positivite_7d,
-- Evolution par rapport a la semaine precedente
LAG(new_cases, 7) OVER (
PARTITION BY department
ORDER BY report_date
) AS cases_7d_ago
FROM covid_cases
)
SELECT
report_date,
department,
new_cases,
avg_7d_cases,
avg_7d_deaths,
taux_positivite_7d,
CASE
WHEN cases_7d_ago IS NULL THEN 'N/A'
WHEN new_cases > cases_7d_ago * 1.5 THEN 'FORTE HAUSSE'
WHEN new_cases > cases_7d_ago THEN 'Hausse'
WHEN new_cases < cases_7d_ago * 0.5 THEN 'FORTE BAISSE'
ELSE 'Baisse'
END AS tendance
FROM daily_stats
ORDER BY department, report_date;
Points cles : La moyenne mobile 7 jours lisse les variations dues aux week-ends (moins de tests). Le taux de positivite est calcule sur une fenetre glissante pour etre statistiquement significatif. LAG(x, 7) compare au meme jour de la semaine precedente.
La Clause WINDOW : Eviter la Repetition
-- Au lieu de repeter OVER(...) 5 fois :
SELECT
order_date,
total,
SUM(total) OVER w AS cumul,
AVG(total) OVER w AS avg_running,
COUNT(*) OVER w AS count_running,
MIN(total) OVER w AS min_running,
MAX(total) OVER w AS max_running
FROM orders
WINDOW w AS (ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY order_date;
-- On peut meme heriter d'une fenetre nommee
SELECT
department,
name,
salary,
RANK() OVER dept_sal AS rank_in_dept,
SUM(salary) OVER dept AS total_dept_salary,
ROUND(salary * 100.0 / SUM(salary) OVER dept, 1) AS pct_of_dept
FROM employees
WINDOW
dept AS (PARTITION BY department),
dept_sal AS (dept ORDER BY salary DESC)
ORDER BY department, salary DESC;
FIRST_VALUE, LAST_VALUE, NTH_VALUE
-- Comparer chaque employe au mieux et au moins bien paye de son dept
SELECT
name,
department,
salary,
FIRST_VALUE(name) OVER w AS top_earner,
FIRST_VALUE(salary) OVER w AS top_salary,
salary - FIRST_VALUE(salary) OVER w AS ecart_au_top,
LAST_VALUE(name) OVER (
PARTITION BY department
ORDER BY salary DESC
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS lowest_earner
FROM employees
WINDOW w AS (PARTITION BY department ORDER BY salary DESC);
-- ATTENTION : LAST_VALUE a un piege !
-- Par defaut, la frame est ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-- Donc LAST_VALUE retourne... la ligne courante !
-- Il FAUT specifier ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
Piege LAST_VALUE
La frame par defaut est ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. Avec LAST_VALUE, cela signifie que la "derniere valeur" est toujours la ligne courante ! Specifiez toujours explicitement ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING avec LAST_VALUE.
Flashcards
CTEs et CTEs Recursives
Les Common Table Expressions (CTEs) sont l'outil numero 1 pour ecrire du SQL lisible et maintenable. Les CTEs recursives ajoutent la capacite de traverser des hierarchies et de generer des series, ouvrant des possibilites impossibles avec du SQL standard.
Objectifs d'apprentissage
- Maitriser la syntaxe WITH pour structurer des requetes complexes
- Chainer plusieurs CTEs pour decomposer la logique
- Comprendre et ecrire des CTEs recursives
- Traverser des hierarchies (organigrammes, categories)
- Connaitre les limites de performance des CTEs
La regle d'or d'un bon Data Architect : si votre requete depasse 30 lignes, decomposez-la en CTEs. Chaque CTE doit avoir un nom explicite qui decrit ce qu'elle contient. Vos collegues (et votre futur vous) vous remercieront.
CTEs Simples : Le WITH Clause
Un CTE est une "table temporaire nommee" qui n'existe que le temps de la requete. Pensez-y comme une variable en programmation.
-- Sans CTE : requete imbriquee difficile a lire
SELECT customer_id, name, total_spent
FROM (
SELECT c.customer_id, c.first_name || ' ' || c.last_name AS name,
SUM(o.total) AS total_spent
FROM customers c JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'delivered'
GROUP BY c.customer_id, c.first_name, c.last_name
) sub
WHERE total_spent > (
SELECT AVG(total_spent) FROM (
SELECT customer_id, SUM(total) AS total_spent
FROM orders WHERE status = 'delivered'
GROUP BY customer_id
) avg_sub
);
-- Avec CTEs : clair, lisible, maintenable
WITH customer_spending AS (
SELECT
c.customer_id,
c.first_name || ' ' || c.last_name AS name,
SUM(o.total) AS total_spent
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
WHERE o.status = 'delivered'
GROUP BY c.customer_id, c.first_name, c.last_name
),
average_spending AS (
SELECT AVG(total_spent) AS avg_spent
FROM customer_spending
)
SELECT cs.customer_id, cs.name, cs.total_spent
FROM customer_spending cs
CROSS JOIN average_spending avs
WHERE cs.total_spent > avs.avg_spent
ORDER BY cs.total_spent DESC;
Chainage de CTEs
-- Analyse de cohorte : retention mensuelle des clients
WITH first_orders AS (
-- Etape 1 : Date de premiere commande par client
SELECT
customer_id,
DATE_TRUNC('month', MIN(order_date)) AS cohort_month
FROM orders
GROUP BY customer_id
),
monthly_activity AS (
-- Etape 2 : Mois d'activite de chaque client
SELECT DISTINCT
o.customer_id,
DATE_TRUNC('month', o.order_date) AS activity_month
FROM orders o
),
cohort_data AS (
-- Etape 3 : Jointure cohorte + activite avec calcul de l'ecart
SELECT
f.cohort_month,
ma.activity_month,
EXTRACT(YEAR FROM AGE(ma.activity_month, f.cohort_month)) * 12
+ EXTRACT(MONTH FROM AGE(ma.activity_month, f.cohort_month)) AS months_since_first,
COUNT(DISTINCT f.customer_id) AS active_customers
FROM first_orders f
JOIN monthly_activity ma ON f.customer_id = ma.customer_id
GROUP BY f.cohort_month, ma.activity_month
),
cohort_sizes AS (
-- Etape 4 : Taille de chaque cohorte
SELECT cohort_month, COUNT(*) AS cohort_size
FROM first_orders
GROUP BY cohort_month
)
-- Requete finale : Taux de retention
SELECT
cd.cohort_month,
cs.cohort_size,
cd.months_since_first,
cd.active_customers,
ROUND(cd.active_customers * 100.0 / cs.cohort_size, 1) AS retention_pct
FROM cohort_data cd
JOIN cohort_sizes cs ON cd.cohort_month = cs.cohort_month
WHERE cd.months_since_first BETWEEN 0 AND 12
ORDER BY cd.cohort_month, cd.months_since_first;
CTE vs Sous-requete : Quand Utiliser Quoi ?
Utilisez un CTE quand : La sous-requete est referencee plusieurs fois, la requete depasse 20 lignes, vous voulez documenter les etapes logiques, ou vous avez besoin de recursion.
Gardez une sous-requete quand : Elle est simple (< 5 lignes), utilisee une seule fois, et que la performance est critique (certains moteurs n'optimisent pas les CTEs).
CTEs Recursives : Traverser des Hierarchies
Les CTEs recursives permettent de resoudre des problemes qui necessiteraient autrement des boucles : hierarchies, graphes, generation de series.
WITH RECURSIVE cte AS (
-- Membre d'ancrage (base case)
SELECT ... WHERE condition_initiale
UNION ALL
-- Membre recursif (recursive case)
SELECT ... FROM cte JOIN table ON ...
)
Execution :
+-----------------+
| Iteration 0 | Membre d'ancrage -> Resultat initial
+-----------------+
|
+-----------------+
| Iteration 1 | Membre recursif avec resultat de iter 0
+-----------------+
|
+-----------------+
| Iteration 2 | Membre recursif avec resultat de iter 1
+-----------------+
|
... (continue jusqu'a ce que le membre recursif
| ne retourne plus de lignes)
+-----------------+
| Iteration N | Resultat vide -> ARRET
+-----------------+
|
v
Resultat final = UNION de toutes les iterations
Exemple 1 : Organigramme d'Entreprise
-- Table employees avec auto-reference
-- employee_id | name | manager_id | title
-- 1 | CEO | NULL | CEO
-- 2 | VP Sales | 1 | VP
-- 3 | VP Tech | 1 | VP
-- 4 | Dir Mkt | 2 | Director
-- 5 | Dev Lead | 3 | Lead
-- 6 | Dev Sr | 5 | Senior Dev
-- 7 | Dev Jr | 5 | Junior Dev
-- Afficher la hierarchie complete avec le niveau et le chemin
WITH RECURSIVE org_chart AS (
-- Ancrage : le CEO (pas de manager)
SELECT
employee_id,
name,
title,
manager_id,
0 AS level,
name AS path,
ARRAY[employee_id] AS path_ids
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Recursion : les subordonnes
SELECT
e.employee_id,
e.name,
e.title,
e.manager_id,
oc.level + 1,
oc.path || ' > ' || e.name,
oc.path_ids || e.employee_id
FROM employees e
JOIN org_chart oc ON e.manager_id = oc.employee_id
)
SELECT
REPEAT(' ', level) || name AS organigramme,
title,
level,
path
FROM org_chart
ORDER BY path_ids;
-- Resultat :
-- organigramme | title | level | path
-- CEO | CEO | 0 | CEO
-- VP Sales | VP | 1 | CEO > VP Sales
-- Dir Mkt | Director | 2 | CEO > VP Sales > Dir Mkt
-- VP Tech | VP | 1 | CEO > VP Tech
-- Dev Lead | Lead | 2 | CEO > VP Tech > Dev Lead
-- Dev Sr | Senior Dev | 3 | CEO > VP Tech > Dev Lead > Dev Sr
-- Dev Jr | Junior Dev | 3 | CEO > VP Tech > Dev Lead > Dev Jr
-- Compter les subordonnes directs et totaux
WITH RECURSIVE subordinates AS (
SELECT employee_id, name, manager_id, employee_id AS root_id
FROM employees
UNION ALL
SELECT e.employee_id, e.name, e.manager_id, s.root_id
FROM employees e
JOIN subordinates s ON e.manager_id = s.employee_id
WHERE e.employee_id != s.root_id -- eviter boucle infinie
)
SELECT
e.name,
e.title,
COUNT(DISTINCT s.employee_id) - 1 AS total_subordinates
FROM employees e
LEFT JOIN subordinates s ON e.employee_id = s.root_id
GROUP BY e.employee_id, e.name, e.title
ORDER BY total_subordinates DESC;
Cas d'Etude : Hierarchie de Categories chez Amazon
Amazon - Navigation par Categories avec CTEs Recursives
Amazon organise ses millions de produits dans une hierarchie de categories profonde de 7+ niveaux :
- Electronique > Informatique > Ordinateurs portables > Gaming > 15 pouces > ASUS > ROG Strix
- Cette hierarchie est stockee dans une seule table avec
parent_category_id - Quand un utilisateur clique sur "Electronique", il faut afficher TOUS les produits de TOUTES les sous-categories
-- Trouver toutes les sous-categories d'une categorie donnee
WITH RECURSIVE category_tree AS (
-- Ancrage : la categorie selectionnee
SELECT category_id, name, parent_id, 0 AS depth,
ARRAY[name] AS breadcrumb
FROM categories
WHERE name = 'Electronique'
UNION ALL
-- Recursion : toutes les sous-categories
SELECT c.category_id, c.name, c.parent_id, ct.depth + 1,
ct.breadcrumb || c.name
FROM categories c
JOIN category_tree ct ON c.parent_id = ct.category_id
WHERE ct.depth < 10 -- securite : max 10 niveaux
)
-- Compter les produits par sous-categorie
SELECT
ct.name AS categorie,
ct.depth,
ARRAY_TO_STRING(ct.breadcrumb, ' > ') AS chemin,
COUNT(p.product_id) AS nb_produits
FROM category_tree ct
LEFT JOIN products p ON p.category_id = ct.category_id
GROUP BY ct.category_id, ct.name, ct.depth, ct.breadcrumb
ORDER BY ct.breadcrumb;
-- Obtenir tous les produits sous "Electronique" (y compris sous-categories)
SELECT p.*
FROM products p
WHERE p.category_id IN (
SELECT category_id FROM category_tree
);
Performance : Sur un catalogue de 350M+ produits avec 30,000+ categories, Amazon utilise des materialized paths et du caching plutot que des CTEs recursives en temps reel. Mais pour le batch processing et les rapports, la CTE recursive reste l'approche standard.
Anti-Pattern : CTE pour Gros Volumes
CTE au Lieu de Table Temporaire pour Gros Volumes
Avant PostgreSQL 12, les CTEs etaient des "optimization fences" : le moteur ne pouvait pas pousser les predicats a l'interieur d'un CTE. Ce n'est plus le cas depuis PostgreSQL 12 (grace a CTE inlining), mais il y a encore des cas ou une table temporaire est preferable :
-- ANTI-PATTERN : CTE materialise qui charge 100M de lignes en memoire
WITH huge_cte AS (
SELECT * FROM transactions -- 100M lignes !
WHERE amount > 0
)
SELECT * FROM huge_cte WHERE customer_id = 42;
-- PostgreSQL 12+ : OK car inlining (le WHERE est pousse dans le CTE)
-- PostgreSQL 11- : DESASTRE car materialise les 100M lignes d'abord
-- Si le CTE est reference PLUSIEURS FOIS, il est toujours materialise :
WITH big_data AS MATERIALIZED ( -- Force la materialisation
SELECT customer_id, SUM(amount) AS total
FROM transactions
GROUP BY customer_id
)
SELECT * FROM big_data d1
JOIN big_data d2 ON d1.customer_id != d2.customer_id
WHERE d1.total > d2.total * 2;
-- -> CTE materialise une seule fois, OK
-- MEILLEUR : Table temporaire pour les gros volumes reutilises
CREATE TEMP TABLE tmp_customer_totals AS
SELECT customer_id, SUM(amount) AS total
FROM transactions
GROUP BY customer_id;
CREATE INDEX ON tmp_customer_totals(customer_id);
CREATE INDEX ON tmp_customer_totals(total);
-- Maintenant les requetes sur tmp_customer_totals sont indexees !
SELECT * FROM tmp_customer_totals WHERE total > 10000;
Regle : Si votre CTE est reference une seule fois, laissez PostgreSQL 12+ l'inliner. Si le CTE est reference plusieurs fois et contient beaucoup de donnees, envisagez une table temporaire avec index.
Generation de Series avec CTEs Recursives
-- Generer un calendrier (alternative a generate_series de PostgreSQL)
WITH RECURSIVE calendar AS (
SELECT DATE '2024-01-01' AS date_val
UNION ALL
SELECT date_val + INTERVAL '1 day'
FROM calendar
WHERE date_val < '2024-12-31'
)
SELECT date_val,
EXTRACT(DOW FROM date_val) AS day_of_week,
TO_CHAR(date_val, 'Day') AS day_name,
EXTRACT(WEEK FROM date_val) AS week_number
FROM calendar;
-- Suite de Fibonacci
WITH RECURSIVE fibo AS (
SELECT 1 AS n, 1::BIGINT AS fib_n, 0::BIGINT AS fib_prev
UNION ALL
SELECT n + 1, fib_n + fib_prev, fib_n
FROM fibo
WHERE n < 50
)
SELECT n, fib_n FROM fibo;
-- Remplir les jours manquants dans des donnees temporelles
WITH RECURSIVE date_range AS (
SELECT MIN(order_date) AS dt FROM orders
UNION ALL
SELECT dt + 1 FROM date_range
WHERE dt < (SELECT MAX(order_date) FROM orders)
)
SELECT
dr.dt AS order_date,
COALESCE(SUM(o.total), 0) AS daily_revenue
FROM date_range dr
LEFT JOIN orders o ON o.order_date = dr.dt AND o.status != 'cancelled'
GROUP BY dr.dt
ORDER BY dr.dt;
Securite : Toujours Limiter la Recursion
Une CTE recursive sans condition d'arret boucle a l'infini. Protegez-vous avec :
1. Une clause WHERE dans le membre recursif (ex: WHERE depth < 20)
2. Le parametre PostgreSQL : SET statement_timeout = '30s';
3. La clause CYCLE (PostgreSQL 14+) : CYCLE id SET is_cycle USING path detecte automatiquement les cycles
Flashcards
Introduction au NoSQL
Objectifs de cette lecon
- Comprendre le theoreme CAP et ses implications architecturales
- Differencier BASE vs ACID et savoir quand privilegier l'un ou l'autre
- Maitriser les 4 familles NoSQL : document, cle-valeur, colonne, graphe
- Savoir choisir la bonne technologie selon le cas d'usage
Le NoSQL n'est pas un remplacement du SQL - c'est un complement. En 20 ans de carriere, j'ai vu trop d'equipes choisir MongoDB "parce que c'est moderne" alors qu'un PostgreSQL aurait ete 10x plus adapte. Le vrai talent d'un Data Architect, c'est de savoir quand utiliser quoi. Le theoreme CAP n'est pas qu'une theorie - c'est votre boussole quotidienne.
Le Theoreme CAP
Formule par Eric Brewer en 2000, le theoreme CAP etablit qu'un systeme distribue ne peut garantir simultanement que 2 des 3 proprietes suivantes :
Consistency (C)
/\
/ \
/ \
/ CP \
/systems \
/ββββββββββ\
/ \
/ CA AP \
/ systems systems\
/ββββββββββββββββββ\
Availability (A) ββββββββββ Partition Tolerance (P)
CP : MongoDB, HBase, Redis Cluster
AP : Cassandra, DynamoDB, CouchDB
CA : PostgreSQL (single node), Oracle RAC
Les 3 Proprietes CAP
Consistency : Tous les noeuds voient les memes donnees au meme moment.
Availability : Chaque requete recoit une reponse (succes ou echec).
Partition Tolerance : Le systeme continue de fonctionner malgre des pertes de messages reseau.
ACID vs BASE
| Propriete | ACID (SQL) | BASE (NoSQL) |
|---|---|---|
| Modele | Atomicity, Consistency, Isolation, Durability | Basically Available, Soft state, Eventually consistent |
| Consistance | Forte (immediate) | Eventuelle (convergence) |
| Disponibilite | Peut etre sacrifiee | Prioritaire |
| Scalabilite | Verticale (scale-up) | Horizontale (scale-out) |
| Schema | Rigide, predetermine | Flexible, schema-on-read |
| Cas ideal | Transactions financieres | Big Data, temps reel |
Les 4 Familles NoSQL
NoSQL Databases βββ Document Store βββ Key-Value Store β βββ MongoDB β βββ Redis β βββ CouchDB β βββ Memcached β βββ Amazon DocumentDB β βββ DynamoDB β β βββ Column-Family βββ Graph Database β βββ Cassandra βββ Neo4j β βββ HBase βββ Amazon Neptune β βββ ScyllaDB βββ ArangoDB
| Famille | Modele | Forces | Cas d'usage |
|---|---|---|---|
| Document | JSON/BSON imbriques | Flexibilite, requetes riches | CMS, catalogues, profils |
| Cle-Valeur | Paires simples | Ultra-rapide, cache | Sessions, cache, compteurs |
| Colonne | Familles de colonnes | Ecriture massive, time-series | IoT, logs, analytics |
| Graphe | Noeuds et relations | Traversees complexes | Reseaux sociaux, fraude |
Amazon DynamoDB - Du monolithe au NoSQL
En 2004, Amazon a subi une panne majeure lors du Black Friday a cause de sa base Oracle. Cet incident a pousse l'equipe a creer Dynamo (ancetre de DynamoDB), un systeme AP concu pour ne jamais refuser une ecriture.
- Probleme : Oracle ne scalait plus a 10M+ requetes/seconde
- Solution : Consistent hashing + vector clocks + eventual consistency
- Resultat : 99.99% disponibilite, latence <10ms au p99
- Impact : Le papier Dynamo (2007) a inspire Cassandra, Riak, Voldemort
Le "NoSQL par defaut"
Choisir NoSQL parce que "c'est plus moderne" sans analyser les besoins. Si vos donnees sont relationnelles avec des transactions complexes, un PostgreSQL sera superieur a n'importe quel NoSQL.
Solution : Commencez par definir vos patterns d'acces (access patterns). Si vous avez besoin de JOINs complexes, de transactions ACID multi-tables, ou de schemas rigides - restez en SQL. Le NoSQL brille quand vous avez des volumes massifs, des schemas variables, ou des patterns de lecture previsibles.
Choisir la bonne base pour un e-commerce
Vous etes Data Architect pour une marketplace avec 5M produits, 200K commandes/jour, et un moteur de recommandation. Quelles bases choisissez-vous ?
- Catalogue produits : MongoDB (schemas variables par categorie)
- Panier & sessions : Redis (latence sub-milliseconde)
- Commandes : PostgreSQL (transactions ACID obligatoires)
- Recommandations : Neo4j (graphe utilisateur-produit)
- Logs & recherche : Elasticsearch (full-text search)
MongoDB Deep Dive
Objectifs de cette lecon
- Maitriser le modele de donnees BSON et les schema design patterns
- Construire des pipelines d'aggregation complexes
- Comprendre le sharding et la replication MongoDB
- Optimiser les performances avec les index et le profiling
MongoDB est la base NoSQL la plus populaire, et pour cause - son modele document est incroyablement intuitif. Mais attention, la simplicite apparente cache une vraie complexite. J'ai vu des equipes detruire les performances de MongoDB en repliquant un schema relationnel dans des documents. Le secret ? Pensez "access patterns first".
Le Modele BSON
BSON (Binary JSON) etend JSON avec des types supplementaires : Date, ObjectId, Decimal128, BinData. Les documents peuvent etre imbriques jusqu'a 100 niveaux avec une taille max de 16 Mo.
// Document e-commerce avec embedding
{
"_id": ObjectId("64a7b3f2..."),
"order_id": "ORD-2024-001",
"customer": {
"name": "Marie Dupont",
"email": "marie@example.com",
"tier": "premium"
},
"items": [
{ "sku": "LAPTOP-01", "name": "MacBook Pro", "qty": 1, "price": 2499.00 },
{ "sku": "CASE-03", "name": "Housse", "qty": 1, "price": 49.00 }
],
"total": 2548.00,
"status": "shipped",
"created_at": ISODate("2024-01-15T10:30:00Z"),
"shipping": {
"address": { "street": "12 Rue de la Paix", "city": "Paris", "zip": "75002" },
"tracking": "FR123456789",
"estimated_delivery": ISODate("2024-01-18")
}
}
Schema Design Patterns
| Pattern | Description | Cas d'usage |
|---|---|---|
| Embedding | Imbriquer les sous-documents | Relation 1:few, lecture ensemble |
| Referencing | Stocker des ObjectId references | Relation many:many, documents volumineux |
| Bucket | Regrouper des evenements par periode | Time-series, IoT, logs |
| Computed | Pre-calculer les aggregations | Compteurs, statistiques frequentes |
| Outlier | Gerer les cas extremes separement | Utilisateurs avec millions de followers |
| Subset | Garder un sous-ensemble frequent | Les 10 derniers commentaires |
Pipeline d'Aggregation
// Analyse des ventes par categorie et mois
db.orders.aggregate([
{ $match: { status: "completed", created_at: { $gte: ISODate("2024-01-01") } } },
{ $unwind: "$items" },
{ $lookup: {
from: "products",
localField: "items.sku",
foreignField: "sku",
as: "product_info"
}},
{ $unwind: "$product_info" },
{ $group: {
_id: {
category: "$product_info.category",
month: { $month: "$created_at" }
},
total_revenue: { $sum: { $multiply: ["$items.qty", "$items.price"] } },
order_count: { $sum: 1 },
avg_price: { $avg: "$items.price" }
}},
{ $sort: { total_revenue: -1 } },
{ $project: {
category: "$_id.category",
month: "$_id.month",
total_revenue: { $round: ["$total_revenue", 2] },
order_count: 1,
avg_price: { $round: ["$avg_price", 2] }
}}
])
Sharding & Replication
Application
|
mongos Router ββββ Config Servers (3x)
/ | \ (metadata, chunk maps)
/ | \
Shard1 Shard2 Shard3
(RS) (RS) (RS)
P-S-S P-S-S P-S-S
RS = Replica Set (Primary + 2 Secondary)
Shard Key: { customer_id: "hashed" }
Forbes - Migration vers MongoDB Atlas
Forbes a migre son CMS de 100M+ articles depuis une base relationnelle vers MongoDB Atlas pour gerer les schemas variables de contenu (articles, videos, podcasts, interactifs).
- Gain : Temps de deploiement de nouveaux types de contenu reduit de 2 semaines a 2 heures
- Performance : Latence de lecture reduite de 300ms a 15ms grace au pattern Subset
- Scalabilite : Gestion de 150M pages vues/mois sans probleme
Le "Relational Mindset" en MongoDB
Creer des collections separees pour chaque entite et utiliser $lookup partout revient a reconstruire un SGBDR mais en moins performant. MongoDB n'a pas d'optimiseur de requetes aussi sophistique que PostgreSQL pour les JOINs.
Solution : Modelisez par les access patterns, pas par les entites. Si deux donnees sont toujours lues ensemble, imbriquez-les. Regles : embed si 1:few et acces conjoint, reference si 1:many ou acces independant.
Redis & Cache Distribue
Objectifs de cette lecon
- Maitriser les structures de donnees Redis avancees
- Implementer les patterns de caching (cache-aside, write-through, write-behind)
- Comprendre Redis Cluster et la haute disponibilite
- Utiliser Redis pour le pub/sub, les streams et les compteurs distribues
Redis est l'outil le plus sous-estime de l'ecosysteme data. Tout le monde le connait comme "un cache", mais c'est en realite un serveur de structures de donnees in-memory ultra-polyvalent. Je l'utilise pour le caching, les sessions, les leaderboards, les rate limiters, les queues, et meme comme broker de messages leger. A 100K+ ops/seconde sur un seul noeud, c'est un game changer.
Structures de Donnees Redis
| Type | Description | Commandes | Cas d'usage |
|---|---|---|---|
| String | Valeur simple (max 512 Mo) | SET, GET, INCR, EXPIRE | Cache, compteurs, flags |
| Hash | Map de champs | HSET, HGET, HGETALL | Objets, profils utilisateur |
| List | Liste chainee | LPUSH, RPOP, LRANGE | Queues, feeds, historique |
| Set | Ensemble non-ordonne | SADD, SMEMBERS, SINTER | Tags, amis communs |
| Sorted Set | Set avec score | ZADD, ZRANGE, ZRANK | Leaderboards, top-N |
| Stream | Append-only log | XADD, XREAD, XGROUP | Event sourcing, CDC |
| HyperLogLog | Estimation cardinalite | PFADD, PFCOUNT | Compteurs uniques |
Patterns de Caching
Client ββGETββ> Cache (Redis)
| |
| Cache Miss | Cache Hit
| |
βββSELECTββ> Database return data
|
βββSETββ> Cache (avec TTL)
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user_profile(user_id: str) -> dict:
"""Cache-aside pattern avec TTL"""
cache_key = f"user:{user_id}:profile"
# 1. Verifier le cache
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. Cache miss - lire la DB
profile = db.query("SELECT * FROM users WHERE id = %s", user_id)
# 3. Populer le cache avec TTL de 15 min
r.setex(cache_key, 900, json.dumps(profile))
return profile
def update_user_profile(user_id: str, data: dict):
"""Write-through: ecrire DB + invalider cache"""
db.update("UPDATE users SET ... WHERE id = %s", data, user_id)
r.delete(f"user:{user_id}:profile") # Invalidation
Cache-Aside
- Application gere le cache
- Donnees potentiellement stale
- Cache miss = latence supplementaire
- Ideal pour lectures frequentes
Write-Through
- Cache toujours synchronise
- Ecriture plus lente (2 writes)
- Pas de donnees stale
- Ideal pour consistance requise
Netflix - Redis pour les sessions et recommandations
Netflix utilise des milliers d'instances Redis (via EVCache, leur wrapper interne) pour servir 200M+ abonnes avec des latences sub-milliseconde.
- Sessions : Etat de lecture de chaque utilisateur en Redis pour reprendre instantanement
- Recommandations : Pre-calcul des listes de recommandation stockees en Sorted Sets
- Scale : ~3M ops/seconde par cluster, ~1500 clusters globalement
- Fiabilite : Replication cross-region avec failover automatique <30s
Redis sans politique d'eviction
Utiliser Redis comme cache sans configurer maxmemory ni eviction policy. Resultat : Redis remplit toute la RAM, OOM killer tue le processus, toutes les sessions sont perdues.
Solution : Toujours configurer maxmemory (typiquement 75% de la RAM disponible) et une politique d'eviction adaptee : allkeys-lru pour du cache general, volatile-ttl si vous utilisez des TTL differencies, noeviction si vous ne pouvez pas perdre de donnees (mais alors surveillez la memoire).
Cassandra & ScyllaDB
Objectifs de cette lecon
- Comprendre l'architecture en anneau et le consistent hashing
- Maitriser la modelisation orientee requetes (query-first design)
- Optimiser les performances avec les strategies de compaction
- Comparer Cassandra et ScyllaDB pour les cas d'usage haute performance
Cassandra est la base de choix quand vous avez besoin d'ecrire massivement avec une disponibilite quasi-absolue. Facebook l'a creee pour son systeme de messaging, et Apple stocke aujourd'hui 400+ Po dans Cassandra. Mais la modelisation est radicalement differente du SQL - vous devez penser "requete d'abord".
Architecture en Anneau
Node A (Token: 0-24)
/ \
Node F Node B
(75-99) (25-49)
| Gossip |
| Protocol |
Node E Node C
(62-74) (50-61)
\ /
Node D (Token: ...)
Write "user:123" (token=42):
β Primary: Node B (owns 25-49)
β Replica: Node C, Node D (RF=3)
β Consistency: QUORUM = 2/3 acks
CQL - Cassandra Query Language
-- Table modelisee pour la requete "derniers evenements par utilisateur"
CREATE TABLE user_events (
user_id UUID,
event_date DATE,
event_time TIMESTAMP,
event_type TEXT,
event_data MAP<TEXT, TEXT>,
PRIMARY KEY ((user_id, event_date), event_time)
) WITH CLUSTERING ORDER BY (event_time DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_size': 1,
'compaction_window_unit': 'DAYS'};
-- Requete optimisee (suit la partition key)
SELECT * FROM user_events
WHERE user_id = ? AND event_date = '2024-01-15'
ORDER BY event_time DESC LIMIT 50;
-- Anti-pattern : scan sans partition key
-- SELECT * FROM user_events WHERE event_type = 'click'; -- INTERDIT !
La regle d'or Cassandra
En Cassandra, vous ne modelisez PAS vos donnees, vous modelisez vos REQUETES. Chaque table est optimisee pour un pattern de lecture specifique. La denormalisation et la duplication de donnees sont non seulement acceptees mais encouragees. Un meme evenement peut exister dans 3 tables differentes si 3 requetes differentes en ont besoin.
Cassandra vs ScyllaDB
| Critere | Cassandra | ScyllaDB |
|---|---|---|
| Langage | Java (JVM) | C++ (Seastar framework) |
| Latence p99 | 10-50ms | 1-5ms |
| Throughput | ~50K ops/s par noeud | ~1M ops/s par noeud |
| GC Pauses | Oui (problematique) | Non (pas de GC) |
| Compaction | Impact sur la latence | Impact minimal |
| Compatibilite | Reference CQL | 100% CQL compatible |
| Cout serveurs | Baseline | 3-5x moins de noeuds |
Discord - Migration Cassandra vers ScyllaDB
Discord stockait des trillions de messages dans Cassandra mais souffrait de latences imprevisibles dues aux GC pauses Java. En 2023, ils ont migre vers ScyllaDB.
- Avant : 177 noeuds Cassandra, latence p99 a 200ms lors des GC
- Apres : 72 noeuds ScyllaDB, latence p99 stable a 5ms
- Gain : -60% de noeuds, -90% de latence p99, maintenance simplifiee
Startup IoT - Mauvaise modelisation Cassandra
Une startup IoT a modelise sa table de capteurs avec une partition key sur device_id uniquement, sans date. En 6 mois, certaines partitions ont atteint 100 Mo+ (limite recommandee : 10 Mo), causant des timeouts en lecture et des compactions interminables.
- Erreur : Partition key sans bucketing temporel
- Impact : 30% des lectures en timeout, 3 incidents majeurs en production
- Fix : Migration vers PRIMARY KEY ((device_id, date_bucket), timestamp) avec bucketing par jour
Le "SELECT * sans WHERE" en Cassandra
Executer des requetes sans filtre sur la partition key force un full cluster scan (ALLOW FILTERING). Cela impacte tous les noeuds et peut mettre le cluster entier a genoux.
Solution : Toute requete DOIT filtrer sur la partition key complete. Si vous avez besoin d'un acces par un champ non-partition, creez une table materialisee ou un index secondaire local (SASI) pour les cas simples.
Neo4j & Bases de Donnees Graphes
Objectifs de cette lecon
- Comprendre le modele Property Graph et ses avantages
- Ecrire des requetes Cypher pour des traversees complexes
- Appliquer les algorithmes de graphe (PageRank, communautes, chemins)
- Identifier les cas d'usage ideaux pour les bases graphes
Les bases graphes sont le secret le mieux garde de l'industrie. Quand vos donnees sont definies par leurs relations plus que par leurs attributs, un graphe change completement la donne. Detection de fraude, recommandations, analyse de reseaux - la ou un SQL demande des JOINs recursifs couteux sur 5 niveaux, Neo4j repond en millisecondes.
Le Modele Property Graph
(Alice:Person)ββ[:FOLLOWS]ββ>(Bob:Person)
| |
[:LIKES] [:WROTE]
| |
v v
(Post1:Post) (Post2:Post)
| |
[:TAGGED] [:TAGGED]
| |
v v
(Python:Topic) (Data:Topic)
Cypher Query Language
// Recommandation : "Amis de mes amis qui aiment les memes topics"
MATCH (me:Person {name: 'Alice'})-[:FOLLOWS]->(friend)-[:FOLLOWS]->(fof:Person)
WHERE NOT (me)-[:FOLLOWS]->(fof) AND me <> fof
WITH fof, COUNT(friend) AS mutual_friends
MATCH (me)-[:LIKES]->(:Post)-[:TAGGED]->(topic)
MATCH (fof)-[:LIKES]->(:Post)-[:TAGGED]->(topic)
WITH fof, mutual_friends, COUNT(DISTINCT topic) AS common_topics
RETURN fof.name, mutual_friends, common_topics
ORDER BY mutual_friends DESC, common_topics DESC
LIMIT 10;
// Detection de fraude : cycles dans les transactions
MATCH path = (a:Account)-[:TRANSFER*3..6]->(a)
WHERE ALL(r IN relationships(path) WHERE r.amount > 10000)
AND ALL(r IN relationships(path) WHERE
duration.between(r.timestamp, r.timestamp).hours < 24)
RETURN path,
reduce(total = 0, r IN relationships(path) | total + r.amount) AS cycle_total;
Algorithmes de Graphe
| Algorithme | Categorie | Cas d'usage |
|---|---|---|
| PageRank | Centralite | Noeuds les plus influents |
| Betweenness Centrality | Centralite | Points de passage critiques |
| Louvain | Communautes | Detection de clusters |
| Dijkstra | Chemins | Plus court chemin |
| Node Similarity | Similarite | Recommandations |
| Label Propagation | Communautes | Classification non-supervisee |
Panama Papers - ICIJ et Neo4j
L'investigation des Panama Papers (11.5M documents, 2.6 To) a utilise Neo4j pour cartographier les relations entre 214K entites offshore, revelant des reseaux de blanchiment impossibles a detecter en SQL.
- Donnees : 800K noeuds (personnes, societes, comptes) + 2M relations
- Requetes : Traversees 6+ niveaux de profondeur en <1 seconde
- Impact : Demissions de dirigeants politiques dans 10+ pays
Utiliser un graphe pour des donnees tabulaires
Stocker des logs d'acces ou des time-series dans Neo4j parce que "tout est connecte". Les bases graphes excellient pour les traversees de relations, pas pour le scan sequentiel ou l'agregation volumetrique.
Solution : Utilisez un graphe quand la valeur est dans les RELATIONS entre entites (fraude, recommandations, reseaux). Pour des donnees principalement attributaires ou temporelles, PostgreSQL, Cassandra ou ClickHouse seront bien plus performants.
Architecture anti-fraude pour une banque
Vous concevez le systeme anti-fraude d'une banque traitant 500K transactions/jour. Proposez une architecture polyglot :
- Neo4j : Graphe des relations client-compte-beneficiaire pour detecter les cycles suspects
- Redis : Cache des profils de risque, rate limiting des transactions
- Kafka : Stream des transactions en temps reel vers les moteurs de regles
- PostgreSQL : Stockage des alertes validees, audit trail reglementaire
Elasticsearch & Recherche Full-Text
Objectifs de cette lecon
- Comprendre l'index inverse et le fonctionnement interne d'Elasticsearch
- Configurer les mappings, analyzers et tokenizers
- Construire des requetes complexes avec le Query DSL
- Maitriser la stack ELK (Elasticsearch, Logstash, Kibana)
Elasticsearch est devenu incontournable pour 3 cas : la recherche full-text, l'observabilite (logs/metriques/traces), et l'analytics en temps reel. Sa capacite a indexer des teraoctets de donnees et a repondre en millisecondes est remarquable. Mais attention aux couts - un cluster Elasticsearch mal dimensionne peut exploser votre budget cloud.
Index Inverse
Documents: Index Inverse:
Doc1: "data architect" "architect" β [Doc1, Doc3]
Doc2: "data engineer" "cloud" β [Doc3]
Doc3: "cloud architect" "data" β [Doc1, Doc2]
"engineer" β [Doc2]
Recherche "data architect":
β "data" β© "architect" = [Doc1] (score: 2 terms matched)
β "data" seulement = [Doc2] (score: 1 term)
β "architect" seulement = [Doc3] (score: 1 term)
Mapping & Analyzers
{
"mappings": {
"properties": {
"title": {
"type": "text",
"analyzer": "french",
"fields": {
"keyword": { "type": "keyword" },
"autocomplete": {
"type": "text",
"analyzer": "autocomplete_analyzer"
}
}
},
"price": { "type": "float" },
"category": { "type": "keyword" },
"created_at": { "type": "date" },
"location": { "type": "geo_point" },
"description": {
"type": "text",
"analyzer": "french"
}
}
},
"settings": {
"analysis": {
"analyzer": {
"autocomplete_analyzer": {
"type": "custom",
"tokenizer": "edge_ngram_tokenizer",
"filter": ["lowercase", "asciifolding"]
}
},
"tokenizer": {
"edge_ngram_tokenizer": {
"type": "edge_ngram",
"min_gram": 2, "max_gram": 15
}
}
}
}
}
Stack ELK
Applications Serveurs Containers
| | |
ββββββββββββββββΌβββββββββββββ
|
Beats (Filebeat,
Metricbeat, APM)
|
Logstash / Ingest
(parse, enrich,
transform)
|
Elasticsearch
(index, search,
aggregate)
|
Kibana
(dashboards,
alertes, ML)
Wikipedia - Elasticsearch pour la recherche
Wikipedia utilise Elasticsearch (via CirrusSearch) pour servir 300M+ recherches par jour sur 60M+ articles en 300+ langues.
- Architecture : Clusters dedies par langue, replication cross-datacenter
- Features : Did-you-mean, autocomplete, recherche par proximite, highlighting
- Performance : Latence mediane <50ms, p99 <200ms
E-commerce - Cluster Elasticsearch non-dimensionne
Un site e-commerce a deploye 3 noeuds Elasticsearch pour indexer 50M produits. Sans tuning, le cluster a sature lors du Black Friday avec des timeouts en cascade.
- Erreur : Shards de 50 Go (recommande : 10-30 Go), pas de replica, heap JVM sous-dimensionnee
- Impact : Recherche hors service pendant 4h, perte estimee de 2M EUR de ventes
- Fix : Re-indexation avec shards de 20 Go, ajout de replicas, circuit breakers configures
Elasticsearch comme base de donnees primaire
Utiliser Elasticsearch comme seul stockage de donnees. ES n'est pas ACID, les updates sont couteuses (delete + re-index), et les pertes de donnees sont possibles lors des split-brain.
Solution : Toujours garder une source de verite (PostgreSQL, S3) et utiliser ES comme index de recherche secondaire. Implementer un pipeline de re-indexation pour pouvoir reconstruire l'index depuis la source.
NewSQL : CockroachDB & TiDB
Objectifs de cette lecon
- Comprendre la promesse NewSQL : scalabilite horizontale + ACID
- Maitriser l'architecture de CockroachDB (Raft, ranges, distributed SQL)
- Comparer TiDB et CockroachDB pour differents cas d'usage
- Evaluer quand le NewSQL remplace un RDBMS traditionnel
Le NewSQL represente le "meilleur des deux mondes" : la familiarite du SQL avec les transactions ACID, et la scalabilite horizontale du NoSQL. Google Spanner a ouvert la voie, et CockroachDB et TiDB democratisent cette approche. Pour les equipes qui ne veulent pas sacrifier la consistance, c'est une revolution.
L'Evolution des Bases de Donnees
SQL (1970s-2000s) NoSQL (2005-2015) NewSQL (2012+) ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β β ACID β β β ACID β β β ACID β β β SQL β β β SQL β β β SQL β β β Scale-out β β β Scale-out β β β Scale-out β β β Geo-distribβ β β Geo-distribβ β β Geo-distribβ ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ Oracle, PostgreSQL Cassandra, MongoDB CockroachDB, TiDB
CockroachDB
-- Tables avec geo-partitionnement (donnees EU restent en EU)
CREATE TABLE orders (
id UUID DEFAULT gen_random_uuid(),
customer_id UUID NOT NULL,
region STRING NOT NULL,
total DECIMAL(10,2),
created_at TIMESTAMPTZ DEFAULT now(),
PRIMARY KEY (region, id)
) LOCALITY REGIONAL BY ROW;
-- Transaction distribuee ACID (identique a PostgreSQL)
BEGIN;
UPDATE accounts SET balance = balance - 500 WHERE id = 'sender' AND region = 'eu';
UPDATE accounts SET balance = balance + 500 WHERE id = 'receiver' AND region = 'us';
INSERT INTO transfers (sender, receiver, amount, region)
VALUES ('sender', 'receiver', 500, 'eu');
COMMIT;
CockroachDB vs TiDB
| Critere | CockroachDB | TiDB |
|---|---|---|
| Inspire de | Google Spanner | Google Spanner + F1 |
| Compatibilite | PostgreSQL wire protocol | MySQL wire protocol |
| Consensus | Raft (multi-raft) | Raft (via TiKV) |
| Storage | Pebble (LSM-tree) | RocksDB (via TiKV) |
| Geo-distribution | Natif (locality rules) | TiKV Placement Rules |
| HTAP | Non (OLTP focus) | Oui (TiFlash columnar) |
| Licence | BSL (source-available) | Apache 2.0 |
| Ideal pour | Multi-region, compliance | Migration MySQL, HTAP |
DoorDash - Migration vers CockroachDB
DoorDash a migre de PostgreSQL a CockroachDB pour gerer la croissance explosive de leur service de livraison pendant le COVID.
- Probleme : PostgreSQL a 13 replicas en lecture, ecriture saturee sur le primary
- Solution : CockroachDB multi-region (US-East, US-West, EU)
- Resultat : Scale lineaire, latence d'ecriture <10ms, zero downtime deployments
NewSQL pour tout remplacer
Vouloir migrer toutes ses bases vers du NewSQL. CockroachDB et TiDB sont excellents pour l'OLTP distribue, mais ne remplacent pas un DWH analytique (Snowflake), un cache (Redis), ou une base graphe (Neo4j).
Solution : Le NewSQL est ideal pour les workloads OLTP qui ont depasse les limites d'un PostgreSQL single-node ou qui necessitent une distribution geographique. Pour l'analytics, restez sur des solutions OLAP dediees.
Quiz - NoSQL & NewSQL
Objectifs de cette evaluation
- Valider vos connaissances sur le theoreme CAP et les modeles de consistance
- Verifier la maitrise des 4 familles NoSQL et leurs cas d'usage
- Tester votre capacite a choisir la bonne technologie
Quiz Module 1.2 - NoSQL & NewSQL
1. Selon le theoreme CAP, quel compromis fait Cassandra ?
2. Quel pattern MongoDB est recommande pour gerer un utilisateur avec potentiellement des millions de followers ?
3. Quelle structure Redis est ideale pour un leaderboard en temps reel ?
4. En Cassandra, pourquoi la modelisation "query-first" est-elle obligatoire ?
5. Quel langage de requete utilise Neo4j ?
6. Quel est l'avantage principal de ScyllaDB par rapport a Cassandra ?
7. Pourquoi ne faut-il pas utiliser Elasticsearch comme base de donnees primaire ?
8. Quel est l'avantage principal du NewSQL (CockroachDB, TiDB) sur le NoSQL ?
Ecosysteme Python Data
Python est devenu la lingua franca de la data. De l'analyse exploratoire au machine learning, en passant par l'ingenierie de donnees, l'ecosysteme Python offre des outils pour chaque etape du pipeline. Cette lecon vous guide a travers les librairies essentielles et vous aide a choisir le bon outil pour chaque situation.
Objectifs d'apprentissage
- Connaitre les librairies fondamentales de l'ecosysteme Python Data
- Comprendre quand utiliser pandas, DuckDB, Polars ou PySpark
- Maitriser la matrice de decision basee sur le volume de donnees
- Decouvrir DuckDB comme revolution analytique locale
- Positionner Polars comme alternative moderne a pandas
Vue d'ensemble de l'ecosysteme
L'ecosysteme Python Data est compose de plusieurs couches complementaires. Chaque outil a ete concu pour resoudre un probleme specifique, et le Data Architect doit savoir les assembler de maniere coherente.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β ORCHESTRATION & WORKFLOW β β Airflow β Prefect β Dagster β Luigi β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β MACHINE LEARNING β β scikit-learn β XGBoost β PyTorch β TensorFlow β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β TRAITEMENT & ANALYSE β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β β β pandas β β Polars β β DuckDB β β PySpark β β β β < 1 GB β β < 50 GB β β < 100 GB β β > 1 TB β β β β In-memory β β In-memoryβ β In-proc β β Cluster β β β ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β CALCUL NUMERIQUE β β NumPy β SciPy β Apache Arrow β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β CONNECTIVITE β β SQLAlchemy β psycopg2 β pymongo β requests β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€ β VISUALISATION β β Matplotlib β Seaborn β Plotly β Altair β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
NumPy - La fondation
NumPy est la brique de base de tout l'ecosysteme scientifique Python. Il fournit le type ndarray, un tableau multidimensionnel performant ecrit en C, sur lequel toutes les autres librairies s'appuient.
import numpy as np
# Creation de tableaux performants
prix = np.array([29.99, 45.50, 12.00, 89.99, 34.50])
quantites = np.array([100, 50, 200, 25, 75])
# Operations vectorisees (pas de boucle Python!)
chiffre_affaires = prix * quantites
total = np.sum(chiffre_affaires) # 12_361.25
# Statistiques instantanees
print(f"CA moyen par produit: {np.mean(chiffre_affaires):.2f}")
print(f"Ecart-type: {np.std(chiffre_affaires):.2f}")
print(f"Produit max: index {np.argmax(chiffre_affaires)}")
# Filtrage vectorise
produits_rentables = prix[chiffre_affaires > 2000]
pandas - Le couteau suisse de l'analyse
pandas est l'outil le plus utilise pour la manipulation de donnees tabulaires. Son objet central, le DataFrame, est un tableau 2D avec des labels sur les lignes et colonnes, qui s'inspire directement des data.frames de R.
import pandas as pd
# Lecture de multiples formats
df_csv = pd.read_csv("ventes.csv")
df_excel = pd.read_excel("rapport.xlsx", sheet_name="Q4")
df_json = pd.read_json("api_response.json")
df_parquet = pd.read_parquet("datalake/ventes.parquet")
df_sql = pd.read_sql("SELECT * FROM clients", engine)
# Exploration rapide
print(df_csv.shape) # (1000000, 15)
print(df_csv.dtypes) # Types de chaque colonne
print(df_csv.describe()) # Stats descriptives
print(df_csv.info(memory_usage='deep')) # Usage memoire
Limite de pandas
pandas charge TOUT en memoire RAM. Un fichier CSV de 5 GB necessite environ 10-15 GB de RAM (overhead Python). Pour les gros volumes, preferez DuckDB, Polars ou PySpark.
DuckDB - Le SQLite de l'analytique
DuckDB est une revolution recente dans le monde de la data. C'est une base de donnees analytique in-process (pas de serveur), optimisee pour les requetes OLAP. Elle peut traiter des fichiers Parquet de dizaines de GB directement, sans les charger en memoire.
Base traditionnelle (PostgreSQL) DuckDB (In-Process)
ββββββββββββ ββββββββββββ ββββββββββββββββββββ
β Client βββββΆβ Serveur β β Votre Script β
β Python ββββββ PostgreSQLβ β Python β
ββββββββββββ ββββββββββββ β ββββββββββββββ β
β β β β DuckDB β β
β Reseau β β β Engine β β
β (latence) β β ββββββββββββββ β
βΌ βΌ ββββββββββββββββββββ
ββββββββββββββββββββββββ β
β Serialisation / β Acces direct fichiers
β Deserialisation β (zero copie)
β des donnees β β
ββββββββββββββββββββββββ βΌ
ββββββββββββββββββββ
β Parquet / CSV / β
β JSON / S3 β
ββββββββββββββββββββ
import duckdb
# Requete directe sur un fichier Parquet (sans le charger en memoire!)
result = duckdb.sql("""
SELECT
region,
product_category,
SUM(amount) as total_sales,
COUNT(DISTINCT customer_id) as unique_customers,
AVG(amount) as avg_order_value
FROM 'sales_2024/*.parquet' -- Glob sur plusieurs fichiers!
WHERE order_date >= '2024-01-01'
GROUP BY region, product_category
HAVING total_sales > 100000
ORDER BY total_sales DESC
""").df() # Convertit en pandas DataFrame
# Requete sur un fichier CSV distant
duckdb.sql("""
SELECT * FROM read_csv_auto(
'https://data.example.com/dataset.csv',
header=true,
dateformat='%Y-%m-%d'
)
LIMIT 100
""")
# Integration transparente avec pandas
import pandas as pd
df_clients = pd.read_csv("clients.csv")
df_commandes = pd.read_parquet("commandes.parquet")
# DuckDB peut joindre un DataFrame pandas avec un fichier Parquet!
result = duckdb.sql("""
SELECT c.nom, c.ville, SUM(co.montant) as total
FROM df_clients c -- DataFrame pandas
JOIN df_commandes co ON c.id = co.client_id -- Fichier Parquet
GROUP BY c.nom, c.ville
ORDER BY total DESC
LIMIT 20
""").df()
Pourquoi DuckDB est un game-changer
Zero infrastructure : pas de serveur, pas de Docker, pip install suffit. Performances : souvent 10-50x plus rapide que pandas pour les groupby/join. SQL natif : pas besoin d'apprendre une nouvelle API. Multi-format : lit Parquet, CSV, JSON, et meme les DataFrames pandas directement.
Polars - Le pandas moderne
Polars est une librairie de DataFrames ecrite en Rust, concue pour corriger les limitations historiques de pandas. Elle est systematiquement plus rapide et utilise moins de memoire grace a Apache Arrow en backend et une execution lazy par defaut.
import polars as pl
# Lecture de donnees
df = pl.read_parquet("ventes.parquet")
# API d'expressions (plus claire que pandas)
result = (
df.lazy() # Mode lazy
.filter(pl.col("date") >= "2024-01-01") # Filtre
.group_by("region", "categorie") # Groupby
.agg([
pl.col("montant").sum().alias("total_ventes"),
pl.col("client_id").n_unique().alias("clients_uniques"),
pl.col("montant").mean().alias("panier_moyen"),
pl.col("montant").quantile(0.95).alias("p95_montant"),
])
.sort("total_ventes", descending=True)
.collect() # Execute tout d'un coup!
)
# Polars optimise automatiquement le plan d'execution
# - Predicate pushdown (filtre avant groupby)
# - Projection pushdown (ne lit que les colonnes necessaires)
# - Parallelisation automatique sur tous les coeurs
pandas
- Execution eager (immediate)
- Single-threaded par defaut
- API mature, enorme ecosysteme
- Index implicites (source de bugs)
- Copie implicite des donnees
- Backend NumPy (legacy)
- Ideal pour : prototypage, notebooks
Polars
- Execution lazy (optimisee)
- Multi-threaded natif
- API plus jeune mais croissante
- Pas d'index (plus previsible)
- Zero-copy quand possible
- Backend Apache Arrow
- Ideal pour : performance, pipelines
PySpark - Le Big Data distribue
PySpark est l'interface Python pour Apache Spark, le moteur de calcul distribue dominant. Il permet de traiter des petaoctets de donnees sur des clusters de centaines de machines.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder \
.appName("DataAnalysis") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Lecture depuis un data lake
df = spark.read.parquet("s3://datalake/ventes/year=2024/")
# API DataFrame similaire a pandas
result = (
df.filter(F.col("status") == "completed")
.groupBy("region", "category")
.agg(
F.sum("amount").alias("total"),
F.countDistinct("customer_id").alias("unique_customers")
)
.orderBy(F.desc("total"))
)
# Ou en SQL pur
df.createOrReplaceTempView("ventes")
spark.sql("""
SELECT region, category, SUM(amount) as total
FROM ventes
WHERE status = 'completed'
GROUP BY region, category
ORDER BY total DESC
""").show()
Matrice de decision : Quel outil pour quel volume ?
| Critere | pandas | Polars | DuckDB | PySpark |
|---|---|---|---|---|
| Volume max | < 1 GB | < 50 GB | < 100 GB | Illimite (cluster) |
| Latence demarrage | Instantane | Instantane | Instantane | 30-60 secondes |
| Infrastructure | Aucune | Aucune | Aucune | Cluster Spark |
| Courbe d'apprentissage | Moyenne | Moyenne | Faible (SQL) | Elevee |
| Ecosysteme ML | Excellent | Bon (via Arrow) | Moyen | MLlib integre |
| Streaming | Non | Non | Non | Structured Streaming |
| Cas d'usage ideal | Notebooks, prototypage, petits CSV | ETL locaux, fichiers moyens | Analytique ad-hoc, exploration de Parquet | Data lakes, pipelines TB-scale |
Quel volume de donnees ?
β
ββββββββββββββΌβββββββββββββββββ
βΌ βΌ βΌ
< 1 GB 1-100 GB > 100 GB
β β β
βΌ βΌ βΌ
pandas Besoin de SQL ? PySpark
β β β (ou Spark SQL)
β Oui Non
β β β
β βΌ βΌ
β DuckDB Polars
β
βΌ
Besoin de compatibilite
ML (scikit-learn) ?
β β
Oui Non
β β
βΌ βΌ
pandas Polars (+ rapide)
'data/*.parquet' nativement. Polars serait aussi viable en mode lazy. pandas serait impossible (depassement memoire).SQLAlchemy - Le pont Python-Base de donnees
SQLAlchemy est la librairie standard pour connecter Python a n'importe quelle base de donnees relationnelle. Elle offre deux niveaux d'abstraction : le Core (SQL generatif) et l'ORM (mapping objet-relationnel).
from sqlalchemy import create_engine
# Connexion a differentes bases
pg_engine = create_engine("postgresql://user:pass@host:5432/db")
mysql_engine = create_engine("mysql+pymysql://user:pass@host:3306/db")
sqlite_engine = create_engine("sqlite:///local.db")
# Integration pandas : le duo parfait
import pandas as pd
df = pd.read_sql("SELECT * FROM clients WHERE actif = true", pg_engine)
df_transforme.to_sql("clients_clean", pg_engine, if_exists="replace", index=False)
Netflix : Python comme lingua franca de la data
Netflix emploie plus de 1500 data scientists et ingenieurs data qui utilisent Python quotidiennement. Leur ecosysteme interne repose sur :
- Metaflow (open-source) : framework Python pour les pipelines ML, developpe en interne chez Netflix
- pandas + PySpark : les data scientists prototypent en pandas sur des echantillons, puis les ingenieurs passent en PySpark pour la production sur le data lake de 100+ PB
- Jupyter Hub interne : chaque employe a acces a des notebooks connectes au data lake, avec des kernels pre-configures (pandas, Spark, TensorFlow)
- Resultat : le systeme de recommandation genere 80% du contenu regarde, l'A/B testing Python traite 200+ experiences simultanees, et la detection d'anomalies Python surveille la qualite de streaming en temps reel
Lecon : Netflix a standardise sur Python non pas parce que c'est le langage le plus performant, mais parce qu'il permet la collaboration entre profils differents (data scientists, ingenieurs, analystes).
Utiliser un seul outil pour tout
Erreur courante du debutant : vouloir tout faire en pandas, ou tout migrer en PySpark. Chaque outil a sa zone de pertinence.
- Erreur : "On utilise Spark pour tout, meme les fichiers de 100 MB" → overhead de demarrage de 30s pour un traitement de 2s en pandas
- Erreur : "pandas suffit pour nos 50 GB de logs" → crash OOM ou temps de traitement de 4 heures au lieu de 2 minutes en DuckDB
- Bonne pratique : evaluez le volume ET la frequence. Un traitement quotidien de 20 GB justifie DuckDB. Un traitement horaire de 500 GB justifie Spark.
Scenario : Choix d'architecture chez une startup e-commerce
Vous etes Data Architect dans une startup qui traite 500K commandes/mois (environ 2 GB de donnees). L'equipe data de 3 personnes hesite entre pandas et PySpark.
Votre recommandation :
- Court terme (6 mois) : pandas + DuckDB. Le volume est largement gerableen local. DuckDB pour les requetes analytiques ad-hoc, pandas pour les transformations et l'integration ML.
- Moyen terme (18 mois) : Si le volume depasse 50 GB/mois, introduire Polars pour les pipelines de transformation. DuckDB reste pour l'analytique.
- Long terme (3+ ans) : Si le volume depasse 1 TB et que l'equipe grandit a 10+, migrer les pipelines critiques vers PySpark sur un cluster manage (EMR, Dataproc).
Erreur a eviter : ne pas sur-ingenierer avec Spark des le depart. Le cout d'un cluster Spark (infrastructure + expertise) est 10x celui d'une solution locale.
Quiz rapide
Vous devez analyser un fichier Parquet de 60 GB sur votre laptop (32 GB RAM). Quel outil est le plus adapte ?
Quel est l'avantage principal de Polars sur pandas ?
En 2024-2025, le paysage a radicalement change. Si vous debutez un nouveau projet, mon conseil : commencez avec DuckDB pour l'analytique et Polars pour les transformations. Ne sortez pandas que pour sa compatibilite avec scikit-learn et les notebooks. Et reservez Spark aux vrais cas Big Data (TB+). Le Data Architect moderne jongle entre ces outils - c'est une competence cle.
pandas DataFrames - Fondamentaux
Le DataFrame pandas est l'objet central de l'analyse de donnees en Python. Maitriser sa creation, la selection de donnees, le filtrage et les transformations est une competence incontournable pour tout Data Architect. Cette lecon approfondie couvre les patterns essentiels et les pieges a eviter.
Objectifs d'apprentissage
- Creer des DataFrames a partir de multiples sources
- Maitriser la selection avec loc, iloc et les conditions booleennes
- Filtrer et transformer les donnees efficacement
- Eviter les anti-patterns courants (iterrows, chained indexing)
- Appliquer les bonnes pratiques de performance et vectorisation
Creation de DataFrames
Un DataFrame est un tableau 2D avec des labels sur les lignes (index) et les colonnes. Il peut etre cree a partir de nombreuses sources.
import pandas as pd
import numpy as np
# 1. Depuis un dictionnaire (le plus courant en dev)
df = pd.DataFrame({
'nom': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'age': [28, 34, 45, 31, 27],
'ville': ['Paris', 'Lyon', 'Paris', 'Marseille', 'Lyon'],
'salaire': [55000, 62000, 78000, 58000, 51000],
'departement': ['Tech', 'Marketing', 'Tech', 'RH', 'Tech']
})
# 2. Depuis un fichier CSV (le plus courant en data)
df_ventes = pd.read_csv(
"ventes.csv",
sep=";", # Separateur (defaut: virgule)
encoding="utf-8", # Encodage
parse_dates=["date_vente"], # Colonnes date
dtype={"code_postal": str}, # Forcer les types
usecols=["date_vente", "montant", "client_id"], # Colonnes utiles
na_values=["N/A", "NULL", ""], # Valeurs manquantes custom
nrows=100000 # Limiter les lignes (test)
)
# 3. Depuis une base de donnees SQL
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/ecommerce")
df_clients = pd.read_sql("""
SELECT id, nom, email, date_inscription, ltv
FROM clients
WHERE actif = true
ORDER BY date_inscription DESC
""", engine, parse_dates=["date_inscription"])
# 4. Depuis un fichier Parquet (recommande pour la data)
df_logs = pd.read_parquet(
"logs_2024.parquet",
columns=["timestamp", "event_type", "user_id"] # Projection!
)
# 5. Depuis un fichier JSON (API responses)
df_api = pd.read_json("api_response.json", orient="records")
# 6. Depuis un fichier Excel
df_budget = pd.read_excel(
"budget_2024.xlsx",
sheet_name="Q4",
skiprows=2, # Sauter les lignes d'en-tete custom
header=0 # Ligne d'en-tete apres skip
)
Toujours specifier les types a la lecture
Utilisez dtype et parse_dates dans read_csv(). Sans cela, pandas infere les types, ce qui est lent (double lecture du fichier) et souvent incorrect (codes postaux convertis en entiers, IDs en float si valeurs manquantes).
Exploration des donnees
Avant toute transformation, explorez vos donnees systematiquement. Voici les methodes essentielles :
# Dimensions et structure
print(f"Shape: {df.shape}") # (1000, 5)
print(f"Colonnes: {df.columns.tolist()}")
print(f"Types:\n{df.dtypes}")
# Premieres et dernieres lignes
df.head(10)
df.tail(5)
df.sample(5) # 5 lignes aleatoires
# Statistiques descriptives
df.describe() # Colonnes numeriques
df.describe(include='object') # Colonnes texte
# Informations memoire detaillees
df.info(memory_usage='deep')
# Valeurs manquantes
print(df.isnull().sum()) # Par colonne
print(f"Total NaN: {df.isnull().sum().sum()}")
print(f"% complet: {df.notna().mean() * 100}") # % par colonne
# Valeurs uniques
print(df['ville'].nunique()) # Nombre de valeurs uniques
print(df['ville'].value_counts()) # Distribution des valeurs
# Doublons
print(f"Doublons: {df.duplicated().sum()}")
print(f"Doublons sur nom+ville: {df.duplicated(subset=['nom','ville']).sum()}")
Selection de donnees : loc vs iloc
La selection de donnees est l'operation la plus frequente. pandas offre deux accesseurs principaux : .loc (selection par label) et .iloc (selection par position).
df.loc[lignes, colonnes] df.iloc[lignes, colonnes]
ββββββββββββββββββββββββ βββββββββββββββββββββββββ
Par LABEL (nom) Par POSITION (numero)
Inclusif des deux bornes Exclusif de la borne droite
Exemples : Exemples :
df.loc[0, 'nom'] df.iloc[0, 0]
df.loc[0:5, 'nom':'age'] df.iloc[0:5, 0:2]
df.loc[mask, ['nom','age']] df.iloc[[0,2,4], [0,1]]
βββββββ¬βββββββββββ¬ββββββ¬ββββββββββ
β β nom β age β ville β
β β col 0 β c 1 β col 2 β
βββββββΌβββββββββββΌββββββΌββββββββββ€
β 0 β Alice β 28 β Paris β βββ df.loc[0] = df.iloc[0]
β 1 β Bob β 34 β Lyon β
β 2 β Charlie β 45 β Paris β
β 3 β Diana β 31 β Marseil β
β 4 β Eve β 27 β Lyon β
βββββββ΄βββββββββββ΄ββββββ΄ββββββββββ
df.loc[1:3, 'nom':'age'] β lignes 1,2,3 / colonnes nom, age
df.iloc[1:3, 0:2] β lignes 1,2 / colonnes 0, 1
# ===== SELECTION PAR LABEL : .loc =====
# Une seule valeur
df.loc[0, 'nom'] # 'Alice'
# Une colonne entiere (Series)
df.loc[:, 'nom'] # Equivalent: df['nom']
# Plusieurs colonnes
df.loc[:, ['nom', 'salaire']] # Equivalent: df[['nom', 'salaire']]
# Plage de lignes et colonnes (inclusif!)
df.loc[1:3, 'nom':'ville'] # Lignes 1,2,3 et colonnes nom, age, ville
# Avec condition booleenne (le plus utile!)
df.loc[df['age'] > 30, ['nom', 'salaire']]
# ===== SELECTION PAR POSITION : .iloc =====
# Premiere ligne, premiere colonne
df.iloc[0, 0] # 'Alice'
# 5 premieres lignes, 2 premieres colonnes
df.iloc[:5, :2]
# Lignes et colonnes specifiques
df.iloc[[0, 2, 4], [0, 3]] # Alice et Eve, nom et salaire
# Derniere ligne
df.iloc[-1]
# ===== SELECTION BOOLEENNE (MASQUES) =====
# Condition simple
jeunes = df[df['age'] < 30]
# Conditions combinees (& = AND, | = OR, ~ = NOT)
tech_seniors = df[(df['departement'] == 'Tech') & (df['age'] > 30)]
paris_ou_lyon = df[df['ville'].isin(['Paris', 'Lyon'])]
pas_paris = df[~(df['ville'] == 'Paris')]
# Condition sur texte
contient_a = df[df['nom'].str.contains('a', case=False)]
commence_par = df[df['nom'].str.startswith('A')]
# Condition avec query() - plus lisible
tech_seniors2 = df.query("departement == 'Tech' and age > 30")
Chained Indexing : df[col][row] vs .loc
Le chained indexing est une source majeure de bugs en pandas. Il cree une copie intermediaire et peut produire des resultats inattendus lors de l'affectation.
# MAUVAIS : Chained indexing df['salaire'][df['ville'] == 'Paris'] = 60000 # SettingWithCopyWarning! # Cela ne modifie PAS forcement le DataFrame original. # Pandas cree parfois une copie intermediaire. # MAUVAIS : Double crochet valeur = df['salaire'][0] # Fonctionne mais fragile
# BON : Utiliser .loc pour toute modification df.loc[df['ville'] == 'Paris', 'salaire'] = 60000 # Garanti! # BON : .loc pour lire aussi valeur = df.loc[0, 'salaire'] # Explicite et fiable # BON : .at pour une valeur scalaire (plus rapide que .loc) valeur = df.at[0, 'salaire']
Regle d'or : utilisez TOUJOURS .loc pour modifier un DataFrame. Jamais df[col][condition] = valeur.
Transformation de donnees
Les transformations sont le coeur du travail de preparation de donnees. Voici les operations les plus courantes.
# ===== CREATION DE COLONNES =====
# Calcul simple
df['salaire_mensuel'] = df['salaire'] / 12
# Calcul conditionnel
df['seniorite'] = np.where(df['age'] >= 35, 'Senior', 'Junior')
# Conditions multiples avec np.select
conditions = [
df['salaire'] < 40000,
df['salaire'] < 60000,
df['salaire'] < 80000,
df['salaire'] >= 80000
]
choix = ['Bas', 'Moyen', 'Eleve', 'Tres eleve']
df['tranche_salaire'] = np.select(conditions, choix, default='Inconnu')
# Depuis une autre colonne (string operations)
df['prenom'] = df['nom'].str.split(' ').str[0]
df['email_domain'] = df['email'].str.extract(r'@(.+)$')
df['nom_upper'] = df['nom'].str.upper()
# ===== GESTION DES VALEURS MANQUANTES =====
# Detecter
df.isnull().sum()
# Supprimer les lignes avec NaN
df_clean = df.dropna() # Toutes colonnes
df_clean = df.dropna(subset=['salaire']) # Colonnes specifiques
# Remplir les NaN
df['salaire'] = df['salaire'].fillna(df['salaire'].median())
df['ville'] = df['ville'].fillna('Inconnu')
df['date'] = df['date'].ffill() # Forward fill (propager la valeur precedente)
# ===== RENOMMAGE ET SUPPRESSION =====
df = df.rename(columns={'nom': 'name', 'age': 'years'})
df = df.drop(columns=['colonne_inutile', 'temp'])
# ===== TRI =====
df = df.sort_values('salaire', ascending=False)
df = df.sort_values(['ville', 'salaire'], ascending=[True, False])
# ===== CONVERSION DE TYPES =====
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
df['montant'] = pd.to_numeric(df['montant'], errors='coerce') # NaN si erreur
df['code'] = df['code'].astype(str)
# ===== OPERATIONS SUR LES DATES =====
df['annee'] = df['date'].dt.year
df['mois'] = df['date'].dt.month
df['jour_semaine'] = df['date'].dt.day_name()
df['trimestre'] = df['date'].dt.quarter
df['jours_depuis'] = (pd.Timestamp.now() - df['date']).dt.days
Performance : Vectorisation vs Boucles
La regle numero un de pandas : ne jamais iterer ligne par ligne. Utilisez toujours des operations vectorisees. La difference de performance peut atteindre un facteur 100x a 1000x.
iterrows() pour des operations vectorisables
C'est l'erreur la plus courante chez les developpeurs venant de langages imperatifs. iterrows() force Python a traiter chaque ligne individuellement, perdant tout le benefice de NumPy sous-jacent.
# TERRIBLE : iterrows() - 100x plus lent
# Sur 1M lignes : ~45 secondes
for index, row in df.iterrows():
if row['montant'] > 100:
df.at[index, 'categorie'] = 'Premium'
else:
df.at[index, 'categorie'] = 'Standard'
# MAUVAIS : apply() avec lambda - 10x plus lent
# Sur 1M lignes : ~8 secondes
df['categorie'] = df['montant'].apply(
lambda x: 'Premium' if x > 100 else 'Standard'
)
# BON : np.where() - vectorise
# Sur 1M lignes : ~0.05 secondes (900x plus rapide!)
df['categorie'] = np.where(
df['montant'] > 100, 'Premium', 'Standard'
)
| Methode | 1M lignes | 10M lignes | Facteur |
|---|---|---|---|
| iterrows() | 45s | 450s | 1x (ref) |
| itertuples() | 5s | 50s | ~9x |
| apply(lambda) | 8s | 80s | ~6x |
| np.where() | 0.05s | 0.5s | ~900x |
| np.select() | 0.08s | 0.8s | ~560x |
| Operations vectorisees | 0.03s | 0.3s | ~1500x |
Pourquoi la vectorisation est si rapide
Les operations vectorisees executent le calcul dans du code C/Fortran compile (via NumPy), en une seule passe sur des blocs de memoire contigus. Les boucles Python ajoutent l'overhead de l'interpreteur pour chaque element : type checking, boxing/unboxing, et saut entre zones memoire non contiguees.
# Pattern 1 : Remplacer apply() par des operations vectorisees
# MAUVAIS
df['full_name'] = df.apply(lambda r: f"{r['prenom']} {r['nom']}", axis=1)
# BON
df['full_name'] = df['prenom'] + ' ' + df['nom']
# Pattern 2 : Remplacer les boucles de calcul
# MAUVAIS
totaux = []
for _, row in df.iterrows():
totaux.append(row['prix'] * row['quantite'] * (1 - row['remise']))
df['total'] = totaux
# BON
df['total'] = df['prix'] * df['quantite'] * (1 - df['remise'])
# Pattern 3 : Conditions multiples avec np.select
conditions = [
(df['score'] >= 90),
(df['score'] >= 70),
(df['score'] >= 50),
]
labels = ['A', 'B', 'C']
df['grade'] = np.select(conditions, labels, default='F')
# Pattern 4 : Mapping de valeurs
mapping = {'Paris': 'IDF', 'Lyon': 'ARA', 'Marseille': 'PACA'}
df['region'] = df['ville'].map(mapping)
# Pattern 5 : Binning numerique
df['tranche_age'] = pd.cut(
df['age'],
bins=[0, 25, 35, 50, 100],
labels=['Junior', 'Confirme', 'Senior', 'Expert']
)
Scenario : Nettoyer un CSV avec 15 problemes de qualite
Vous recevez un fichier clients_dirty.csv avec 500K lignes et les problemes suivants. Ecrivez le pipeline de nettoyage complet.
import pandas as pd
import numpy as np
# Lecture avec types explicites
df = pd.read_csv("clients_dirty.csv", dtype={
'code_postal': str, 'telephone': str, 'id': str
}, parse_dates=['date_inscription'], na_values=['N/A', 'NULL', '', 'n/a'])
print(f"Shape initiale: {df.shape}")
print(f"Valeurs manquantes:\n{df.isnull().sum()}")
# 1. Supprimer les doublons exacts
df = df.drop_duplicates()
# 2. Supprimer les doublons sur email (garder le plus recent)
df = df.sort_values('date_inscription', ascending=False)
df = df.drop_duplicates(subset=['email'], keep='first')
# 3. Normaliser les noms (majuscules, espaces)
df['nom'] = df['nom'].str.strip().str.title()
df['prenom'] = df['prenom'].str.strip().str.title()
# 4. Normaliser les emails (minuscules)
df['email'] = df['email'].str.strip().str.lower()
# 5. Valider le format email
email_valide = df['email'].str.match(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
df.loc[~email_valide, 'email'] = np.nan
# 6. Nettoyer les numeros de telephone
df['telephone'] = (df['telephone']
.str.replace(r'[^\d+]', '', regex=True) # Garder chiffres et +
.str.replace(r'^0', '+33', regex=True)) # Format international
# 7. Corriger les codes postaux
df['code_postal'] = df['code_postal'].str.zfill(5) # Padding zeros
df.loc[df['code_postal'].str.len() != 5, 'code_postal'] = np.nan
# 8. Borner les ages aberrants
df.loc[(df['age'] < 0) | (df['age'] > 120), 'age'] = np.nan
df['age'] = df['age'].fillna(df['age'].median())
# 9. Borner les montants negatifs
df.loc[df['ltv'] < 0, 'ltv'] = 0
# 10. Normaliser les villes
df['ville'] = (df['ville'].str.strip().str.upper()
.str.replace('SAINT ', 'ST ', regex=False)
.str.replace('SAINTE ', 'STE ', regex=False))
# 11. Corriger les types
df['age'] = df['age'].astype(int)
# 12. Remplir les valeurs manquantes strategiquement
df['ville'] = df['ville'].fillna('INCONNU')
df['ltv'] = df['ltv'].fillna(0)
# 13. Creer des colonnes derivees
df['annee_inscription'] = df['date_inscription'].dt.year
df['anciennete_jours'] = (pd.Timestamp.now() - df['date_inscription']).dt.days
# 14. Supprimer les lignes sans email ET sans telephone
df = df.dropna(subset=['email', 'telephone'], how='all')
# 15. Reset de l'index
df = df.reset_index(drop=True)
print(f"Shape finale: {df.shape}")
print(f"Valeurs manquantes restantes:\n{df.isnull().sum()}")
Flashcards
df['col'] retourne une Series (1D). df[['col']] retourne un DataFrame (2D avec une seule colonne). Important pour les fonctions qui attendent un type specifique..loc pour modifier un DataFrame : df.loc[condition, 'colonne'] = valeur. Evitez le chained indexing df[col][row] = val. Si vous travaillez sur un sous-ensemble, faites une copie explicite avec .copy()..at[row, col] est optimise pour acceder a une seule valeur scalaire (plus rapide). .loc[rows, cols] est plus general et supporte les slices, conditions booleennes, et selections multiples. Utilisez .at dans les boucles (si vraiment necessaires) et .loc pour tout le reste.chunksize de read_csv() pour lire par morceaux : for chunk in pd.read_csv('big.csv', chunksize=100000): process(chunk). Mais preferez DuckDB ou Polars pour ce volume, qui gerent nativement les donnees plus grandes que la RAM.Quiz
Quel est le resultat de df.loc[1:3] si l'index est [0, 1, 2, 3, 4] ?
Comment appliquer une transformation conditionnelle sur 10M lignes le plus rapidement ?
Quel parametre de read_csv() est critique pour eviter de charger 10 GB en memoire ?
Apres 10 ans de pandas, voici mon conseil : maitrisez .loc, les operations vectorisees et read_csv avec ses parametres. Ces trois competences couvrent 80% du travail quotidien. Et surtout, prenez l'habitude de TOUJOURS regarder df.info(memory_usage='deep') avant de travailler - ca vous evitera beaucoup de surprises de memoire.
pandas Avance - groupby, merge, pivot
Les operations avancees de pandas - groupby, merge, pivot_table et les window functions - sont les briques essentielles de toute analyse de donnees serieuse. Cette lecon approfondit ces concepts avec des cas reels, des optimisations memoire, et les pieges a eviter en production.
Objectifs d'apprentissage
- Maitriser groupby avec des aggregations complexes
- Joindre des DataFrames avec merge et les differents types de jointures
- Creer des tableaux croises avec pivot_table
- Utiliser les window functions pandas (rolling, expanding, shift)
- Optimiser la memoire (dtypes, categories, downcasting)
- Eviter les anti-patterns apply() et les copies inutiles
groupby - Split-Apply-Combine
Le paradigme groupby est au coeur de l'analyse de donnees. Il decoupe les donnees en groupes, applique une fonction a chaque groupe, puis recombine les resultats. Comprendre ce mecanisme est fondamental.
DataFrame original SPLIT APPLY COMBINE
ββββββββ¬βββββββ¬ββββββββ
βregionβproduitβmontantβ ββββββββββββββββ
β Nord β A β 100 ββββΆβ Nord: β sum() ββββββββ¬ββββββββ
β Nord β B β 200 β β 100,200,150 ββββββββΆ 450 βββΆβ Nord β 450 β
β Sud β A β 300 β ββββββββββββββββ β Sud β 550 β
β Sud β B β 250 β ββββββββββββββββ β Est β 180 β
β Est β A β 180 ββββΆβ Sud: β sum() ββββββββ΄ββββββββ
β Nord β A β 150 β β 300, 250 ββββββββΆ 550
β Sud β A β 300 β ββββββββββββββββ
ββββββββ΄βββββββ΄ββββββββ ββββββββββββββββ
βββΆβ Est: β sum()
β 180 ββββββββΆ 180
ββββββββββββββββ
import pandas as pd
import numpy as np
# Donnees exemple : ventes e-commerce
df = pd.DataFrame({
'date': pd.date_range('2024-01-01', periods=1000, freq='h'),
'region': np.random.choice(['IDF', 'PACA', 'ARA', 'NAQ'], 1000),
'categorie': np.random.choice(['Electronique', 'Mode', 'Maison'], 1000),
'montant': np.random.uniform(10, 500, 1000).round(2),
'client_id': np.random.randint(1, 200, 1000),
'quantite': np.random.randint(1, 10, 1000)
})
# ===== AGGREGATION SIMPLE =====
# Chiffre d'affaires par region
ca_region = df.groupby('region')['montant'].sum()
# Plusieurs metriques a la fois
stats = df.groupby('region')['montant'].agg(['sum', 'mean', 'median', 'std', 'count'])
# ===== AGGREGATION MULTIPLE (named agg) =====
rapport = df.groupby('region').agg(
ca_total=('montant', 'sum'),
panier_moyen=('montant', 'mean'),
nb_commandes=('montant', 'count'),
clients_uniques=('client_id', 'nunique'),
montant_max=('montant', 'max'),
quantite_totale=('quantite', 'sum')
).round(2)
# ===== GROUPBY MULTI-NIVEAUX =====
croise = df.groupby(['region', 'categorie']).agg(
ca=('montant', 'sum'),
nb=('montant', 'count')
).reset_index() # Important pour avoir un DataFrame plat
# ===== AGGREGATION AVEC FONCTIONS CUSTOM =====
def coefficient_variation(x):
return x.std() / x.mean() * 100
volatilite = df.groupby('region')['montant'].agg(
moyenne='mean',
ecart_type='std',
cv=coefficient_variation
)
# ===== TRANSFORM (ajouter le resultat au DataFrame original) =====
# Ajouter le CA total de la region a chaque ligne
df['ca_region'] = df.groupby('region')['montant'].transform('sum')
# Pourcentage du CA regional
df['pct_region'] = df['montant'] / df.groupby('region')['montant'].transform('sum') * 100
# Z-score par region (normalisation)
df['zscore'] = df.groupby('region')['montant'].transform(
lambda x: (x - x.mean()) / x.std()
)
# ===== FILTER (garder/exclure des groupes entiers) =====
# Regions avec plus de 200 commandes
regions_actives = df.groupby('region').filter(lambda g: len(g) > 200)
# Clients avec un panier moyen > 200
gros_clients = df.groupby('client_id').filter(
lambda g: g['montant'].mean() > 200
)
merge - Jointures de DataFrames
La jointure de DataFrames est l'equivalent du JOIN SQL. pandas offre merge() qui supporte les 4 types de jointures classiques.
# DataFrames a joindre
clients = pd.DataFrame({
'client_id': [1, 2, 3, 4, 5],
'nom': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'ville': ['Paris', 'Lyon', 'Paris', 'Marseille', 'Lyon'],
'segment': ['Gold', 'Silver', 'Gold', 'Bronze', 'Silver']
})
commandes = pd.DataFrame({
'order_id': [101, 102, 103, 104, 105, 106],
'client_id': [1, 2, 1, 3, 6, 2], # client 6 n'existe pas!
'montant': [150, 200, 75, 300, 50, 120],
'date': pd.date_range('2024-01-01', periods=6, freq='W')
})
# INNER JOIN (par defaut) - uniquement les correspondances
inner = pd.merge(commandes, clients, on='client_id', how='inner')
# 5 lignes (client_id=6 exclu car pas dans clients)
# LEFT JOIN - tout le DataFrame de gauche
left = pd.merge(commandes, clients, on='client_id', how='left')
# 6 lignes, client_id=6 avec NaN pour nom/ville/segment
# RIGHT JOIN - tout le DataFrame de droite
right = pd.merge(commandes, clients, on='client_id', how='right')
# inclut Diana et Eve (pas de commandes) avec NaN pour order_id/montant
# FULL OUTER JOIN - tout des deux cotes
outer = pd.merge(commandes, clients, on='client_id', how='outer')
# Inclut client_id=6 ET Diana/Eve
# Jointure sur colonnes differentes
pd.merge(df1, df2, left_on='id_client', right_on='customer_id')
# Jointure sur plusieurs colonnes
pd.merge(df1, df2, on=['annee', 'mois', 'region'])
# Verifier les doublons de jointure (indicator)
result = pd.merge(commandes, clients, on='client_id', how='outer', indicator=True)
print(result['_merge'].value_counts())
# both 5 (dans les deux)
# left_only 1 (client_id=6, uniquement dans commandes)
# right_only 2 (Diana, Eve, uniquement dans clients)
INNER JOIN LEFT JOIN RIGHT JOIN OUTER JOIN βββββ¬ββββ βββββ¬ββββ βββββ¬ββββ βββββ¬ββββ β A β B β β A β B β β A β B β β A β B β β ββΌβ β ββββββΌβ β β ββΌββββ ββββββΌββββ β βββ β βββββββ β β ββββββ ββββββββββ β ββΌβ β ββββββΌβ β β ββΌββββ ββββββΌββββ βββββ΄ββββ βββββ΄ββββ βββββ΄ββββ βββββ΄ββββ Intersection Tout A + Tout B + Tout A + uniquement match B match A Tout B
Piege : Explosion de lignes lors de merge
Si les cles de jointure ne sont pas uniques dans un des DataFrames, merge produit un produit cartesien : 3 lignes x 4 lignes = 12 lignes! Verifiez toujours avec df['cle'].duplicated().sum() avant un merge. Utilisez validate='one_to_many' pour detecter les problemes.
pivot_table - Tableaux croises dynamiques
pivot_table est l'equivalent Python des tableaux croises dynamiques Excel. C'est un outil puissant pour resumer des donnees multidimensionnelles.
# Pivot table simple : CA par region et categorie
pivot = df.pivot_table(
values='montant',
index='region', # Lignes
columns='categorie', # Colonnes
aggfunc='sum', # Fonction d'agregation
fill_value=0, # Remplacer NaN par 0
margins=True, # Ajouter les totaux
margins_name='Total'
)
# Pivot avec plusieurs metriques
pivot_multi = df.pivot_table(
values=['montant', 'quantite'],
index='region',
columns='categorie',
aggfunc={
'montant': ['sum', 'mean'],
'quantite': 'sum'
}
)
# Pivot temporel (mensuel par region)
df['mois'] = df['date'].dt.to_period('M')
pivot_time = df.pivot_table(
values='montant',
index='mois',
columns='region',
aggfunc='sum'
)
# Crosstab : frequences croisees (raccourci)
freq = pd.crosstab(
df['region'],
df['categorie'],
normalize='index' # Pourcentages par ligne
).round(3) * 100
Window Functions - rolling, expanding, shift
Les window functions pandas sont l'equivalent des fonctions fenetres SQL. Elles calculent des metriques sur des fenetres glissantes ou cumulatives.
# Donnees temporelles triees
df_ts = df.sort_values('date').set_index('date')
# ===== ROLLING (fenetre glissante) =====
# Moyenne mobile sur 7 jours
df_ts['mm7'] = df_ts['montant'].rolling(window=7).mean()
# Moyenne mobile ponderee
df_ts['mm_exp'] = df_ts['montant'].ewm(span=7).mean()
# Rolling par groupe
df['mm7_region'] = (df.sort_values('date')
.groupby('region')['montant']
.rolling(7).mean()
.reset_index(level=0, drop=True))
# ===== EXPANDING (cumul depuis le debut) =====
df_ts['cumul'] = df_ts['montant'].expanding().sum()
df_ts['max_historique'] = df_ts['montant'].expanding().max()
# ===== SHIFT (decalage temporel) =====
df_ts['montant_precedent'] = df_ts['montant'].shift(1) # Jour precedent
df_ts['montant_suivant'] = df_ts['montant'].shift(-1) # Jour suivant
df_ts['variation'] = df_ts['montant'].pct_change() # % variation
# ===== RANK (classement) =====
df['rank_montant'] = df.groupby('region')['montant'].rank(
method='dense', ascending=False
)
# Top 3 par region
top3 = df[df['rank_montant'] <= 3].sort_values(
['region', 'rank_montant']
)
Optimisation memoire
Sur des DataFrames volumineux, l'optimisation memoire peut reduire l'empreinte de 50 a 90%. C'est la difference entre un traitement qui passe en memoire et un crash OOM.
def optimize_dataframe(df, verbose=True):
"""Reduit l'empreinte memoire d'un DataFrame."""
start_mem = df.memory_usage(deep=True).sum() / 1024**2
for col in df.columns:
col_type = df[col].dtype
if col_type == 'object':
# Convertir en category si peu de valeurs uniques
ratio = df[col].nunique() / len(df)
if ratio < 0.5: # Moins de 50% de valeurs uniques
df[col] = df[col].astype('category')
elif col_type in ['int64', 'int32']:
# Downcast entiers
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0:
if col_max < 255:
df[col] = df[col].astype(np.uint8)
elif col_max < 65535:
df[col] = df[col].astype(np.uint16)
elif col_max < 4294967295:
df[col] = df[col].astype(np.uint32)
else:
if col_min > -128 and col_max < 127:
df[col] = df[col].astype(np.int8)
elif col_min > -32768 and col_max < 32767:
df[col] = df[col].astype(np.int16)
elif col_min > -2147483648 and col_max < 2147483647:
df[col] = df[col].astype(np.int32)
elif col_type == 'float64':
df[col] = pd.to_numeric(df[col], downcast='float')
end_mem = df.memory_usage(deep=True).sum() / 1024**2
if verbose:
print(f"Memoire: {start_mem:.1f} MB -> {end_mem:.1f} MB "
f"(-{(1-end_mem/start_mem)*100:.0f}%)")
return df
# Exemple : 1M lignes
# Avant: 450 MB -> Apres: 85 MB (-81%)
df = optimize_dataframe(df)
Le type Category : votre meilleur allie memoire
Une colonne object (string) avec 10 valeurs uniques sur 1M lignes stocke 1M de pointeurs vers des strings. En category, elle stocke 10 strings + 1M d'entiers de mapping. Gain : 70-95% de memoire. Bonus : les groupby sur les categories sont 2-5x plus rapides.
apply() avec lambda vs operations vectorisees
Le reflexe df.apply(lambda x: ...) est tentant mais presque toujours sous-optimal. Chaque appel apply() boucle en Python sur chaque ligne, annulant les benefices de NumPy.
# Scenario : calculer un prix TTC avec remise conditionnelle
# MAUVAIS : apply + lambda (~12 sec sur 5M lignes)
df['ttc'] = df.apply(
lambda row: row['prix'] * 1.2 * (0.9 if row['premium'] else 1.0),
axis=1
)
# BON : vectorise (~0.1 sec sur 5M lignes, 120x plus rapide)
remise = np.where(df['premium'], 0.9, 1.0)
df['ttc'] = df['prix'] * 1.2 * remise
# Scenario : extraire le domaine d'un email
# MAUVAIS : apply
df['domain'] = df['email'].apply(lambda x: x.split('@')[1])
# BON : str accessor vectorise
df['domain'] = df['email'].str.split('@').str[1]
# Scenario : concat de colonnes
# MAUVAIS : apply
df['full'] = df.apply(lambda r: f"{r['prenom']} {r['nom']}", axis=1)
# BON : operations string vectorisees
df['full'] = df['prenom'] + ' ' + df['nom']
Regle : avant d'ecrire apply(), demandez-vous s'il existe une methode vectorisee equivalente. Dans 90% des cas, la reponse est oui.
Pandas en production : les pieges chez Instacart
Instacart, le service de livraison de courses, a partage ses retours d'experience sur l'utilisation de pandas en production :
- Probleme 1 - Memoire : un pipeline pandas consommait 64 GB de RAM pour traiter 20 GB de donnees. Cause : copies implicites lors des chained operations. Solution :
inplace=Truela ou c'est possible, et surtout restructurer en etapes avec liberation memoire (del df_temp). - Probleme 2 - Reproductibilite : les resultats de
groupby().apply()variaient selon l'ordre des donnees. Cause : la fonction custom n'etait pas deterministe. Solution : toujours trier avant groupby (sort_values). - Probleme 3 - Performance : un rapport journalier prenait 45 minutes. L'equipe a profile et decouvert que 80% du temps etait dans 3 lignes d'
apply(). Apres vectorisation : 3 minutes. - Lecon : pandas est excellent pour le prototypage, mais chaque pipeline en production doit etre profile avec
%%timeitet optimise avant deploiement. Instacart a fini par migrer les pipelines critiques vers PySpark tout en gardant pandas pour l'exploration.
Scenario : Rapport de ventes mensuel automatise
Vous devez creer un rapport automatise qui, a partir des donnees brutes de commandes, produit un tableau croise des ventes par region et categorie, avec les variations mois par mois.
def generer_rapport_mensuel(df_commandes):
"""Genere un rapport de ventes mensuel."""
# 1. Preparation
df = df_commandes.copy()
df['mois'] = df['date'].dt.to_period('M')
# 2. Tableau croise : CA par region et mois
pivot_ca = df.pivot_table(
values='montant', index='region',
columns='mois', aggfunc='sum', fill_value=0
)
# 3. Variation mensuelle (%)
variation = pivot_ca.pct_change(axis=1) * 100
# 4. Top 5 clients par region
top_clients = (df.groupby(['region', 'client_id'])
.agg(ca=('montant', 'sum'), nb_commandes=('montant', 'count'))
.reset_index()
.sort_values('ca', ascending=False)
.groupby('region').head(5))
# 5. KPIs globaux
kpis = {
'ca_total': df['montant'].sum(),
'panier_moyen': df['montant'].mean(),
'nb_commandes': len(df),
'clients_actifs': df['client_id'].nunique(),
'meilleure_region': df.groupby('region')['montant'].sum().idxmax()
}
return pivot_ca, variation, top_clients, kpis
Flashcards
agg() retourne un DataFrame reduit (une ligne par groupe). transform() retourne un DataFrame de meme taille que l'original, en "broadcast" le resultat du groupe sur chaque ligne. Utilisez transform() pour ajouter des statistiques de groupe a chaque ligne (ex: % du total du groupe).validate : pd.merge(df1, df2, on='key', validate='one_to_many'). Options : 'one_to_one', 'one_to_many', 'many_to_one', 'many_to_many'. Si la contrainte est violee, pandas leve une MergeError. Verifiez aussi len(result) vs len(df1) apres le merge.category : df['col'] = df['col'].astype('category'). Cela reduit l'empreinte memoire de 70-95% et accelere les groupby de 2-5x. Ideal pour les colonnes comme 'pays', 'statut', 'categorie' avec un nombre fixe de valeurs.Quiz
Que retourne df.groupby('region')['montant'].transform('mean') ?
Un merge entre df1 (100K lignes) et df2 (50K lignes) produit 5M lignes. Que s'est-il passe ?
Le trio groupby/merge/pivot_table est l'equivalent pandas du SQL analytique. Si vous savez ecrire un GROUP BY avec HAVING, un JOIN avec des conditions, et un tableau croise en SQL, vous saurez faire la meme chose en pandas. Mon conseil : pensez toujours en termes de colonnes (vectorise), jamais en termes de lignes (boucle). C'est le changement de paradigme fondamental de pandas.
SQLAlchemy - Connexion & ORM
SQLAlchemy est la librairie de reference pour connecter Python aux bases de donnees relationnelles. Elle offre deux niveaux : le Core (SQL generatif type-safe) et l'ORM (mapping objet-relationnel). Pour un Data Architect, comprendre ces deux couches et savoir quand utiliser l'une ou l'autre est essentiel.
Objectifs d'apprentissage
- Configurer un Engine et comprendre le connection pooling
- Ecrire des requetes avec le Core (SQL generatif)
- Modeliser des tables avec l'ORM
- Integrer SQLAlchemy avec pandas
- Eviter l'anti-pattern N+1 queries
- Choisir entre raw SQL, Core, ORM et pandas read_sql
Engine et Connection Pooling
L'Engine est le point d'entree de SQLAlchemy. Il gere la connexion a la base et, surtout, le pool de connexions qui evite d'ouvrir/fermer une connexion a chaque requete.
Votre code Python
β
βΌ
ββββββββββββββββ
β Engine βββ URL de connexion: "postgresql://user:pass@host:5432/db"
β β
β ββββββββββ β Pool de connexions (par defaut: 5 connexions)
β β Pool β β βββββββ βββββββ βββββββ βββββββ βββββββ
β β βββββββΆβconn1β βconn2β βconn3β βconn4β βconn5β
β ββββββββββ β ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬βββ ββββ¬βββ
ββββββββββββββββ β β β β β
β βββββββββ΄ββββββββ΄ββββββββ΄ββββββββ
βΌ β
ββββββββββββ βΌ
β Core β ββββββββββββββββββββ
β (SQL β β Base de donnees β
β generatifβ β (PostgreSQL, β
ββββββββββββ β MySQL, etc.) β
β ββββββββββββββββββββ
βΌ
ββββββββββββ
β ORM β
β (objets β
β Python) β
ββββββββββββ
from sqlalchemy import create_engine
# ===== CREATION D'ENGINE =====
# PostgreSQL
engine = create_engine(
"postgresql+psycopg2://user:password@localhost:5432/ecommerce",
pool_size=10, # Connexions permanentes
max_overflow=20, # Connexions supplementaires en pic
pool_timeout=30, # Timeout si pas de connexion dispo (sec)
pool_recycle=3600, # Recycler apres 1h (evite timeout serveur)
echo=False # True pour voir le SQL genere (debug)
)
# MySQL
mysql_engine = create_engine(
"mysql+pymysql://user:pass@host:3306/db?charset=utf8mb4"
)
# SQLite (fichier local, ideal pour tests)
sqlite_engine = create_engine("sqlite:///local_database.db")
# DuckDB (via SQLAlchemy!)
duckdb_engine = create_engine("duckdb:///analytics.duckdb")
# ===== UTILISATION BASIQUE =====
# Context manager (recommande - ferme automatiquement la connexion)
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM clients LIMIT 10"))
for row in result:
print(row.nom, row.email)
# Transaction explicite
with engine.begin() as conn: # Auto-commit a la fin, rollback si exception
conn.execute(text("UPDATE clients SET actif = false WHERE ltv = 0"))
conn.execute(text("DELETE FROM sessions WHERE expire < NOW()"))
Pourquoi le Connection Pooling est critique
Ouvrir une connexion PostgreSQL prend 50-100ms (negotiation TCP, authentification, allocation memoire). Dans une API web traitant 1000 req/sec, cela ajouterait 50-100 secondes de latence par seconde! Le pool pre-ouvre N connexions et les reutilise, reduisant la latence a ~0.1ms par requete.
SQLAlchemy Core - SQL generatif
Le Core permet de construire des requetes SQL de maniere programmatique et type-safe, sans ecrire de SQL brut. C'est un compromis excellent entre le raw SQL et l'ORM complet.
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime
from sqlalchemy import select, insert, update, delete, func, and_, or_
# Reflexion : charger les tables existantes
metadata = MetaData()
metadata.reflect(bind=engine)
clients = metadata.tables['clients']
commandes = metadata.tables['commandes']
# SELECT avec conditions
stmt = (
select(clients.c.nom, clients.c.email, clients.c.ville)
.where(and_(
clients.c.actif == True,
clients.c.ltv > 1000
))
.order_by(clients.c.ltv.desc())
.limit(50)
)
with engine.connect() as conn:
result = conn.execute(stmt)
rows = result.fetchall()
# JOIN + GROUP BY
stmt = (
select(
clients.c.ville,
func.count(commandes.c.id).label('nb_commandes'),
func.sum(commandes.c.montant).label('ca_total'),
func.avg(commandes.c.montant).label('panier_moyen')
)
.join(commandes, clients.c.id == commandes.c.client_id)
.group_by(clients.c.ville)
.having(func.count(commandes.c.id) > 10)
.order_by(func.sum(commandes.c.montant).desc())
)
# INSERT
stmt = insert(clients).values(
nom='Jean Dupont',
email='jean@example.com',
ville='Paris',
actif=True
)
# Bulk INSERT (beaucoup plus rapide)
with engine.begin() as conn:
conn.execute(
insert(clients),
[
{'nom': 'Alice', 'email': 'alice@ex.com', 'ville': 'Lyon'},
{'nom': 'Bob', 'email': 'bob@ex.com', 'ville': 'Paris'},
# ... des milliers de lignes
]
)
SQLAlchemy ORM - Mapping objet-relationnel
L'ORM mappe les tables SQL sur des classes Python. Chaque ligne devient un objet, chaque colonne un attribut. C'est puissant mais avec des pieges de performance.
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy import ForeignKey
from datetime import datetime
from typing import List, Optional
# Base declarative (SQLAlchemy 2.0+)
class Base(DeclarativeBase):
pass
class Client(Base):
__tablename__ = 'clients'
id: Mapped[int] = mapped_column(primary_key=True)
nom: Mapped[str] = mapped_column(String(100))
email: Mapped[str] = mapped_column(String(255), unique=True)
ville: Mapped[Optional[str]] = mapped_column(String(100))
date_inscription: Mapped[datetime] = mapped_column(default=datetime.utcnow)
actif: Mapped[bool] = mapped_column(default=True)
# Relation : un client a plusieurs commandes
commandes: Mapped[List["Commande"]] = relationship(back_populates="client")
def __repr__(self):
return f"Client(id={self.id}, nom='{self.nom}')"
class Commande(Base):
__tablename__ = 'commandes'
id: Mapped[int] = mapped_column(primary_key=True)
client_id: Mapped[int] = mapped_column(ForeignKey('clients.id'))
montant: Mapped[float] = mapped_column(Float)
date: Mapped[datetime] = mapped_column(default=datetime.utcnow)
statut: Mapped[str] = mapped_column(String(20), default='pending')
# Relation inverse
client: Mapped["Client"] = relationship(back_populates="commandes")
# Creation des tables
Base.metadata.create_all(engine)
# Session pour interagir
SessionLocal = sessionmaker(bind=engine)
# ===== OPERATIONS CRUD =====
with SessionLocal() as session:
# CREATE
nouveau_client = Client(nom="Marie Martin", email="marie@ex.com", ville="Paris")
session.add(nouveau_client)
session.commit()
# READ
client = session.query(Client).filter_by(email="marie@ex.com").first()
print(client.nom) # Marie Martin
# UPDATE
client.ville = "Lyon"
session.commit()
# Requetes avancees
clients_paris = (
session.query(Client)
.filter(Client.ville == "Paris", Client.actif == True)
.order_by(Client.nom)
.limit(20)
.all()
)
N+1 Queries avec l'ORM
Le probleme N+1 est le piege le plus dangereux de tout ORM. Il se produit quand vous accedez a une relation dans une boucle, generant une requete SQL par element.
# MAUVAIS : N+1 queries (1 + 100 = 101 requetes SQL!)
clients = session.query(Client).limit(100).all() # 1 requete
for client in clients:
print(f"{client.nom}: {len(client.commandes)} commandes")
# ^ Chaque acces a client.commandes declenche 1 requete!
# Total: 1 (clients) + 100 (commandes par client) = 101 requetes
# BON : Eager loading avec joinedload (1 seule requete avec JOIN)
from sqlalchemy.orm import joinedload
clients = (
session.query(Client)
.options(joinedload(Client.commandes)) # JOIN en une seule requete
.limit(100)
.all()
)
for client in clients:
print(f"{client.nom}: {len(client.commandes)} commandes")
# Aucune requete supplementaire!
# ALTERNATIF : subqueryload (2 requetes, mieux pour de gros resultats)
from sqlalchemy.orm import subqueryload
clients = (
session.query(Client)
.options(subqueryload(Client.commandes))
.limit(100)
.all()
)
N+1 (MAUVAIS) Eager Loading (BON)
βββββββββββββ βββββββββββββββββββ
Query 1: SELECT * FROM clients Query 1: SELECT clients.*, commandes.*
Query 2: SELECT * FROM commandes FROM clients
WHERE client_id = 1 LEFT JOIN commandes
Query 3: SELECT * FROM commandes ON clients.id = commandes.client_id
WHERE client_id = 2
Query 4: SELECT * FROM commandes Total: 1 requete, ~50ms
WHERE client_id = 3
...
Query 101: SELECT * FROM commandes
WHERE client_id = 100
Total: 101 requetes, ~2000ms
Integration avec pandas
L'integration SQLAlchemy + pandas est le workflow le plus courant pour les Data Architects : lire depuis la base, transformer en Python, ecrire le resultat.
import pandas as pd
from sqlalchemy import create_engine, text
engine = create_engine("postgresql://user:pass@localhost/ecommerce")
# ===== LIRE DEPUIS LA BASE =====
# Methode 1 : SQL brut (le plus courant et flexible)
df = pd.read_sql("""
SELECT c.nom, c.ville, SUM(co.montant) as ca_total
FROM clients c
JOIN commandes co ON c.id = co.client_id
WHERE co.date >= '2024-01-01'
GROUP BY c.nom, c.ville
HAVING SUM(co.montant) > 1000
ORDER BY ca_total DESC
""", engine)
# Methode 2 : avec parametres (securise contre SQL injection!)
df = pd.read_sql(
text("SELECT * FROM clients WHERE ville = :ville AND actif = :actif"),
engine,
params={"ville": "Paris", "actif": True}
)
# Methode 3 : lire une table entiere
df = pd.read_sql_table("clients", engine, columns=["nom", "email", "ville"])
# ===== ECRIRE DANS LA BASE =====
# Ecriture simple
df_clean.to_sql(
"clients_clean",
engine,
if_exists="replace", # 'fail', 'replace', 'append'
index=False,
dtype={ # Types SQL explicites
'nom': String(100),
'montant': Float,
'date': DateTime
},
chunksize=10000, # Insertion par lots (memoire)
method='multi' # INSERT multi-valeurs (plus rapide)
)
# Ecriture performante avec COPY (PostgreSQL)
from io import StringIO
def to_sql_copy(df, table_name, engine):
"""Utilise COPY pour un insert ultra-rapide (PostgreSQL)."""
buffer = StringIO()
df.to_csv(buffer, index=False, header=False, sep='\t')
buffer.seek(0)
with engine.raw_connection() as conn:
with conn.cursor() as cur:
cur.copy_from(buffer, table_name, sep='\t', null='')
conn.commit()
# 10x plus rapide que to_sql() pour les gros volumes
to_sql_copy(df_million_rows, 'staging_table', engine)
Raw SQL (pd.read_sql)
- Controle total sur la requete
- Performance maximale
- Pas de couche d'abstraction
- Ideal pour : analytique, rapports
- Risque : SQL injection si mal parametrise
ORM (session.query)
- Type-safe et refactorable
- Relations automatiques
- Overhead de mapping (plus lent)
- Ideal pour : applications web, CRUD
- Risque : N+1 queries, over-fetching
Scenario : Architecture de connexion multi-bases
Vous etes Data Architect dans une entreprise avec 3 bases de donnees : PostgreSQL (transactionnel), MySQL (legacy CRM), et DuckDB (analytique). Concevez le module de connexion.
from sqlalchemy import create_engine
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Gestionnaire centralise de connexions multi-bases."""
def __init__(self, config: dict):
self.engines = {}
for name, url in config.items():
self.engines[name] = create_engine(
url,
pool_size=5,
max_overflow=10,
pool_recycle=3600,
pool_pre_ping=True # Verifie que la connexion est active
)
logger.info(f"Engine '{name}' configure")
@contextmanager
def get_connection(self, db_name: str):
"""Context manager pour obtenir une connexion."""
if db_name not in self.engines:
raise ValueError(f"Base '{db_name}' non configuree")
conn = self.engines[db_name].connect()
try:
yield conn
finally:
conn.close()
def read_sql(self, query: str, db_name: str, **kwargs):
"""Lire depuis n'importe quelle base en DataFrame."""
import pandas as pd
return pd.read_sql(query, self.engines[db_name], **kwargs)
def dispose_all(self):
"""Fermer tous les pools de connexions."""
for name, engine in self.engines.items():
engine.dispose()
logger.info(f"Pool '{name}' ferme")
# Utilisation
db = DatabaseManager({
'transactional': 'postgresql://user:pass@pg-host:5432/app',
'crm': 'mysql+pymysql://user:pass@mysql-host:3306/crm',
'analytics': 'duckdb:///analytics.duckdb'
})
# Lire depuis le CRM, transformer, ecrire dans l'analytique
df_clients = db.read_sql("SELECT * FROM contacts WHERE actif=1", 'crm')
df_commandes = db.read_sql("SELECT * FROM orders WHERE status='paid'", 'transactional')
# ... transformation pandas ...
Flashcards
connect() ouvre une connexion en mode autocommit (chaque requete est commitee individuellement). begin() ouvre une connexion avec une transaction explicite : tout est commite a la fin du bloc with, ou rollback en cas d'exception. Utilisez begin() pour les operations d'ecriture qui doivent etre atomiques.pool_pre_ping envoie un SELECT 1 avant de reutiliser une connexion du pool. Cela detecte les connexions "mortes" (timeout serveur, redemarrage). Sans ce parametre, votre code peut recevoir une erreur "connection closed" sur une connexion qui a ete deconnectee cote serveur pendant qu'elle dormait dans le pool.method='multi' genere un seul INSERT avec plusieurs VALUES au lieu d'un INSERT par ligne. C'est 5-10x plus rapide pour les insertions en masse. Attention : certaines bases limitent le nombre de parametres par requete (ex: SQLite a 999). Combinez avec chunksize pour gerer cette limite.Quiz
Qu'est-ce que le probleme N+1 en ORM ?
Quelle methode d'ecriture pandas est la plus rapide pour inserer 1M de lignes dans PostgreSQL ?
En tant que Data Architect, vous n'ecrirez probablement pas beaucoup d'ORM vous-meme - c'est davantage le terrain des developpeurs backend. Mais vous devez comprendre les implications : N+1 queries, connection pooling, et surtout, l'importance d'utiliser pd.read_sql() avec des requetes SQL optimisees plutot que de charger des tables entieres. Le Data Architect est le gardien de la performance d'acces aux donnees.
PySpark Fondamentaux
Apache Spark est le moteur de calcul distribue dominant pour le Big Data. PySpark, son interface Python, permet de traiter des petaoctets de donnees sur des clusters de centaines de machines. Cette lecon couvre l'architecture Spark, l'API DataFrame, les transformations vs actions, et les techniques d'optimisation essentielles.
Objectifs d'apprentissage
- Comprendre l'architecture Spark (Driver, Executors, Tasks)
- Creer une SparkSession et lire des donnees
- Maitriser l'API DataFrame PySpark
- Distinguer transformations (lazy) et actions (eager)
- Utiliser l'API SQL de Spark
- Optimiser un job Spark (partitioning, caching, broadcast)
Architecture Apache Spark
Comprendre l'architecture de Spark est fondamental pour ecrire du code performant. Spark suit un modele master-worker avec une evaluation paresseuse (lazy).
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β SPARK APPLICATION β β β β ββββββββββββββββββββ ββββββββββββββββββββββββ β β β DRIVER β β CLUSTER MANAGER β β β β ββββββββββΆβ (YARN / K8s / β β β β - SparkContext β β Standalone) β β β β - DAG Scheduler β ββββββββββββ¬ββββββββββββ β β β - Task Scheduler β β β β β - UI (port 4040) β β Alloue β β ββββββββββββββββββββ β ressources β β β β β β β Envoie les tasks βΌ β β β β β ββββββββββΌβββββββββββββββββββββββββββββββββββββββββββ β β β βΌ EXECUTORS β β β β β β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β β βExecutor 1β βExecutor 2β βExecutor 3β ... β β β β β β β β β β β β β β β ββββββββ β β ββββββββ β β ββββββββ β β β β β β βTask 1β β β βTask 3β β β βTask 5β β β β β β β βTask 2β β β βTask 4β β β βTask 6β β β β β β β ββββββββ β β ββββββββ β β ββββββββ β β β β β β [Cache] β β [Cache] β β [Cache] β β β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββ β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ FLUX D'EXECUTION : 1. Driver recoit le code PySpark 2. Construit un DAG (Directed Acyclic Graph) des operations 3. Optimise le DAG (Catalyst optimizer) 4. Decoupe en stages et tasks 5. Envoie les tasks aux executors 6. Executors traitent les partitions en parallele 7. Resultats renvoyes au Driver
Concepts cles de Spark
Partition : un bloc de donnees (typiquement 128 MB). Un DataFrame de 10 GB = ~80 partitions traitees en parallele. Stage : un ensemble de tasks qui peuvent s'executer sans shuffle (redistribution des donnees). Shuffle : redistribution des donnees entre executors (GROUP BY, JOIN) - l'operation la plus couteuse. DAG : graphe de toutes les operations, optimise avant execution.
SparkSession et lecture de donnees
from pyspark.sql import SparkSession
# Creation de la SparkSession (point d'entree unique)
spark = SparkSession.builder \
.appName("DataArchitectTraining") \
.master("local[*]") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.parquet.compression.codec", "snappy") \
.getOrCreate()
# ===== LECTURE DE DONNEES =====
# Parquet (format recommande pour Spark)
df = spark.read.parquet("s3://datalake/ventes/year=2024/")
# CSV
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("sep", ";") \
.csv("hdfs:///data/exports/ventes.csv")
# CSV avec schema explicite (recommande - evite inferSchema qui est lent)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, \
FloatType, TimestampType
schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", IntegerType(), True),
StructField("amount", FloatType(), True),
StructField("order_date", TimestampType(), True),
StructField("region", StringType(), True),
StructField("category", StringType(), True)
])
df_typed = spark.read.schema(schema).csv("data/ventes.csv", header=True)
# JSON
df_json = spark.read.json("s3://bucket/events/*.json")
# Delta Lake
df_delta = spark.read.format("delta").load("s3://datalake/delta/clients")
# Base de donnees via JDBC
df_pg = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/db") \
.option("dbtable", "clients") \
.option("user", "user") \
.option("password", "pass") \
.option("numPartitions", 10) \
.option("partitionColumn", "id") \
.option("lowerBound", 1) \
.option("upperBound", 1000000) \
.load()
Transformations vs Actions
La distinction entre transformations et actions est LE concept fondamental de Spark. Les transformations sont lazy (rien ne s'execute), les actions sont eager (declenchent l'execution).
TRANSFORMATIONS (Lazy) ACTIONS (Eager)
Construisent le plan Declenchent l'execution
ββββββββββββββββββββ βββββββββββββββββββββ
.filter() .select() .show() .count()
.groupBy() .join() .collect() .take(n)
.orderBy() .withColumn() .write() .first()
.distinct() .drop() .toPandas() .foreach()
.union() .repartition() .head() .reduce()
Code: Execution:
df2 = df.filter(...) ββ df2.show() ββββββΆ Execute tout
df3 = df2.groupBy() ββ€ Rien df3.count() ββββββΆ Execute tout
df4 = df3.orderBy() ββ ne se df4.write() ββββββΆ Execute tout
execute!
from pyspark.sql import functions as F
# ===== TRANSFORMATIONS (rien ne s'execute encore!) =====
df_filtered = df.filter(F.col("amount") > 100)
df_selected = df_filtered.select("customer_id", "amount", "region")
df_grouped = df_selected.groupBy("region").agg(
F.sum("amount").alias("total_sales"),
F.count("*").alias("order_count"),
F.avg("amount").alias("avg_amount"),
F.countDistinct("customer_id").alias("unique_customers")
)
df_sorted = df_grouped.orderBy(F.desc("total_sales"))
# A ce stade, RIEN n'a ete execute! Spark a juste construit le plan.
# Vous pouvez verifier le plan :
df_sorted.explain(True) # Affiche le plan logique et physique
# ===== ACTIONS (declenchent l'execution!) =====
df_sorted.show(20) # Affiche les 20 premieres lignes
total = df.count() # Compte les lignes
result = df_sorted.collect() # Ramene TOUT au Driver (attention!)
first_row = df_sorted.first() # Premiere ligne
pandas_df = df_sorted.toPandas() # Convertit en pandas (attention!)
# ===== API DataFrame COMPLETE =====
result = (
df
# Filtrage
.filter(F.col("order_date") >= "2024-01-01")
.filter(F.col("status").isin(["completed", "shipped"]))
# Nouvelles colonnes
.withColumn("year", F.year("order_date"))
.withColumn("month", F.month("order_date"))
.withColumn("amount_eur", F.col("amount") * F.lit(0.92))
.withColumn("is_premium", F.when(F.col("amount") > 500, True).otherwise(False))
# Colonnes string
.withColumn("region_upper", F.upper(F.col("region")))
.withColumn("email_domain", F.regexp_extract("email", r"@(.+)$", 1))
# Aggregation
.groupBy("year", "month", "region")
.agg(
F.sum("amount_eur").alias("revenue"),
F.count("*").alias("orders"),
F.avg("amount_eur").alias("avg_order"),
F.percentile_approx("amount_eur", 0.95).alias("p95_amount")
)
# Tri
.orderBy("year", "month", F.desc("revenue"))
)
API SQL de Spark
Spark permet d'ecrire des requetes SQL standard sur les DataFrames. C'est souvent plus lisible pour les analystes et les requetes complexes.
# Enregistrer un DataFrame comme vue temporaire
df.createOrReplaceTempView("orders")
# Requete SQL standard
result = spark.sql("""
WITH monthly_stats AS (
SELECT
region,
DATE_TRUNC('month', order_date) as month,
SUM(amount) as revenue,
COUNT(*) as order_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE status = 'completed'
AND order_date >= '2024-01-01'
GROUP BY region, DATE_TRUNC('month', order_date)
),
ranked AS (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY month ORDER BY revenue DESC) as rank,
LAG(revenue) OVER (PARTITION BY region ORDER BY month) as prev_revenue
FROM monthly_stats
)
SELECT
region, month, revenue, order_count, unique_customers,
rank,
ROUND((revenue - prev_revenue) / prev_revenue * 100, 1) as growth_pct
FROM ranked
WHERE rank <= 5
ORDER BY month DESC, rank
""")
result.show(50, truncate=False)
Comment Uber traite 1 PB/jour avec Spark
Uber est l'un des plus gros utilisateurs de Spark au monde, avec des milliers de jobs traitant plus d'un petaoctet par jour :
- Architecture : 10 000+ noeuds Spark repartis sur plusieurs clusters, geres par YARN et Kubernetes
- Cas d'usage : tarification dynamique (surge pricing) calculee toutes les 2 minutes sur les donnees de trajet en temps reel, detection de fraude sur des milliards de transactions, ETL du data lake (Parquet sur HDFS et S3)
- Optimisations cles : Uber a developpe des outils internes pour optimiser Spark - auto-tuning des configurations (nombre d'executors, memoire), repartitionnement intelligent base sur les patterns d'acces, et un systeme de cache distribue pour les datasets frequemment utilises
- Migration : Uber est passe de Hive (MapReduce) a Spark, reduisant le temps de leurs pipelines ETL de 12 heures a 2 heures
- Lecon : meme a l'echelle d'Uber, 80% de l'optimisation Spark vient de 3 choses : bon partitionnement, eviter les shuffles inutiles, et bien dimensionner les executors
collect() sur un DataFrame de 100M lignes
collect() ramene TOUTES les donnees du cluster vers le Driver (une seule machine). Sur un gros DataFrame, cela provoque un OutOfMemoryError et potentiellement crash le Driver, ce qui tue le job entier.
# CATASTROPHIQUE : ne JAMAIS faire ca sur un gros DataFrame
all_data = df.collect() # 100M lignes x 20 colonnes = crash memoire!
# DANGEREUX aussi :
pandas_df = df.toPandas() # Meme probleme, ramene tout au Driver
# BON : limiter avant de collect
sample = df.limit(1000).collect()
top10 = df.orderBy(F.desc("revenue")).limit(10).collect()
# BON : ecrire directement dans le stockage
df.write.mode("overwrite").parquet("s3://output/result/")
# BON : utiliser show() pour debugger
df.show(20, truncate=False)
# BON : toPandas() seulement sur un resultat agrege petit
stats = df.groupBy("region").agg(F.sum("amount")).toPandas()
# Seulement ~20 lignes (1 par region) β OK pour toPandas()
Optimisation de jobs Spark
# ===== 1. BROADCAST JOIN (petite table jointe a une grande) =====
from pyspark.sql.functions import broadcast
# Table de reference petite (< 100 MB)
dim_regions = spark.read.parquet("dim_regions.parquet") # 1000 lignes
# MAUVAIS : shuffle join (redistribue les 100M lignes)
result = big_df.join(dim_regions, "region_id")
# BON : broadcast join (envoie la petite table a tous les executors)
result = big_df.join(broadcast(dim_regions), "region_id")
# 10-100x plus rapide car evite le shuffle!
# ===== 2. REPARTITIONNEMENT =====
# Trop de petites partitions (slow)
df.rdd.getNumPartitions() # 10000 partitions de 1 MB
# Reduire le nombre de partitions (coalesce, sans shuffle)
df = df.coalesce(200) # 200 partitions de 50 MB
# Repartitionner par cle (optimise les joins/groupby suivants)
df = df.repartition(200, "region")
# ===== 3. CACHING =====
# Si un DataFrame est utilise plusieurs fois
df_clients = spark.read.parquet("clients/").filter(F.col("actif"))
df_clients.cache() # Stocke en memoire apres la premiere action
# ou
df_clients.persist(StorageLevel.MEMORY_AND_DISK) # Spillover sur disque
# Utilise plusieurs fois sans relire le Parquet
rapport1 = df_clients.groupBy("region").count()
rapport2 = df_clients.groupBy("segment").agg(F.sum("ltv"))
df_clients.unpersist() # Liberer la memoire quand fini
# ===== 4. PREDICATE PUSHDOWN =====
# Spark pousse les filtres vers la source (Parquet/JDBC)
# Parquet : ne lit que les colonnes et row groups necessaires
df = spark.read.parquet("big_table/") \
.select("col1", "col2") \ # Projection pushdown
.filter(F.col("date") >= "2024-01-01") # Predicate pushdown
# ===== 5. AQE (Adaptive Query Execution) =====
# Active par defaut depuis Spark 3.2
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE optimise le plan PENDANT l'execution :
# - Fusionne les petites partitions apres shuffle
# - Convertit sort-merge join en broadcast join si une side est petite
# - Gere le data skew automatiquement
Scenario : Optimiser un job Spark de 4 heures
Votre equipe a un job PySpark quotidien qui joint une table de 500M de lignes avec une table de 1M de lignes, puis fait un groupby. Le job prend 4 heures et echoue parfois en OOM.
Diagnostic et solutions :
- Etape 1 - Analyse : Regarder le Spark UI (port 4040). Identifier quel stage est le plus long. Verifier le "data skew" (quelques partitions beaucoup plus grosses que les autres).
- Etape 2 - Broadcast Join : La table de 1M de lignes fait probablement < 100 MB. Utiliser
broadcast()pour l'envoyer a tous les executors au lieu d'un shuffle join. Gain attendu : 3-5x. - Etape 3 - Partitionnement : Repartitionner la table de 500M par la cle de jointure avant le job. Si les donnees sont en Parquet, utiliser le partitionnement Hive (par date/region).
- Etape 4 - Caching : Si la table de 1M est lue plusieurs fois, la cacher en memoire.
- Etape 5 - Configuration : Ajuster spark.executor.memory (4-8 GB), spark.sql.shuffle.partitions (200-400 au lieu du defaut 200 si data skew).
- Resultat attendu : de 4 heures a 30-45 minutes. Les broadcast joins et le bon partitionnement sont les deux leviers les plus puissants.
Flashcards
repartition(n) redistribue les donnees via un shuffle complet (couteux mais equilibre). coalesce(n) fusionne les partitions existantes sans shuffle (rapide mais ne peut que reduire le nombre). Utilisez coalesce() pour reduire les partitions (apres un filtre), repartition() pour augmenter ou equilibrer.Quiz
Quelle ligne de code declenche reellement l'execution dans Spark ?
Vous joignez un DataFrame de 500M lignes avec un DataFrame de 50K lignes. Quelle optimisation est la plus impactante ?
Que fait spark.sql.adaptive.enabled = true ?
Spark est un outil puissant mais complexe. Mon conseil pour un Data Architect : ne recommandez jamais Spark si le volume est < 100 GB - DuckDB ou Polars feront mieux avec zero infrastructure. Mais au-dela de 100 GB, Spark devient incontournable. Maitrisez les 3 concepts cles : lazy evaluation, partitionnement, et broadcast joins. Avec ces trois outils, vous resoudrez 90% des problemes de performance Spark.
Formats de Fichiers Data
Le choix du format de fichier est une des decisions les plus impactantes en data engineering. Un mauvais choix peut multiplier les couts de stockage par 10 et les temps de lecture par 100. Cette lecon compare CSV, JSON, Avro, Parquet, ORC et Arrow, avec des benchmarks reels et des recommandations actionnables.
Objectifs d'apprentissage
- Comprendre les differences entre formats ligne et colonne
- Comparer CSV, JSON, Avro, Parquet et ORC avec des metriques reelles
- Choisir le bon format selon le cas d'usage
- Comprendre le role d'Apache Arrow dans l'ecosysteme
- Eviter l'anti-pattern CSV en production
Formats ligne vs colonne
La distinction fondamentale est entre les formats orientes ligne (CSV, JSON, Avro) et les formats orientes colonne (Parquet, ORC). Ce choix determine les performances de lecture, ecriture, et compression.
DONNEES ORIGINALES :
ββββββββββ¬βββββββ¬βββββββββ¬βββββββββββ
β nom β age β ville β salaire β
ββββββββββΌβββββββΌβββββββββΌβββββββββββ€
β Alice β 28 β Paris β 55000 β
β Bob β 34 β Lyon β 62000 β
β Charlieβ 45 β Paris β 78000 β
ββββββββββ΄βββββββ΄βββββββββ΄βββββββββββ
FORMAT LIGNE (CSV, JSON, Avro) : FORMAT COLONNE (Parquet, ORC) :
βββββββββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββ
β Alice, 28, Paris, 55000 β β nom: Alice, Bob, Charlie β
β Bob, 34, Lyon, 62000 β β age: 28, 34, 45 β
β Charlie, 45, Paris, 78000 β β ville: Paris, Lyon, Paris β
βββββββββββββββββββββββββββββββββββ β salaire: 55000, 62000, 78000β
ββββββββββββββββββββββββββββββββ
Ideal pour : Ideal pour :
- Ecriture rapide (append) - Lecture selective (colonnes)
- Lecture de lignes completes - Compression superieure
- Streaming - Analytique (SUM, AVG, GROUP BY)
- OLTP - OLAP
Pourquoi le format colonne compresse mieux
Dans une colonne "ville", les valeurs sont similaires (Paris, Lyon, Paris, Paris...). Le compresseur peut utiliser le dictionary encoding (3 valeurs uniques au lieu de 1M de strings) et le run-length encoding (Paris x 500K, Lyon x 300K). Dans un format ligne, le compresseur voit "Alice, 28, Paris, 55000, Bob, 34, Lyon..." - des types melanges impossibles a compresser efficacement.
Comparaison detaillee des formats
CSV - Comma-Separated Values
nom,age,ville,salaire Alice,28,Paris,55000 Bob,34,Lyon,62000 "Charlie ""Chuck""",45,Paris,78000
Avantages : lisible par un humain, universel, editable dans Excel. Inconvenients : pas de types (tout est string), pas de schema, pas de compression, lent a lire, ambiguite des separateurs.
JSON - JavaScript Object Notation
{"nom": "Alice", "age": 28, "ville": "Paris", "salaire": 55000}
{"nom": "Bob", "age": 34, "ville": "Lyon", "salaire": 62000}
{"nom": "Charlie", "age": 45, "ville": "Paris", "salaire": 78000, "bonus": 5000}
Avantages : semi-structure, supporte les types de base, schema flexible (colonnes optionnelles), nesting natif. Inconvenients : verbeux (repetition des cles), plus gros que CSV, pas de compression native.
Apache Avro
Format binaire oriente ligne avec schema integre. Ideal pour le streaming (Kafka) et les pipelines ou le schema evolue.
Apache Parquet
Le format roi de l'analytique. Format binaire oriente colonne avec compression, metadata, et predicate pushdown.
import pandas as pd
import pyarrow.parquet as pq
# Ecriture Parquet avec compression
df.to_parquet(
"ventes.parquet",
engine="pyarrow",
compression="snappy", # Alternatives: gzip, zstd, lz4
index=False
)
# Ecriture partitionnee (essentiel pour les data lakes!)
df.to_parquet(
"datalake/ventes/",
partition_cols=["year", "month"], # Cree des sous-dossiers
compression="snappy"
)
# Resultat :
# datalake/ventes/year=2024/month=01/part-0.parquet
# datalake/ventes/year=2024/month=02/part-0.parquet
# ...
# Lecture avec projection pushdown (ne lit que les colonnes demandees)
df = pd.read_parquet(
"ventes.parquet",
columns=["date", "montant", "region"], # Projection pushdown!
filters=[("year", "=", 2024)] # Predicate pushdown!
)
# Lecture des metadata sans charger les donnees
parquet_file = pq.read_metadata("ventes.parquet")
print(f"Lignes: {parquet_file.num_rows}")
print(f"Colonnes: {parquet_file.num_columns}")
print(f"Taille: {parquet_file.serialized_size / 1024**2:.1f} MB")
print(f"Row groups: {parquet_file.num_row_groups}")
Apache ORC
Autre format colonne, optimise pour l'ecosysteme Hive/Hadoop. Similaire a Parquet mais moins repandu hors de l'ecosysteme Hadoop.
Benchmarks reels
Voici des benchmarks sur un dataset de 10M lignes (15 colonnes, mix numerique/string) :
| Metrique | CSV | JSON | Avro | Parquet (snappy) | ORC (zlib) |
|---|---|---|---|---|---|
| Taille fichier | 1.2 GB | 2.1 GB | 450 MB | 180 MB | 165 MB |
| Ratio vs CSV | 1x | 1.75x | 0.38x | 0.15x | 0.14x |
| Temps ecriture | 12s | 28s | 8s | 6s | 9s |
| Temps lecture (tout) | 45s | 90s | 15s | 4s | 5s |
| Lecture 3 colonnes | 45s | 90s | 15s | 0.8s | 0.9s |
| Schema integre | Non | Non | Oui | Oui | Oui |
| Types natifs | Non | Partiels | Oui | Oui | Oui |
| Splittable | Oui | Oui (JSONL) | Oui | Oui | Oui |
| Lisible humain | Oui | Oui | Non | Non | Non |
Le chiffre qui compte : 0.8s vs 45s
Parquet met 0.8 seconde pour lire 3 colonnes sur 10M lignes, contre 45 secondes pour CSV. C'est un facteur 56x. Sur un data lake de 1 TB, cela represente la difference entre une requete de 5 secondes et une requete de 5 minutes. C'est pourquoi Parquet est le standard des data lakes.
Apache Arrow - Le format en memoire
Apache Arrow n'est pas un format de fichier mais un format en memoire. Il definit comment les donnees tabulaires sont organisees en RAM, et permet l'echange de donnees entre systemes sans serialisation/deserialisation (zero-copy).
AVANT Arrow (chaque outil a son format memoire) :
ββββββββββ Serialise ββββββββββ Serialise ββββββββββ
β pandas β βββββββββββββββΆ β Spark β βββββββββββββββΆ β R β
β (NumPy)β βββββββββββββββ β (JVM) β βββββββββββββββ β(R obj) β
ββββββββββ Deserialise ββββββββββ Deserialise ββββββββββ
Copie memoire a chaque transfert (lent, couteux)
AVEC Arrow (format memoire commun) :
ββββββββββ ββββββββββ ββββββββββ
β pandas β β Spark β β R β
β ββββ βββββ βββββ βββββ β
ββββββββββ β β ββββββββββ β β ββββββββββ
βΌ βΌ βΌ βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββ
β Apache Arrow (memoire partagee) β
β Format colonne standardise en RAM β
β Zero-copy entre Python, Java, R, Rust β
ββββββββββββββββββββββββββββββββββββββββββββββββ
import pyarrow as pa
import pyarrow.parquet as pq
# Conversion pandas <-> Arrow (quasi-instantane)
arrow_table = pa.Table.from_pandas(df)
df_back = arrow_table.to_pandas()
# Lecture Parquet via Arrow (plus rapide que pd.read_parquet)
table = pq.read_table("ventes.parquet", columns=["date", "montant"])
df = table.to_pandas()
# Arrow est utilise par :
# - Polars (backend natif)
# - DuckDB (echange avec pandas)
# - PySpark (echange Python <-> JVM depuis Spark 3.0)
# - Pandas 2.0 (backend optionnel: pd.ArrowDtype)
# Pandas avec backend Arrow (plus rapide et moins de memoire)
df = pd.read_parquet("data.parquet", dtype_backend="pyarrow")
print(df.dtypes) # int64[pyarrow], string[pyarrow], etc.
Utiliser CSV pour des pipelines data en production
Le CSV est le format de fichier le plus dangereux en production. Voici les problemes concrets :
- Pas de types : "1234" est-il un entier ou un code postal ? Le CSV ne le dit pas. Chaque lecteur infere differemment.
- Pas de schema : si une colonne est ajoutee ou renommee, tous les pipelines en aval cassent silencieusement.
- Encodage ambigu : UTF-8 ? Latin-1 ? Windows-1252 ? Les fichiers CSV francais avec des accents sont un cauchemar.
- Separateur ambigu : virgule ? Point-virgule ? Tab ? Et si une valeur contient le separateur ?
- Pas de compression : un fichier Parquet equivalent est 5-10x plus petit.
- Pas de predicate pushdown : impossible de lire seulement 3 colonnes sur 50 - il faut tout charger.
Regle : CSV pour l'import/export humain et les petits fichiers ad-hoc. Parquet pour tout le reste.
Comment LinkedIn economise 60% de stockage avec Parquet
LinkedIn a migre son data lake de Avro vers Parquet en 2019 :
- Avant : 500 PB de donnees en Avro sur HDFS, cout de stockage enorme
- Migration : conversion progressive des tables les plus volumineuses vers Parquet avec compression Zstandard
- Resultat stockage : reduction de 60% de l'espace (500 PB a 200 PB). Economie estimee : plusieurs millions de dollars/an en infrastructure HDFS
- Resultat performance : les requetes analytiques (Spark, Presto) sont 3-10x plus rapides grace a la projection pushdown et la lecture columnar
- Schema evolution : Parquet supporte l'ajout de colonnes sans recrire les fichiers existants (les anciennes partitions retournent NULL pour les nouvelles colonnes)
- Lecon : le format de fichier est un des leviers les plus sous-estimes en data engineering. Un changement de format peut diviser les couts par 2 et les temps de requete par 5.
Guide de decision des formats
Quel est le cas d'usage ?
β
βββββββββββββββββββΌββββββββββββββββββββ
βΌ βΌ βΌ
Analytique / Streaming / Echange
Data Lake Event Bus Humain
β β β
βΌ βΌ βΌ
PARQUET AVRO CSV
(+ compression (schema (petits
snappy/zstd) evolution, fichiers,
Kafka natif) Excel)
β
βΌ
Besoin de
schema flexible ?
β β
Oui Non
β β
βΌ βΌ
JSON PARQUET
(nested (performance
data, maximale)
APIs)
Quiz
Un data lake de 100 TB en CSV est migre en Parquet (snappy). Quelle taille attendue ?
Quel format est recommande pour les evenements Kafka qui doivent supporter l'evolution de schema ?
Qu'est-ce que le "predicate pushdown" en Parquet ?
Si je ne devais retenir qu'une seule chose de cette lecon : Parquet est le format par defaut pour tout pipeline data. CSV pour l'import/export humain, Avro pour Kafka, JSON pour les APIs. C'est tout. La migration CSV vers Parquet est souvent la premiere action a fort ROI qu'un Data Architect peut proposer dans une entreprise. Facile a implementer, resultats immediats.
Projet Pipeline ETL Complet
Ce lab pratique vous guide dans la construction d'un pipeline ETL complet et robuste : extraction depuis une API REST et des fichiers CSV, transformation avec pandas et DuckDB, chargement dans PostgreSQL. Vous ajouterez la gestion d'erreurs, le retry, le logging et la validation de donnees - les elements qui font la difference entre un prototype et un systeme de production.
Objectifs d'apprentissage
- Concevoir un pipeline ETL end-to-end
- Implementer l'extraction multi-sources (API + fichiers)
- Transformer les donnees avec pandas et DuckDB
- Charger dans PostgreSQL avec gestion des conflits
- Ajouter la gestion d'erreurs, retry, logging et monitoring
- Valider la qualite des donnees a chaque etape
Architecture du pipeline
SOURCES EXTRACT TRANSFORM LOAD
βββββββ βββββββ βββββββββ ββββ
ββββββββββββ ββββββββββββββββββββββββββββββββββββββββββββββββββ
β API RESTββββ β β
β (produits)β β β ββββββββββββ ββββββββββββ ββββββββββββ β
ββββββββββββ β Extract β β Validate β β Transformβ β Validateβ β
βββββββββββΆ β β Schema ββββΆβ pandas ββββΆβ Quality β β
ββββββββββββ β β β (input) β β DuckDB β β (output)β β
β CSV filesββββ€ β ββββββββββββ ββββββββββββ ββββββββββββ β
β (ventes) β β β β β
ββββββββββββ β ββββββββββββββββββββββββββββββββββββββββΌβββββββββ
β β
ββββββββββββ β βΌ
β JSON API ββββ ββββββββββββββββ
β (clients)β β PostgreSQL β
ββββββββββββ β + Parquet β
β (archive) β
MONITORING ββββββββββββββββ
ββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β Logging (fichier + console) β
β Metriques (lignes traitees, erreurs, duree) β
β Alertes (Slack/email si echec) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lab : Construction du pipeline ETL
Etape 1 : Configuration et logging
Un bon pipeline commence par une configuration propre et un systeme de logging. Ne jamais utiliser print() en production.
import logging
import os
from dataclasses import dataclass
from datetime import datetime
@dataclass
class PipelineConfig:
"""Configuration centralisee du pipeline."""
# Sources
api_base_url: str = "https://api.example.com/v1"
api_key: str = os.getenv("API_KEY", "")
csv_input_dir: str = "./data/input/"
# Destination
db_url: str = os.getenv("DATABASE_URL",
"postgresql://etl_user:password@localhost:5432/warehouse")
# Pipeline
batch_size: int = 10000
max_retries: int = 3
retry_delay: int = 5 # secondes
output_parquet_dir: str = "./data/archive/"
# Qualite
max_null_ratio: float = 0.05 # Max 5% de nulls
min_rows_expected: int = 100
def setup_logging(log_file: str = None) -> logging.Logger:
"""Configure le logging pour le pipeline."""
logger = logging.getLogger("etl_pipeline")
logger.setLevel(logging.INFO)
# Format
fmt = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Console handler
console = logging.StreamHandler()
console.setFormatter(fmt)
logger.addHandler(console)
# File handler
if log_file is None:
log_file = f"logs/etl_{datetime.now():%Y%m%d_%H%M%S}.log"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(fmt)
logger.addHandler(file_handler)
return logger
Etape 2 : Utilitaire de retry
Les sources externes (APIs, bases) echouent regulierement. Un decorateur retry avec backoff exponentiel est indispensable.
import time
import functools
import logging
logger = logging.getLogger("etl_pipeline")
def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0,
exceptions: tuple = (Exception,)):
"""Decorateur retry avec backoff exponentiel."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
current_delay = delay
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
if attempt == max_attempts:
logger.error(
f"ECHEC {func.__name__} apres {max_attempts} "
f"tentatives: {e}"
)
raise
logger.warning(
f"{func.__name__} tentative {attempt}/{max_attempts} "
f"echouee: {e}. Retry dans {current_delay:.1f}s..."
)
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
class PipelineMetrics:
"""Collecte les metriques d'execution."""
def __init__(self):
self.start_time = time.time()
self.rows_extracted = 0
self.rows_transformed = 0
self.rows_loaded = 0
self.rows_rejected = 0
self.errors = []
def log_summary(self):
duration = time.time() - self.start_time
logger.info("=" * 60)
logger.info("RESUME DU PIPELINE")
logger.info(f" Duree totale: {duration:.1f}s")
logger.info(f" Lignes extraites: {self.rows_extracted}")
logger.info(f" Lignes transformees: {self.rows_transformed}")
logger.info(f" Lignes chargees: {self.rows_loaded}")
logger.info(f" Lignes rejetees: {self.rows_rejected}")
logger.info(f" Erreurs: {len(self.errors)}")
if self.errors:
for err in self.errors[:5]:
logger.info(f" - {err}")
logger.info("=" * 60)
Etape 3 : Extraction (Extract)
L'extraction gere deux sources : une API REST (produits et clients) et des fichiers CSV (ventes). Chaque source a sa logique d'erreur.
import pandas as pd
import requests
import glob
import logging
from utils import retry
logger = logging.getLogger("etl_pipeline")
@retry(max_attempts=3, delay=2.0, exceptions=(requests.RequestException,))
def extract_from_api(url: str, api_key: str, endpoint: str) -> pd.DataFrame:
"""Extrait des donnees depuis une API REST avec pagination."""
all_data = []
page = 1
per_page = 1000
while True:
response = requests.get(
f"{url}/{endpoint}",
headers={"Authorization": f"Bearer {api_key}"},
params={"page": page, "per_page": per_page},
timeout=30
)
response.raise_for_status()
data = response.json()
if not data.get("results"):
break
all_data.extend(data["results"])
logger.info(f" API {endpoint}: page {page}, {len(data['results'])} records")
if not data.get("has_next"):
break
page += 1
df = pd.DataFrame(all_data)
logger.info(f"API {endpoint}: {len(df)} lignes extraites")
return df
def extract_csv_files(input_dir: str, pattern: str = "ventes_*.csv") -> pd.DataFrame:
"""Extrait et concatene des fichiers CSV."""
files = sorted(glob.glob(f"{input_dir}/{pattern}"))
if not files:
raise FileNotFoundError(f"Aucun fichier {pattern} dans {input_dir}")
dfs = []
for filepath in files:
try:
df = pd.read_csv(
filepath,
sep=";",
encoding="utf-8",
dtype={"code_produit": str, "code_client": str},
parse_dates=["date_vente"],
na_values=["N/A", "NULL", ""]
)
dfs.append(df)
logger.info(f" CSV: {filepath} -> {len(df)} lignes")
except Exception as e:
logger.error(f" ERREUR lecture {filepath}: {e}")
continue
if not dfs:
raise ValueError("Aucun fichier CSV valide")
result = pd.concat(dfs, ignore_index=True)
logger.info(f"CSV total: {len(result)} lignes depuis {len(dfs)} fichiers")
return result
def extract_all(config) -> dict:
"""Orchestre l'extraction de toutes les sources."""
logger.info("=== PHASE EXTRACT ===")
data = {}
# API - Produits
data['produits'] = extract_from_api(
config.api_base_url, config.api_key, "products"
)
# API - Clients
data['clients'] = extract_from_api(
config.api_base_url, config.api_key, "customers"
)
# CSV - Ventes
data['ventes'] = extract_csv_files(config.csv_input_dir)
return data
Etape 4 : Validation des donnees entree
Validez TOUJOURS les donnees apres extraction et avant transformation. Une donnee corrompue en entree produit des resultats faux en sortie.
import pandas as pd
import logging
from dataclasses import dataclass
from typing import List, Optional
logger = logging.getLogger("etl_pipeline")
@dataclass
class ValidationResult:
is_valid: bool
errors: List[str]
warnings: List[str]
rows_before: int
rows_after: int
def validate_dataframe(
df: pd.DataFrame,
name: str,
required_columns: List[str],
unique_columns: Optional[List[str]] = None,
not_null_columns: Optional[List[str]] = None,
min_rows: int = 1,
max_null_ratio: float = 0.05
) -> ValidationResult:
"""Valide un DataFrame selon des regles metier."""
errors = []
warnings = []
rows_before = len(df)
# 1. Colonnes requises
missing = set(required_columns) - set(df.columns)
if missing:
errors.append(f"Colonnes manquantes: {missing}")
# 2. Nombre minimum de lignes
if len(df) < min_rows:
errors.append(f"Seulement {len(df)} lignes (minimum: {min_rows})")
# 3. Doublons sur colonnes uniques
if unique_columns:
dupes = df.duplicated(subset=unique_columns).sum()
if dupes > 0:
warnings.append(f"{dupes} doublons sur {unique_columns}")
# 4. Valeurs manquantes
if not_null_columns:
for col in not_null_columns:
if col in df.columns:
null_ratio = df[col].isnull().mean()
if null_ratio > max_null_ratio:
errors.append(
f"Colonne '{col}': {null_ratio:.1%} nulls "
f"(max: {max_null_ratio:.1%})"
)
elif null_ratio > 0:
warnings.append(f"Colonne '{col}': {null_ratio:.1%} nulls")
# 5. Types attendus
null_total = df.isnull().sum().sum()
total_cells = df.shape[0] * df.shape[1]
if null_total / total_cells > 0.1:
warnings.append(f"Taux global de nulls eleve: {null_total/total_cells:.1%}")
is_valid = len(errors) == 0
# Log
status = "VALIDE" if is_valid else "INVALIDE"
logger.info(f"Validation [{name}]: {status} - {len(df)} lignes, "
f"{len(errors)} erreurs, {len(warnings)} warnings")
for e in errors:
logger.error(f" ERREUR: {e}")
for w in warnings:
logger.warning(f" WARNING: {w}")
return ValidationResult(
is_valid=is_valid, errors=errors, warnings=warnings,
rows_before=rows_before, rows_after=len(df)
)
Etape 5 : Transformation (Transform)
La transformation nettoie, enrichit et structure les donnees. On utilise pandas pour les transformations complexes et DuckDB pour les jointures performantes.
import pandas as pd
import numpy as np
import duckdb
import logging
logger = logging.getLogger("etl_pipeline")
def transform_clients(df: pd.DataFrame) -> pd.DataFrame:
"""Nettoie et normalise les donnees clients."""
logger.info("Transform clients...")
df = df.copy()
# Normalisation noms
df['nom'] = df['nom'].str.strip().str.title()
df['email'] = df['email'].str.strip().str.lower()
# Validation email
email_valid = df['email'].str.match(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
df.loc[~email_valid, 'email'] = np.nan
# Deduplication par email
df = df.sort_values('date_inscription', ascending=False)
df = df.drop_duplicates(subset=['email'], keep='first')
# Segmentation (regles metier)
conditions = [
df['ltv'] >= 10000,
df['ltv'] >= 1000,
df['ltv'] >= 100,
]
segments = ['Platinum', 'Gold', 'Silver']
df['segment'] = np.select(conditions, segments, default='Bronze')
logger.info(f" Clients: {len(df)} lignes apres nettoyage")
return df.reset_index(drop=True)
def transform_ventes(df: pd.DataFrame) -> pd.DataFrame:
"""Nettoie et enrichit les donnees de ventes."""
logger.info("Transform ventes...")
df = df.copy()
# Supprimer les montants invalides
initial = len(df)
df = df[df['montant'] > 0]
df = df[df['quantite'] > 0]
logger.info(f" {initial - len(df)} lignes supprimees (montant/quantite <= 0)")
# Enrichissement temporel
df['annee'] = df['date_vente'].dt.year
df['mois'] = df['date_vente'].dt.month
df['trimestre'] = df['date_vente'].dt.quarter
df['jour_semaine'] = df['date_vente'].dt.dayofweek
df['est_weekend'] = df['jour_semaine'].isin([5, 6])
# Calcul du montant total
df['montant_total'] = df['montant'] * df['quantite']
# Categories de montant
df['tranche_montant'] = pd.cut(
df['montant_total'],
bins=[0, 50, 200, 1000, float('inf')],
labels=['Petit', 'Moyen', 'Grand', 'Premium']
)
logger.info(f" Ventes: {len(df)} lignes apres nettoyage")
return df.reset_index(drop=True)
def create_fact_table(ventes: pd.DataFrame, clients: pd.DataFrame,
produits: pd.DataFrame) -> pd.DataFrame:
"""Cree la table de faits via DuckDB (jointures performantes)."""
logger.info("Creation de la table de faits avec DuckDB...")
fact = duckdb.sql("""
SELECT
v.id as vente_id,
v.date_vente,
v.annee,
v.mois,
v.trimestre,
v.est_weekend,
c.id as client_id,
c.nom as client_nom,
c.segment as client_segment,
c.ville as client_ville,
p.id as produit_id,
p.nom as produit_nom,
p.categorie as produit_categorie,
v.quantite,
v.montant as prix_unitaire,
v.montant_total,
v.tranche_montant
FROM ventes v
LEFT JOIN clients c ON v.code_client = c.code_client
LEFT JOIN produits p ON v.code_produit = p.code_produit
ORDER BY v.date_vente DESC
""").df()
logger.info(f" Table de faits: {len(fact)} lignes")
return fact
Etape 6 : Chargement (Load)
Le chargement ecrit les donnees dans PostgreSQL et archive en Parquet. On gere les conflits (upsert) et les transactions.
import pandas as pd
import logging
import os
from datetime import datetime
from sqlalchemy import create_engine, text
logger = logging.getLogger("etl_pipeline")
def load_to_postgresql(df: pd.DataFrame, table_name: str,
engine, if_exists: str = "append",
chunksize: int = 10000) -> int:
"""Charge un DataFrame dans PostgreSQL."""
logger.info(f"Chargement de {len(df)} lignes dans {table_name}...")
try:
df.to_sql(
table_name,
engine,
if_exists=if_exists,
index=False,
chunksize=chunksize,
method='multi'
)
logger.info(f" {table_name}: {len(df)} lignes chargees")
return len(df)
except Exception as e:
logger.error(f" ERREUR chargement {table_name}: {e}")
raise
def upsert_to_postgresql(df: pd.DataFrame, table_name: str,
engine, conflict_columns: list) -> int:
"""Upsert (INSERT ... ON CONFLICT UPDATE) dans PostgreSQL."""
logger.info(f"Upsert de {len(df)} lignes dans {table_name}...")
# Charger dans une table staging temporaire
staging = f"staging_{table_name}"
df.to_sql(staging, engine, if_exists="replace", index=False,
method='multi', chunksize=10000)
# Upsert depuis staging vers la table finale
columns = df.columns.tolist()
conflict_str = ", ".join(conflict_columns)
update_cols = [c for c in columns if c not in conflict_columns]
update_str = ", ".join([f"{c} = EXCLUDED.{c}" for c in update_cols])
cols_str = ", ".join(columns)
upsert_sql = f"""
INSERT INTO {table_name} ({cols_str})
SELECT {cols_str} FROM {staging}
ON CONFLICT ({conflict_str})
DO UPDATE SET {update_str};
"""
with engine.begin() as conn:
result = conn.execute(text(upsert_sql))
conn.execute(text(f"DROP TABLE IF EXISTS {staging}"))
logger.info(f" Upsert {table_name}: complete")
return len(df)
def archive_to_parquet(df: pd.DataFrame, output_dir: str,
name: str) -> str:
"""Archive un DataFrame en Parquet partitionne."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filepath = os.path.join(output_dir, f"{name}_{timestamp}.parquet")
os.makedirs(output_dir, exist_ok=True)
df.to_parquet(filepath, compression="snappy", index=False)
size_mb = os.path.getsize(filepath) / 1024**2
logger.info(f" Archive: {filepath} ({size_mb:.1f} MB)")
return filepath
Etape 7 : Orchestrateur principal
L'orchestrateur relie toutes les etapes avec gestion d'erreurs globale et metriques.
import sys
import logging
from datetime import datetime
from sqlalchemy import create_engine
from config import PipelineConfig, setup_logging
from utils import PipelineMetrics
from extract import extract_all
from validate import validate_dataframe
from transform import transform_clients, transform_ventes, create_fact_table
from load import load_to_postgresql, upsert_to_postgresql, archive_to_parquet
def run_pipeline():
"""Execute le pipeline ETL complet."""
config = PipelineConfig()
logger = setup_logging()
metrics = PipelineMetrics()
engine = create_engine(config.db_url)
logger.info("=" * 60)
logger.info(f"DEMARRAGE PIPELINE ETL - {datetime.now():%Y-%m-%d %H:%M:%S}")
logger.info("=" * 60)
try:
# ===== EXTRACT =====
raw_data = extract_all(config)
metrics.rows_extracted = sum(len(df) for df in raw_data.values())
# ===== VALIDATE INPUT =====
v_clients = validate_dataframe(
raw_data['clients'], "clients_raw",
required_columns=['code_client', 'nom', 'email'],
unique_columns=['code_client'],
not_null_columns=['code_client', 'nom'],
min_rows=config.min_rows_expected
)
v_ventes = validate_dataframe(
raw_data['ventes'], "ventes_raw",
required_columns=['date_vente', 'montant', 'code_client'],
not_null_columns=['date_vente', 'montant'],
min_rows=config.min_rows_expected
)
if not v_clients.is_valid or not v_ventes.is_valid:
logger.error("ECHEC validation input - pipeline arrete")
metrics.errors.append("Validation input echouee")
metrics.log_summary()
sys.exit(1)
# ===== TRANSFORM =====
logger.info("=== PHASE TRANSFORM ===")
clients_clean = transform_clients(raw_data['clients'])
ventes_clean = transform_ventes(raw_data['ventes'])
fact_table = create_fact_table(
ventes_clean, clients_clean, raw_data['produits']
)
metrics.rows_transformed = len(fact_table)
# ===== VALIDATE OUTPUT =====
v_output = validate_dataframe(
fact_table, "fact_ventes",
required_columns=['vente_id', 'date_vente', 'montant_total'],
unique_columns=['vente_id'],
not_null_columns=['vente_id', 'date_vente', 'montant_total'],
min_rows=config.min_rows_expected
)
metrics.rows_rejected = (
metrics.rows_extracted - metrics.rows_transformed
)
# ===== LOAD =====
logger.info("=== PHASE LOAD ===")
upsert_to_postgresql(clients_clean, "dim_clients", engine,
conflict_columns=["code_client"])
load_to_postgresql(fact_table, "fact_ventes", engine)
metrics.rows_loaded = len(fact_table)
# Archive Parquet
archive_to_parquet(fact_table, config.output_parquet_dir, "fact_ventes")
logger.info("PIPELINE TERMINE AVEC SUCCES")
except Exception as e:
logger.critical(f"ECHEC PIPELINE: {e}", exc_info=True)
metrics.errors.append(str(e))
# Ici: envoyer alerte Slack/email
send_alert(f"Pipeline ETL ECHEC: {e}")
sys.exit(1)
finally:
metrics.log_summary()
engine.dispose()
def send_alert(message: str):
"""Envoie une alerte (Slack, email, PagerDuty)."""
logger = logging.getLogger("etl_pipeline")
logger.info(f"ALERTE: {message}")
# En production: requests.post(SLACK_WEBHOOK, json={"text": message})
if __name__ == "__main__":
run_pipeline()
Etape 8 : Tests et monitoring
Un pipeline sans tests est une bombe a retardement. Ecrivez des tests unitaires pour chaque etape et des tests d'integration pour le pipeline complet.
import pytest
import pandas as pd
import numpy as np
from transform import transform_clients, transform_ventes
from validate import validate_dataframe
class TestTransformClients:
def setup_method(self):
self.df = pd.DataFrame({
'code_client': ['C001', 'C002', 'C003', 'C002'],
'nom': [' alice dupont ', 'BOB MARTIN', 'charlie', 'Bob Martin'],
'email': ['alice@ex.com', 'bob@ex.com', 'invalid-email', 'bob@ex.com'],
'date_inscription': pd.to_datetime([
'2024-01-01', '2024-06-01', '2024-03-01', '2024-08-01'
]),
'ltv': [15000, 500, 50, 1200],
'ville': ['Paris', 'Lyon', 'Marseille', 'Lyon']
})
def test_nom_normalise(self):
result = transform_clients(self.df)
assert result.iloc[0]['nom'] == 'Alice Dupont'
def test_email_invalide_supprime(self):
result = transform_clients(self.df)
assert result[result['code_client'] == 'C003']['email'].isna().all()
def test_deduplication_email(self):
result = transform_clients(self.df)
assert result['email'].dropna().duplicated().sum() == 0
def test_segmentation(self):
result = transform_clients(self.df)
platinum = result[result['code_client'] == 'C001']
assert platinum.iloc[0]['segment'] == 'Platinum'
class TestValidation:
def test_colonnes_manquantes(self):
df = pd.DataFrame({'nom': ['test']})
result = validate_dataframe(
df, "test",
required_columns=['nom', 'email', 'age']
)
assert not result.is_valid
assert any('manquantes' in e for e in result.errors)
def test_trop_de_nulls(self):
df = pd.DataFrame({
'id': range(100),
'value': [None] * 20 + list(range(80)) # 20% nulls
})
result = validate_dataframe(
df, "test",
required_columns=['id', 'value'],
not_null_columns=['value'],
max_null_ratio=0.10
)
assert not result.is_valid
# pytest test_pipeline.py -v
Scenario : Pipeline fails at 3 AM - Monitoring et alertes
Votre pipeline ETL tourne chaque nuit a 2h du matin. A 3h, il echoue car l'API source renvoie des erreurs 503. Personne n'est au courant jusqu'au lendemain matin quand les analystes demandent pourquoi les donnees ne sont pas a jour.
Plan d'action :
- 1. Alertes immediates : Configurer un webhook Slack/Teams et un email automatique en cas d'echec. Le
send_alert()dans notre pipeline est le minimum. - 2. Dashboard de monitoring : Utiliser un outil comme Grafana pour afficher les metriques du pipeline (duree, lignes traitees, taux d'erreur). Stocker les metriques dans une table
pipeline_runs. - 3. Retry automatique : Le decorateur
@retrygere les erreurs 503 transitoires. Si 3 retries echouent, alerter et replanifier 1h plus tard. - 4. Dead letter queue : Les lignes qui echouent la validation ne sont pas perdues - les stocker dans un fichier
rejected_YYYYMMDD.parquetpour investigation. - 5. SLA monitoring : Si le pipeline n'a pas termine a 5h (SLA interne), declencher une alerte "SLA breach" meme si le pipeline est encore en cours.
Pipeline ETL sans monitoring chez Knight Capital - 440M$ de perte
En 2012, Knight Capital Group a deploye du code de trading sur un serveur de production sans pipeline de monitoring adequat :
- Probleme : un ancien code de test a ete reactive par erreur sur un des 8 serveurs de trading. Le deploiement n'avait pas de verification de coherence entre serveurs.
- Detection tardive : aucune alerte automatique sur les anomalies de volume de trading. Les operateurs ont mis 45 minutes a comprendre le probleme.
- Resultat : en 45 minutes, le code a genere 440 millions de dollars de pertes. Knight Capital a fait faillite.
- Lecon pour nous : tout pipeline en production DOIT avoir des alertes automatiques, des validations de donnees a chaque etape, et un kill switch. Un pipeline ETL sans monitoring n'est pas un pipeline de production.
Ce lab couvre les fondamentaux d'un pipeline ETL de production. En realite, vous utiliserez un orchestrateur comme Airflow, Prefect ou Dagster pour gerer le scheduling, les retries, et les dependances entre taches. Mais les briques de base (extract, validate, transform, load, monitor) restent les memes. Retenez : un bon pipeline n'est pas celui qui transforme les donnees correctement - c'est celui qui ECHOUE proprement quand quelque chose ne va pas.
Examen Final - Phase 1
Felicitations, vous avez parcouru toute la Phase 1 ! Cet examen final couvre les trois modules : SQL Avance, NoSQL & NewSQL, et Python pour la Data. Il melange des questions de connaissance et des scenarios de mise en situation reelle, comme un Data Architect les rencontre au quotidien.
Objectifs de l'examen
- Verifier la maitrise des concepts SQL avances (window functions, CTEs, index)
- Valider la comprehension du theoreme CAP et des choix NoSQL
- Tester les competences Python Data (pandas, DuckDB, PySpark, formats)
- Evaluer la capacite a prendre des decisions architecturales en situation reelle
Partie 1 : Questions de connaissance (6 questions)
Question 1 (SQL) : Quelle est la difference entre ROW_NUMBER(), RANK() et DENSE_RANK() quand deux employes ont le meme salaire ?
Question 2 (SQL) : Quel est l'avantage principal d'une vue materialisee par rapport a une vue standard ?
Question 3 (NoSQL) : Selon le theoreme CAP, qu'est-ce qu'un systeme CP (Consistency + Partition tolerance) sacrifie ?
Question 4 (NoSQL) : Pour stocker des relations de type reseau social (amis d'amis, recommandations, chemins les plus courts), quelle base NoSQL est la plus adaptee ?
Question 5 (Python) : Quel format de fichier offre le meilleur ratio compression + vitesse de lecture pour un data lake analytique ?
Question 6 (Python) : Quelle est la maniere la plus performante de calculer une colonne conditionnelle sur 10 millions de lignes dans pandas ?
Partie 2 : Scenarios Data Architect (6 questions)
Scenario 1 : Vous etes Data Architect chez une fintech. L'equipe veut stocker les transactions bancaires (10M/jour, besoin de requetes analytiques sur l'historique de 2 ans, conformite reglementaire exigeant coherence ACID). Quelle architecture recommandez-vous ?
Scenario 2 : Vous etes Data Architect chez un media en ligne. Le CTO veut migrer de MySQL a MongoDB parce que "NoSQL est plus moderne". Le systeme actuel fonctionne bien avec 5M d'articles et des requetes relationnelles (auteurs, categories, tags). Que conseillez-vous ?
Scenario 3 : Vous etes Data Architect dans une startup e-commerce (50 employes). L'equipe data (3 personnes) traite 5 GB de donnees par jour. Un ingenieur propose de mettre en place un cluster Spark sur AWS EMR. Votre reaction ?
Scenario 4 : Vous etes Data Architect chez un retailer. Votre pipeline ETL pandas prend 6 heures car il utilise des boucles iterrows() sur un DataFrame de 50M lignes. Quelle approche pour reduire a moins de 30 minutes ?
Scenario 5 : Vous etes Data Architect chez une entreprise IoT. 10 000 capteurs envoient des mesures toutes les secondes (10K ecritures/sec). Les donnees doivent etre conservees 5 ans et interrogeables par capteur et par plage de temps. Quelle base de donnees ?
Scenario 6 : Vous etes Data Architect et vous decouvrez que le data lake de votre entreprise stocke tout en CSV (200 TB). Le CEO demande une estimation du ROI d'une migration vers Parquet. Que repondez-vous ?
Bilan et mots du mentor
Si vous avez suivi cette Phase 1 avec attention, vous maitrisez maintenant les fondamentaux essentiels d'un Data Architect :
Module 1.1 - SQL Avance
- Window functions (ROW_NUMBER, RANK, LAG/LEAD)
- CTEs et CTEs recursives
- Plans d'execution et indexation
- Partitionnement et vues materialisees
Module 1.2 - NoSQL & NewSQL
- Theoreme CAP et decisions architecturales
- MongoDB, Redis, Cassandra, Neo4j
- Quand utiliser SQL vs NoSQL
- NewSQL et bases hybrides
Module 1.3 - Python pour la Data
- Ecosysteme Python : pandas, DuckDB, Polars, PySpark
- Matrice de decision basee sur le volume
- Performance : vectorisation vs boucles
- Formats de fichiers : Parquet, Avro, Arrow
- Pipelines ETL robustes avec monitoring
La Phase 1 vous a donne les briques fondamentales. Vous savez maintenant interroger des donnees (SQL), choisir le bon stockage (SQL vs NoSQL), et manipuler les donnees en Python. La Phase 2 - Modelisation va construire sur ces fondations : vous apprendrez a concevoir des modeles de donnees qui servent l'entreprise, a comprendre les schemas en etoile et flocon, et a penser en termes de data warehouse et data lake. C'est la ou le Data Architect se differencie de l'ingenieur data : dans la capacite a modeler l'information pour creer de la valeur. Rendez-vous en Phase 2!
Phase 1 Phase 2 Phase 3 Phase 4 FONDAMENTAUX MODELISATION MODERN STACK ARCHITECTURE ββββββββββββ ββββββββββββ ββββββββββββ ββββββββββββ β Complete! A venir A venir A venir SQL Avance βββΆ Data Modeling βββΆ Kafka/Spark βββΆ Cloud Architecture NoSQL βββΆ Star Schema βββΆ dbt/Airflow βββΆ Lakehouse Python βββΆ Data Vault βββΆ Delta Lake βββΆ Gouvernance