Introduction au SQL Avance

45 min Fondamental

Bienvenue dans le module SQL Avance. Ce module vous transformera d'un utilisateur SQL standard en un expert capable d'ecrire des requetes performantes sur des tables de millions de lignes. Vous apprendrez les techniques utilisees par les plus grandes entreprises tech pour gerer leurs donnees a grande echelle.

Objectifs d'apprentissage

  • Comprendre les differences fondamentales entre OLTP et OLAP
  • Maitriser l'ordre d'execution reel d'une requete SQL
  • Connaitre les types de JOIN et leurs cas d'usage
  • Configurer un environnement PostgreSQL pour la pratique
  • Identifier les anti-patterns SQL les plus courants

Le SQL est le langage universel de la donnee. Meme avec l'essor du NoSQL, plus de 80% des donnees structurees en entreprise restent dans des bases relationnelles. Maitriser le SQL avance, c'est maitriser l'acces a la majorite des donnees du monde.

OLTP vs OLAP : Deux Mondes Differents

Avant de plonger dans le SQL avance, il est crucial de comprendre les deux grandes familles de systemes de bases de donnees. Chacune a des objectifs, des schemas et des patterns de requetes radicalement differents.

Concept Cle

OLTP (Online Transaction Processing) : Optimise pour les operations transactionnelles rapides (INSERT, UPDATE, DELETE). Pense "application web" - chaque clic d'utilisateur genere une transaction.

OLAP (Online Analytical Processing) : Optimise pour l'analyse de gros volumes de donnees. Pense "rapport de ventes annuel" - peu de requetes mais qui scannent des millions de lignes.

CritereOLTPOLAP
ObjectifTransactions rapidesAnalyse et reporting
OperationsINSERT, UPDATE, DELETESELECT complexes, agregations
Volume par requeteQuelques lignesMillions de lignes
UtilisateursMilliers (clients, apps)Dizaines (analystes, BI)
SchemaNormalise (3NF)Denormalise (Star, Snowflake)
LatenceMillisecondesSecondes a minutes
ExemplesPostgreSQL, MySQL, SQL ServerSnowflake, BigQuery, Redshift
Taille typiqueGo a ToTo a Po
ConcurrenceHaute (milliers de sessions)Basse (dizaines de sessions)
Architecture OLTP vs OLAP
  OLTP (Transactionnel)                    OLAP (Analytique)
  =====================                    ==================

  [App Web] [App Mobile] [API]            [Tableau] [PowerBI] [Jupyter]
      |          |         |                   |         |         |
      v          v         v                   v         v         v
  +---------------------------+           +---------------------------+
  |    Load Balancer / Pool   |           |      Query Engine         |
  +---------------------------+           +---------------------------+
      |          |         |                          |
      v          v         v                          v
  +---------------------------+           +---------------------------+
  |      PostgreSQL           |    ETL    |       Snowflake           |
  |   Schema Normalise (3NF) | --------> |   Schema en Etoile        |
  |   Tables: users, orders  |   Nuit    |   Faits + Dimensions      |
  |   Index B-tree partout   |           |   Colonnar Storage        |
  +---------------------------+           +---------------------------+
       ~ ms par requete                        ~ secondes par requete
       ~ 10,000 req/sec                        ~ 50 req en parallele

L'Ordre d'Execution SQL : Ce Que Vous Devez Savoir

L'une des erreurs les plus frequentes en SQL est de penser que la requete s'execute dans l'ordre ou elle est ecrite. En realite, le moteur SQL suit un ordre logique tres different. Comprendre cet ordre est fondamental pour ecrire des requetes correctes et performantes.

Ordre d'Execution Reel d'une Requete SQL
   Ordre d'ECRITURE:              Ordre d'EXECUTION:
   ==================             ===================

   SELECT    (5e)                 1. FROM / JOIN     -- Determine les tables
   FROM      (1er)                2. WHERE            -- Filtre les lignes
   WHERE     (2e)                 3. GROUP BY         -- Regroupe les lignes
   GROUP BY  (3e)                 4. HAVING           -- Filtre les groupes
   HAVING    (4e)                 5. SELECT           -- Projette les colonnes
   ORDER BY  (6e)                 6. ORDER BY         -- Trie le resultat
   LIMIT     (7e)                 7. LIMIT / OFFSET   -- Limite le resultat

   +-----------+
   | 1. FROM   |  "De quelles tables je lis ?"
   +-----+-----+
         |
   +-----v-----+
   | 2. WHERE  |  "Quelles lignes je garde ?"
   +-----+-----+
         |
   +-----v-------+
   | 3. GROUP BY |  "Comment je regroupe ?"
   +-----+-------+
         |
   +-----v-----+
   | 4. HAVING |  "Quels groupes je garde ?"
   +-----+-----+
         |
   +-----v-----+
   | 5. SELECT |  "Quelles colonnes j'affiche ?"
   +-----+-----+
         |
   +-----v-------+
   | 6. ORDER BY |  "Dans quel ordre ?"
   +-----+-------+
         |
   +-----v-----+
   | 7. LIMIT  |  "Combien de lignes ?"
   +-----------+

Consequence Pratique

Vous ne pouvez PAS utiliser un alias defini dans le SELECT dans la clause WHERE, car WHERE s'execute AVANT SELECT. Exemple : SELECT price * 0.8 AS discounted FROM products WHERE discounted > 100 ne fonctionnera PAS. Utilisez une sous-requete ou un CTE a la place.

SQL
-- ERREUR : l'alias 'discounted' n'existe pas encore au moment du WHERE
SELECT price * 0.8 AS discounted
FROM products
WHERE discounted > 100;  -- ERREUR !

-- SOLUTION 1 : repeter l'expression
SELECT price * 0.8 AS discounted
FROM products
WHERE price * 0.8 > 100;

-- SOLUTION 2 : utiliser un CTE (plus lisible)
WITH priced AS (
    SELECT *, price * 0.8 AS discounted
    FROM products
)
SELECT * FROM priced WHERE discounted > 100;

-- SOLUTION 3 : sous-requete dans FROM
SELECT * FROM (
    SELECT *, price * 0.8 AS discounted
    FROM products
) sub
WHERE discounted > 100;

Les Types de JOIN en Detail

Les JOIN sont le coeur du SQL relationnel. Un Data Architect doit maitriser chaque type et savoir quand l'utiliser.

Diagramme des JOIN SQL
  Table A          Table B              Resultat
  +-----+         +-----+
  | a1  |         | b1  |
  | a2  +---------+ b2  |    INNER JOIN      : a2-b2, a3-b3
  | a3  +---------+ b3  |    LEFT JOIN       : a1-NULL, a2-b2, a3-b3
  +-----+         | b4  |    RIGHT JOIN      : a2-b2, a3-b3, NULL-b4
                  +-----+    FULL OUTER JOIN  : a1-NULL, a2-b2, a3-b3, NULL-b4
                              CROSS JOIN       : a1-b1, a1-b2, a1-b3, a1-b4,
                                                 a2-b1, a2-b2, ... (12 lignes)

  LEFT ANTI JOIN (lignes de A sans correspondance dans B):
  SELECT a.* FROM A LEFT JOIN B ON a.id = b.id WHERE b.id IS NULL;
  Resultat: a1

  SEMI JOIN (lignes de A avec au moins une correspondance dans B):
  SELECT a.* FROM A WHERE EXISTS (SELECT 1 FROM B WHERE B.id = A.id);
  Resultat: a2, a3
SQL
-- INNER JOIN : Clients avec des commandes
SELECT c.name, o.order_date, o.total
FROM customers c
INNER JOIN orders o ON c.id = o.customer_id;

-- LEFT JOIN : Tous les clients, meme sans commande
SELECT c.name, COALESCE(COUNT(o.id), 0) AS nb_orders
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
GROUP BY c.name;

-- LEFT ANTI JOIN : Clients qui n'ont JAMAIS commande
SELECT c.name, c.email
FROM customers c
LEFT JOIN orders o ON c.id = o.customer_id
WHERE o.id IS NULL;

-- SELF JOIN : Employes et leurs managers
SELECT e.name AS employe, m.name AS manager
FROM employees e
LEFT JOIN employees m ON e.manager_id = m.id;

-- CROSS JOIN : Toutes les combinaisons (utile pour calendriers)
SELECT d.date, p.product_name
FROM dates d
CROSS JOIN products p;

-- FULL OUTER JOIN : Union complete avec correspondances
SELECT COALESCE(a.id, b.id) AS id,
       a.value AS value_a,
       b.value AS value_b
FROM table_a a
FULL OUTER JOIN table_b b ON a.id = b.id;

Performance des JOINs

Le moteur SQL choisit parmi 3 algorithmes de JOIN selon la taille des tables :
Nested Loop : Optimal pour petites tables ou quand un index existe (O(n*m) pire cas)
Hash Join : Optimal pour grosses tables sans index (O(n+m), mais couteux en memoire)
Merge Join : Optimal quand les deux tables sont deja triees (O(n+m))

Verifiez toujours avec EXPLAIN ANALYZE quel algorithme est utilise !

Cas d'Etude : Comment StackOverflow Gere 50M Requetes/Jour

StackOverflow - SQL Server a Echelle Massive

StackOverflow est l'un des sites les plus visites au monde pour les developpeurs, avec des statistiques impressionnantes :

  • 50+ millions de requetes SQL par jour sur seulement 2 serveurs SQL Server
  • 1.3 milliard de pages vues par mois avec un temps de reponse moyen de 18ms
  • Base de donnees principale de ~700 Go avec des tables de 50M+ lignes (Posts, Votes)
  • Equipe DBA de seulement 2 personnes

Les secrets de leur performance :

  • Index cibles : Pas d'over-indexing. Chaque index est justifie par des patterns de requetes reels, surveilles via les DMV (Dynamic Management Views) de SQL Server
  • Pas d'ORM pour les requetes critiques : Dapper (micro-ORM) au lieu d'Entity Framework pour garder le controle du SQL genere
  • Mise en cache agressive : Redis devant SQL Server, avec invalidation intelligente
  • Denormalisation strategique : Le champ Score sur la table Posts est pre-calcule (pas de COUNT a chaque affichage)
  • OLTP pur : L'analytique est faite sur un replica en lecture seule, jamais sur la base primaire

Lecon : On n'a pas besoin de technologies complexes. Un SQL bien ecrit avec de bons index sur du hardware adapte peut aller tres loin. StackOverflow prouve qu'une architecture simple et maitrisee surpasse souvent une architecture distribuee mal comprise.

Anti-Pattern : SELECT * en Production

SELECT * : Le Tueur Silencieux de Performance

L'utilisation de SELECT * en production est l'anti-pattern le plus repandu et le plus couteux. Voici pourquoi :

  • Bande passante gaspillee : Si votre table a 50 colonnes et que vous n'en avez besoin que de 3, vous transferez ~16x trop de donnees
  • Index covering impossible : Avec SELECT *, le moteur doit TOUJOURS faire un "heap lookup" pour aller chercher les colonnes manquantes dans l'index
  • Bugs silencieux : Quand quelqu'un ajoute une colonne a la table, votre application recoit des donnees inattendues
  • Memoire serveur : Le buffer pool se remplit de donnees inutiles, evictant les donnees utiles du cache
SQL - Impact Mesurable
-- MAUVAIS : SELECT * sur une table avec 40 colonnes (dont 3 colonnes TEXT)
SELECT * FROM orders WHERE status = 'pending';
-- Temps: 2.4 secondes | I/O: 45,000 pages lues | Memoire: 180 Mo

-- BON : Seulement les colonnes necessaires
SELECT order_id, customer_id, total, created_at
FROM orders WHERE status = 'pending';
-- Temps: 0.12 secondes | I/O: 3,200 pages lues | Memoire: 12 Mo

-- ENCORE MIEUX : Avec un index covering
CREATE INDEX idx_orders_status_covering
ON orders (status) INCLUDE (order_id, customer_id, total, created_at);

SELECT order_id, customer_id, total, created_at
FROM orders WHERE status = 'pending';
-- Temps: 0.008 secondes | I/O: 85 pages lues | Memoire: 0.8 Mo

Regle d'or : En production, listez TOUJOURS explicitement vos colonnes. Reservez SELECT * au developpement et a l'exploration.

Configuration de l'Environnement PostgreSQL

Pour ce module, nous utiliserons PostgreSQL, la base de donnees open-source la plus avancee. Voici comment configurer votre environnement.

Bash - Installation
# Option 1 : Docker (recommande pour la formation)
docker run --name pg-training \
  -e POSTGRES_PASSWORD=training \
  -e POSTGRES_DB=dataarchitect \
  -p 5432:5432 \
  -d postgres:16

# Option 2 : Installation native
# macOS: brew install postgresql@16
# Ubuntu: sudo apt install postgresql-16
# Windows: Telecharger depuis postgresql.org

# Connexion
psql -h localhost -U postgres -d dataarchitect

# Verification de la version
SELECT version();
-- PostgreSQL 16.x ...
SQL - Creation du Schema de Pratique
-- Schema e-commerce pour les exercices du module
CREATE SCHEMA IF NOT EXISTS ecommerce;
SET search_path TO ecommerce;

CREATE TABLE customers (
    customer_id   SERIAL PRIMARY KEY,
    first_name    VARCHAR(50) NOT NULL,
    last_name     VARCHAR(50) NOT NULL,
    email         VARCHAR(100) UNIQUE NOT NULL,
    city          VARCHAR(50),
    country       VARCHAR(50) DEFAULT 'France',
    signup_date   DATE NOT NULL DEFAULT CURRENT_DATE,
    segment       VARCHAR(20) CHECK (segment IN ('Bronze','Silver','Gold','Platinum'))
);

CREATE TABLE categories (
    category_id   SERIAL PRIMARY KEY,
    name          VARCHAR(50) NOT NULL,
    parent_id     INT REFERENCES categories(category_id)
);

CREATE TABLE products (
    product_id    SERIAL PRIMARY KEY,
    name          VARCHAR(100) NOT NULL,
    category_id   INT REFERENCES categories(category_id),
    price         DECIMAL(10,2) NOT NULL CHECK (price > 0),
    stock         INT NOT NULL DEFAULT 0,
    created_at    TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    order_id      SERIAL PRIMARY KEY,
    customer_id   INT REFERENCES customers(customer_id),
    order_date    DATE NOT NULL DEFAULT CURRENT_DATE,
    status        VARCHAR(20) DEFAULT 'pending'
                  CHECK (status IN ('pending','confirmed','shipped','delivered','cancelled')),
    total         DECIMAL(12,2)
);

CREATE TABLE order_items (
    item_id       SERIAL PRIMARY KEY,
    order_id      INT REFERENCES orders(order_id),
    product_id    INT REFERENCES products(product_id),
    quantity      INT NOT NULL CHECK (quantity > 0),
    unit_price    DECIMAL(10,2) NOT NULL
);

-- Index de base pour les requetes frequentes
CREATE INDEX idx_orders_customer ON orders(customer_id);
CREATE INDEX idx_orders_date ON orders(order_date);
CREATE INDEX idx_order_items_order ON order_items(order_id);
CREATE INDEX idx_products_category ON products(category_id);

Scenario : Votre Premier Jour en Tant que Data Architect

Vous venez d'etre embauche comme Data Architect dans une startup e-commerce. Le CTO vous dit : "Notre base PostgreSQL rame, les pages produit mettent 3 secondes a charger, et nos rapports du lundi matin bloquent le site pendant 1 heure."

Questions a vous poser :

  • Les rapports du lundi (OLAP) tournent-ils sur la meme base que le site (OLTP) ? Si oui, c'est le premier probleme.
  • Les requetes du site utilisent-elles SELECT * ? Probablement.
  • Y a-t-il un systeme de cache (Redis) devant la base ? Probablement pas.
  • Les index sont-ils adaptes aux requetes reelles ? A verifier avec pg_stat_user_indexes.

Ce module vous donnera toutes les competences pour resoudre ces problemes systematiquement.

Flashcards - Concepts Cles

Quelle est la difference principale entre OLTP et OLAP ?
OLTP est optimise pour les transactions rapides (INSERT/UPDATE/DELETE sur quelques lignes), tandis que OLAP est optimise pour l'analyse de gros volumes (SELECT avec agregations sur des millions de lignes). OLTP utilise un schema normalise, OLAP un schema denormalise (etoile/flocon).
Quel est l'ordre reel d'execution d'une requete SQL ?
FROM/JOIN -> WHERE -> GROUP BY -> HAVING -> SELECT -> ORDER BY -> LIMIT/OFFSET. C'est pourquoi on ne peut pas utiliser un alias du SELECT dans le WHERE.
Quels sont les 3 algorithmes de JOIN et quand sont-ils utilises ?
Nested Loop : petites tables ou avec index. Hash Join : grosses tables sans index (couteux en memoire). Merge Join : tables deja triees sur la cle de jointure. Le moteur choisit automatiquement via le plan d'execution.
Pourquoi SELECT * est-il un anti-pattern en production ?
Il gaspille la bande passante (transfert de colonnes inutiles), empeche les index covering (force un heap lookup), cause des bugs silencieux si la table change, et pollue le buffer pool (cache) avec des donnees inutiles.

Window Functions - Classement

60 min Intermediaire

Les Window Functions sont l'une des fonctionnalites les plus puissantes du SQL moderne. Elles permettent de faire des calculs sur un ensemble de lignes liees a la ligne courante, sans les regrouper. Pensez-y comme un GROUP BY qui ne collapse pas les lignes.

Objectifs d'apprentissage

  • Comprendre la syntaxe OVER(PARTITION BY ... ORDER BY ...)
  • Maitriser ROW_NUMBER, RANK, DENSE_RANK et NTILE
  • Savoir choisir la bonne fonction de classement selon le besoin
  • Appliquer les window functions dans des scenarios reels

Les Window Functions ont ete introduites dans SQL:2003, mais beaucoup de developpeurs les ignorent encore. C'est pourtant LA fonctionnalite qui separe un utilisateur SQL intermediaire d'un expert. Maitrisez-les et vous diviserez la complexite de vos requetes par 2.

Anatomie d'une Window Function

Chaque window function suit la meme structure syntaxique. Comprendre cette structure est la cle pour maitriser toutes les variantes.

Structure d'une Window Function
  fonction(arguments) OVER (
      PARTITION BY colonne(s)     -- Optionnel : decoupe en groupes
      ORDER BY colonne(s)        -- Optionnel : ordre dans chaque groupe
      frame_clause               -- Optionnel : fenetre de lignes
  )

  Exemple concret :
  +-------------------------------------------------+
  | RANK() OVER (PARTITION BY dept ORDER BY salary DESC) |
  +-------------------------------------------------+
       |              |                    |
       v              v                    v
   Fonction     Decoupage par        Tri dans chaque
   de rang      departement          departement (desc)

  Visualisation sur les donnees :
  +--------+--------+--------+------+
  | dept   | name   | salary | RANK |
  +--------+--------+--------+------+
  | Sales  | Alice  | 80000  |  1   |  <- Partition "Sales"
  | Sales  | Bob    | 70000  |  2   |
  | Sales  | Carol  | 70000  |  2   |  <- Meme salaire = meme rang
  | Sales  | Dave   | 60000  |  4   |  <- Rang 3 saute !
  +--------+--------+--------+------+
  | IT     | Eve    | 95000  |  1   |  <- Partition "IT" (repart de 1)
  | IT     | Frank  | 85000  |  2   |
  +--------+--------+--------+------+

Les 4 Fonctions de Classement

ROW_NUMBER() - Numero Unique

Attribue un numero sequentiel unique a chaque ligne dans sa partition. Meme en cas d'egalite, chaque ligne recoit un numero different (le choix est arbitraire pour les ex aequo).

SQL
-- Numerotation simple de toutes les commandes par client
SELECT
    customer_id,
    order_id,
    order_date,
    total,
    ROW_NUMBER() OVER (
        PARTITION BY customer_id
        ORDER BY order_date DESC
    ) AS order_rank
FROM orders;

-- Resultat :
-- customer_id | order_id | order_date | total  | order_rank
-- 1           | 1052     | 2024-12-15 | 299.99 | 1
-- 1           | 1023     | 2024-11-20 | 149.50 | 2
-- 1           | 987      | 2024-10-05 | 89.99  | 3
-- 2           | 1061     | 2024-12-18 | 450.00 | 1   <- repart de 1
-- 2           | 1044     | 2024-12-01 | 175.25 | 2

Astuce : Deduplication avec ROW_NUMBER

ROW_NUMBER est l'outil ideal pour supprimer les doublons. Gardez seulement la ligne avec row_number = 1 :

SQL - Deduplication
-- Garder seulement la commande la plus recente par client
WITH ranked AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY customer_id
            ORDER BY order_date DESC
        ) AS rn
    FROM orders
)
SELECT * FROM ranked WHERE rn = 1;

-- Supprimer les vrais doublons d'une table
DELETE FROM customers
WHERE customer_id IN (
    SELECT customer_id FROM (
        SELECT customer_id,
            ROW_NUMBER() OVER (
                PARTITION BY email ORDER BY customer_id
            ) AS rn
        FROM customers
    ) dupes
    WHERE rn > 1
);

RANK() vs DENSE_RANK() - Gestion des Ex Aequo

La difference entre RANK et DENSE_RANK est subtile mais importante : RANK laisse des "trous" apres les ex aequo, DENSE_RANK ne laisse pas de trous.

SQL
-- Comparaison des 3 fonctions de classement
SELECT
    name,
    department,
    salary,
    ROW_NUMBER() OVER (ORDER BY salary DESC) AS row_num,
    RANK()       OVER (ORDER BY salary DESC) AS rank_val,
    DENSE_RANK() OVER (ORDER BY salary DESC) AS dense_rank_val
FROM employees;

-- Resultat :
-- name    | salary | row_num | rank_val | dense_rank_val
-- Alice   | 95000  | 1       | 1        | 1
-- Bob     | 85000  | 2       | 2        | 2
-- Carol   | 85000  | 3       | 2        | 2     <- ex aequo
-- Dave    | 75000  | 4       | 4        | 3     <- RANK=4, DENSE=3 !
-- Eve     | 70000  | 5       | 5        | 4
-- Frank   | 70000  | 6       | 5        | 4     <- ex aequo
-- Grace   | 60000  | 7       | 7        | 5     <- RANK=7, DENSE=5 !

Utilisez RANK quand...

  • Vous faites un classement sportif (le 3e est 3e meme si 2 personnes sont 1eres, il n'y a pas de "2e")
  • Vous voulez montrer la "position reelle" dans le classement
  • Le nombre total de rangs doit correspondre au nombre de lignes

Utilisez DENSE_RANK quand...

  • Vous cherchez le "Top N valeurs distinctes" (top 3 salaires differents)
  • Vous ne voulez pas de trous dans la numerotation
  • Vous groupez par niveaux (1er niveau, 2e niveau...)

NTILE(n) - Decoupage en Quantiles

NTILE divise les lignes en N groupes de taille (presque) egale. Indispensable pour creer des quartiles, deciles, percentiles.

SQL
-- Diviser les clients en 4 quartiles selon leur CA total
SELECT
    c.customer_id,
    c.first_name,
    c.last_name,
    SUM(o.total) AS total_spent,
    NTILE(4) OVER (ORDER BY SUM(o.total) DESC) AS quartile
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.first_name, c.last_name;

-- Resultat :
-- customer_id | name     | total_spent | quartile
-- 42          | Marie    | 12500.00    | 1        <- Top 25%
-- 17          | Jean     | 9800.00     | 1
-- 8           | Pierre   | 7200.00     | 2        <- 25-50%
-- 31          | Sophie   | 5100.00     | 2
-- 55          | Luc      | 3400.00     | 3        <- 50-75%
-- 12          | Claire   | 1200.00     | 4        <- Bottom 25%

-- Segmentation marketing basee sur les quartiles
WITH customer_quartiles AS (
    SELECT
        c.customer_id,
        c.first_name || ' ' || c.last_name AS name,
        SUM(o.total) AS total_spent,
        NTILE(4) OVER (ORDER BY SUM(o.total) DESC) AS quartile
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    GROUP BY c.customer_id, c.first_name, c.last_name
)
SELECT
    name,
    total_spent,
    CASE quartile
        WHEN 1 THEN 'Platinum - VIP (remise 20%)'
        WHEN 2 THEN 'Gold - Fidele (remise 10%)'
        WHEN 3 THEN 'Silver - Regulier (remise 5%)'
        WHEN 4 THEN 'Bronze - Occasionnel (offre bienvenue)'
    END AS segment
FROM customer_quartiles;

Cas d'Etude : Comment Spotify Classe les Top 50 par Pays

Spotify - Classement Musical avec RANK()

Spotify genere des classements "Top 50" pour chaque pays, chaque jour. Avec 180+ pays et des milliards de streams, comment font-ils ?

Le probleme :

  • Des milliards d'evenements de streaming par jour
  • Classements par pays, par genre, par playlist
  • Besoin de gerer les ex aequo (2 chansons avec le meme nombre de streams)
  • Mise a jour toutes les heures

L'approche simplifiee (conceptuelle) :

SQL - Logique Spotify Simplifiee
-- Etape 1 : Agreger les streams par pays et par chanson
WITH daily_streams AS (
    SELECT
        country_code,
        track_id,
        COUNT(*) AS stream_count
    FROM stream_events
    WHERE event_date = CURRENT_DATE
    GROUP BY country_code, track_id
),
-- Etape 2 : Classer par pays
ranked AS (
    SELECT
        country_code,
        track_id,
        stream_count,
        RANK() OVER (
            PARTITION BY country_code
            ORDER BY stream_count DESC
        ) AS chart_position
    FROM daily_streams
)
-- Etape 3 : Garder le Top 50
SELECT r.country_code, t.title, t.artist, r.stream_count, r.chart_position
FROM ranked r
JOIN tracks t ON r.track_id = t.track_id
WHERE r.chart_position <= 50
ORDER BY r.country_code, r.chart_position;

Pourquoi RANK et pas DENSE_RANK ? Spotify utilise RANK car si 2 chansons sont en position 5, la suivante est en position 7 (pas 6). Cela reflète mieux le classement reel : il y a bien 6 chansons devant la 7e.

En realite, Spotify utilise un pipeline Spark/Kafka qui pre-agrege les streams en temps reel, mais la logique SQL sous-jacente est identique. Le pattern PARTITION BY + ORDER BY + RANK est universel.

Anti-Pattern : GROUP BY au Lieu de Window Functions

Quand GROUP BY Devient un Piege

Beaucoup de developpeurs utilisent GROUP BY + sous-requetes pour des problemes que les Window Functions resolvent en une seule passe :

SQL - MAUVAIS : Sous-requetes correlees
-- ANTI-PATTERN : Trouver le salaire max par departement avec les details
-- Cette requete fait N+1 lectures de la table !
SELECT e.*
FROM employees e
WHERE e.salary = (
    SELECT MAX(salary)
    FROM employees e2
    WHERE e2.department = e.department
);
-- Probleme : sous-requete correlee executee pour CHAQUE ligne
-- Sur 100K employes avec 50 departements = 100K executions de sous-requete
SQL - BON : Window Function en une seule passe
-- CORRECT : Une seule lecture de la table
WITH ranked AS (
    SELECT *,
        RANK() OVER (PARTITION BY department ORDER BY salary DESC) AS rk
    FROM employees
)
SELECT * FROM ranked WHERE rk = 1;
-- La table est lue UNE SEULE FOIS
-- Performance : 10x a 100x plus rapide sur de gros volumes

Regle : Si vous ecrivez une sous-requete correlee dans le WHERE qui reference la requete externe, demandez-vous toujours si une Window Function ne serait pas plus adaptee.

Scenario Pratique : Top 3 Produits par Categorie

Votre Manager Demande : "Quels sont les 3 produits les plus vendus par categorie ?"

Contexte : Vous travaillez chez un e-commerΓ§ant. Le directeur marketing veut savoir les 3 best-sellers de chaque categorie pour les mettre en avant sur la homepage.

Contraintes :

  • Si 2 produits ont le meme nombre de ventes, les afficher tous les deux
  • Afficher le CA genere par chaque produit
  • Ne considerer que les commandes des 90 derniers jours
SQL - Solution
WITH product_sales AS (
    SELECT
        cat.name AS category_name,
        p.name AS product_name,
        SUM(oi.quantity) AS total_sold,
        SUM(oi.quantity * oi.unit_price) AS revenue,
        DENSE_RANK() OVER (
            PARTITION BY cat.category_id
            ORDER BY SUM(oi.quantity) DESC
        ) AS sales_rank
    FROM order_items oi
    JOIN orders o ON oi.order_id = o.order_id
    JOIN products p ON oi.product_id = p.product_id
    JOIN categories cat ON p.category_id = cat.category_id
    WHERE o.order_date >= CURRENT_DATE - INTERVAL '90 days'
      AND o.status != 'cancelled'
    GROUP BY cat.category_id, cat.name, p.product_id, p.name
)
SELECT
    category_name,
    product_name,
    total_sold,
    TO_CHAR(revenue, '999,999.99') AS revenue_formatted,
    sales_rank
FROM product_sales
WHERE sales_rank <= 3
ORDER BY category_name, sales_rank;

Pourquoi DENSE_RANK ici ? Parce que si 2 produits sont numero 2, on veut quand meme voir le produit numero 3 (pas qu'il soit saute). DENSE_RANK garantit qu'on a bien 3 niveaux de ventes differents.

Exercices Pratiques

Exercices - Window Functions de Classement

Exercice 1 : Classement des clients par CA

Ecrivez une requete qui affiche chaque client avec son CA total et son rang parmi tous les clients. Utilisez RANK(). Affichez aussi le percentile avec NTILE(100).

Exercice 2 : Deduplication

La table customers contient des doublons (meme email). Ecrivez une requete qui identifie les doublons en gardant le customer_id le plus ancien, et qui retourne les IDs a supprimer.

Exercice 3 : Top N par Groupe

Affichez les 2 commandes les plus cheres par mois (sur les 12 derniers mois). Incluez le mois, le customer_id, le montant, et le rang.

Flashcards

Quelle est la difference entre ROW_NUMBER et RANK ?
ROW_NUMBER attribue un numero unique a chaque ligne (pas d'ex aequo). RANK attribue le meme rang aux ex aequo et laisse des trous (1, 2, 2, 4). Utilisez ROW_NUMBER pour la deduplication, RANK pour les classements.
Quand utiliser NTILE plutot que RANK ?
NTILE divise en N groupes de taille egale (quartiles, deciles). RANK classe par valeur. Utilisez NTILE pour la segmentation (top 25%, bottom 25%) et RANK pour le classement precis (1er, 2e, 3e...).
Comment trouver le "Top N par groupe" en SQL ?
Utilisez un CTE avec RANK() ou DENSE_RANK() OVER (PARTITION BY groupe ORDER BY critere DESC), puis filtrez WHERE rank <= N dans la requete externe. Ne jamais utiliser une sous-requete correlee pour cela.

Window Functions - Agregation et Frames

60 min Intermediaire

Apres les fonctions de classement, decouvrons les fonctions d'agregation fenΓͺtrees et les fonctions de navigation (LAG, LEAD). Ces outils sont indispensables pour l'analyse temporelle, la detection d'anomalies et les tableaux de bord.

Objectifs d'apprentissage

  • Maitriser LAG et LEAD pour comparer des lignes consecutives
  • Utiliser SUM/AVG/COUNT OVER pour des agregations fenΓͺtrees
  • Comprendre les frames ROWS vs RANGE
  • Calculer des moyennes mobiles et des cumuls
  • Appliquer ces techniques a des cas concrets

LAG et LEAD : Regarder en Arriere et en Avant

LAG et LEAD permettent d'acceder a la valeur d'une ligne precedente (LAG) ou suivante (LEAD) dans la partition, sans avoir besoin d'un self-join.

Fonctionnement de LAG et LEAD
  Donnees ordonnees par date :

  Ligne   | date       | ventes | LAG(ventes,1) | LEAD(ventes,1)
  --------|------------|--------|---------------|----------------
  1       | 2024-01    | 100    | NULL          | 150
  2       | 2024-02    | 150    | 100           | 120
  3       | 2024-03    | 120    | 150           | 200
  4       | 2024-04    | 200    | 120           | 180
  5       | 2024-05    | 180    | 200           | NULL

           LAG regarde     ^                 ^    LEAD regarde
           EN ARRIERE      |                 |    EN AVANT
                    (ligne precedente)  (ligne suivante)

  Syntaxe complete :
  LAG(colonne, offset, valeur_par_defaut) OVER (PARTITION BY ... ORDER BY ...)
       |         |            |
       v         v            v
  Quelle     Combien de   Valeur si NULL
  colonne    lignes avant  (debut/fin)
SQL - LAG et LEAD
-- Evolution mensuelle du CA avec variation
SELECT
    DATE_TRUNC('month', order_date) AS mois,
    SUM(total) AS ca_mensuel,
    LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)) AS ca_mois_prec,
    ROUND(
        (SUM(total) - LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)))
        / LAG(SUM(total), 1) OVER (ORDER BY DATE_TRUNC('month', order_date)) * 100
    , 1) AS variation_pct
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY mois;

-- Resultat :
-- mois       | ca_mensuel | ca_mois_prec | variation_pct
-- 2024-01-01 | 45000.00   | NULL         | NULL
-- 2024-02-01 | 52000.00   | 45000.00     | 15.6
-- 2024-03-01 | 48000.00   | 52000.00     | -7.7
-- 2024-04-01 | 61000.00   | 48000.00     | 27.1

-- Comparaison Year-over-Year (LAG avec offset de 12 mois)
SELECT
    DATE_TRUNC('month', order_date) AS mois,
    SUM(total) AS ca_mensuel,
    LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)) AS ca_meme_mois_an_prec,
    ROUND(
        (SUM(total) - LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)))
        / NULLIF(LAG(SUM(total), 12) OVER (ORDER BY DATE_TRUNC('month', order_date)), 0) * 100
    , 1) AS yoy_growth_pct
FROM orders
WHERE status != 'cancelled'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY mois;

-- Detection de chute soudaine avec LEAD
SELECT
    order_date,
    daily_revenue,
    LEAD(daily_revenue, 1) OVER (ORDER BY order_date) AS next_day_revenue,
    CASE
        WHEN LEAD(daily_revenue, 1) OVER (ORDER BY order_date) < daily_revenue * 0.5
        THEN 'ALERTE: Chute >50% le lendemain !'
        ELSE 'Normal'
    END AS alert
FROM (
    SELECT order_date, SUM(total) AS daily_revenue
    FROM orders
    GROUP BY order_date
) daily;

Cas d'Etude : Detection de Fraude Bancaire avec LAG/LEAD

Detection de Fraude en Temps Reel - Banque BNP Paribas (Approche Simplifiee)

Les systemes de detection de fraude bancaire utilisent massivement LAG/LEAD pour identifier des patterns suspects. Voici les regles typiques :

  • Regle 1 : Transaction > 5x le montant moyen des 10 dernieres transactions
  • Regle 2 : 2 transactions a plus de 500km de distance en moins de 30 minutes
  • Regle 3 : Montant croissant sur 5 transactions consecutives (pattern "test de carte")
SQL - Detection de Fraude
-- Regle 1 : Transaction anormalement elevee
WITH transaction_context AS (
    SELECT
        t.transaction_id,
        t.account_id,
        t.amount,
        t.transaction_time,
        t.merchant_city,
        LAG(t.amount, 1) OVER w AS prev_amount,
        LAG(t.transaction_time, 1) OVER w AS prev_time,
        LAG(t.merchant_city, 1) OVER w AS prev_city,
        AVG(t.amount) OVER (
            PARTITION BY t.account_id
            ORDER BY t.transaction_time
            ROWS BETWEEN 10 PRECEDING AND 1 PRECEDING
        ) AS avg_last_10
    FROM transactions t
    WINDOW w AS (PARTITION BY t.account_id ORDER BY t.transaction_time)
)
SELECT
    transaction_id,
    account_id,
    amount,
    avg_last_10,
    ROUND(amount / NULLIF(avg_last_10, 0), 1) AS ratio,
    CASE
        WHEN amount > avg_last_10 * 5 THEN 'FRAUDE POTENTIELLE - Montant anormal'
        WHEN EXTRACT(EPOCH FROM (transaction_time - prev_time)) < 1800
             AND prev_city != merchant_city THEN 'FRAUDE POTENTIELLE - Deplacement impossible'
        ELSE 'OK'
    END AS fraud_flag
FROM transaction_context
WHERE amount > avg_last_10 * 5
   OR (EXTRACT(EPOCH FROM (transaction_time - prev_time)) < 1800
       AND prev_city != merchant_city);

-- Regle 3 : Pattern "test de carte" (montants croissants)
WITH consecutive_increases AS (
    SELECT
        account_id,
        transaction_id,
        amount,
        transaction_time,
        CASE WHEN amount > LAG(amount, 1) OVER w THEN 1 ELSE 0 END AS is_increase
    FROM transactions
    WINDOW w AS (PARTITION BY account_id ORDER BY transaction_time)
)
SELECT account_id, MIN(transaction_time), MAX(transaction_time),
       COUNT(*) AS streak_length
FROM (
    SELECT *,
        SUM(CASE WHEN is_increase = 0 THEN 1 ELSE 0 END) OVER (
            PARTITION BY account_id ORDER BY transaction_time
        ) AS grp
    FROM consecutive_increases
)
GROUP BY account_id, grp
HAVING COUNT(*) >= 5  -- 5+ transactions croissantes consecutives
ORDER BY streak_length DESC;

Impact reel : Ce type de requetes, executees en temps reel sur des flux Kafka/Flink, permet de bloquer des transactions frauduleuses en moins de 100ms. Les banques qui implementent ces regles reduisent la fraude de 60-80%.

Agregations FenΓͺtrees : SUM, AVG, COUNT OVER

Les fonctions d'agregation classiques (SUM, AVG, COUNT, MIN, MAX) deviennent extremement puissantes quand on les combine avec OVER. Contrairement a GROUP BY, elles ne reduisent pas le nombre de lignes.

SQL - Agregations FenΓͺtrees
-- Cumul progressif du CA (running total)
SELECT
    order_date,
    total,
    SUM(total) OVER (ORDER BY order_date) AS cumul_ca,
    COUNT(*) OVER (ORDER BY order_date) AS nb_commandes_cumul,
    AVG(total) OVER (ORDER BY order_date) AS panier_moyen_cumul
FROM orders
WHERE status = 'delivered'
ORDER BY order_date;

-- Resultat :
-- order_date | total  | cumul_ca  | nb_commandes | panier_moyen
-- 2024-01-02 | 150.00 | 150.00    | 1            | 150.00
-- 2024-01-03 | 200.00 | 350.00    | 2            | 175.00
-- 2024-01-03 | 75.00  | 425.00    | 3            | 141.67
-- 2024-01-05 | 300.00 | 725.00    | 4            | 181.25

-- Pourcentage du total par categorie
SELECT
    cat.name AS categorie,
    p.name AS produit,
    SUM(oi.quantity * oi.unit_price) AS ca_produit,
    SUM(SUM(oi.quantity * oi.unit_price)) OVER (
        PARTITION BY cat.category_id
    ) AS ca_categorie,
    ROUND(
        SUM(oi.quantity * oi.unit_price) * 100.0
        / SUM(SUM(oi.quantity * oi.unit_price)) OVER (PARTITION BY cat.category_id)
    , 1) AS pct_de_la_categorie
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
JOIN categories cat ON p.category_id = cat.category_id
GROUP BY cat.category_id, cat.name, p.product_id, p.name
ORDER BY categorie, ca_produit DESC;

ROWS vs RANGE : Les Frames de Fenetre

La clause frame definit exactement quelles lignes sont incluses dans le calcul de la window function. C'est ce qui permet les moyennes mobiles, les cumuls avec plage, etc.

ROWS vs RANGE - Visualisation
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
  =========================================
  Compte les LIGNES physiques (exactement 2 lignes avant)

  date       | valeur | SUM (ROWS 2 PREC)
  2024-01-01 | 10     | 10           (seulement ligne courante)
  2024-01-02 | 20     | 30           (10 + 20)
  2024-01-03 | 30     | 60           (10 + 20 + 30)
  2024-01-04 | 15     | 65           (20 + 30 + 15)  <- 10 est sorti
  2024-01-05 | 25     | 70           (30 + 15 + 25)


  RANGE BETWEEN 2 PRECEDING AND CURRENT ROW
  ==========================================
  Compte les VALEURS dans une plage (toutes les lignes dont ORDER BY
  value est dans [current - 2, current])

  value | data | SUM (RANGE 2 PREC)
  1     | A    | A                    (valeurs 1 a 1 : juste A)
  2     | B    | A+B                  (valeurs 0 a 2 : A, B)
  2     | C    | A+B+C                (valeurs 0 a 2 : A, B, C)  <- B et C ont value=2
  3     | D    | A+B+C+D              (valeurs 1 a 3 : A, B, C, D)
  5     | E    | D+E                  (valeurs 3 a 5 : juste D, E)


  Frames possibles :
  +-------------------------------------------------------+
  | ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW      | <- Running total
  | ROWS BETWEEN 6 PRECEDING AND CURRENT ROW              | <- Moyenne mobile 7j
  | ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING      | <- Cumul inverse
  | ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING              | <- Lissage 3 points
  | RANGE BETWEEN INTERVAL '7 days' PRECEDING AND ...     | <- Plage temporelle
  +-------------------------------------------------------+
SQL - Moyenne Mobile 7 Jours
-- Moyenne mobile sur 7 jours (methode ROWS - requiert 1 ligne par jour)
SELECT
    order_date,
    daily_revenue,
    ROUND(AVG(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ), 2) AS moving_avg_7d,
    ROUND(AVG(daily_revenue) OVER (
        ORDER BY order_date
        ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
    ), 2) AS moving_avg_30d
FROM (
    SELECT order_date, SUM(total) AS daily_revenue
    FROM orders
    WHERE status != 'cancelled'
    GROUP BY order_date
) daily
ORDER BY order_date;

-- Moyenne mobile avec RANGE et INTERVAL (gere les jours manquants)
SELECT
    order_date,
    daily_revenue,
    AVG(daily_revenue) OVER (
        ORDER BY order_date
        RANGE BETWEEN INTERVAL '6 days' PRECEDING AND CURRENT ROW
    ) AS moving_avg_7d_range
FROM (
    SELECT order_date, SUM(total) AS daily_revenue
    FROM orders GROUP BY order_date
) daily
ORDER BY order_date;

ROWS vs RANGE : Piege Courant

Si vous avez des jours sans commande, ROWS BETWEEN 6 PRECEDING donnera une fenetre de 7 LIGNES (qui peut couvrir plus de 7 jours). Utilisez RANGE BETWEEN INTERVAL '6 days' PRECEDING pour une vraie fenetre de 7 jours calendaires. Attention : RANGE avec INTERVAL n'est supporte qu'a partir de PostgreSQL 11+.

Scenario : Dashboard COVID - Moyenne Mobile 7 Jours

Calculer la Moyenne Mobile 7 Jours pour un Dashboard COVID

Contexte : Vous etes data analyst dans une ARS (Agence Regionale de Sante). On vous demande de creer le tableau de bord qui affiche la courbe de cas COVID avec une moyenne mobile sur 7 jours, comme ceux que l'on voyait quotidiennement a la television.

Donnees : Table covid_cases(report_date, department, new_cases, new_deaths, new_tests)

SQL - Dashboard COVID
-- Dashboard COVID avec moyennes mobiles et taux
WITH daily_stats AS (
    SELECT
        report_date,
        department,
        new_cases,
        new_deaths,
        new_tests,
        -- Moyenne mobile 7 jours des nouveaux cas
        ROUND(AVG(new_cases) OVER (
            PARTITION BY department
            ORDER BY report_date
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ), 1) AS avg_7d_cases,
        -- Moyenne mobile 7 jours des deces
        ROUND(AVG(new_deaths) OVER (
            PARTITION BY department
            ORDER BY report_date
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ), 1) AS avg_7d_deaths,
        -- Taux de positivite glissant 7 jours
        ROUND(
            SUM(new_cases) OVER (
                PARTITION BY department
                ORDER BY report_date
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ) * 100.0 / NULLIF(SUM(new_tests) OVER (
                PARTITION BY department
                ORDER BY report_date
                ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
            ), 0)
        , 2) AS taux_positivite_7d,
        -- Evolution par rapport a la semaine precedente
        LAG(new_cases, 7) OVER (
            PARTITION BY department
            ORDER BY report_date
        ) AS cases_7d_ago
    FROM covid_cases
)
SELECT
    report_date,
    department,
    new_cases,
    avg_7d_cases,
    avg_7d_deaths,
    taux_positivite_7d,
    CASE
        WHEN cases_7d_ago IS NULL THEN 'N/A'
        WHEN new_cases > cases_7d_ago * 1.5 THEN 'FORTE HAUSSE'
        WHEN new_cases > cases_7d_ago THEN 'Hausse'
        WHEN new_cases < cases_7d_ago * 0.5 THEN 'FORTE BAISSE'
        ELSE 'Baisse'
    END AS tendance
FROM daily_stats
ORDER BY department, report_date;

Points cles : La moyenne mobile 7 jours lisse les variations dues aux week-ends (moins de tests). Le taux de positivite est calcule sur une fenetre glissante pour etre statistiquement significatif. LAG(x, 7) compare au meme jour de la semaine precedente.

La Clause WINDOW : Eviter la Repetition

SQL - Named Windows
-- Au lieu de repeter OVER(...) 5 fois :
SELECT
    order_date,
    total,
    SUM(total)   OVER w AS cumul,
    AVG(total)   OVER w AS avg_running,
    COUNT(*)     OVER w AS count_running,
    MIN(total)   OVER w AS min_running,
    MAX(total)   OVER w AS max_running
FROM orders
WINDOW w AS (ORDER BY order_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
ORDER BY order_date;

-- On peut meme heriter d'une fenetre nommee
SELECT
    department,
    name,
    salary,
    RANK() OVER dept_sal AS rank_in_dept,
    SUM(salary) OVER dept AS total_dept_salary,
    ROUND(salary * 100.0 / SUM(salary) OVER dept, 1) AS pct_of_dept
FROM employees
WINDOW
    dept AS (PARTITION BY department),
    dept_sal AS (dept ORDER BY salary DESC)
ORDER BY department, salary DESC;

FIRST_VALUE, LAST_VALUE, NTH_VALUE

SQL
-- Comparer chaque employe au mieux et au moins bien paye de son dept
SELECT
    name,
    department,
    salary,
    FIRST_VALUE(name) OVER w AS top_earner,
    FIRST_VALUE(salary) OVER w AS top_salary,
    salary - FIRST_VALUE(salary) OVER w AS ecart_au_top,
    LAST_VALUE(name) OVER (
        PARTITION BY department
        ORDER BY salary DESC
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS lowest_earner
FROM employees
WINDOW w AS (PARTITION BY department ORDER BY salary DESC);

-- ATTENTION : LAST_VALUE a un piege !
-- Par defaut, la frame est ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
-- Donc LAST_VALUE retourne... la ligne courante !
-- Il FAUT specifier ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING

Piege LAST_VALUE

La frame par defaut est ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW. Avec LAST_VALUE, cela signifie que la "derniere valeur" est toujours la ligne courante ! Specifiez toujours explicitement ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING avec LAST_VALUE.

Flashcards

Quelle est la difference entre ROWS et RANGE dans une frame de fenetre ?
ROWS compte les lignes physiques (exactement N lignes avant/apres). RANGE considere les valeurs : toutes les lignes dont la valeur ORDER BY est dans la plage specifiee. RANGE inclut donc les doublons. Utilisez ROWS pour les moyennes mobiles par nombre de lignes, RANGE pour les plages de valeurs/dates.
Comment calculer un cumul progressif (running total) en SQL ?
SUM(colonne) OVER (ORDER BY date_col ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW). Le ROWS BETWEEN est optionnel ici car c'est la frame par defaut avec ORDER BY.
Quel piege courant existe avec LAST_VALUE ?
La frame par defaut est ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW, donc LAST_VALUE retourne toujours la ligne courante ! Il faut explicitement specifier ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING pour obtenir la vraie derniere valeur de la partition.
A quoi sert LAG(colonne, N, default) ?
LAG accede a la valeur N lignes AVANT la ligne courante dans la partition. Le 3e argument est la valeur par defaut si on depasse le debut de la partition (au lieu de NULL). Exemple : LAG(sales, 1, 0) retourne les ventes de la ligne precedente, ou 0 pour la premiere ligne.

CTEs et CTEs Recursives

60 min Intermediaire

Les Common Table Expressions (CTEs) sont l'outil numero 1 pour ecrire du SQL lisible et maintenable. Les CTEs recursives ajoutent la capacite de traverser des hierarchies et de generer des series, ouvrant des possibilites impossibles avec du SQL standard.

Objectifs d'apprentissage

  • Maitriser la syntaxe WITH pour structurer des requetes complexes
  • Chainer plusieurs CTEs pour decomposer la logique
  • Comprendre et ecrire des CTEs recursives
  • Traverser des hierarchies (organigrammes, categories)
  • Connaitre les limites de performance des CTEs

La regle d'or d'un bon Data Architect : si votre requete depasse 30 lignes, decomposez-la en CTEs. Chaque CTE doit avoir un nom explicite qui decrit ce qu'elle contient. Vos collegues (et votre futur vous) vous remercieront.

CTEs Simples : Le WITH Clause

Un CTE est une "table temporaire nommee" qui n'existe que le temps de la requete. Pensez-y comme une variable en programmation.

SQL - CTE Basique
-- Sans CTE : requete imbriquee difficile a lire
SELECT customer_id, name, total_spent
FROM (
    SELECT c.customer_id, c.first_name || ' ' || c.last_name AS name,
           SUM(o.total) AS total_spent
    FROM customers c JOIN orders o ON c.customer_id = o.customer_id
    WHERE o.status = 'delivered'
    GROUP BY c.customer_id, c.first_name, c.last_name
) sub
WHERE total_spent > (
    SELECT AVG(total_spent) FROM (
        SELECT customer_id, SUM(total) AS total_spent
        FROM orders WHERE status = 'delivered'
        GROUP BY customer_id
    ) avg_sub
);

-- Avec CTEs : clair, lisible, maintenable
WITH customer_spending AS (
    SELECT
        c.customer_id,
        c.first_name || ' ' || c.last_name AS name,
        SUM(o.total) AS total_spent
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    WHERE o.status = 'delivered'
    GROUP BY c.customer_id, c.first_name, c.last_name
),
average_spending AS (
    SELECT AVG(total_spent) AS avg_spent
    FROM customer_spending
)
SELECT cs.customer_id, cs.name, cs.total_spent
FROM customer_spending cs
CROSS JOIN average_spending avs
WHERE cs.total_spent > avs.avg_spent
ORDER BY cs.total_spent DESC;

Chainage de CTEs

SQL - CTEs Chainees
-- Analyse de cohorte : retention mensuelle des clients
WITH first_orders AS (
    -- Etape 1 : Date de premiere commande par client
    SELECT
        customer_id,
        DATE_TRUNC('month', MIN(order_date)) AS cohort_month
    FROM orders
    GROUP BY customer_id
),
monthly_activity AS (
    -- Etape 2 : Mois d'activite de chaque client
    SELECT DISTINCT
        o.customer_id,
        DATE_TRUNC('month', o.order_date) AS activity_month
    FROM orders o
),
cohort_data AS (
    -- Etape 3 : Jointure cohorte + activite avec calcul de l'ecart
    SELECT
        f.cohort_month,
        ma.activity_month,
        EXTRACT(YEAR FROM AGE(ma.activity_month, f.cohort_month)) * 12
        + EXTRACT(MONTH FROM AGE(ma.activity_month, f.cohort_month)) AS months_since_first,
        COUNT(DISTINCT f.customer_id) AS active_customers
    FROM first_orders f
    JOIN monthly_activity ma ON f.customer_id = ma.customer_id
    GROUP BY f.cohort_month, ma.activity_month
),
cohort_sizes AS (
    -- Etape 4 : Taille de chaque cohorte
    SELECT cohort_month, COUNT(*) AS cohort_size
    FROM first_orders
    GROUP BY cohort_month
)
-- Requete finale : Taux de retention
SELECT
    cd.cohort_month,
    cs.cohort_size,
    cd.months_since_first,
    cd.active_customers,
    ROUND(cd.active_customers * 100.0 / cs.cohort_size, 1) AS retention_pct
FROM cohort_data cd
JOIN cohort_sizes cs ON cd.cohort_month = cs.cohort_month
WHERE cd.months_since_first BETWEEN 0 AND 12
ORDER BY cd.cohort_month, cd.months_since_first;

CTE vs Sous-requete : Quand Utiliser Quoi ?

Utilisez un CTE quand : La sous-requete est referencee plusieurs fois, la requete depasse 20 lignes, vous voulez documenter les etapes logiques, ou vous avez besoin de recursion.

Gardez une sous-requete quand : Elle est simple (< 5 lignes), utilisee une seule fois, et que la performance est critique (certains moteurs n'optimisent pas les CTEs).

CTEs Recursives : Traverser des Hierarchies

Les CTEs recursives permettent de resoudre des problemes qui necessiteraient autrement des boucles : hierarchies, graphes, generation de series.

Fonctionnement d'un CTE Recursif
  WITH RECURSIVE cte AS (
      -- Membre d'ancrage (base case)
      SELECT ... WHERE condition_initiale

      UNION ALL

      -- Membre recursif (recursive case)
      SELECT ... FROM cte JOIN table ON ...
  )

  Execution :
  +-----------------+
  | Iteration 0     |  Membre d'ancrage -> Resultat initial
  +-----------------+
          |
  +-----------------+
  | Iteration 1     |  Membre recursif avec resultat de iter 0
  +-----------------+
          |
  +-----------------+
  | Iteration 2     |  Membre recursif avec resultat de iter 1
  +-----------------+
          |
         ...         (continue jusqu'a ce que le membre recursif
          |           ne retourne plus de lignes)
  +-----------------+
  | Iteration N     |  Resultat vide -> ARRET
  +-----------------+
          |
          v
  Resultat final = UNION de toutes les iterations

Exemple 1 : Organigramme d'Entreprise

SQL - Organigramme Recursif
-- Table employees avec auto-reference
-- employee_id | name     | manager_id | title
-- 1           | CEO      | NULL       | CEO
-- 2           | VP Sales | 1          | VP
-- 3           | VP Tech  | 1          | VP
-- 4           | Dir Mkt  | 2          | Director
-- 5           | Dev Lead | 3          | Lead
-- 6           | Dev Sr   | 5          | Senior Dev
-- 7           | Dev Jr   | 5          | Junior Dev

-- Afficher la hierarchie complete avec le niveau et le chemin
WITH RECURSIVE org_chart AS (
    -- Ancrage : le CEO (pas de manager)
    SELECT
        employee_id,
        name,
        title,
        manager_id,
        0 AS level,
        name AS path,
        ARRAY[employee_id] AS path_ids
    FROM employees
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursion : les subordonnes
    SELECT
        e.employee_id,
        e.name,
        e.title,
        e.manager_id,
        oc.level + 1,
        oc.path || ' > ' || e.name,
        oc.path_ids || e.employee_id
    FROM employees e
    JOIN org_chart oc ON e.manager_id = oc.employee_id
)
SELECT
    REPEAT('  ', level) || name AS organigramme,
    title,
    level,
    path
FROM org_chart
ORDER BY path_ids;

-- Resultat :
-- organigramme        | title      | level | path
-- CEO                 | CEO        | 0     | CEO
--   VP Sales          | VP         | 1     | CEO > VP Sales
--     Dir Mkt         | Director   | 2     | CEO > VP Sales > Dir Mkt
--   VP Tech           | VP         | 1     | CEO > VP Tech
--     Dev Lead        | Lead       | 2     | CEO > VP Tech > Dev Lead
--       Dev Sr        | Senior Dev | 3     | CEO > VP Tech > Dev Lead > Dev Sr
--       Dev Jr        | Junior Dev | 3     | CEO > VP Tech > Dev Lead > Dev Jr

-- Compter les subordonnes directs et totaux
WITH RECURSIVE subordinates AS (
    SELECT employee_id, name, manager_id, employee_id AS root_id
    FROM employees

    UNION ALL

    SELECT e.employee_id, e.name, e.manager_id, s.root_id
    FROM employees e
    JOIN subordinates s ON e.manager_id = s.employee_id
    WHERE e.employee_id != s.root_id  -- eviter boucle infinie
)
SELECT
    e.name,
    e.title,
    COUNT(DISTINCT s.employee_id) - 1 AS total_subordinates
FROM employees e
LEFT JOIN subordinates s ON e.employee_id = s.root_id
GROUP BY e.employee_id, e.name, e.title
ORDER BY total_subordinates DESC;

Cas d'Etude : Hierarchie de Categories chez Amazon

Amazon - Navigation par Categories avec CTEs Recursives

Amazon organise ses millions de produits dans une hierarchie de categories profonde de 7+ niveaux :

  • Electronique > Informatique > Ordinateurs portables > Gaming > 15 pouces > ASUS > ROG Strix
  • Cette hierarchie est stockee dans une seule table avec parent_category_id
  • Quand un utilisateur clique sur "Electronique", il faut afficher TOUS les produits de TOUTES les sous-categories
SQL - Hierarchie Amazon
-- Trouver toutes les sous-categories d'une categorie donnee
WITH RECURSIVE category_tree AS (
    -- Ancrage : la categorie selectionnee
    SELECT category_id, name, parent_id, 0 AS depth,
           ARRAY[name] AS breadcrumb
    FROM categories
    WHERE name = 'Electronique'

    UNION ALL

    -- Recursion : toutes les sous-categories
    SELECT c.category_id, c.name, c.parent_id, ct.depth + 1,
           ct.breadcrumb || c.name
    FROM categories c
    JOIN category_tree ct ON c.parent_id = ct.category_id
    WHERE ct.depth < 10  -- securite : max 10 niveaux
)
-- Compter les produits par sous-categorie
SELECT
    ct.name AS categorie,
    ct.depth,
    ARRAY_TO_STRING(ct.breadcrumb, ' > ') AS chemin,
    COUNT(p.product_id) AS nb_produits
FROM category_tree ct
LEFT JOIN products p ON p.category_id = ct.category_id
GROUP BY ct.category_id, ct.name, ct.depth, ct.breadcrumb
ORDER BY ct.breadcrumb;

-- Obtenir tous les produits sous "Electronique" (y compris sous-categories)
SELECT p.*
FROM products p
WHERE p.category_id IN (
    SELECT category_id FROM category_tree
);

Performance : Sur un catalogue de 350M+ produits avec 30,000+ categories, Amazon utilise des materialized paths et du caching plutot que des CTEs recursives en temps reel. Mais pour le batch processing et les rapports, la CTE recursive reste l'approche standard.

Anti-Pattern : CTE pour Gros Volumes

CTE au Lieu de Table Temporaire pour Gros Volumes

Avant PostgreSQL 12, les CTEs etaient des "optimization fences" : le moteur ne pouvait pas pousser les predicats a l'interieur d'un CTE. Ce n'est plus le cas depuis PostgreSQL 12 (grace a CTE inlining), mais il y a encore des cas ou une table temporaire est preferable :

SQL - CTE vs Table Temporaire
-- ANTI-PATTERN : CTE materialise qui charge 100M de lignes en memoire
WITH huge_cte AS (
    SELECT * FROM transactions  -- 100M lignes !
    WHERE amount > 0
)
SELECT * FROM huge_cte WHERE customer_id = 42;
-- PostgreSQL 12+ : OK car inlining (le WHERE est pousse dans le CTE)
-- PostgreSQL 11- : DESASTRE car materialise les 100M lignes d'abord

-- Si le CTE est reference PLUSIEURS FOIS, il est toujours materialise :
WITH big_data AS MATERIALIZED (  -- Force la materialisation
    SELECT customer_id, SUM(amount) AS total
    FROM transactions
    GROUP BY customer_id
)
SELECT * FROM big_data d1
JOIN big_data d2 ON d1.customer_id != d2.customer_id
WHERE d1.total > d2.total * 2;
-- -> CTE materialise une seule fois, OK

-- MEILLEUR : Table temporaire pour les gros volumes reutilises
CREATE TEMP TABLE tmp_customer_totals AS
SELECT customer_id, SUM(amount) AS total
FROM transactions
GROUP BY customer_id;

CREATE INDEX ON tmp_customer_totals(customer_id);
CREATE INDEX ON tmp_customer_totals(total);

-- Maintenant les requetes sur tmp_customer_totals sont indexees !
SELECT * FROM tmp_customer_totals WHERE total > 10000;

Regle : Si votre CTE est reference une seule fois, laissez PostgreSQL 12+ l'inliner. Si le CTE est reference plusieurs fois et contient beaucoup de donnees, envisagez une table temporaire avec index.

Generation de Series avec CTEs Recursives

SQL - Generation de Donnees
-- Generer un calendrier (alternative a generate_series de PostgreSQL)
WITH RECURSIVE calendar AS (
    SELECT DATE '2024-01-01' AS date_val
    UNION ALL
    SELECT date_val + INTERVAL '1 day'
    FROM calendar
    WHERE date_val < '2024-12-31'
)
SELECT date_val,
    EXTRACT(DOW FROM date_val) AS day_of_week,
    TO_CHAR(date_val, 'Day') AS day_name,
    EXTRACT(WEEK FROM date_val) AS week_number
FROM calendar;

-- Suite de Fibonacci
WITH RECURSIVE fibo AS (
    SELECT 1 AS n, 1::BIGINT AS fib_n, 0::BIGINT AS fib_prev
    UNION ALL
    SELECT n + 1, fib_n + fib_prev, fib_n
    FROM fibo
    WHERE n < 50
)
SELECT n, fib_n FROM fibo;

-- Remplir les jours manquants dans des donnees temporelles
WITH RECURSIVE date_range AS (
    SELECT MIN(order_date) AS dt FROM orders
    UNION ALL
    SELECT dt + 1 FROM date_range
    WHERE dt < (SELECT MAX(order_date) FROM orders)
)
SELECT
    dr.dt AS order_date,
    COALESCE(SUM(o.total), 0) AS daily_revenue
FROM date_range dr
LEFT JOIN orders o ON o.order_date = dr.dt AND o.status != 'cancelled'
GROUP BY dr.dt
ORDER BY dr.dt;

Securite : Toujours Limiter la Recursion

Une CTE recursive sans condition d'arret boucle a l'infini. Protegez-vous avec :
1. Une clause WHERE dans le membre recursif (ex: WHERE depth < 20)
2. Le parametre PostgreSQL : SET statement_timeout = '30s';
3. La clause CYCLE (PostgreSQL 14+) : CYCLE id SET is_cycle USING path detecte automatiquement les cycles

Flashcards

Quels sont les deux parties d'une CTE recursive ?
1. Le membre d'ancrage (anchor member) : la requete initiale qui fournit le point de depart, sans reference au CTE lui-meme. 2. Le membre recursif : la requete qui reference le CTE et s'appuie sur le resultat de l'iteration precedente. Ils sont relies par UNION ALL.
Quelle est la difference entre un CTE et une sous-requete en termes de performance (PostgreSQL 12+) ?
Depuis PostgreSQL 12, un CTE reference une seule fois est automatiquement "inline" (pousse dans la requete principale), donc meme performance qu'une sous-requete. Un CTE reference plusieurs fois est materialise (calcule une seule fois et stocke en memoire). On peut forcer le comportement avec AS MATERIALIZED ou AS NOT MATERIALIZED.
Comment eviter une boucle infinie dans une CTE recursive ?
3 strategies : 1) Clause WHERE avec une limite (WHERE depth < 20). 2) statement_timeout au niveau de la session. 3) Clause CYCLE (PostgreSQL 14+) qui detecte automatiquement les cycles dans les donnees. Toujours combiner au moins 2 de ces strategies.

Introduction au NoSQL

45 min Intermediaire

Objectifs de cette lecon

  • Comprendre le theoreme CAP et ses implications architecturales
  • Differencier BASE vs ACID et savoir quand privilegier l'un ou l'autre
  • Maitriser les 4 familles NoSQL : document, cle-valeur, colonne, graphe
  • Savoir choisir la bonne technologie selon le cas d'usage

Le NoSQL n'est pas un remplacement du SQL - c'est un complement. En 20 ans de carriere, j'ai vu trop d'equipes choisir MongoDB "parce que c'est moderne" alors qu'un PostgreSQL aurait ete 10x plus adapte. Le vrai talent d'un Data Architect, c'est de savoir quand utiliser quoi. Le theoreme CAP n'est pas qu'une theorie - c'est votre boussole quotidienne.

Le Theoreme CAP

Formule par Eric Brewer en 2000, le theoreme CAP etablit qu'un systeme distribue ne peut garantir simultanement que 2 des 3 proprietes suivantes :

Le Triangle CAP
                    Consistency (C)
                         /\
                        /  \
                       /    \
                      / CP   \
                     /systems \
                    /──────────\
                   /            \
                  / CA    AP     \
                 / systems systems\
                /──────────────────\
     Availability (A) ────────── Partition Tolerance (P)

  CP : MongoDB, HBase, Redis Cluster
  AP : Cassandra, DynamoDB, CouchDB
  CA : PostgreSQL (single node), Oracle RAC

Les 3 Proprietes CAP

Consistency : Tous les noeuds voient les memes donnees au meme moment.
Availability : Chaque requete recoit une reponse (succes ou echec).
Partition Tolerance : Le systeme continue de fonctionner malgre des pertes de messages reseau.

ACID vs BASE

ProprieteACID (SQL)BASE (NoSQL)
ModeleAtomicity, Consistency, Isolation, DurabilityBasically Available, Soft state, Eventually consistent
ConsistanceForte (immediate)Eventuelle (convergence)
DisponibilitePeut etre sacrifieePrioritaire
ScalabiliteVerticale (scale-up)Horizontale (scale-out)
SchemaRigide, predetermineFlexible, schema-on-read
Cas idealTransactions financieresBig Data, temps reel

Les 4 Familles NoSQL

Taxonomie NoSQL
NoSQL Databases
β”œβ”€β”€ Document Store          β”œβ”€β”€ Key-Value Store
β”‚   β”œβ”€β”€ MongoDB             β”‚   β”œβ”€β”€ Redis
β”‚   β”œβ”€β”€ CouchDB             β”‚   β”œβ”€β”€ Memcached
β”‚   └── Amazon DocumentDB   β”‚   └── DynamoDB
β”‚                           β”‚
β”œβ”€β”€ Column-Family           └── Graph Database
β”‚   β”œβ”€β”€ Cassandra               β”œβ”€β”€ Neo4j
β”‚   β”œβ”€β”€ HBase                   β”œβ”€β”€ Amazon Neptune
β”‚   └── ScyllaDB                └── ArangoDB
FamilleModeleForcesCas d'usage
DocumentJSON/BSON imbriquesFlexibilite, requetes richesCMS, catalogues, profils
Cle-ValeurPaires simplesUltra-rapide, cacheSessions, cache, compteurs
ColonneFamilles de colonnesEcriture massive, time-seriesIoT, logs, analytics
GrapheNoeuds et relationsTraversees complexesReseaux sociaux, fraude

Amazon DynamoDB - Du monolithe au NoSQL

En 2004, Amazon a subi une panne majeure lors du Black Friday a cause de sa base Oracle. Cet incident a pousse l'equipe a creer Dynamo (ancetre de DynamoDB), un systeme AP concu pour ne jamais refuser une ecriture.

  • Probleme : Oracle ne scalait plus a 10M+ requetes/seconde
  • Solution : Consistent hashing + vector clocks + eventual consistency
  • Resultat : 99.99% disponibilite, latence <10ms au p99
  • Impact : Le papier Dynamo (2007) a inspire Cassandra, Riak, Voldemort

Le "NoSQL par defaut"

Choisir NoSQL parce que "c'est plus moderne" sans analyser les besoins. Si vos donnees sont relationnelles avec des transactions complexes, un PostgreSQL sera superieur a n'importe quel NoSQL.

Solution : Commencez par definir vos patterns d'acces (access patterns). Si vous avez besoin de JOINs complexes, de transactions ACID multi-tables, ou de schemas rigides - restez en SQL. Le NoSQL brille quand vous avez des volumes massifs, des schemas variables, ou des patterns de lecture previsibles.

Choisir la bonne base pour un e-commerce

Vous etes Data Architect pour une marketplace avec 5M produits, 200K commandes/jour, et un moteur de recommandation. Quelles bases choisissez-vous ?

  • Catalogue produits : MongoDB (schemas variables par categorie)
  • Panier & sessions : Redis (latence sub-milliseconde)
  • Commandes : PostgreSQL (transactions ACID obligatoires)
  • Recommandations : Neo4j (graphe utilisateur-produit)
  • Logs & recherche : Elasticsearch (full-text search)
Quelle est la difference fondamentale entre consistance forte et consistance eventuelle ?
Consistance forte : apres une ecriture, toutes les lectures retournent la derniere valeur (linearizabilite). Consistance eventuelle : apres une ecriture, les replicas convergent progressivement vers la meme valeur - il peut y avoir un delai (window of inconsistency) pendant lequel differents noeuds retournent des valeurs differentes.
Pourquoi dit-on que le theoreme CAP est en realite un spectre plutot qu'un choix binaire ?
Les systemes modernes permettent de tuner le niveau de consistance par requete. Par exemple, Cassandra avec QUORUM offre un bon compromis C/A, tandis que ONE privilegue la disponibilite. DynamoDB permet de choisir entre lectures eventually consistent et strongly consistent. Le CAP est donc un curseur, pas un interrupteur.

MongoDB Deep Dive

50 min Intermediaire

Objectifs de cette lecon

  • Maitriser le modele de donnees BSON et les schema design patterns
  • Construire des pipelines d'aggregation complexes
  • Comprendre le sharding et la replication MongoDB
  • Optimiser les performances avec les index et le profiling

MongoDB est la base NoSQL la plus populaire, et pour cause - son modele document est incroyablement intuitif. Mais attention, la simplicite apparente cache une vraie complexite. J'ai vu des equipes detruire les performances de MongoDB en repliquant un schema relationnel dans des documents. Le secret ? Pensez "access patterns first".

Le Modele BSON

BSON (Binary JSON) etend JSON avec des types supplementaires : Date, ObjectId, Decimal128, BinData. Les documents peuvent etre imbriques jusqu'a 100 niveaux avec une taille max de 16 Mo.

MongoDB
// Document e-commerce avec embedding
{
  "_id": ObjectId("64a7b3f2..."),
  "order_id": "ORD-2024-001",
  "customer": {
    "name": "Marie Dupont",
    "email": "marie@example.com",
    "tier": "premium"
  },
  "items": [
    { "sku": "LAPTOP-01", "name": "MacBook Pro", "qty": 1, "price": 2499.00 },
    { "sku": "CASE-03", "name": "Housse", "qty": 1, "price": 49.00 }
  ],
  "total": 2548.00,
  "status": "shipped",
  "created_at": ISODate("2024-01-15T10:30:00Z"),
  "shipping": {
    "address": { "street": "12 Rue de la Paix", "city": "Paris", "zip": "75002" },
    "tracking": "FR123456789",
    "estimated_delivery": ISODate("2024-01-18")
  }
}

Schema Design Patterns

PatternDescriptionCas d'usage
EmbeddingImbriquer les sous-documentsRelation 1:few, lecture ensemble
ReferencingStocker des ObjectId referencesRelation many:many, documents volumineux
BucketRegrouper des evenements par periodeTime-series, IoT, logs
ComputedPre-calculer les aggregationsCompteurs, statistiques frequentes
OutlierGerer les cas extremes separementUtilisateurs avec millions de followers
SubsetGarder un sous-ensemble frequentLes 10 derniers commentaires

Pipeline d'Aggregation

MongoDB Aggregation
// Analyse des ventes par categorie et mois
db.orders.aggregate([
  { $match: { status: "completed", created_at: { $gte: ISODate("2024-01-01") } } },
  { $unwind: "$items" },
  { $lookup: {
      from: "products",
      localField: "items.sku",
      foreignField: "sku",
      as: "product_info"
  }},
  { $unwind: "$product_info" },
  { $group: {
      _id: {
        category: "$product_info.category",
        month: { $month: "$created_at" }
      },
      total_revenue: { $sum: { $multiply: ["$items.qty", "$items.price"] } },
      order_count: { $sum: 1 },
      avg_price: { $avg: "$items.price" }
  }},
  { $sort: { total_revenue: -1 } },
  { $project: {
      category: "$_id.category",
      month: "$_id.month",
      total_revenue: { $round: ["$total_revenue", 2] },
      order_count: 1,
      avg_price: { $round: ["$avg_price", 2] }
  }}
])

Sharding & Replication

Architecture MongoDB Sharded Cluster
   Application
       |
   mongos Router ──── Config Servers (3x)
   /    |    \          (metadata, chunk maps)
  /     |     \
Shard1  Shard2  Shard3
(RS)    (RS)    (RS)
P-S-S   P-S-S   P-S-S

RS = Replica Set (Primary + 2 Secondary)
Shard Key: { customer_id: "hashed" }

Forbes - Migration vers MongoDB Atlas

Forbes a migre son CMS de 100M+ articles depuis une base relationnelle vers MongoDB Atlas pour gerer les schemas variables de contenu (articles, videos, podcasts, interactifs).

  • Gain : Temps de deploiement de nouveaux types de contenu reduit de 2 semaines a 2 heures
  • Performance : Latence de lecture reduite de 300ms a 15ms grace au pattern Subset
  • Scalabilite : Gestion de 150M pages vues/mois sans probleme

Le "Relational Mindset" en MongoDB

Creer des collections separees pour chaque entite et utiliser $lookup partout revient a reconstruire un SGBDR mais en moins performant. MongoDB n'a pas d'optimiseur de requetes aussi sophistique que PostgreSQL pour les JOINs.

Solution : Modelisez par les access patterns, pas par les entites. Si deux donnees sont toujours lues ensemble, imbriquez-les. Regles : embed si 1:few et acces conjoint, reference si 1:many ou acces independant.

Quels sont les criteres pour choisir une bonne shard key MongoDB ?
Une bonne shard key doit avoir : (1) haute cardinalite (beaucoup de valeurs distinctes), (2) bonne distribution (pas de hot spots), (3) correspondre aux query patterns (eviter les scatter-gather). Exemples : hashed(user_id) pour distribution uniforme, compound key {region, timestamp} pour des lectures localisees. Eviter : dates monotones (tout va au dernier shard), booleans (2 shards max).

Redis & Cache Distribue

45 min Intermediaire

Objectifs de cette lecon

  • Maitriser les structures de donnees Redis avancees
  • Implementer les patterns de caching (cache-aside, write-through, write-behind)
  • Comprendre Redis Cluster et la haute disponibilite
  • Utiliser Redis pour le pub/sub, les streams et les compteurs distribues

Redis est l'outil le plus sous-estime de l'ecosysteme data. Tout le monde le connait comme "un cache", mais c'est en realite un serveur de structures de donnees in-memory ultra-polyvalent. Je l'utilise pour le caching, les sessions, les leaderboards, les rate limiters, les queues, et meme comme broker de messages leger. A 100K+ ops/seconde sur un seul noeud, c'est un game changer.

Structures de Donnees Redis

TypeDescriptionCommandesCas d'usage
StringValeur simple (max 512 Mo)SET, GET, INCR, EXPIRECache, compteurs, flags
HashMap de champsHSET, HGET, HGETALLObjets, profils utilisateur
ListListe chaineeLPUSH, RPOP, LRANGEQueues, feeds, historique
SetEnsemble non-ordonneSADD, SMEMBERS, SINTERTags, amis communs
Sorted SetSet avec scoreZADD, ZRANGE, ZRANKLeaderboards, top-N
StreamAppend-only logXADD, XREAD, XGROUPEvent sourcing, CDC
HyperLogLogEstimation cardinalitePFADD, PFCOUNTCompteurs uniques

Patterns de Caching

Cache-Aside (Lazy Loading)
  Client ──GET──> Cache (Redis)
    |               |
    |  Cache Miss    | Cache Hit
    |               |
    └──SELECT──> Database     return data
    |
    └──SET──> Cache (avec TTL)
Python
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user_profile(user_id: str) -> dict:
    """Cache-aside pattern avec TTL"""
    cache_key = f"user:{user_id}:profile"

    # 1. Verifier le cache
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. Cache miss - lire la DB
    profile = db.query("SELECT * FROM users WHERE id = %s", user_id)

    # 3. Populer le cache avec TTL de 15 min
    r.setex(cache_key, 900, json.dumps(profile))
    return profile

def update_user_profile(user_id: str, data: dict):
    """Write-through: ecrire DB + invalider cache"""
    db.update("UPDATE users SET ... WHERE id = %s", data, user_id)
    r.delete(f"user:{user_id}:profile")  # Invalidation

Cache-Aside

  • Application gere le cache
  • Donnees potentiellement stale
  • Cache miss = latence supplementaire
  • Ideal pour lectures frequentes

Write-Through

  • Cache toujours synchronise
  • Ecriture plus lente (2 writes)
  • Pas de donnees stale
  • Ideal pour consistance requise

Netflix - Redis pour les sessions et recommandations

Netflix utilise des milliers d'instances Redis (via EVCache, leur wrapper interne) pour servir 200M+ abonnes avec des latences sub-milliseconde.

  • Sessions : Etat de lecture de chaque utilisateur en Redis pour reprendre instantanement
  • Recommandations : Pre-calcul des listes de recommandation stockees en Sorted Sets
  • Scale : ~3M ops/seconde par cluster, ~1500 clusters globalement
  • Fiabilite : Replication cross-region avec failover automatique <30s

Redis sans politique d'eviction

Utiliser Redis comme cache sans configurer maxmemory ni eviction policy. Resultat : Redis remplit toute la RAM, OOM killer tue le processus, toutes les sessions sont perdues.

Solution : Toujours configurer maxmemory (typiquement 75% de la RAM disponible) et une politique d'eviction adaptee : allkeys-lru pour du cache general, volatile-ttl si vous utilisez des TTL differencies, noeviction si vous ne pouvez pas perdre de donnees (mais alors surveillez la memoire).

Quelle est la difference entre Redis Sentinel et Redis Cluster ?
Redis Sentinel assure la haute disponibilite d'un seul master : il monitore, notifie et fait du failover automatique (promotion d'un replica en master). Redis Cluster assure le sharding automatique des donnees sur N noeuds avec 16384 hash slots. Sentinel = HA sans sharding, Cluster = HA + sharding. Pour <25 Go, Sentinel suffit. Au-dela, Cluster est necessaire.

Cassandra & ScyllaDB

50 min Avance

Objectifs de cette lecon

  • Comprendre l'architecture en anneau et le consistent hashing
  • Maitriser la modelisation orientee requetes (query-first design)
  • Optimiser les performances avec les strategies de compaction
  • Comparer Cassandra et ScyllaDB pour les cas d'usage haute performance

Cassandra est la base de choix quand vous avez besoin d'ecrire massivement avec une disponibilite quasi-absolue. Facebook l'a creee pour son systeme de messaging, et Apple stocke aujourd'hui 400+ Po dans Cassandra. Mais la modelisation est radicalement differente du SQL - vous devez penser "requete d'abord".

Architecture en Anneau

Cassandra Ring avec Replication Factor 3
         Node A (Token: 0-24)
        /                    \
  Node F                      Node B
  (75-99)                     (25-49)
    |          Gossip           |
    |        Protocol           |
  Node E                      Node C
  (62-74)                     (50-61)
        \                    /
         Node D (Token: ...)

  Write "user:123" (token=42):
  β†’ Primary: Node B (owns 25-49)
  β†’ Replica: Node C, Node D (RF=3)
  β†’ Consistency: QUORUM = 2/3 acks

CQL - Cassandra Query Language

CQL
-- Table modelisee pour la requete "derniers evenements par utilisateur"
CREATE TABLE user_events (
    user_id UUID,
    event_date DATE,
    event_time TIMESTAMP,
    event_type TEXT,
    event_data MAP<TEXT, TEXT>,
    PRIMARY KEY ((user_id, event_date), event_time)
) WITH CLUSTERING ORDER BY (event_time DESC)
  AND compaction = {'class': 'TimeWindowCompactionStrategy',
                    'compaction_window_size': 1,
                    'compaction_window_unit': 'DAYS'};

-- Requete optimisee (suit la partition key)
SELECT * FROM user_events
WHERE user_id = ? AND event_date = '2024-01-15'
ORDER BY event_time DESC LIMIT 50;

-- Anti-pattern : scan sans partition key
-- SELECT * FROM user_events WHERE event_type = 'click'; -- INTERDIT !

La regle d'or Cassandra

En Cassandra, vous ne modelisez PAS vos donnees, vous modelisez vos REQUETES. Chaque table est optimisee pour un pattern de lecture specifique. La denormalisation et la duplication de donnees sont non seulement acceptees mais encouragees. Un meme evenement peut exister dans 3 tables differentes si 3 requetes differentes en ont besoin.

Cassandra vs ScyllaDB

CritereCassandraScyllaDB
LangageJava (JVM)C++ (Seastar framework)
Latence p9910-50ms1-5ms
Throughput~50K ops/s par noeud~1M ops/s par noeud
GC PausesOui (problematique)Non (pas de GC)
CompactionImpact sur la latenceImpact minimal
CompatibiliteReference CQL100% CQL compatible
Cout serveursBaseline3-5x moins de noeuds

Discord - Migration Cassandra vers ScyllaDB

Discord stockait des trillions de messages dans Cassandra mais souffrait de latences imprevisibles dues aux GC pauses Java. En 2023, ils ont migre vers ScyllaDB.

  • Avant : 177 noeuds Cassandra, latence p99 a 200ms lors des GC
  • Apres : 72 noeuds ScyllaDB, latence p99 stable a 5ms
  • Gain : -60% de noeuds, -90% de latence p99, maintenance simplifiee

Startup IoT - Mauvaise modelisation Cassandra

Une startup IoT a modelise sa table de capteurs avec une partition key sur device_id uniquement, sans date. En 6 mois, certaines partitions ont atteint 100 Mo+ (limite recommandee : 10 Mo), causant des timeouts en lecture et des compactions interminables.

  • Erreur : Partition key sans bucketing temporel
  • Impact : 30% des lectures en timeout, 3 incidents majeurs en production
  • Fix : Migration vers PRIMARY KEY ((device_id, date_bucket), timestamp) avec bucketing par jour

Le "SELECT * sans WHERE" en Cassandra

Executer des requetes sans filtre sur la partition key force un full cluster scan (ALLOW FILTERING). Cela impacte tous les noeuds et peut mettre le cluster entier a genoux.

Solution : Toute requete DOIT filtrer sur la partition key complete. Si vous avez besoin d'un acces par un champ non-partition, creez une table materialisee ou un index secondaire local (SASI) pour les cas simples.

Neo4j & Bases de Donnees Graphes

45 min Intermediaire

Objectifs de cette lecon

  • Comprendre le modele Property Graph et ses avantages
  • Ecrire des requetes Cypher pour des traversees complexes
  • Appliquer les algorithmes de graphe (PageRank, communautes, chemins)
  • Identifier les cas d'usage ideaux pour les bases graphes

Les bases graphes sont le secret le mieux garde de l'industrie. Quand vos donnees sont definies par leurs relations plus que par leurs attributs, un graphe change completement la donne. Detection de fraude, recommandations, analyse de reseaux - la ou un SQL demande des JOINs recursifs couteux sur 5 niveaux, Neo4j repond en millisecondes.

Le Modele Property Graph

Graphe Social Simplifie
  (Alice:Person)──[:FOLLOWS]──>(Bob:Person)
       |                            |
  [:LIKES]                     [:WROTE]
       |                            |
       v                            v
  (Post1:Post)               (Post2:Post)
       |                            |
  [:TAGGED]                    [:TAGGED]
       |                            |
       v                            v
  (Python:Topic)              (Data:Topic)

Cypher Query Language

Cypher
// Recommandation : "Amis de mes amis qui aiment les memes topics"
MATCH (me:Person {name: 'Alice'})-[:FOLLOWS]->(friend)-[:FOLLOWS]->(fof:Person)
WHERE NOT (me)-[:FOLLOWS]->(fof) AND me <> fof
WITH fof, COUNT(friend) AS mutual_friends
MATCH (me)-[:LIKES]->(:Post)-[:TAGGED]->(topic)
MATCH (fof)-[:LIKES]->(:Post)-[:TAGGED]->(topic)
WITH fof, mutual_friends, COUNT(DISTINCT topic) AS common_topics
RETURN fof.name, mutual_friends, common_topics
ORDER BY mutual_friends DESC, common_topics DESC
LIMIT 10;

// Detection de fraude : cycles dans les transactions
MATCH path = (a:Account)-[:TRANSFER*3..6]->(a)
WHERE ALL(r IN relationships(path) WHERE r.amount > 10000)
  AND ALL(r IN relationships(path) WHERE
      duration.between(r.timestamp, r.timestamp).hours < 24)
RETURN path,
       reduce(total = 0, r IN relationships(path) | total + r.amount) AS cycle_total;

Algorithmes de Graphe

AlgorithmeCategorieCas d'usage
PageRankCentraliteNoeuds les plus influents
Betweenness CentralityCentralitePoints de passage critiques
LouvainCommunautesDetection de clusters
DijkstraCheminsPlus court chemin
Node SimilaritySimilariteRecommandations
Label PropagationCommunautesClassification non-supervisee

Panama Papers - ICIJ et Neo4j

L'investigation des Panama Papers (11.5M documents, 2.6 To) a utilise Neo4j pour cartographier les relations entre 214K entites offshore, revelant des reseaux de blanchiment impossibles a detecter en SQL.

  • Donnees : 800K noeuds (personnes, societes, comptes) + 2M relations
  • Requetes : Traversees 6+ niveaux de profondeur en <1 seconde
  • Impact : Demissions de dirigeants politiques dans 10+ pays

Utiliser un graphe pour des donnees tabulaires

Stocker des logs d'acces ou des time-series dans Neo4j parce que "tout est connecte". Les bases graphes excellient pour les traversees de relations, pas pour le scan sequentiel ou l'agregation volumetrique.

Solution : Utilisez un graphe quand la valeur est dans les RELATIONS entre entites (fraude, recommandations, reseaux). Pour des donnees principalement attributaires ou temporelles, PostgreSQL, Cassandra ou ClickHouse seront bien plus performants.

Architecture anti-fraude pour une banque

Vous concevez le systeme anti-fraude d'une banque traitant 500K transactions/jour. Proposez une architecture polyglot :

  • Neo4j : Graphe des relations client-compte-beneficiaire pour detecter les cycles suspects
  • Redis : Cache des profils de risque, rate limiting des transactions
  • Kafka : Stream des transactions en temps reel vers les moteurs de regles
  • PostgreSQL : Stockage des alertes validees, audit trail reglementaire

Elasticsearch & Recherche Full-Text

45 min Intermediaire

Objectifs de cette lecon

  • Comprendre l'index inverse et le fonctionnement interne d'Elasticsearch
  • Configurer les mappings, analyzers et tokenizers
  • Construire des requetes complexes avec le Query DSL
  • Maitriser la stack ELK (Elasticsearch, Logstash, Kibana)

Elasticsearch est devenu incontournable pour 3 cas : la recherche full-text, l'observabilite (logs/metriques/traces), et l'analytics en temps reel. Sa capacite a indexer des teraoctets de donnees et a repondre en millisecondes est remarquable. Mais attention aux couts - un cluster Elasticsearch mal dimensionne peut exploser votre budget cloud.

Index Inverse

Fonctionnement de l'Index Inverse
Documents:                    Index Inverse:
Doc1: "data architect"        "architect" β†’ [Doc1, Doc3]
Doc2: "data engineer"         "cloud"     β†’ [Doc3]
Doc3: "cloud architect"       "data"      β†’ [Doc1, Doc2]
                              "engineer"  β†’ [Doc2]

Recherche "data architect":
β†’ "data" ∩ "architect" = [Doc1]  (score: 2 terms matched)
β†’ "data" seulement = [Doc2]      (score: 1 term)
β†’ "architect" seulement = [Doc3] (score: 1 term)

Mapping & Analyzers

JSON - Elasticsearch
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text",
        "analyzer": "french",
        "fields": {
          "keyword": { "type": "keyword" },
          "autocomplete": {
            "type": "text",
            "analyzer": "autocomplete_analyzer"
          }
        }
      },
      "price": { "type": "float" },
      "category": { "type": "keyword" },
      "created_at": { "type": "date" },
      "location": { "type": "geo_point" },
      "description": {
        "type": "text",
        "analyzer": "french"
      }
    }
  },
  "settings": {
    "analysis": {
      "analyzer": {
        "autocomplete_analyzer": {
          "type": "custom",
          "tokenizer": "edge_ngram_tokenizer",
          "filter": ["lowercase", "asciifolding"]
        }
      },
      "tokenizer": {
        "edge_ngram_tokenizer": {
          "type": "edge_ngram",
          "min_gram": 2, "max_gram": 15
        }
      }
    }
  }
}

Stack ELK

Architecture ELK pour Observabilite
  Applications    Serveurs    Containers
      |              |            |
      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     |
              Beats (Filebeat,
              Metricbeat, APM)
                     |
              Logstash / Ingest
              (parse, enrich,
               transform)
                     |
              Elasticsearch
              (index, search,
               aggregate)
                     |
              Kibana
              (dashboards,
               alertes, ML)

Wikipedia - Elasticsearch pour la recherche

Wikipedia utilise Elasticsearch (via CirrusSearch) pour servir 300M+ recherches par jour sur 60M+ articles en 300+ langues.

  • Architecture : Clusters dedies par langue, replication cross-datacenter
  • Features : Did-you-mean, autocomplete, recherche par proximite, highlighting
  • Performance : Latence mediane <50ms, p99 <200ms

E-commerce - Cluster Elasticsearch non-dimensionne

Un site e-commerce a deploye 3 noeuds Elasticsearch pour indexer 50M produits. Sans tuning, le cluster a sature lors du Black Friday avec des timeouts en cascade.

  • Erreur : Shards de 50 Go (recommande : 10-30 Go), pas de replica, heap JVM sous-dimensionnee
  • Impact : Recherche hors service pendant 4h, perte estimee de 2M EUR de ventes
  • Fix : Re-indexation avec shards de 20 Go, ajout de replicas, circuit breakers configures

Elasticsearch comme base de donnees primaire

Utiliser Elasticsearch comme seul stockage de donnees. ES n'est pas ACID, les updates sont couteuses (delete + re-index), et les pertes de donnees sont possibles lors des split-brain.

Solution : Toujours garder une source de verite (PostgreSQL, S3) et utiliser ES comme index de recherche secondaire. Implementer un pipeline de re-indexation pour pouvoir reconstruire l'index depuis la source.

Quelle est la difference entre un type "text" et "keyword" dans Elasticsearch ?
"text" est analyse (tokenise, stemmise) pour la recherche full-text : "Data Architect" β†’ ["data", "architect"]. "keyword" est stocke tel quel pour le filtrage exact, le tri et les aggregations : "Data Architect" reste "Data Architect". Bonne pratique : utiliser les multi-fields pour avoir les deux sur un meme champ.

NewSQL : CockroachDB & TiDB

40 min Avance

Objectifs de cette lecon

  • Comprendre la promesse NewSQL : scalabilite horizontale + ACID
  • Maitriser l'architecture de CockroachDB (Raft, ranges, distributed SQL)
  • Comparer TiDB et CockroachDB pour differents cas d'usage
  • Evaluer quand le NewSQL remplace un RDBMS traditionnel

Le NewSQL represente le "meilleur des deux mondes" : la familiarite du SQL avec les transactions ACID, et la scalabilite horizontale du NoSQL. Google Spanner a ouvert la voie, et CockroachDB et TiDB democratisent cette approche. Pour les equipes qui ne veulent pas sacrifier la consistance, c'est une revolution.

L'Evolution des Bases de Donnees

Evolution : SQL β†’ NoSQL β†’ NewSQL
  SQL (1970s-2000s)         NoSQL (2005-2015)        NewSQL (2012+)
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ βœ“ ACID       β”‚         β”‚ βœ— ACID       β”‚         β”‚ βœ“ ACID       β”‚
  β”‚ βœ“ SQL        β”‚         β”‚ βœ— SQL        β”‚         β”‚ βœ“ SQL        β”‚
  β”‚ βœ— Scale-out  β”‚         β”‚ βœ“ Scale-out  β”‚         β”‚ βœ“ Scale-out  β”‚
  β”‚ βœ— Geo-distribβ”‚         β”‚ βœ“ Geo-distribβ”‚         β”‚ βœ“ Geo-distribβ”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
   Oracle, PostgreSQL       Cassandra, MongoDB       CockroachDB, TiDB

CockroachDB

SQL - CockroachDB
-- Tables avec geo-partitionnement (donnees EU restent en EU)
CREATE TABLE orders (
    id UUID DEFAULT gen_random_uuid(),
    customer_id UUID NOT NULL,
    region STRING NOT NULL,
    total DECIMAL(10,2),
    created_at TIMESTAMPTZ DEFAULT now(),
    PRIMARY KEY (region, id)
) LOCALITY REGIONAL BY ROW;

-- Transaction distribuee ACID (identique a PostgreSQL)
BEGIN;
  UPDATE accounts SET balance = balance - 500 WHERE id = 'sender' AND region = 'eu';
  UPDATE accounts SET balance = balance + 500 WHERE id = 'receiver' AND region = 'us';
  INSERT INTO transfers (sender, receiver, amount, region)
    VALUES ('sender', 'receiver', 500, 'eu');
COMMIT;

CockroachDB vs TiDB

CritereCockroachDBTiDB
Inspire deGoogle SpannerGoogle Spanner + F1
CompatibilitePostgreSQL wire protocolMySQL wire protocol
ConsensusRaft (multi-raft)Raft (via TiKV)
StoragePebble (LSM-tree)RocksDB (via TiKV)
Geo-distributionNatif (locality rules)TiKV Placement Rules
HTAPNon (OLTP focus)Oui (TiFlash columnar)
LicenceBSL (source-available)Apache 2.0
Ideal pourMulti-region, complianceMigration MySQL, HTAP

DoorDash - Migration vers CockroachDB

DoorDash a migre de PostgreSQL a CockroachDB pour gerer la croissance explosive de leur service de livraison pendant le COVID.

  • Probleme : PostgreSQL a 13 replicas en lecture, ecriture saturee sur le primary
  • Solution : CockroachDB multi-region (US-East, US-West, EU)
  • Resultat : Scale lineaire, latence d'ecriture <10ms, zero downtime deployments

NewSQL pour tout remplacer

Vouloir migrer toutes ses bases vers du NewSQL. CockroachDB et TiDB sont excellents pour l'OLTP distribue, mais ne remplacent pas un DWH analytique (Snowflake), un cache (Redis), ou une base graphe (Neo4j).

Solution : Le NewSQL est ideal pour les workloads OLTP qui ont depasse les limites d'un PostgreSQL single-node ou qui necessitent une distribution geographique. Pour l'analytics, restez sur des solutions OLAP dediees.

Comment CockroachDB garantit-il la consistance forte dans un systeme distribue ?
CockroachDB utilise le consensus Raft sur chaque "range" (partition de 512 Mo) pour garantir que les ecritures sont repliquees avant d'etre commitees. Il utilise aussi des horloges hybrides (HLC) pour ordonner les transactions de maniere causale, et le protocole de commit parallele pour reduire la latence des transactions distribuees. Le resultat est une isolation serializable par defaut - le niveau le plus fort du standard SQL.

Quiz - NoSQL & NewSQL

25 min Evaluation

Objectifs de cette evaluation

  • Valider vos connaissances sur le theoreme CAP et les modeles de consistance
  • Verifier la maitrise des 4 familles NoSQL et leurs cas d'usage
  • Tester votre capacite a choisir la bonne technologie

Quiz Module 1.2 - NoSQL & NewSQL

1. Selon le theoreme CAP, quel compromis fait Cassandra ?

A) CP - Consistance et Tolerance aux partitions
B) AP - Disponibilite et Tolerance aux partitions
C) CA - Consistance et Disponibilite
D) CAP - Les trois simultanement
Correct : B) Cassandra est un systeme AP. Elle privilegiera toujours la disponibilite, meme au prix de retourner des donnees potentiellement non a jour. La consistance peut etre tunee via les consistency levels (ONE, QUORUM, ALL).

2. Quel pattern MongoDB est recommande pour gerer un utilisateur avec potentiellement des millions de followers ?

A) Embedding - Tout imbriquer dans le document utilisateur
B) Referencing simple - Array d'ObjectId
C) Outlier Pattern - Document standard + overflow
D) Computed Pattern - Pre-calculer les followers
Correct : C) L'Outlier Pattern gere les cas extremes en separant les documents normaux (<1000 followers en array) des outliers (collection separee avec pagination). Cela evite de depasser la limite de 16 Mo du document BSON.

3. Quelle structure Redis est ideale pour un leaderboard en temps reel ?

A) Hash
B) List
C) Sorted Set
D) Stream
Correct : C) Le Sorted Set (ZSET) permet de stocker des elements avec un score, avec des operations O(log N) pour l'ajout (ZADD), le classement (ZRANK) et le top-N (ZRANGE). Ideal pour les leaderboards, classements et priorites.

4. En Cassandra, pourquoi la modelisation "query-first" est-elle obligatoire ?

A) Parce que CQL ne supporte pas les JOINs
B) Parce que les lectures sans partition key scannent tout le cluster
C) A et B sont vrais
D) Parce que Cassandra ne supporte pas les index
Correct : C) Cassandra n'a pas de JOINs et les requetes sans filtre sur la partition key necessitent ALLOW FILTERING (scan distribue). Chaque table doit etre concue pour repondre a UNE requete specifique, ce qui implique denormalisation et duplication.

5. Quel langage de requete utilise Neo4j ?

A) GraphQL
B) Gremlin
C) SPARQL
D) Cypher
Correct : D) Cypher est le langage declaratif de Neo4j, inspire de l'ASCII art pour representer les patterns de graphe. Ex: MATCH (a)-[:KNOWS]->(b) RETURN b. Gremlin est utilise par Apache TinkerPop, SPARQL pour les RDF triplestores.

6. Quel est l'avantage principal de ScyllaDB par rapport a Cassandra ?

A) Support du SQL complet
B) Elimination des GC pauses grace au C++
C) Meilleur support des transactions ACID
D) Compatibilite avec MongoDB
Correct : B) ScyllaDB, ecrit en C++ avec le framework Seastar, elimine les GC pauses de la JVM qui causent des pics de latence imprevisibles dans Cassandra. Resultat : latence p99 10x inferieure avec 3-5x moins de noeuds.

7. Pourquoi ne faut-il pas utiliser Elasticsearch comme base de donnees primaire ?

A) Il ne supporte pas le JSON
B) Il n'est pas ACID et les updates sont couteuses (delete + re-index)
C) Il ne peut pas stocker plus de 1 Go
D) Il ne supporte pas les requetes
Correct : B) ES n'offre pas de garanties ACID, les updates internes sont des delete + re-index, et des pertes de donnees sont possibles lors de split-brain. Toujours garder une source de verite (PostgreSQL, S3) et utiliser ES comme index secondaire.

8. Quel est l'avantage principal du NewSQL (CockroachDB, TiDB) sur le NoSQL ?

A) Meilleure performance en lecture
B) Transactions ACID distribuees avec scalabilite horizontale
C) Schema-less
D) Pas besoin de SQL
Correct : B) Le NewSQL combine le meilleur du SQL (ACID, SQL standard, schemas) et du NoSQL (scalabilite horizontale, geo-distribution). CockroachDB offre l'isolation serializable par defaut avec un scale lineaire sur des dizaines de noeuds.

Ecosysteme Python Data

45 min Fondamental

Python est devenu la lingua franca de la data. De l'analyse exploratoire au machine learning, en passant par l'ingenierie de donnees, l'ecosysteme Python offre des outils pour chaque etape du pipeline. Cette lecon vous guide a travers les librairies essentielles et vous aide a choisir le bon outil pour chaque situation.

Objectifs d'apprentissage

  • Connaitre les librairies fondamentales de l'ecosysteme Python Data
  • Comprendre quand utiliser pandas, DuckDB, Polars ou PySpark
  • Maitriser la matrice de decision basee sur le volume de donnees
  • Decouvrir DuckDB comme revolution analytique locale
  • Positionner Polars comme alternative moderne a pandas

Vue d'ensemble de l'ecosysteme

L'ecosysteme Python Data est compose de plusieurs couches complementaires. Chaque outil a ete concu pour resoudre un probleme specifique, et le Data Architect doit savoir les assembler de maniere coherente.

Ecosysteme Python Data - Architecture en couches
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     ORCHESTRATION & WORKFLOW                        β”‚
β”‚              Airflow  β”‚  Prefect  β”‚  Dagster  β”‚  Luigi              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                      MACHINE LEARNING                               β”‚
β”‚         scikit-learn  β”‚  XGBoost  β”‚  PyTorch  β”‚  TensorFlow         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                   TRAITEMENT & ANALYSE                              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚  pandas   β”‚ β”‚  Polars  β”‚ β”‚  DuckDB  β”‚ β”‚ PySpark  β”‚              β”‚
β”‚  β”‚  < 1 GB   β”‚ β”‚  < 50 GB β”‚ β”‚ < 100 GB β”‚ β”‚  > 1 TB  β”‚              β”‚
β”‚  β”‚ In-memory β”‚ β”‚ In-memoryβ”‚ β”‚ In-proc  β”‚ β”‚ Cluster  β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    CALCUL NUMERIQUE                                  β”‚
β”‚              NumPy  β”‚  SciPy  β”‚  Apache Arrow                       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    CONNECTIVITE                                      β”‚
β”‚        SQLAlchemy  β”‚  psycopg2  β”‚  pymongo  β”‚  requests             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                    VISUALISATION                                     β”‚
β”‚        Matplotlib  β”‚  Seaborn  β”‚  Plotly  β”‚  Altair                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

NumPy - La fondation

NumPy est la brique de base de tout l'ecosysteme scientifique Python. Il fournit le type ndarray, un tableau multidimensionnel performant ecrit en C, sur lequel toutes les autres librairies s'appuient.

Python
import numpy as np

# Creation de tableaux performants
prix = np.array([29.99, 45.50, 12.00, 89.99, 34.50])
quantites = np.array([100, 50, 200, 25, 75])

# Operations vectorisees (pas de boucle Python!)
chiffre_affaires = prix * quantites
total = np.sum(chiffre_affaires)  # 12_361.25

# Statistiques instantanees
print(f"CA moyen par produit: {np.mean(chiffre_affaires):.2f}")
print(f"Ecart-type: {np.std(chiffre_affaires):.2f}")
print(f"Produit max: index {np.argmax(chiffre_affaires)}")

# Filtrage vectorise
produits_rentables = prix[chiffre_affaires > 2000]
Pourquoi NumPy est-il 50 a 100x plus rapide que les listes Python pour les calculs numeriques ?
NumPy stocke les donnees dans des blocs de memoire contigus (comme un tableau C), utilise des types fixes (pas de boxing/unboxing Python), et execute les operations en code compile C/Fortran via des instructions SIMD du processeur. Les listes Python sont des tableaux de pointeurs vers des objets disperses en memoire.

pandas - Le couteau suisse de l'analyse

pandas est l'outil le plus utilise pour la manipulation de donnees tabulaires. Son objet central, le DataFrame, est un tableau 2D avec des labels sur les lignes et colonnes, qui s'inspire directement des data.frames de R.

Python
import pandas as pd

# Lecture de multiples formats
df_csv = pd.read_csv("ventes.csv")
df_excel = pd.read_excel("rapport.xlsx", sheet_name="Q4")
df_json = pd.read_json("api_response.json")
df_parquet = pd.read_parquet("datalake/ventes.parquet")
df_sql = pd.read_sql("SELECT * FROM clients", engine)

# Exploration rapide
print(df_csv.shape)          # (1000000, 15)
print(df_csv.dtypes)         # Types de chaque colonne
print(df_csv.describe())     # Stats descriptives
print(df_csv.info(memory_usage='deep'))  # Usage memoire

Limite de pandas

pandas charge TOUT en memoire RAM. Un fichier CSV de 5 GB necessite environ 10-15 GB de RAM (overhead Python). Pour les gros volumes, preferez DuckDB, Polars ou PySpark.

DuckDB - Le SQLite de l'analytique

DuckDB est une revolution recente dans le monde de la data. C'est une base de donnees analytique in-process (pas de serveur), optimisee pour les requetes OLAP. Elle peut traiter des fichiers Parquet de dizaines de GB directement, sans les charger en memoire.

DuckDB vs Base de donnees traditionnelle
    Base traditionnelle (PostgreSQL)          DuckDB (In-Process)
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚  Client   │───▢│ Serveur  β”‚             β”‚  Votre Script    β”‚
    β”‚  Python   │◀───│ PostgreSQLβ”‚             β”‚  Python          β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
         β”‚              β”‚                     β”‚  β”‚  DuckDB    β”‚  β”‚
         β”‚   Reseau     β”‚                     β”‚  β”‚  Engine    β”‚  β”‚
         β”‚   (latence)  β”‚                     β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
         β–Ό              β–Ό                     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β”‚
    β”‚  Serialisation /     β”‚                  Acces direct fichiers
    β”‚  Deserialisation     β”‚                  (zero copie)
    β”‚  des donnees         β”‚                        β”‚
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                        β–Ό
                                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                                              β”‚ Parquet / CSV /  β”‚
                                              β”‚ JSON / S3        β”‚
                                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Python
import duckdb

# Requete directe sur un fichier Parquet (sans le charger en memoire!)
result = duckdb.sql("""
    SELECT
        region,
        product_category,
        SUM(amount) as total_sales,
        COUNT(DISTINCT customer_id) as unique_customers,
        AVG(amount) as avg_order_value
    FROM 'sales_2024/*.parquet'       -- Glob sur plusieurs fichiers!
    WHERE order_date >= '2024-01-01'
    GROUP BY region, product_category
    HAVING total_sales > 100000
    ORDER BY total_sales DESC
""").df()  # Convertit en pandas DataFrame

# Requete sur un fichier CSV distant
duckdb.sql("""
    SELECT * FROM read_csv_auto(
        'https://data.example.com/dataset.csv',
        header=true,
        dateformat='%Y-%m-%d'
    )
    LIMIT 100
""")

# Integration transparente avec pandas
import pandas as pd
df_clients = pd.read_csv("clients.csv")
df_commandes = pd.read_parquet("commandes.parquet")

# DuckDB peut joindre un DataFrame pandas avec un fichier Parquet!
result = duckdb.sql("""
    SELECT c.nom, c.ville, SUM(co.montant) as total
    FROM df_clients c                         -- DataFrame pandas
    JOIN df_commandes co ON c.id = co.client_id  -- Fichier Parquet
    GROUP BY c.nom, c.ville
    ORDER BY total DESC
    LIMIT 20
""").df()

Pourquoi DuckDB est un game-changer

Zero infrastructure : pas de serveur, pas de Docker, pip install suffit. Performances : souvent 10-50x plus rapide que pandas pour les groupby/join. SQL natif : pas besoin d'apprendre une nouvelle API. Multi-format : lit Parquet, CSV, JSON, et meme les DataFrames pandas directement.

Polars - Le pandas moderne

Polars est une librairie de DataFrames ecrite en Rust, concue pour corriger les limitations historiques de pandas. Elle est systematiquement plus rapide et utilise moins de memoire grace a Apache Arrow en backend et une execution lazy par defaut.

Python
import polars as pl

# Lecture de donnees
df = pl.read_parquet("ventes.parquet")

# API d'expressions (plus claire que pandas)
result = (
    df.lazy()                                    # Mode lazy
    .filter(pl.col("date") >= "2024-01-01")     # Filtre
    .group_by("region", "categorie")             # Groupby
    .agg([
        pl.col("montant").sum().alias("total_ventes"),
        pl.col("client_id").n_unique().alias("clients_uniques"),
        pl.col("montant").mean().alias("panier_moyen"),
        pl.col("montant").quantile(0.95).alias("p95_montant"),
    ])
    .sort("total_ventes", descending=True)
    .collect()                                   # Execute tout d'un coup!
)

# Polars optimise automatiquement le plan d'execution
# - Predicate pushdown (filtre avant groupby)
# - Projection pushdown (ne lit que les colonnes necessaires)
# - Parallelisation automatique sur tous les coeurs

pandas

  • Execution eager (immediate)
  • Single-threaded par defaut
  • API mature, enorme ecosysteme
  • Index implicites (source de bugs)
  • Copie implicite des donnees
  • Backend NumPy (legacy)
  • Ideal pour : prototypage, notebooks

Polars

  • Execution lazy (optimisee)
  • Multi-threaded natif
  • API plus jeune mais croissante
  • Pas d'index (plus previsible)
  • Zero-copy quand possible
  • Backend Apache Arrow
  • Ideal pour : performance, pipelines

PySpark - Le Big Data distribue

PySpark est l'interface Python pour Apache Spark, le moteur de calcul distribue dominant. Il permet de traiter des petaoctets de donnees sur des clusters de centaines de machines.

Python
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("DataAnalysis") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Lecture depuis un data lake
df = spark.read.parquet("s3://datalake/ventes/year=2024/")

# API DataFrame similaire a pandas
result = (
    df.filter(F.col("status") == "completed")
      .groupBy("region", "category")
      .agg(
          F.sum("amount").alias("total"),
          F.countDistinct("customer_id").alias("unique_customers")
      )
      .orderBy(F.desc("total"))
)

# Ou en SQL pur
df.createOrReplaceTempView("ventes")
spark.sql("""
    SELECT region, category, SUM(amount) as total
    FROM ventes
    WHERE status = 'completed'
    GROUP BY region, category
    ORDER BY total DESC
""").show()

Matrice de decision : Quel outil pour quel volume ?

Critere pandas Polars DuckDB PySpark
Volume max < 1 GB < 50 GB < 100 GB Illimite (cluster)
Latence demarrage Instantane Instantane Instantane 30-60 secondes
Infrastructure Aucune Aucune Aucune Cluster Spark
Courbe d'apprentissage Moyenne Moyenne Faible (SQL) Elevee
Ecosysteme ML Excellent Bon (via Arrow) Moyen MLlib integre
Streaming Non Non Non Structured Streaming
Cas d'usage ideal Notebooks, prototypage, petits CSV ETL locaux, fichiers moyens Analytique ad-hoc, exploration de Parquet Data lakes, pipelines TB-scale
Arbre de decision - Choix de l'outil
                    Quel volume de donnees ?
                           β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β–Ό            β–Ό                β–Ό
          < 1 GB      1-100 GB          > 100 GB
              β”‚            β”‚                β”‚
              β–Ό            β–Ό                β–Ό
          pandas     Besoin de SQL ?     PySpark
              β”‚       β”‚          β”‚       (ou Spark SQL)
              β”‚      Oui        Non
              β”‚       β”‚          β”‚
              β”‚       β–Ό          β–Ό
              β”‚    DuckDB     Polars
              β”‚
              β–Ό
    Besoin de compatibilite
    ML (scikit-learn) ?
       β”‚           β”‚
      Oui         Non
       β”‚           β”‚
       β–Ό           β–Ό
    pandas     Polars (+ rapide)
Vous avez 30 fichiers Parquet totalisant 40 GB sur votre laptop (16 GB RAM). Quel outil choisir ?
DuckDB est le choix ideal. Il peut lire les fichiers Parquet en streaming sans les charger entierement en RAM, executer du SQL directement dessus, et gerer le glob pattern 'data/*.parquet' nativement. Polars serait aussi viable en mode lazy. pandas serait impossible (depassement memoire).
Quelle est la difference fondamentale entre DuckDB et SQLite ?
SQLite est optimise pour les operations OLTP (insertions/mises a jour ligne par ligne), tandis que DuckDB est optimise pour les operations OLAP (scans analytiques sur des colonnes entieres). DuckDB utilise un stockage en colonnes et une execution vectorisee, ce qui le rend 10-100x plus rapide que SQLite pour les requetes analytiques (GROUP BY, JOIN, aggregations).

SQLAlchemy - Le pont Python-Base de donnees

SQLAlchemy est la librairie standard pour connecter Python a n'importe quelle base de donnees relationnelle. Elle offre deux niveaux d'abstraction : le Core (SQL generatif) et l'ORM (mapping objet-relationnel).

Python
from sqlalchemy import create_engine

# Connexion a differentes bases
pg_engine = create_engine("postgresql://user:pass@host:5432/db")
mysql_engine = create_engine("mysql+pymysql://user:pass@host:3306/db")
sqlite_engine = create_engine("sqlite:///local.db")

# Integration pandas : le duo parfait
import pandas as pd
df = pd.read_sql("SELECT * FROM clients WHERE actif = true", pg_engine)
df_transforme.to_sql("clients_clean", pg_engine, if_exists="replace", index=False)

Netflix : Python comme lingua franca de la data

Netflix emploie plus de 1500 data scientists et ingenieurs data qui utilisent Python quotidiennement. Leur ecosysteme interne repose sur :

  • Metaflow (open-source) : framework Python pour les pipelines ML, developpe en interne chez Netflix
  • pandas + PySpark : les data scientists prototypent en pandas sur des echantillons, puis les ingenieurs passent en PySpark pour la production sur le data lake de 100+ PB
  • Jupyter Hub interne : chaque employe a acces a des notebooks connectes au data lake, avec des kernels pre-configures (pandas, Spark, TensorFlow)
  • Resultat : le systeme de recommandation genere 80% du contenu regarde, l'A/B testing Python traite 200+ experiences simultanees, et la detection d'anomalies Python surveille la qualite de streaming en temps reel

Lecon : Netflix a standardise sur Python non pas parce que c'est le langage le plus performant, mais parce qu'il permet la collaboration entre profils differents (data scientists, ingenieurs, analystes).

Utiliser un seul outil pour tout

Erreur courante du debutant : vouloir tout faire en pandas, ou tout migrer en PySpark. Chaque outil a sa zone de pertinence.

  • Erreur : "On utilise Spark pour tout, meme les fichiers de 100 MB" → overhead de demarrage de 30s pour un traitement de 2s en pandas
  • Erreur : "pandas suffit pour nos 50 GB de logs" → crash OOM ou temps de traitement de 4 heures au lieu de 2 minutes en DuckDB
  • Bonne pratique : evaluez le volume ET la frequence. Un traitement quotidien de 20 GB justifie DuckDB. Un traitement horaire de 500 GB justifie Spark.

Scenario : Choix d'architecture chez une startup e-commerce

Vous etes Data Architect dans une startup qui traite 500K commandes/mois (environ 2 GB de donnees). L'equipe data de 3 personnes hesite entre pandas et PySpark.

Votre recommandation :

  • Court terme (6 mois) : pandas + DuckDB. Le volume est largement gerableen local. DuckDB pour les requetes analytiques ad-hoc, pandas pour les transformations et l'integration ML.
  • Moyen terme (18 mois) : Si le volume depasse 50 GB/mois, introduire Polars pour les pipelines de transformation. DuckDB reste pour l'analytique.
  • Long terme (3+ ans) : Si le volume depasse 1 TB et que l'equipe grandit a 10+, migrer les pipelines critiques vers PySpark sur un cluster manage (EMR, Dataproc).

Erreur a eviter : ne pas sur-ingenierer avec Spark des le depart. Le cout d'un cluster Spark (infrastructure + expertise) est 10x celui d'une solution locale.

Quiz rapide

Vous devez analyser un fichier Parquet de 60 GB sur votre laptop (32 GB RAM). Quel outil est le plus adapte ?

pandas (avec chunking)
DuckDB
PySpark en local
NumPy
DuckDB peut traiter des fichiers Parquet bien plus gros que la RAM disponible grace a son execution en streaming et son moteur OLAP columnar. pandas crasherait en memoire, PySpark aurait un overhead inutile en local.

Quel est l'avantage principal de Polars sur pandas ?

Il est ecrit en Python pur
Il supporte plus de formats de fichiers
Il utilise l'execution lazy et le multi-threading natif
Il a un meilleur ecosysteme ML
Polars, ecrit en Rust avec Apache Arrow, offre une execution lazy (optimisation du plan de requete) et un multi-threading natif qui le rendent 5-20x plus rapide que pandas pour les transformations de donnees.

En 2024-2025, le paysage a radicalement change. Si vous debutez un nouveau projet, mon conseil : commencez avec DuckDB pour l'analytique et Polars pour les transformations. Ne sortez pandas que pour sa compatibilite avec scikit-learn et les notebooks. Et reservez Spark aux vrais cas Big Data (TB+). Le Data Architect moderne jongle entre ces outils - c'est une competence cle.

pandas DataFrames - Fondamentaux

75 min Intermediaire

Le DataFrame pandas est l'objet central de l'analyse de donnees en Python. Maitriser sa creation, la selection de donnees, le filtrage et les transformations est une competence incontournable pour tout Data Architect. Cette lecon approfondie couvre les patterns essentiels et les pieges a eviter.

Objectifs d'apprentissage

  • Creer des DataFrames a partir de multiples sources
  • Maitriser la selection avec loc, iloc et les conditions booleennes
  • Filtrer et transformer les donnees efficacement
  • Eviter les anti-patterns courants (iterrows, chained indexing)
  • Appliquer les bonnes pratiques de performance et vectorisation

Creation de DataFrames

Un DataFrame est un tableau 2D avec des labels sur les lignes (index) et les colonnes. Il peut etre cree a partir de nombreuses sources.

Python
import pandas as pd
import numpy as np

# 1. Depuis un dictionnaire (le plus courant en dev)
df = pd.DataFrame({
    'nom': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'age': [28, 34, 45, 31, 27],
    'ville': ['Paris', 'Lyon', 'Paris', 'Marseille', 'Lyon'],
    'salaire': [55000, 62000, 78000, 58000, 51000],
    'departement': ['Tech', 'Marketing', 'Tech', 'RH', 'Tech']
})

# 2. Depuis un fichier CSV (le plus courant en data)
df_ventes = pd.read_csv(
    "ventes.csv",
    sep=";",                      # Separateur (defaut: virgule)
    encoding="utf-8",             # Encodage
    parse_dates=["date_vente"],   # Colonnes date
    dtype={"code_postal": str},   # Forcer les types
    usecols=["date_vente", "montant", "client_id"],  # Colonnes utiles
    na_values=["N/A", "NULL", ""],  # Valeurs manquantes custom
    nrows=100000                  # Limiter les lignes (test)
)

# 3. Depuis une base de donnees SQL
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/ecommerce")
df_clients = pd.read_sql("""
    SELECT id, nom, email, date_inscription, ltv
    FROM clients
    WHERE actif = true
    ORDER BY date_inscription DESC
""", engine, parse_dates=["date_inscription"])

# 4. Depuis un fichier Parquet (recommande pour la data)
df_logs = pd.read_parquet(
    "logs_2024.parquet",
    columns=["timestamp", "event_type", "user_id"]  # Projection!
)

# 5. Depuis un fichier JSON (API responses)
df_api = pd.read_json("api_response.json", orient="records")

# 6. Depuis un fichier Excel
df_budget = pd.read_excel(
    "budget_2024.xlsx",
    sheet_name="Q4",
    skiprows=2,         # Sauter les lignes d'en-tete custom
    header=0            # Ligne d'en-tete apres skip
)

Toujours specifier les types a la lecture

Utilisez dtype et parse_dates dans read_csv(). Sans cela, pandas infere les types, ce qui est lent (double lecture du fichier) et souvent incorrect (codes postaux convertis en entiers, IDs en float si valeurs manquantes).

Exploration des donnees

Avant toute transformation, explorez vos donnees systematiquement. Voici les methodes essentielles :

Python
# Dimensions et structure
print(f"Shape: {df.shape}")          # (1000, 5)
print(f"Colonnes: {df.columns.tolist()}")
print(f"Types:\n{df.dtypes}")

# Premieres et dernieres lignes
df.head(10)
df.tail(5)
df.sample(5)   # 5 lignes aleatoires

# Statistiques descriptives
df.describe()                    # Colonnes numeriques
df.describe(include='object')    # Colonnes texte

# Informations memoire detaillees
df.info(memory_usage='deep')

# Valeurs manquantes
print(df.isnull().sum())                    # Par colonne
print(f"Total NaN: {df.isnull().sum().sum()}")
print(f"% complet: {df.notna().mean() * 100}")  # % par colonne

# Valeurs uniques
print(df['ville'].nunique())         # Nombre de valeurs uniques
print(df['ville'].value_counts())    # Distribution des valeurs

# Doublons
print(f"Doublons: {df.duplicated().sum()}")
print(f"Doublons sur nom+ville: {df.duplicated(subset=['nom','ville']).sum()}")

Selection de donnees : loc vs iloc

La selection de donnees est l'operation la plus frequente. pandas offre deux accesseurs principaux : .loc (selection par label) et .iloc (selection par position).

loc vs iloc - Regles de selection
    df.loc[lignes, colonnes]         df.iloc[lignes, colonnes]
    ────────────────────────         ─────────────────────────
    Par LABEL (nom)                  Par POSITION (numero)
    Inclusif des deux bornes         Exclusif de la borne droite

    Exemples :                       Exemples :
    df.loc[0, 'nom']                 df.iloc[0, 0]
    df.loc[0:5, 'nom':'age']        df.iloc[0:5, 0:2]
    df.loc[mask, ['nom','age']]     df.iloc[[0,2,4], [0,1]]

    β”Œβ”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚     β”‚   nom    β”‚ age β”‚  ville  β”‚
    β”‚     β”‚  col 0   β”‚ c 1 β”‚  col 2  β”‚
    β”œβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
    β”‚  0  β”‚  Alice   β”‚ 28  β”‚  Paris  β”‚  ◄── df.loc[0] = df.iloc[0]
    β”‚  1  β”‚  Bob     β”‚ 34  β”‚  Lyon   β”‚
    β”‚  2  β”‚  Charlie β”‚ 45  β”‚  Paris  β”‚
    β”‚  3  β”‚  Diana   β”‚ 31  β”‚ Marseil β”‚
    β”‚  4  β”‚  Eve     β”‚ 27  β”‚  Lyon   β”‚
    β””β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

    df.loc[1:3, 'nom':'age'] β†’ lignes 1,2,3 / colonnes nom, age
    df.iloc[1:3, 0:2]        β†’ lignes 1,2   / colonnes 0, 1
Python
# ===== SELECTION PAR LABEL : .loc =====
# Une seule valeur
df.loc[0, 'nom']                    # 'Alice'

# Une colonne entiere (Series)
df.loc[:, 'nom']                    # Equivalent: df['nom']

# Plusieurs colonnes
df.loc[:, ['nom', 'salaire']]       # Equivalent: df[['nom', 'salaire']]

# Plage de lignes et colonnes (inclusif!)
df.loc[1:3, 'nom':'ville']          # Lignes 1,2,3 et colonnes nom, age, ville

# Avec condition booleenne (le plus utile!)
df.loc[df['age'] > 30, ['nom', 'salaire']]

# ===== SELECTION PAR POSITION : .iloc =====
# Premiere ligne, premiere colonne
df.iloc[0, 0]                       # 'Alice'

# 5 premieres lignes, 2 premieres colonnes
df.iloc[:5, :2]

# Lignes et colonnes specifiques
df.iloc[[0, 2, 4], [0, 3]]          # Alice et Eve, nom et salaire

# Derniere ligne
df.iloc[-1]

# ===== SELECTION BOOLEENNE (MASQUES) =====
# Condition simple
jeunes = df[df['age'] < 30]

# Conditions combinees (& = AND, | = OR, ~ = NOT)
tech_seniors = df[(df['departement'] == 'Tech') & (df['age'] > 30)]
paris_ou_lyon = df[df['ville'].isin(['Paris', 'Lyon'])]
pas_paris = df[~(df['ville'] == 'Paris')]

# Condition sur texte
contient_a = df[df['nom'].str.contains('a', case=False)]
commence_par = df[df['nom'].str.startswith('A')]

# Condition avec query() - plus lisible
tech_seniors2 = df.query("departement == 'Tech' and age > 30")

Chained Indexing : df[col][row] vs .loc

Le chained indexing est une source majeure de bugs en pandas. Il cree une copie intermediaire et peut produire des resultats inattendus lors de l'affectation.

Python - MAUVAIS
# MAUVAIS : Chained indexing
df['salaire'][df['ville'] == 'Paris'] = 60000  # SettingWithCopyWarning!
# Cela ne modifie PAS forcement le DataFrame original.
# Pandas cree parfois une copie intermediaire.

# MAUVAIS : Double crochet
valeur = df['salaire'][0]   # Fonctionne mais fragile
Python - BON
# BON : Utiliser .loc pour toute modification
df.loc[df['ville'] == 'Paris', 'salaire'] = 60000  # Garanti!

# BON : .loc pour lire aussi
valeur = df.loc[0, 'salaire']   # Explicite et fiable

# BON : .at pour une valeur scalaire (plus rapide que .loc)
valeur = df.at[0, 'salaire']

Regle d'or : utilisez TOUJOURS .loc pour modifier un DataFrame. Jamais df[col][condition] = valeur.

Transformation de donnees

Les transformations sont le coeur du travail de preparation de donnees. Voici les operations les plus courantes.

Python
# ===== CREATION DE COLONNES =====
# Calcul simple
df['salaire_mensuel'] = df['salaire'] / 12

# Calcul conditionnel
df['seniorite'] = np.where(df['age'] >= 35, 'Senior', 'Junior')

# Conditions multiples avec np.select
conditions = [
    df['salaire'] < 40000,
    df['salaire'] < 60000,
    df['salaire'] < 80000,
    df['salaire'] >= 80000
]
choix = ['Bas', 'Moyen', 'Eleve', 'Tres eleve']
df['tranche_salaire'] = np.select(conditions, choix, default='Inconnu')

# Depuis une autre colonne (string operations)
df['prenom'] = df['nom'].str.split(' ').str[0]
df['email_domain'] = df['email'].str.extract(r'@(.+)$')
df['nom_upper'] = df['nom'].str.upper()

# ===== GESTION DES VALEURS MANQUANTES =====
# Detecter
df.isnull().sum()

# Supprimer les lignes avec NaN
df_clean = df.dropna()                      # Toutes colonnes
df_clean = df.dropna(subset=['salaire'])     # Colonnes specifiques

# Remplir les NaN
df['salaire'] = df['salaire'].fillna(df['salaire'].median())
df['ville'] = df['ville'].fillna('Inconnu')
df['date'] = df['date'].ffill()   # Forward fill (propager la valeur precedente)

# ===== RENOMMAGE ET SUPPRESSION =====
df = df.rename(columns={'nom': 'name', 'age': 'years'})
df = df.drop(columns=['colonne_inutile', 'temp'])

# ===== TRI =====
df = df.sort_values('salaire', ascending=False)
df = df.sort_values(['ville', 'salaire'], ascending=[True, False])

# ===== CONVERSION DE TYPES =====
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
df['montant'] = pd.to_numeric(df['montant'], errors='coerce')  # NaN si erreur
df['code'] = df['code'].astype(str)

# ===== OPERATIONS SUR LES DATES =====
df['annee'] = df['date'].dt.year
df['mois'] = df['date'].dt.month
df['jour_semaine'] = df['date'].dt.day_name()
df['trimestre'] = df['date'].dt.quarter
df['jours_depuis'] = (pd.Timestamp.now() - df['date']).dt.days

Performance : Vectorisation vs Boucles

La regle numero un de pandas : ne jamais iterer ligne par ligne. Utilisez toujours des operations vectorisees. La difference de performance peut atteindre un facteur 100x a 1000x.

iterrows() pour des operations vectorisables

C'est l'erreur la plus courante chez les developpeurs venant de langages imperatifs. iterrows() force Python a traiter chaque ligne individuellement, perdant tout le benefice de NumPy sous-jacent.

Python - NE FAITES JAMAIS CA
# TERRIBLE : iterrows() - 100x plus lent
# Sur 1M lignes : ~45 secondes
for index, row in df.iterrows():
    if row['montant'] > 100:
        df.at[index, 'categorie'] = 'Premium'
    else:
        df.at[index, 'categorie'] = 'Standard'

# MAUVAIS : apply() avec lambda - 10x plus lent
# Sur 1M lignes : ~8 secondes
df['categorie'] = df['montant'].apply(
    lambda x: 'Premium' if x > 100 else 'Standard'
)

# BON : np.where() - vectorise
# Sur 1M lignes : ~0.05 secondes (900x plus rapide!)
df['categorie'] = np.where(
    df['montant'] > 100, 'Premium', 'Standard'
)
Methode 1M lignes 10M lignes Facteur
iterrows() 45s 450s 1x (ref)
itertuples() 5s 50s ~9x
apply(lambda) 8s 80s ~6x
np.where() 0.05s 0.5s ~900x
np.select() 0.08s 0.8s ~560x
Operations vectorisees 0.03s 0.3s ~1500x

Pourquoi la vectorisation est si rapide

Les operations vectorisees executent le calcul dans du code C/Fortran compile (via NumPy), en une seule passe sur des blocs de memoire contigus. Les boucles Python ajoutent l'overhead de l'interpreteur pour chaque element : type checking, boxing/unboxing, et saut entre zones memoire non contiguees.

Python - Patterns de vectorisation
# Pattern 1 : Remplacer apply() par des operations vectorisees
# MAUVAIS
df['full_name'] = df.apply(lambda r: f"{r['prenom']} {r['nom']}", axis=1)
# BON
df['full_name'] = df['prenom'] + ' ' + df['nom']

# Pattern 2 : Remplacer les boucles de calcul
# MAUVAIS
totaux = []
for _, row in df.iterrows():
    totaux.append(row['prix'] * row['quantite'] * (1 - row['remise']))
df['total'] = totaux
# BON
df['total'] = df['prix'] * df['quantite'] * (1 - df['remise'])

# Pattern 3 : Conditions multiples avec np.select
conditions = [
    (df['score'] >= 90),
    (df['score'] >= 70),
    (df['score'] >= 50),
]
labels = ['A', 'B', 'C']
df['grade'] = np.select(conditions, labels, default='F')

# Pattern 4 : Mapping de valeurs
mapping = {'Paris': 'IDF', 'Lyon': 'ARA', 'Marseille': 'PACA'}
df['region'] = df['ville'].map(mapping)

# Pattern 5 : Binning numerique
df['tranche_age'] = pd.cut(
    df['age'],
    bins=[0, 25, 35, 50, 100],
    labels=['Junior', 'Confirme', 'Senior', 'Expert']
)

Scenario : Nettoyer un CSV avec 15 problemes de qualite

Vous recevez un fichier clients_dirty.csv avec 500K lignes et les problemes suivants. Ecrivez le pipeline de nettoyage complet.

Python
import pandas as pd
import numpy as np

# Lecture avec types explicites
df = pd.read_csv("clients_dirty.csv", dtype={
    'code_postal': str, 'telephone': str, 'id': str
}, parse_dates=['date_inscription'], na_values=['N/A', 'NULL', '', 'n/a'])

print(f"Shape initiale: {df.shape}")
print(f"Valeurs manquantes:\n{df.isnull().sum()}")

# 1. Supprimer les doublons exacts
df = df.drop_duplicates()

# 2. Supprimer les doublons sur email (garder le plus recent)
df = df.sort_values('date_inscription', ascending=False)
df = df.drop_duplicates(subset=['email'], keep='first')

# 3. Normaliser les noms (majuscules, espaces)
df['nom'] = df['nom'].str.strip().str.title()
df['prenom'] = df['prenom'].str.strip().str.title()

# 4. Normaliser les emails (minuscules)
df['email'] = df['email'].str.strip().str.lower()

# 5. Valider le format email
email_valide = df['email'].str.match(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
df.loc[~email_valide, 'email'] = np.nan

# 6. Nettoyer les numeros de telephone
df['telephone'] = (df['telephone']
    .str.replace(r'[^\d+]', '', regex=True)   # Garder chiffres et +
    .str.replace(r'^0', '+33', regex=True))    # Format international

# 7. Corriger les codes postaux
df['code_postal'] = df['code_postal'].str.zfill(5)  # Padding zeros
df.loc[df['code_postal'].str.len() != 5, 'code_postal'] = np.nan

# 8. Borner les ages aberrants
df.loc[(df['age'] < 0) | (df['age'] > 120), 'age'] = np.nan
df['age'] = df['age'].fillna(df['age'].median())

# 9. Borner les montants negatifs
df.loc[df['ltv'] < 0, 'ltv'] = 0

# 10. Normaliser les villes
df['ville'] = (df['ville'].str.strip().str.upper()
    .str.replace('SAINT ', 'ST ', regex=False)
    .str.replace('SAINTE ', 'STE ', regex=False))

# 11. Corriger les types
df['age'] = df['age'].astype(int)

# 12. Remplir les valeurs manquantes strategiquement
df['ville'] = df['ville'].fillna('INCONNU')
df['ltv'] = df['ltv'].fillna(0)

# 13. Creer des colonnes derivees
df['annee_inscription'] = df['date_inscription'].dt.year
df['anciennete_jours'] = (pd.Timestamp.now() - df['date_inscription']).dt.days

# 14. Supprimer les lignes sans email ET sans telephone
df = df.dropna(subset=['email', 'telephone'], how='all')

# 15. Reset de l'index
df = df.reset_index(drop=True)

print(f"Shape finale: {df.shape}")
print(f"Valeurs manquantes restantes:\n{df.isnull().sum()}")

Flashcards

Quelle est la difference entre df['col'] et df[['col']] ?
df['col'] retourne une Series (1D). df[['col']] retourne un DataFrame (2D avec une seule colonne). Important pour les fonctions qui attendent un type specifique.
Comment eviter le SettingWithCopyWarning ?
Utilisez toujours .loc pour modifier un DataFrame : df.loc[condition, 'colonne'] = valeur. Evitez le chained indexing df[col][row] = val. Si vous travaillez sur un sous-ensemble, faites une copie explicite avec .copy().
Quand utiliser .at[] vs .loc[] ?
.at[row, col] est optimise pour acceder a une seule valeur scalaire (plus rapide). .loc[rows, cols] est plus general et supporte les slices, conditions booleennes, et selections multiples. Utilisez .at dans les boucles (si vraiment necessaires) et .loc pour tout le reste.
Comment lire un CSV de 10 GB avec pandas sur une machine 8 GB RAM ?
Utilisez le parametre chunksize de read_csv() pour lire par morceaux : for chunk in pd.read_csv('big.csv', chunksize=100000): process(chunk). Mais preferez DuckDB ou Polars pour ce volume, qui gerent nativement les donnees plus grandes que la RAM.

Quiz

Quel est le resultat de df.loc[1:3] si l'index est [0, 1, 2, 3, 4] ?

Lignes aux positions 1 et 2 (exclusif)
Lignes avec labels 1, 2 et 3 (inclusif)
Lignes aux positions 1, 2 et 3 (inclusif)
Erreur car loc n'accepte pas de slices
.loc utilise les LABELS, et les slices sont INCLUSIVES des deux cotes. Donc df.loc[1:3] retourne les lignes avec les labels 1, 2, et 3. Avec .iloc, df.iloc[1:3] retournerait les positions 1 et 2 (exclusif a droite).

Comment appliquer une transformation conditionnelle sur 10M lignes le plus rapidement ?

df.apply(lambda row: ..., axis=1)
for index, row in df.iterrows(): ...
np.where(condition, valeur_si_vrai, valeur_si_faux)
df.itertuples() avec traitement par tuple
np.where() est la methode vectorisee la plus rapide pour les conditions binaires. Elle execute le calcul en C/Fortran sur des blocs memoire contigus, evitant l'overhead de l'interpreteur Python. Sur 10M lignes : ~0.5s vs ~80s pour apply() et ~450s pour iterrows().

Quel parametre de read_csv() est critique pour eviter de charger 10 GB en memoire ?

low_memory=True
chunksize=100000
nrows=1000
memory_map=True
chunksize retourne un iterateur de DataFrames au lieu de tout charger. Chaque chunk tient en memoire et peut etre traite puis libere. nrows limiterait les donnees lues (pas tout le fichier). low_memory affecte l'inference de types, pas le chargement.

Apres 10 ans de pandas, voici mon conseil : maitrisez .loc, les operations vectorisees et read_csv avec ses parametres. Ces trois competences couvrent 80% du travail quotidien. Et surtout, prenez l'habitude de TOUJOURS regarder df.info(memory_usage='deep') avant de travailler - ca vous evitera beaucoup de surprises de memoire.

pandas Avance - groupby, merge, pivot

75 min Intermediaire

Les operations avancees de pandas - groupby, merge, pivot_table et les window functions - sont les briques essentielles de toute analyse de donnees serieuse. Cette lecon approfondit ces concepts avec des cas reels, des optimisations memoire, et les pieges a eviter en production.

Objectifs d'apprentissage

  • Maitriser groupby avec des aggregations complexes
  • Joindre des DataFrames avec merge et les differents types de jointures
  • Creer des tableaux croises avec pivot_table
  • Utiliser les window functions pandas (rolling, expanding, shift)
  • Optimiser la memoire (dtypes, categories, downcasting)
  • Eviter les anti-patterns apply() et les copies inutiles

groupby - Split-Apply-Combine

Le paradigme groupby est au coeur de l'analyse de donnees. Il decoupe les donnees en groupes, applique une fonction a chaque groupe, puis recombine les resultats. Comprendre ce mecanisme est fondamental.

groupby - Mecanisme Split-Apply-Combine
  DataFrame original          SPLIT              APPLY            COMBINE
β”Œβ”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
β”‚regionβ”‚produitβ”‚montantβ”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Nord β”‚  A   β”‚  100  │──▢│ Nord:        β”‚   sum()        β”Œβ”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Nord β”‚  B   β”‚  200  β”‚   β”‚  100,200,150 │──────▢ 450 ──▢│ Nord β”‚  450  β”‚
β”‚ Sud  β”‚  A   β”‚  300  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                β”‚ Sud  β”‚  550  β”‚
β”‚ Sud  β”‚  B   β”‚  250  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                β”‚ Est  β”‚  180  β”‚
β”‚ Est  β”‚  A   β”‚  180  │──▢│ Sud:         β”‚   sum()        β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚ Nord β”‚  A   β”‚  150  β”‚   β”‚  300, 250    │──────▢ 550
β”‚ Sud  β”‚  A   β”‚  300  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                       ──▢│ Est:         β”‚   sum()
                          β”‚  180         │──────▢ 180
                          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Python
import pandas as pd
import numpy as np

# Donnees exemple : ventes e-commerce
df = pd.DataFrame({
    'date': pd.date_range('2024-01-01', periods=1000, freq='h'),
    'region': np.random.choice(['IDF', 'PACA', 'ARA', 'NAQ'], 1000),
    'categorie': np.random.choice(['Electronique', 'Mode', 'Maison'], 1000),
    'montant': np.random.uniform(10, 500, 1000).round(2),
    'client_id': np.random.randint(1, 200, 1000),
    'quantite': np.random.randint(1, 10, 1000)
})

# ===== AGGREGATION SIMPLE =====
# Chiffre d'affaires par region
ca_region = df.groupby('region')['montant'].sum()

# Plusieurs metriques a la fois
stats = df.groupby('region')['montant'].agg(['sum', 'mean', 'median', 'std', 'count'])

# ===== AGGREGATION MULTIPLE (named agg) =====
rapport = df.groupby('region').agg(
    ca_total=('montant', 'sum'),
    panier_moyen=('montant', 'mean'),
    nb_commandes=('montant', 'count'),
    clients_uniques=('client_id', 'nunique'),
    montant_max=('montant', 'max'),
    quantite_totale=('quantite', 'sum')
).round(2)

# ===== GROUPBY MULTI-NIVEAUX =====
croise = df.groupby(['region', 'categorie']).agg(
    ca=('montant', 'sum'),
    nb=('montant', 'count')
).reset_index()  # Important pour avoir un DataFrame plat

# ===== AGGREGATION AVEC FONCTIONS CUSTOM =====
def coefficient_variation(x):
    return x.std() / x.mean() * 100

volatilite = df.groupby('region')['montant'].agg(
    moyenne='mean',
    ecart_type='std',
    cv=coefficient_variation
)

# ===== TRANSFORM (ajouter le resultat au DataFrame original) =====
# Ajouter le CA total de la region a chaque ligne
df['ca_region'] = df.groupby('region')['montant'].transform('sum')

# Pourcentage du CA regional
df['pct_region'] = df['montant'] / df.groupby('region')['montant'].transform('sum') * 100

# Z-score par region (normalisation)
df['zscore'] = df.groupby('region')['montant'].transform(
    lambda x: (x - x.mean()) / x.std()
)

# ===== FILTER (garder/exclure des groupes entiers) =====
# Regions avec plus de 200 commandes
regions_actives = df.groupby('region').filter(lambda g: len(g) > 200)

# Clients avec un panier moyen > 200
gros_clients = df.groupby('client_id').filter(
    lambda g: g['montant'].mean() > 200
)

merge - Jointures de DataFrames

La jointure de DataFrames est l'equivalent du JOIN SQL. pandas offre merge() qui supporte les 4 types de jointures classiques.

Python
# DataFrames a joindre
clients = pd.DataFrame({
    'client_id': [1, 2, 3, 4, 5],
    'nom': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
    'ville': ['Paris', 'Lyon', 'Paris', 'Marseille', 'Lyon'],
    'segment': ['Gold', 'Silver', 'Gold', 'Bronze', 'Silver']
})

commandes = pd.DataFrame({
    'order_id': [101, 102, 103, 104, 105, 106],
    'client_id': [1, 2, 1, 3, 6, 2],      # client 6 n'existe pas!
    'montant': [150, 200, 75, 300, 50, 120],
    'date': pd.date_range('2024-01-01', periods=6, freq='W')
})

# INNER JOIN (par defaut) - uniquement les correspondances
inner = pd.merge(commandes, clients, on='client_id', how='inner')
# 5 lignes (client_id=6 exclu car pas dans clients)

# LEFT JOIN - tout le DataFrame de gauche
left = pd.merge(commandes, clients, on='client_id', how='left')
# 6 lignes, client_id=6 avec NaN pour nom/ville/segment

# RIGHT JOIN - tout le DataFrame de droite
right = pd.merge(commandes, clients, on='client_id', how='right')
# inclut Diana et Eve (pas de commandes) avec NaN pour order_id/montant

# FULL OUTER JOIN - tout des deux cotes
outer = pd.merge(commandes, clients, on='client_id', how='outer')
# Inclut client_id=6 ET Diana/Eve

# Jointure sur colonnes differentes
pd.merge(df1, df2, left_on='id_client', right_on='customer_id')

# Jointure sur plusieurs colonnes
pd.merge(df1, df2, on=['annee', 'mois', 'region'])

# Verifier les doublons de jointure (indicator)
result = pd.merge(commandes, clients, on='client_id', how='outer', indicator=True)
print(result['_merge'].value_counts())
# both          5  (dans les deux)
# left_only     1  (client_id=6, uniquement dans commandes)
# right_only    2  (Diana, Eve, uniquement dans clients)
Types de jointures - Recapitulatif visuel
  INNER JOIN          LEFT JOIN          RIGHT JOIN         OUTER JOIN
  β”Œβ”€β”€β”€β”¬β”€β”€β”€β”          β”Œβ”€β”€β”€β”¬β”€β”€β”€β”          β”Œβ”€β”€β”€β”¬β”€β”€β”€β”          β”Œβ”€β”€β”€β”¬β”€β”€β”€β”
  β”‚ A β”‚ B β”‚          β”‚ A β”‚ B β”‚          β”‚ A β”‚ B β”‚          β”‚ A β”‚ B β”‚
  β”‚   β”Œβ”Όβ”  β”‚          β”‚β–ˆβ–ˆβ–ˆβ”Œβ”Όβ”  β”‚          β”‚   β”Œβ”Όβ”β–ˆβ–ˆβ”‚          β”‚β–ˆβ–ˆβ–ˆβ”Œβ”Όβ”β–ˆβ–ˆβ”‚
  β”‚   β”‚β–ˆβ”‚  β”‚          β”‚β–ˆβ–ˆβ–ˆβ”‚β–ˆβ”‚  β”‚          β”‚   β”‚β–ˆβ”‚β–ˆβ–ˆβ”‚          β”‚β–ˆβ–ˆβ–ˆβ”‚β–ˆβ”‚β–ˆβ–ˆβ”‚
  β”‚   β””β”Όβ”˜  β”‚          β”‚β–ˆβ–ˆβ–ˆβ””β”Όβ”˜  β”‚          β”‚   β””β”Όβ”˜β–ˆβ–ˆβ”‚          β”‚β–ˆβ–ˆβ–ˆβ””β”Όβ”˜β–ˆβ–ˆβ”‚
  β””β”€β”€β”€β”΄β”€β”€β”€β”˜          β””β”€β”€β”€β”΄β”€β”€β”€β”˜          β””β”€β”€β”€β”΄β”€β”€β”€β”˜          β””β”€β”€β”€β”΄β”€β”€β”€β”˜
  Intersection       Tout A +           Tout B +           Tout A +
  uniquement         match B            match A            Tout B

Piege : Explosion de lignes lors de merge

Si les cles de jointure ne sont pas uniques dans un des DataFrames, merge produit un produit cartesien : 3 lignes x 4 lignes = 12 lignes! Verifiez toujours avec df['cle'].duplicated().sum() avant un merge. Utilisez validate='one_to_many' pour detecter les problemes.

pivot_table - Tableaux croises dynamiques

pivot_table est l'equivalent Python des tableaux croises dynamiques Excel. C'est un outil puissant pour resumer des donnees multidimensionnelles.

Python
# Pivot table simple : CA par region et categorie
pivot = df.pivot_table(
    values='montant',
    index='region',           # Lignes
    columns='categorie',      # Colonnes
    aggfunc='sum',            # Fonction d'agregation
    fill_value=0,             # Remplacer NaN par 0
    margins=True,             # Ajouter les totaux
    margins_name='Total'
)

# Pivot avec plusieurs metriques
pivot_multi = df.pivot_table(
    values=['montant', 'quantite'],
    index='region',
    columns='categorie',
    aggfunc={
        'montant': ['sum', 'mean'],
        'quantite': 'sum'
    }
)

# Pivot temporel (mensuel par region)
df['mois'] = df['date'].dt.to_period('M')
pivot_time = df.pivot_table(
    values='montant',
    index='mois',
    columns='region',
    aggfunc='sum'
)

# Crosstab : frequences croisees (raccourci)
freq = pd.crosstab(
    df['region'],
    df['categorie'],
    normalize='index'   # Pourcentages par ligne
).round(3) * 100

Window Functions - rolling, expanding, shift

Les window functions pandas sont l'equivalent des fonctions fenetres SQL. Elles calculent des metriques sur des fenetres glissantes ou cumulatives.

Python
# Donnees temporelles triees
df_ts = df.sort_values('date').set_index('date')

# ===== ROLLING (fenetre glissante) =====
# Moyenne mobile sur 7 jours
df_ts['mm7'] = df_ts['montant'].rolling(window=7).mean()

# Moyenne mobile ponderee
df_ts['mm_exp'] = df_ts['montant'].ewm(span=7).mean()

# Rolling par groupe
df['mm7_region'] = (df.sort_values('date')
    .groupby('region')['montant']
    .rolling(7).mean()
    .reset_index(level=0, drop=True))

# ===== EXPANDING (cumul depuis le debut) =====
df_ts['cumul'] = df_ts['montant'].expanding().sum()
df_ts['max_historique'] = df_ts['montant'].expanding().max()

# ===== SHIFT (decalage temporel) =====
df_ts['montant_precedent'] = df_ts['montant'].shift(1)     # Jour precedent
df_ts['montant_suivant'] = df_ts['montant'].shift(-1)      # Jour suivant
df_ts['variation'] = df_ts['montant'].pct_change()          # % variation

# ===== RANK (classement) =====
df['rank_montant'] = df.groupby('region')['montant'].rank(
    method='dense', ascending=False
)

# Top 3 par region
top3 = df[df['rank_montant'] <= 3].sort_values(
    ['region', 'rank_montant']
)

Optimisation memoire

Sur des DataFrames volumineux, l'optimisation memoire peut reduire l'empreinte de 50 a 90%. C'est la difference entre un traitement qui passe en memoire et un crash OOM.

Python
def optimize_dataframe(df, verbose=True):
    """Reduit l'empreinte memoire d'un DataFrame."""
    start_mem = df.memory_usage(deep=True).sum() / 1024**2

    for col in df.columns:
        col_type = df[col].dtype

        if col_type == 'object':
            # Convertir en category si peu de valeurs uniques
            ratio = df[col].nunique() / len(df)
            if ratio < 0.5:  # Moins de 50% de valeurs uniques
                df[col] = df[col].astype('category')

        elif col_type in ['int64', 'int32']:
            # Downcast entiers
            col_min = df[col].min()
            col_max = df[col].max()
            if col_min >= 0:
                if col_max < 255:
                    df[col] = df[col].astype(np.uint8)
                elif col_max < 65535:
                    df[col] = df[col].astype(np.uint16)
                elif col_max < 4294967295:
                    df[col] = df[col].astype(np.uint32)
            else:
                if col_min > -128 and col_max < 127:
                    df[col] = df[col].astype(np.int8)
                elif col_min > -32768 and col_max < 32767:
                    df[col] = df[col].astype(np.int16)
                elif col_min > -2147483648 and col_max < 2147483647:
                    df[col] = df[col].astype(np.int32)

        elif col_type == 'float64':
            df[col] = pd.to_numeric(df[col], downcast='float')

    end_mem = df.memory_usage(deep=True).sum() / 1024**2
    if verbose:
        print(f"Memoire: {start_mem:.1f} MB -> {end_mem:.1f} MB "
              f"(-{(1-end_mem/start_mem)*100:.0f}%)")
    return df

# Exemple : 1M lignes
# Avant: 450 MB -> Apres: 85 MB (-81%)
df = optimize_dataframe(df)

Le type Category : votre meilleur allie memoire

Une colonne object (string) avec 10 valeurs uniques sur 1M lignes stocke 1M de pointeurs vers des strings. En category, elle stocke 10 strings + 1M d'entiers de mapping. Gain : 70-95% de memoire. Bonus : les groupby sur les categories sont 2-5x plus rapides.

apply() avec lambda vs operations vectorisees

Le reflexe df.apply(lambda x: ...) est tentant mais presque toujours sous-optimal. Chaque appel apply() boucle en Python sur chaque ligne, annulant les benefices de NumPy.

Python - Comparaisons
# Scenario : calculer un prix TTC avec remise conditionnelle
# MAUVAIS : apply + lambda (~12 sec sur 5M lignes)
df['ttc'] = df.apply(
    lambda row: row['prix'] * 1.2 * (0.9 if row['premium'] else 1.0),
    axis=1
)

# BON : vectorise (~0.1 sec sur 5M lignes, 120x plus rapide)
remise = np.where(df['premium'], 0.9, 1.0)
df['ttc'] = df['prix'] * 1.2 * remise

# Scenario : extraire le domaine d'un email
# MAUVAIS : apply
df['domain'] = df['email'].apply(lambda x: x.split('@')[1])
# BON : str accessor vectorise
df['domain'] = df['email'].str.split('@').str[1]

# Scenario : concat de colonnes
# MAUVAIS : apply
df['full'] = df.apply(lambda r: f"{r['prenom']} {r['nom']}", axis=1)
# BON : operations string vectorisees
df['full'] = df['prenom'] + ' ' + df['nom']

Regle : avant d'ecrire apply(), demandez-vous s'il existe une methode vectorisee equivalente. Dans 90% des cas, la reponse est oui.

Pandas en production : les pieges chez Instacart

Instacart, le service de livraison de courses, a partage ses retours d'experience sur l'utilisation de pandas en production :

  • Probleme 1 - Memoire : un pipeline pandas consommait 64 GB de RAM pour traiter 20 GB de donnees. Cause : copies implicites lors des chained operations. Solution : inplace=True la ou c'est possible, et surtout restructurer en etapes avec liberation memoire (del df_temp).
  • Probleme 2 - Reproductibilite : les resultats de groupby().apply() variaient selon l'ordre des donnees. Cause : la fonction custom n'etait pas deterministe. Solution : toujours trier avant groupby (sort_values).
  • Probleme 3 - Performance : un rapport journalier prenait 45 minutes. L'equipe a profile et decouvert que 80% du temps etait dans 3 lignes d'apply(). Apres vectorisation : 3 minutes.
  • Lecon : pandas est excellent pour le prototypage, mais chaque pipeline en production doit etre profile avec %%timeit et optimise avant deploiement. Instacart a fini par migrer les pipelines critiques vers PySpark tout en gardant pandas pour l'exploration.

Scenario : Rapport de ventes mensuel automatise

Vous devez creer un rapport automatise qui, a partir des donnees brutes de commandes, produit un tableau croise des ventes par region et categorie, avec les variations mois par mois.

Python
def generer_rapport_mensuel(df_commandes):
    """Genere un rapport de ventes mensuel."""
    # 1. Preparation
    df = df_commandes.copy()
    df['mois'] = df['date'].dt.to_period('M')

    # 2. Tableau croise : CA par region et mois
    pivot_ca = df.pivot_table(
        values='montant', index='region',
        columns='mois', aggfunc='sum', fill_value=0
    )

    # 3. Variation mensuelle (%)
    variation = pivot_ca.pct_change(axis=1) * 100

    # 4. Top 5 clients par region
    top_clients = (df.groupby(['region', 'client_id'])
        .agg(ca=('montant', 'sum'), nb_commandes=('montant', 'count'))
        .reset_index()
        .sort_values('ca', ascending=False)
        .groupby('region').head(5))

    # 5. KPIs globaux
    kpis = {
        'ca_total': df['montant'].sum(),
        'panier_moyen': df['montant'].mean(),
        'nb_commandes': len(df),
        'clients_actifs': df['client_id'].nunique(),
        'meilleure_region': df.groupby('region')['montant'].sum().idxmax()
    }

    return pivot_ca, variation, top_clients, kpis

Flashcards

Quelle est la difference entre groupby().agg() et groupby().transform() ?
agg() retourne un DataFrame reduit (une ligne par groupe). transform() retourne un DataFrame de meme taille que l'original, en "broadcast" le resultat du groupe sur chaque ligne. Utilisez transform() pour ajouter des statistiques de groupe a chaque ligne (ex: % du total du groupe).
Comment detecter une explosion de lignes lors d'un merge ?
Utilisez validate : pd.merge(df1, df2, on='key', validate='one_to_many'). Options : 'one_to_one', 'one_to_many', 'many_to_one', 'many_to_many'. Si la contrainte est violee, pandas leve une MergeError. Verifiez aussi len(result) vs len(df1) apres le merge.
Quelle est la maniere la plus efficace de convertir une colonne string avec peu de valeurs uniques ?
Convertir en type category : df['col'] = df['col'].astype('category'). Cela reduit l'empreinte memoire de 70-95% et accelere les groupby de 2-5x. Ideal pour les colonnes comme 'pays', 'statut', 'categorie' avec un nombre fixe de valeurs.

Quiz

Que retourne df.groupby('region')['montant'].transform('mean') ?

Un DataFrame avec une ligne par region et la moyenne
Une Series de meme longueur que df avec la moyenne de chaque region
Un scalaire avec la moyenne globale
Un dictionnaire region: moyenne
transform() retourne une Series de meme longueur que le DataFrame original. Chaque ligne contient la valeur aggregee de son groupe. C'est l'equivalent du OVER(PARTITION BY) en SQL.

Un merge entre df1 (100K lignes) et df2 (50K lignes) produit 5M lignes. Que s'est-il passe ?

pandas a duplique les donnees par erreur
Les cles de jointure ont des doublons des deux cotes (many-to-many)
C'est un CROSS JOIN implicite
Le parametre how='outer' a ete utilise
Quand les cles de jointure ne sont pas uniques dans les deux DataFrames, merge effectue un produit cartesien par groupe : si une cle apparait 100 fois dans df1 et 50 fois dans df2, cela produit 5000 lignes pour cette seule cle. Utilisez validate='one_to_many' pour prevenir ce probleme.

Le trio groupby/merge/pivot_table est l'equivalent pandas du SQL analytique. Si vous savez ecrire un GROUP BY avec HAVING, un JOIN avec des conditions, et un tableau croise en SQL, vous saurez faire la meme chose en pandas. Mon conseil : pensez toujours en termes de colonnes (vectorise), jamais en termes de lignes (boucle). C'est le changement de paradigme fondamental de pandas.

SQLAlchemy - Connexion & ORM

45 min Intermediaire

SQLAlchemy est la librairie de reference pour connecter Python aux bases de donnees relationnelles. Elle offre deux niveaux : le Core (SQL generatif type-safe) et l'ORM (mapping objet-relationnel). Pour un Data Architect, comprendre ces deux couches et savoir quand utiliser l'une ou l'autre est essentiel.

Objectifs d'apprentissage

  • Configurer un Engine et comprendre le connection pooling
  • Ecrire des requetes avec le Core (SQL generatif)
  • Modeliser des tables avec l'ORM
  • Integrer SQLAlchemy avec pandas
  • Eviter l'anti-pattern N+1 queries
  • Choisir entre raw SQL, Core, ORM et pandas read_sql

Engine et Connection Pooling

L'Engine est le point d'entree de SQLAlchemy. Il gere la connexion a la base et, surtout, le pool de connexions qui evite d'ouvrir/fermer une connexion a chaque requete.

Architecture SQLAlchemy
  Votre code Python
       β”‚
       β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚    Engine     │── URL de connexion: "postgresql://user:pass@host:5432/db"
β”‚              β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚   Pool de connexions (par defaut: 5 connexions)
β”‚  β”‚  Pool  β”‚  β”‚   β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”
β”‚  β”‚        │──│──▢│conn1β”‚ β”‚conn2β”‚ β”‚conn3β”‚ β”‚conn4β”‚ β”‚conn5β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚   β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜ β””β”€β”€β”¬β”€β”€β”˜
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β”‚       β”‚       β”‚       β”‚       β”‚
       β”‚              β””β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜
       β–Ό                              β”‚
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                        β–Ό
  β”‚   Core   β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ (SQL     β”‚              β”‚  Base de donnees  β”‚
  β”‚ generatifβ”‚              β”‚  (PostgreSQL,     β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚   MySQL, etc.)    β”‚
       β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β–Ό
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚   ORM    β”‚
  β”‚ (objets  β”‚
  β”‚  Python) β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Python
from sqlalchemy import create_engine

# ===== CREATION D'ENGINE =====
# PostgreSQL
engine = create_engine(
    "postgresql+psycopg2://user:password@localhost:5432/ecommerce",
    pool_size=10,            # Connexions permanentes
    max_overflow=20,         # Connexions supplementaires en pic
    pool_timeout=30,         # Timeout si pas de connexion dispo (sec)
    pool_recycle=3600,       # Recycler apres 1h (evite timeout serveur)
    echo=False               # True pour voir le SQL genere (debug)
)

# MySQL
mysql_engine = create_engine(
    "mysql+pymysql://user:pass@host:3306/db?charset=utf8mb4"
)

# SQLite (fichier local, ideal pour tests)
sqlite_engine = create_engine("sqlite:///local_database.db")

# DuckDB (via SQLAlchemy!)
duckdb_engine = create_engine("duckdb:///analytics.duckdb")

# ===== UTILISATION BASIQUE =====
# Context manager (recommande - ferme automatiquement la connexion)
from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM clients LIMIT 10"))
    for row in result:
        print(row.nom, row.email)

# Transaction explicite
with engine.begin() as conn:  # Auto-commit a la fin, rollback si exception
    conn.execute(text("UPDATE clients SET actif = false WHERE ltv = 0"))
    conn.execute(text("DELETE FROM sessions WHERE expire < NOW()"))

Pourquoi le Connection Pooling est critique

Ouvrir une connexion PostgreSQL prend 50-100ms (negotiation TCP, authentification, allocation memoire). Dans une API web traitant 1000 req/sec, cela ajouterait 50-100 secondes de latence par seconde! Le pool pre-ouvre N connexions et les reutilise, reduisant la latence a ~0.1ms par requete.

SQLAlchemy Core - SQL generatif

Le Core permet de construire des requetes SQL de maniere programmatique et type-safe, sans ecrire de SQL brut. C'est un compromis excellent entre le raw SQL et l'ORM complet.

Python
from sqlalchemy import MetaData, Table, Column, Integer, String, Float, DateTime
from sqlalchemy import select, insert, update, delete, func, and_, or_

# Reflexion : charger les tables existantes
metadata = MetaData()
metadata.reflect(bind=engine)
clients = metadata.tables['clients']
commandes = metadata.tables['commandes']

# SELECT avec conditions
stmt = (
    select(clients.c.nom, clients.c.email, clients.c.ville)
    .where(and_(
        clients.c.actif == True,
        clients.c.ltv > 1000
    ))
    .order_by(clients.c.ltv.desc())
    .limit(50)
)

with engine.connect() as conn:
    result = conn.execute(stmt)
    rows = result.fetchall()

# JOIN + GROUP BY
stmt = (
    select(
        clients.c.ville,
        func.count(commandes.c.id).label('nb_commandes'),
        func.sum(commandes.c.montant).label('ca_total'),
        func.avg(commandes.c.montant).label('panier_moyen')
    )
    .join(commandes, clients.c.id == commandes.c.client_id)
    .group_by(clients.c.ville)
    .having(func.count(commandes.c.id) > 10)
    .order_by(func.sum(commandes.c.montant).desc())
)

# INSERT
stmt = insert(clients).values(
    nom='Jean Dupont',
    email='jean@example.com',
    ville='Paris',
    actif=True
)

# Bulk INSERT (beaucoup plus rapide)
with engine.begin() as conn:
    conn.execute(
        insert(clients),
        [
            {'nom': 'Alice', 'email': 'alice@ex.com', 'ville': 'Lyon'},
            {'nom': 'Bob', 'email': 'bob@ex.com', 'ville': 'Paris'},
            # ... des milliers de lignes
        ]
    )

SQLAlchemy ORM - Mapping objet-relationnel

L'ORM mappe les tables SQL sur des classes Python. Chaque ligne devient un objet, chaque colonne un attribut. C'est puissant mais avec des pieges de performance.

Python
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy import ForeignKey
from datetime import datetime
from typing import List, Optional

# Base declarative (SQLAlchemy 2.0+)
class Base(DeclarativeBase):
    pass

class Client(Base):
    __tablename__ = 'clients'

    id: Mapped[int] = mapped_column(primary_key=True)
    nom: Mapped[str] = mapped_column(String(100))
    email: Mapped[str] = mapped_column(String(255), unique=True)
    ville: Mapped[Optional[str]] = mapped_column(String(100))
    date_inscription: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    actif: Mapped[bool] = mapped_column(default=True)

    # Relation : un client a plusieurs commandes
    commandes: Mapped[List["Commande"]] = relationship(back_populates="client")

    def __repr__(self):
        return f"Client(id={self.id}, nom='{self.nom}')"

class Commande(Base):
    __tablename__ = 'commandes'

    id: Mapped[int] = mapped_column(primary_key=True)
    client_id: Mapped[int] = mapped_column(ForeignKey('clients.id'))
    montant: Mapped[float] = mapped_column(Float)
    date: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    statut: Mapped[str] = mapped_column(String(20), default='pending')

    # Relation inverse
    client: Mapped["Client"] = relationship(back_populates="commandes")

# Creation des tables
Base.metadata.create_all(engine)

# Session pour interagir
SessionLocal = sessionmaker(bind=engine)

# ===== OPERATIONS CRUD =====
with SessionLocal() as session:
    # CREATE
    nouveau_client = Client(nom="Marie Martin", email="marie@ex.com", ville="Paris")
    session.add(nouveau_client)
    session.commit()

    # READ
    client = session.query(Client).filter_by(email="marie@ex.com").first()
    print(client.nom)  # Marie Martin

    # UPDATE
    client.ville = "Lyon"
    session.commit()

    # Requetes avancees
    clients_paris = (
        session.query(Client)
        .filter(Client.ville == "Paris", Client.actif == True)
        .order_by(Client.nom)
        .limit(20)
        .all()
    )

N+1 Queries avec l'ORM

Le probleme N+1 est le piege le plus dangereux de tout ORM. Il se produit quand vous accedez a une relation dans une boucle, generant une requete SQL par element.

Python - PROBLEME N+1
# MAUVAIS : N+1 queries (1 + 100 = 101 requetes SQL!)
clients = session.query(Client).limit(100).all()  # 1 requete
for client in clients:
    print(f"{client.nom}: {len(client.commandes)} commandes")
    # ^ Chaque acces a client.commandes declenche 1 requete!
    # Total: 1 (clients) + 100 (commandes par client) = 101 requetes

# BON : Eager loading avec joinedload (1 seule requete avec JOIN)
from sqlalchemy.orm import joinedload

clients = (
    session.query(Client)
    .options(joinedload(Client.commandes))  # JOIN en une seule requete
    .limit(100)
    .all()
)
for client in clients:
    print(f"{client.nom}: {len(client.commandes)} commandes")
    # Aucune requete supplementaire!

# ALTERNATIF : subqueryload (2 requetes, mieux pour de gros resultats)
from sqlalchemy.orm import subqueryload

clients = (
    session.query(Client)
    .options(subqueryload(Client.commandes))
    .limit(100)
    .all()
)
N+1 vs Eager Loading
  N+1 (MAUVAIS)                        Eager Loading (BON)
  ─────────────                        ───────────────────
  Query 1: SELECT * FROM clients       Query 1: SELECT clients.*, commandes.*
  Query 2: SELECT * FROM commandes              FROM clients
           WHERE client_id = 1                  LEFT JOIN commandes
  Query 3: SELECT * FROM commandes              ON clients.id = commandes.client_id
           WHERE client_id = 2
  Query 4: SELECT * FROM commandes     Total: 1 requete, ~50ms
           WHERE client_id = 3
  ...
  Query 101: SELECT * FROM commandes
           WHERE client_id = 100

  Total: 101 requetes, ~2000ms

Integration avec pandas

L'integration SQLAlchemy + pandas est le workflow le plus courant pour les Data Architects : lire depuis la base, transformer en Python, ecrire le resultat.

Python
import pandas as pd
from sqlalchemy import create_engine, text

engine = create_engine("postgresql://user:pass@localhost/ecommerce")

# ===== LIRE DEPUIS LA BASE =====
# Methode 1 : SQL brut (le plus courant et flexible)
df = pd.read_sql("""
    SELECT c.nom, c.ville, SUM(co.montant) as ca_total
    FROM clients c
    JOIN commandes co ON c.id = co.client_id
    WHERE co.date >= '2024-01-01'
    GROUP BY c.nom, c.ville
    HAVING SUM(co.montant) > 1000
    ORDER BY ca_total DESC
""", engine)

# Methode 2 : avec parametres (securise contre SQL injection!)
df = pd.read_sql(
    text("SELECT * FROM clients WHERE ville = :ville AND actif = :actif"),
    engine,
    params={"ville": "Paris", "actif": True}
)

# Methode 3 : lire une table entiere
df = pd.read_sql_table("clients", engine, columns=["nom", "email", "ville"])

# ===== ECRIRE DANS LA BASE =====
# Ecriture simple
df_clean.to_sql(
    "clients_clean",
    engine,
    if_exists="replace",    # 'fail', 'replace', 'append'
    index=False,
    dtype={                  # Types SQL explicites
        'nom': String(100),
        'montant': Float,
        'date': DateTime
    },
    chunksize=10000,         # Insertion par lots (memoire)
    method='multi'           # INSERT multi-valeurs (plus rapide)
)

# Ecriture performante avec COPY (PostgreSQL)
from io import StringIO

def to_sql_copy(df, table_name, engine):
    """Utilise COPY pour un insert ultra-rapide (PostgreSQL)."""
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False, sep='\t')
    buffer.seek(0)
    with engine.raw_connection() as conn:
        with conn.cursor() as cur:
            cur.copy_from(buffer, table_name, sep='\t', null='')
        conn.commit()

# 10x plus rapide que to_sql() pour les gros volumes
to_sql_copy(df_million_rows, 'staging_table', engine)

Raw SQL (pd.read_sql)

  • Controle total sur la requete
  • Performance maximale
  • Pas de couche d'abstraction
  • Ideal pour : analytique, rapports
  • Risque : SQL injection si mal parametrise

ORM (session.query)

  • Type-safe et refactorable
  • Relations automatiques
  • Overhead de mapping (plus lent)
  • Ideal pour : applications web, CRUD
  • Risque : N+1 queries, over-fetching

Scenario : Architecture de connexion multi-bases

Vous etes Data Architect dans une entreprise avec 3 bases de donnees : PostgreSQL (transactionnel), MySQL (legacy CRM), et DuckDB (analytique). Concevez le module de connexion.

Python
from sqlalchemy import create_engine
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)

class DatabaseManager:
    """Gestionnaire centralise de connexions multi-bases."""

    def __init__(self, config: dict):
        self.engines = {}
        for name, url in config.items():
            self.engines[name] = create_engine(
                url,
                pool_size=5,
                max_overflow=10,
                pool_recycle=3600,
                pool_pre_ping=True  # Verifie que la connexion est active
            )
            logger.info(f"Engine '{name}' configure")

    @contextmanager
    def get_connection(self, db_name: str):
        """Context manager pour obtenir une connexion."""
        if db_name not in self.engines:
            raise ValueError(f"Base '{db_name}' non configuree")
        conn = self.engines[db_name].connect()
        try:
            yield conn
        finally:
            conn.close()

    def read_sql(self, query: str, db_name: str, **kwargs):
        """Lire depuis n'importe quelle base en DataFrame."""
        import pandas as pd
        return pd.read_sql(query, self.engines[db_name], **kwargs)

    def dispose_all(self):
        """Fermer tous les pools de connexions."""
        for name, engine in self.engines.items():
            engine.dispose()
            logger.info(f"Pool '{name}' ferme")

# Utilisation
db = DatabaseManager({
    'transactional': 'postgresql://user:pass@pg-host:5432/app',
    'crm': 'mysql+pymysql://user:pass@mysql-host:3306/crm',
    'analytics': 'duckdb:///analytics.duckdb'
})

# Lire depuis le CRM, transformer, ecrire dans l'analytique
df_clients = db.read_sql("SELECT * FROM contacts WHERE actif=1", 'crm')
df_commandes = db.read_sql("SELECT * FROM orders WHERE status='paid'", 'transactional')
# ... transformation pandas ...

Flashcards

Quelle est la difference entre engine.connect() et engine.begin() ?
connect() ouvre une connexion en mode autocommit (chaque requete est commitee individuellement). begin() ouvre une connexion avec une transaction explicite : tout est commite a la fin du bloc with, ou rollback en cas d'exception. Utilisez begin() pour les operations d'ecriture qui doivent etre atomiques.
Pourquoi utiliser pool_pre_ping=True ?
pool_pre_ping envoie un SELECT 1 avant de reutiliser une connexion du pool. Cela detecte les connexions "mortes" (timeout serveur, redemarrage). Sans ce parametre, votre code peut recevoir une erreur "connection closed" sur une connexion qui a ete deconnectee cote serveur pendant qu'elle dormait dans le pool.
Quand utiliser to_sql(method='multi') ?
method='multi' genere un seul INSERT avec plusieurs VALUES au lieu d'un INSERT par ligne. C'est 5-10x plus rapide pour les insertions en masse. Attention : certaines bases limitent le nombre de parametres par requete (ex: SQLite a 999). Combinez avec chunksize pour gerer cette limite.

Quiz

Qu'est-ce que le probleme N+1 en ORM ?

Envoyer N+1 requetes pour recuperer N+1 objets
Charger N objets parent puis 1 requete par parent pour les enfants
Creer N+1 connexions simultanees
Un bug dans SQLAlchemy version N+1
Le N+1 se produit quand on charge N objets parents (1 requete), puis qu'on accede a une relation pour chaque parent, generant N requetes supplementaires. Total: 1 + N requetes. Solution: eager loading (joinedload ou subqueryload).

Quelle methode d'ecriture pandas est la plus rapide pour inserer 1M de lignes dans PostgreSQL ?

to_sql() avec method=None (defaut)
to_sql() avec method='multi' et chunksize=10000
COPY via psycopg2 (copy_from)
ORM avec session.add_all()
COPY (via psycopg2) est la methode la plus rapide pour PostgreSQL : 10-50x plus rapide que to_sql(). Elle ecrit les donnees en format binaire directement dans la table, sans passer par le parseur SQL. C'est l'equivalent d'un \COPY en psql.

En tant que Data Architect, vous n'ecrirez probablement pas beaucoup d'ORM vous-meme - c'est davantage le terrain des developpeurs backend. Mais vous devez comprendre les implications : N+1 queries, connection pooling, et surtout, l'importance d'utiliser pd.read_sql() avec des requetes SQL optimisees plutot que de charger des tables entieres. Le Data Architect est le gardien de la performance d'acces aux donnees.

PySpark Fondamentaux

90 min Avance

Apache Spark est le moteur de calcul distribue dominant pour le Big Data. PySpark, son interface Python, permet de traiter des petaoctets de donnees sur des clusters de centaines de machines. Cette lecon couvre l'architecture Spark, l'API DataFrame, les transformations vs actions, et les techniques d'optimisation essentielles.

Objectifs d'apprentissage

  • Comprendre l'architecture Spark (Driver, Executors, Tasks)
  • Creer une SparkSession et lire des donnees
  • Maitriser l'API DataFrame PySpark
  • Distinguer transformations (lazy) et actions (eager)
  • Utiliser l'API SQL de Spark
  • Optimiser un job Spark (partitioning, caching, broadcast)

Architecture Apache Spark

Comprendre l'architecture de Spark est fondamental pour ecrire du code performant. Spark suit un modele master-worker avec une evaluation paresseuse (lazy).

Architecture d'execution Spark
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚                    SPARK APPLICATION                      β”‚
  β”‚                                                          β”‚
  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
  β”‚  β”‚   DRIVER          β”‚         β”‚   CLUSTER MANAGER    β”‚  β”‚
  β”‚  β”‚                   │────────▢│   (YARN / K8s /      β”‚  β”‚
  β”‚  β”‚  - SparkContext   β”‚         β”‚    Standalone)        β”‚  β”‚
  β”‚  β”‚  - DAG Scheduler  β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
  β”‚  β”‚  - Task Scheduler β”‚                    β”‚              β”‚
  β”‚  β”‚  - UI (port 4040) β”‚                    β”‚ Alloue       β”‚
  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β”‚ ressources   β”‚
  β”‚           β”‚                               β”‚              β”‚
  β”‚           β”‚ Envoie les tasks              β–Ό              β”‚
  β”‚           β”‚                                              β”‚
  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
  β”‚  β”‚        β–Ό           EXECUTORS                       β”‚  β”‚
  β”‚  β”‚                                                    β”‚  β”‚
  β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚  β”‚
  β”‚  β”‚  β”‚Executor 1β”‚  β”‚Executor 2β”‚  β”‚Executor 3β”‚  ...   β”‚  β”‚
  β”‚  β”‚  β”‚          β”‚  β”‚          β”‚  β”‚          β”‚        β”‚  β”‚
  β”‚  β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β” β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β” β”‚  β”‚ β”Œβ”€β”€β”€β”€β”€β”€β” β”‚        β”‚  β”‚
  β”‚  β”‚  β”‚ β”‚Task 1β”‚ β”‚  β”‚ β”‚Task 3β”‚ β”‚  β”‚ β”‚Task 5β”‚ β”‚        β”‚  β”‚
  β”‚  β”‚  β”‚ β”‚Task 2β”‚ β”‚  β”‚ β”‚Task 4β”‚ β”‚  β”‚ β”‚Task 6β”‚ β”‚        β”‚  β”‚
  β”‚  β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”˜ β”‚  β”‚ β””β”€β”€β”€β”€β”€β”€β”˜ β”‚        β”‚  β”‚
  β”‚  β”‚  β”‚ [Cache]  β”‚  β”‚ [Cache]  β”‚  β”‚ [Cache]  β”‚        β”‚  β”‚
  β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚  β”‚
  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

  FLUX D'EXECUTION :
  1. Driver recoit le code PySpark
  2. Construit un DAG (Directed Acyclic Graph) des operations
  3. Optimise le DAG (Catalyst optimizer)
  4. Decoupe en stages et tasks
  5. Envoie les tasks aux executors
  6. Executors traitent les partitions en parallele
  7. Resultats renvoyes au Driver

Concepts cles de Spark

Partition : un bloc de donnees (typiquement 128 MB). Un DataFrame de 10 GB = ~80 partitions traitees en parallele. Stage : un ensemble de tasks qui peuvent s'executer sans shuffle (redistribution des donnees). Shuffle : redistribution des donnees entre executors (GROUP BY, JOIN) - l'operation la plus couteuse. DAG : graphe de toutes les operations, optimise avant execution.

SparkSession et lecture de donnees

Python
from pyspark.sql import SparkSession

# Creation de la SparkSession (point d'entree unique)
spark = SparkSession.builder \
    .appName("DataArchitectTraining") \
    .master("local[*]") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.parquet.compression.codec", "snappy") \
    .getOrCreate()

# ===== LECTURE DE DONNEES =====
# Parquet (format recommande pour Spark)
df = spark.read.parquet("s3://datalake/ventes/year=2024/")

# CSV
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ";") \
    .csv("hdfs:///data/exports/ventes.csv")

# CSV avec schema explicite (recommande - evite inferSchema qui est lent)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, \
    FloatType, TimestampType

schema = StructType([
    StructField("order_id", StringType(), False),
    StructField("customer_id", IntegerType(), True),
    StructField("amount", FloatType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("region", StringType(), True),
    StructField("category", StringType(), True)
])

df_typed = spark.read.schema(schema).csv("data/ventes.csv", header=True)

# JSON
df_json = spark.read.json("s3://bucket/events/*.json")

# Delta Lake
df_delta = spark.read.format("delta").load("s3://datalake/delta/clients")

# Base de donnees via JDBC
df_pg = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "clients") \
    .option("user", "user") \
    .option("password", "pass") \
    .option("numPartitions", 10) \
    .option("partitionColumn", "id") \
    .option("lowerBound", 1) \
    .option("upperBound", 1000000) \
    .load()

Transformations vs Actions

La distinction entre transformations et actions est LE concept fondamental de Spark. Les transformations sont lazy (rien ne s'execute), les actions sont eager (declenchent l'execution).

Lazy Evaluation : Transformations vs Actions
  TRANSFORMATIONS (Lazy)              ACTIONS (Eager)
  Construisent le plan               Declenchent l'execution
  ────────────────────               ─────────────────────
  .filter()     .select()            .show()        .count()
  .groupBy()    .join()              .collect()     .take(n)
  .orderBy()    .withColumn()        .write()       .first()
  .distinct()   .drop()              .toPandas()    .foreach()
  .union()      .repartition()       .head()        .reduce()

  Code:                              Execution:
  df2 = df.filter(...)  ─┐           df2.show()  ─────▢ Execute tout
  df3 = df2.groupBy()   ── Rien      df3.count() ─────▢ Execute tout
  df4 = df3.orderBy()   β”€β”˜ ne se     df4.write() ─────▢ Execute tout
                           execute!
Python
from pyspark.sql import functions as F

# ===== TRANSFORMATIONS (rien ne s'execute encore!) =====
df_filtered = df.filter(F.col("amount") > 100)
df_selected = df_filtered.select("customer_id", "amount", "region")
df_grouped = df_selected.groupBy("region").agg(
    F.sum("amount").alias("total_sales"),
    F.count("*").alias("order_count"),
    F.avg("amount").alias("avg_amount"),
    F.countDistinct("customer_id").alias("unique_customers")
)
df_sorted = df_grouped.orderBy(F.desc("total_sales"))

# A ce stade, RIEN n'a ete execute! Spark a juste construit le plan.
# Vous pouvez verifier le plan :
df_sorted.explain(True)  # Affiche le plan logique et physique

# ===== ACTIONS (declenchent l'execution!) =====
df_sorted.show(20)                    # Affiche les 20 premieres lignes
total = df.count()                     # Compte les lignes
result = df_sorted.collect()           # Ramene TOUT au Driver (attention!)
first_row = df_sorted.first()          # Premiere ligne
pandas_df = df_sorted.toPandas()       # Convertit en pandas (attention!)

# ===== API DataFrame COMPLETE =====
result = (
    df
    # Filtrage
    .filter(F.col("order_date") >= "2024-01-01")
    .filter(F.col("status").isin(["completed", "shipped"]))

    # Nouvelles colonnes
    .withColumn("year", F.year("order_date"))
    .withColumn("month", F.month("order_date"))
    .withColumn("amount_eur", F.col("amount") * F.lit(0.92))
    .withColumn("is_premium", F.when(F.col("amount") > 500, True).otherwise(False))

    # Colonnes string
    .withColumn("region_upper", F.upper(F.col("region")))
    .withColumn("email_domain", F.regexp_extract("email", r"@(.+)$", 1))

    # Aggregation
    .groupBy("year", "month", "region")
    .agg(
        F.sum("amount_eur").alias("revenue"),
        F.count("*").alias("orders"),
        F.avg("amount_eur").alias("avg_order"),
        F.percentile_approx("amount_eur", 0.95).alias("p95_amount")
    )

    # Tri
    .orderBy("year", "month", F.desc("revenue"))
)

API SQL de Spark

Spark permet d'ecrire des requetes SQL standard sur les DataFrames. C'est souvent plus lisible pour les analystes et les requetes complexes.

Python
# Enregistrer un DataFrame comme vue temporaire
df.createOrReplaceTempView("orders")

# Requete SQL standard
result = spark.sql("""
    WITH monthly_stats AS (
        SELECT
            region,
            DATE_TRUNC('month', order_date) as month,
            SUM(amount) as revenue,
            COUNT(*) as order_count,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM orders
        WHERE status = 'completed'
          AND order_date >= '2024-01-01'
        GROUP BY region, DATE_TRUNC('month', order_date)
    ),
    ranked AS (
        SELECT *,
            ROW_NUMBER() OVER (PARTITION BY month ORDER BY revenue DESC) as rank,
            LAG(revenue) OVER (PARTITION BY region ORDER BY month) as prev_revenue
        FROM monthly_stats
    )
    SELECT
        region, month, revenue, order_count, unique_customers,
        rank,
        ROUND((revenue - prev_revenue) / prev_revenue * 100, 1) as growth_pct
    FROM ranked
    WHERE rank <= 5
    ORDER BY month DESC, rank
""")

result.show(50, truncate=False)

Comment Uber traite 1 PB/jour avec Spark

Uber est l'un des plus gros utilisateurs de Spark au monde, avec des milliers de jobs traitant plus d'un petaoctet par jour :

  • Architecture : 10 000+ noeuds Spark repartis sur plusieurs clusters, geres par YARN et Kubernetes
  • Cas d'usage : tarification dynamique (surge pricing) calculee toutes les 2 minutes sur les donnees de trajet en temps reel, detection de fraude sur des milliards de transactions, ETL du data lake (Parquet sur HDFS et S3)
  • Optimisations cles : Uber a developpe des outils internes pour optimiser Spark - auto-tuning des configurations (nombre d'executors, memoire), repartitionnement intelligent base sur les patterns d'acces, et un systeme de cache distribue pour les datasets frequemment utilises
  • Migration : Uber est passe de Hive (MapReduce) a Spark, reduisant le temps de leurs pipelines ETL de 12 heures a 2 heures
  • Lecon : meme a l'echelle d'Uber, 80% de l'optimisation Spark vient de 3 choses : bon partitionnement, eviter les shuffles inutiles, et bien dimensionner les executors

collect() sur un DataFrame de 100M lignes

collect() ramene TOUTES les donnees du cluster vers le Driver (une seule machine). Sur un gros DataFrame, cela provoque un OutOfMemoryError et potentiellement crash le Driver, ce qui tue le job entier.

Python
# CATASTROPHIQUE : ne JAMAIS faire ca sur un gros DataFrame
all_data = df.collect()  # 100M lignes x 20 colonnes = crash memoire!

# DANGEREUX aussi :
pandas_df = df.toPandas()  # Meme probleme, ramene tout au Driver

# BON : limiter avant de collect
sample = df.limit(1000).collect()
top10 = df.orderBy(F.desc("revenue")).limit(10).collect()

# BON : ecrire directement dans le stockage
df.write.mode("overwrite").parquet("s3://output/result/")

# BON : utiliser show() pour debugger
df.show(20, truncate=False)

# BON : toPandas() seulement sur un resultat agrege petit
stats = df.groupBy("region").agg(F.sum("amount")).toPandas()
# Seulement ~20 lignes (1 par region) β†’ OK pour toPandas()

Optimisation de jobs Spark

Python - Techniques d'optimisation
# ===== 1. BROADCAST JOIN (petite table jointe a une grande) =====
from pyspark.sql.functions import broadcast

# Table de reference petite (< 100 MB)
dim_regions = spark.read.parquet("dim_regions.parquet")  # 1000 lignes

# MAUVAIS : shuffle join (redistribue les 100M lignes)
result = big_df.join(dim_regions, "region_id")

# BON : broadcast join (envoie la petite table a tous les executors)
result = big_df.join(broadcast(dim_regions), "region_id")
# 10-100x plus rapide car evite le shuffle!

# ===== 2. REPARTITIONNEMENT =====
# Trop de petites partitions (slow)
df.rdd.getNumPartitions()  # 10000 partitions de 1 MB

# Reduire le nombre de partitions (coalesce, sans shuffle)
df = df.coalesce(200)  # 200 partitions de 50 MB

# Repartitionner par cle (optimise les joins/groupby suivants)
df = df.repartition(200, "region")

# ===== 3. CACHING =====
# Si un DataFrame est utilise plusieurs fois
df_clients = spark.read.parquet("clients/").filter(F.col("actif"))
df_clients.cache()  # Stocke en memoire apres la premiere action
# ou
df_clients.persist(StorageLevel.MEMORY_AND_DISK)  # Spillover sur disque

# Utilise plusieurs fois sans relire le Parquet
rapport1 = df_clients.groupBy("region").count()
rapport2 = df_clients.groupBy("segment").agg(F.sum("ltv"))

df_clients.unpersist()  # Liberer la memoire quand fini

# ===== 4. PREDICATE PUSHDOWN =====
# Spark pousse les filtres vers la source (Parquet/JDBC)
# Parquet : ne lit que les colonnes et row groups necessaires
df = spark.read.parquet("big_table/") \
    .select("col1", "col2") \     # Projection pushdown
    .filter(F.col("date") >= "2024-01-01")  # Predicate pushdown

# ===== 5. AQE (Adaptive Query Execution) =====
# Active par defaut depuis Spark 3.2
spark.conf.set("spark.sql.adaptive.enabled", "true")
# AQE optimise le plan PENDANT l'execution :
# - Fusionne les petites partitions apres shuffle
# - Convertit sort-merge join en broadcast join si une side est petite
# - Gere le data skew automatiquement

Scenario : Optimiser un job Spark de 4 heures

Votre equipe a un job PySpark quotidien qui joint une table de 500M de lignes avec une table de 1M de lignes, puis fait un groupby. Le job prend 4 heures et echoue parfois en OOM.

Diagnostic et solutions :

  • Etape 1 - Analyse : Regarder le Spark UI (port 4040). Identifier quel stage est le plus long. Verifier le "data skew" (quelques partitions beaucoup plus grosses que les autres).
  • Etape 2 - Broadcast Join : La table de 1M de lignes fait probablement < 100 MB. Utiliser broadcast() pour l'envoyer a tous les executors au lieu d'un shuffle join. Gain attendu : 3-5x.
  • Etape 3 - Partitionnement : Repartitionner la table de 500M par la cle de jointure avant le job. Si les donnees sont en Parquet, utiliser le partitionnement Hive (par date/region).
  • Etape 4 - Caching : Si la table de 1M est lue plusieurs fois, la cacher en memoire.
  • Etape 5 - Configuration : Ajuster spark.executor.memory (4-8 GB), spark.sql.shuffle.partitions (200-400 au lieu du defaut 200 si data skew).
  • Resultat attendu : de 4 heures a 30-45 minutes. Les broadcast joins et le bon partitionnement sont les deux leviers les plus puissants.

Flashcards

Quelle est la difference entre repartition() et coalesce() ?
repartition(n) redistribue les donnees via un shuffle complet (couteux mais equilibre). coalesce(n) fusionne les partitions existantes sans shuffle (rapide mais ne peut que reduire le nombre). Utilisez coalesce() pour reduire les partitions (apres un filtre), repartition() pour augmenter ou equilibrer.
Pourquoi Spark est-il "lazy" (evaluation paresseuse) ?
L'evaluation paresseuse permet a Spark d'optimiser le plan d'execution AVANT de l'executer. Le Catalyst optimizer peut reordonner les operations, pousser les filtres vers les sources, combiner les etapes, et choisir les algorithmes de join optimaux. Si Spark executait chaque ligne immediatement, aucune de ces optimisations ne serait possible.
Quand utiliser l'API SQL vs l'API DataFrame dans PySpark ?
Les deux produisent exactement le meme plan d'execution (meme optimiseur Catalyst). Utilisez SQL pour les requetes complexes avec des CTEs, window functions, et sous-requetes (plus lisible). Utilisez l'API DataFrame pour les pipelines programmatiques avec conditions dynamiques et composition de fonctions. Dans la pratique, melangez les deux.
Qu'est-ce que le "data skew" et comment le resoudre ?
Le data skew est un desequilibre de donnees entre partitions (ex: 99 partitions de 10 MB et 1 partition de 50 GB). Le job attend la plus grosse partition. Solutions : (1) Salting : ajouter un prefixe aleatoire a la cle pour redistribuer, (2) AQE (Spark 3.2+) qui le gere automatiquement, (3) Broadcast join si possible, (4) Filtrer les cles problematiques et les traiter separement.

Quiz

Quelle ligne de code declenche reellement l'execution dans Spark ?

df2 = df.filter(F.col("amount") > 100)
df3 = df2.groupBy("region").agg(F.sum("amount"))
df3.show()
df3.explain()
show() est une ACTION qui declenche l'execution de tout le pipeline (filter + groupBy + agg). filter() et groupBy() sont des TRANSFORMATIONS (lazy). explain() affiche le plan sans executer.

Vous joignez un DataFrame de 500M lignes avec un DataFrame de 50K lignes. Quelle optimisation est la plus impactante ?

Augmenter spark.executor.memory
Utiliser broadcast() sur le petit DataFrame
Repartitionner les deux DataFrames
Utiliser coalesce(1) pour reduire les partitions
broadcast() envoie le petit DataFrame (50K lignes, quelques MB) a tous les executors, evitant un shuffle du gros DataFrame. C'est l'optimisation la plus impactante pour les jointures asymetriques. Gain typique : 5-20x.

Que fait spark.sql.adaptive.enabled = true ?

Active le mode streaming
Optimise le plan d'execution pendant l'execution basee sur les statistiques reelles
Active le machine learning automatique
Permet a Spark de changer de cluster dynamiquement
AQE (Adaptive Query Execution) optimise le plan pendant l'execution en se basant sur les statistiques reelles des donnees (taille des partitions apres shuffle). Il peut convertir un sort-merge join en broadcast join, fusionner des petites partitions, et gerer le data skew automatiquement.

Spark est un outil puissant mais complexe. Mon conseil pour un Data Architect : ne recommandez jamais Spark si le volume est < 100 GB - DuckDB ou Polars feront mieux avec zero infrastructure. Mais au-dela de 100 GB, Spark devient incontournable. Maitrisez les 3 concepts cles : lazy evaluation, partitionnement, et broadcast joins. Avec ces trois outils, vous resoudrez 90% des problemes de performance Spark.

Formats de Fichiers Data

60 min Intermediaire

Le choix du format de fichier est une des decisions les plus impactantes en data engineering. Un mauvais choix peut multiplier les couts de stockage par 10 et les temps de lecture par 100. Cette lecon compare CSV, JSON, Avro, Parquet, ORC et Arrow, avec des benchmarks reels et des recommandations actionnables.

Objectifs d'apprentissage

  • Comprendre les differences entre formats ligne et colonne
  • Comparer CSV, JSON, Avro, Parquet et ORC avec des metriques reelles
  • Choisir le bon format selon le cas d'usage
  • Comprendre le role d'Apache Arrow dans l'ecosysteme
  • Eviter l'anti-pattern CSV en production

Formats ligne vs colonne

La distinction fondamentale est entre les formats orientes ligne (CSV, JSON, Avro) et les formats orientes colonne (Parquet, ORC). Ce choix determine les performances de lecture, ecriture, et compression.

Stockage ligne vs colonne
  DONNEES ORIGINALES :
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚  nom   β”‚ age  β”‚ ville  β”‚ salaire  β”‚
  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
  β”‚ Alice  β”‚  28  β”‚ Paris  β”‚  55000   β”‚
  β”‚ Bob    β”‚  34  β”‚ Lyon   β”‚  62000   β”‚
  β”‚ Charlieβ”‚  45  β”‚ Paris  β”‚  78000   β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

  FORMAT LIGNE (CSV, JSON, Avro) :           FORMAT COLONNE (Parquet, ORC) :
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ Alice, 28, Paris, 55000         β”‚        β”‚ nom:     Alice, Bob, Charlie β”‚
  β”‚ Bob, 34, Lyon, 62000            β”‚        β”‚ age:     28, 34, 45         β”‚
  β”‚ Charlie, 45, Paris, 78000       β”‚        β”‚ ville:   Paris, Lyon, Paris β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚ salaire: 55000, 62000, 78000β”‚
                                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  Ideal pour :                               Ideal pour :
  - Ecriture rapide (append)                 - Lecture selective (colonnes)
  - Lecture de lignes completes              - Compression superieure
  - Streaming                                - Analytique (SUM, AVG, GROUP BY)
  - OLTP                                    - OLAP

Pourquoi le format colonne compresse mieux

Dans une colonne "ville", les valeurs sont similaires (Paris, Lyon, Paris, Paris...). Le compresseur peut utiliser le dictionary encoding (3 valeurs uniques au lieu de 1M de strings) et le run-length encoding (Paris x 500K, Lyon x 300K). Dans un format ligne, le compresseur voit "Alice, 28, Paris, 55000, Bob, 34, Lyon..." - des types melanges impossibles a compresser efficacement.

Comparaison detaillee des formats

CSV - Comma-Separated Values

Exemple CSV
nom,age,ville,salaire
Alice,28,Paris,55000
Bob,34,Lyon,62000
"Charlie ""Chuck""",45,Paris,78000

Avantages : lisible par un humain, universel, editable dans Excel. Inconvenients : pas de types (tout est string), pas de schema, pas de compression, lent a lire, ambiguite des separateurs.

JSON - JavaScript Object Notation

Exemple JSON Lines
{"nom": "Alice", "age": 28, "ville": "Paris", "salaire": 55000}
{"nom": "Bob", "age": 34, "ville": "Lyon", "salaire": 62000}
{"nom": "Charlie", "age": 45, "ville": "Paris", "salaire": 78000, "bonus": 5000}

Avantages : semi-structure, supporte les types de base, schema flexible (colonnes optionnelles), nesting natif. Inconvenients : verbeux (repetition des cles), plus gros que CSV, pas de compression native.

Apache Avro

Format binaire oriente ligne avec schema integre. Ideal pour le streaming (Kafka) et les pipelines ou le schema evolue.

Apache Parquet

Le format roi de l'analytique. Format binaire oriente colonne avec compression, metadata, et predicate pushdown.

Python - Lecture/Ecriture Parquet
import pandas as pd
import pyarrow.parquet as pq

# Ecriture Parquet avec compression
df.to_parquet(
    "ventes.parquet",
    engine="pyarrow",
    compression="snappy",     # Alternatives: gzip, zstd, lz4
    index=False
)

# Ecriture partitionnee (essentiel pour les data lakes!)
df.to_parquet(
    "datalake/ventes/",
    partition_cols=["year", "month"],   # Cree des sous-dossiers
    compression="snappy"
)
# Resultat :
# datalake/ventes/year=2024/month=01/part-0.parquet
# datalake/ventes/year=2024/month=02/part-0.parquet
# ...

# Lecture avec projection pushdown (ne lit que les colonnes demandees)
df = pd.read_parquet(
    "ventes.parquet",
    columns=["date", "montant", "region"],   # Projection pushdown!
    filters=[("year", "=", 2024)]             # Predicate pushdown!
)

# Lecture des metadata sans charger les donnees
parquet_file = pq.read_metadata("ventes.parquet")
print(f"Lignes: {parquet_file.num_rows}")
print(f"Colonnes: {parquet_file.num_columns}")
print(f"Taille: {parquet_file.serialized_size / 1024**2:.1f} MB")
print(f"Row groups: {parquet_file.num_row_groups}")

Apache ORC

Autre format colonne, optimise pour l'ecosysteme Hive/Hadoop. Similaire a Parquet mais moins repandu hors de l'ecosysteme Hadoop.

Benchmarks reels

Voici des benchmarks sur un dataset de 10M lignes (15 colonnes, mix numerique/string) :

Metrique CSV JSON Avro Parquet (snappy) ORC (zlib)
Taille fichier 1.2 GB 2.1 GB 450 MB 180 MB 165 MB
Ratio vs CSV 1x 1.75x 0.38x 0.15x 0.14x
Temps ecriture 12s 28s 8s 6s 9s
Temps lecture (tout) 45s 90s 15s 4s 5s
Lecture 3 colonnes 45s 90s 15s 0.8s 0.9s
Schema integre Non Non Oui Oui Oui
Types natifs Non Partiels Oui Oui Oui
Splittable Oui Oui (JSONL) Oui Oui Oui
Lisible humain Oui Oui Non Non Non

Le chiffre qui compte : 0.8s vs 45s

Parquet met 0.8 seconde pour lire 3 colonnes sur 10M lignes, contre 45 secondes pour CSV. C'est un facteur 56x. Sur un data lake de 1 TB, cela represente la difference entre une requete de 5 secondes et une requete de 5 minutes. C'est pourquoi Parquet est le standard des data lakes.

Apache Arrow - Le format en memoire

Apache Arrow n'est pas un format de fichier mais un format en memoire. Il definit comment les donnees tabulaires sont organisees en RAM, et permet l'echange de donnees entre systemes sans serialisation/deserialisation (zero-copy).

Arrow comme lingua franca de la memoire
  AVANT Arrow (chaque outil a son format memoire) :
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    Serialise     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”    Serialise     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ pandas β”‚  ──────────────▢ β”‚  Spark β”‚  ──────────────▢ β”‚  R     β”‚
  β”‚ (NumPy)β”‚  ◀────────────── β”‚ (JVM)  β”‚  ◀────────────── β”‚(R obj) β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Deserialise   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Deserialise   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       Copie memoire a chaque transfert (lent, couteux)

  AVEC Arrow (format memoire commun) :
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ pandas β”‚                  β”‚  Spark β”‚                  β”‚  R     β”‚
  β”‚        │──┐           β”Œβ”€β”€β”€β”‚        │───┐          β”Œβ”€β”€β”€β”‚        β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚           β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β”‚          β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜
              β–Ό           β–Ό                β–Ό          β–Ό
           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
           β”‚         Apache Arrow (memoire partagee)       β”‚
           β”‚     Format colonne standardise en RAM         β”‚
           β”‚     Zero-copy entre Python, Java, R, Rust     β”‚
           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
Python
import pyarrow as pa
import pyarrow.parquet as pq

# Conversion pandas <-> Arrow (quasi-instantane)
arrow_table = pa.Table.from_pandas(df)
df_back = arrow_table.to_pandas()

# Lecture Parquet via Arrow (plus rapide que pd.read_parquet)
table = pq.read_table("ventes.parquet", columns=["date", "montant"])
df = table.to_pandas()

# Arrow est utilise par :
# - Polars (backend natif)
# - DuckDB (echange avec pandas)
# - PySpark (echange Python <-> JVM depuis Spark 3.0)
# - Pandas 2.0 (backend optionnel: pd.ArrowDtype)

# Pandas avec backend Arrow (plus rapide et moins de memoire)
df = pd.read_parquet("data.parquet", dtype_backend="pyarrow")
print(df.dtypes)  # int64[pyarrow], string[pyarrow], etc.

Utiliser CSV pour des pipelines data en production

Le CSV est le format de fichier le plus dangereux en production. Voici les problemes concrets :

  • Pas de types : "1234" est-il un entier ou un code postal ? Le CSV ne le dit pas. Chaque lecteur infere differemment.
  • Pas de schema : si une colonne est ajoutee ou renommee, tous les pipelines en aval cassent silencieusement.
  • Encodage ambigu : UTF-8 ? Latin-1 ? Windows-1252 ? Les fichiers CSV francais avec des accents sont un cauchemar.
  • Separateur ambigu : virgule ? Point-virgule ? Tab ? Et si une valeur contient le separateur ?
  • Pas de compression : un fichier Parquet equivalent est 5-10x plus petit.
  • Pas de predicate pushdown : impossible de lire seulement 3 colonnes sur 50 - il faut tout charger.

Regle : CSV pour l'import/export humain et les petits fichiers ad-hoc. Parquet pour tout le reste.

Comment LinkedIn economise 60% de stockage avec Parquet

LinkedIn a migre son data lake de Avro vers Parquet en 2019 :

  • Avant : 500 PB de donnees en Avro sur HDFS, cout de stockage enorme
  • Migration : conversion progressive des tables les plus volumineuses vers Parquet avec compression Zstandard
  • Resultat stockage : reduction de 60% de l'espace (500 PB a 200 PB). Economie estimee : plusieurs millions de dollars/an en infrastructure HDFS
  • Resultat performance : les requetes analytiques (Spark, Presto) sont 3-10x plus rapides grace a la projection pushdown et la lecture columnar
  • Schema evolution : Parquet supporte l'ajout de colonnes sans recrire les fichiers existants (les anciennes partitions retournent NULL pour les nouvelles colonnes)
  • Lecon : le format de fichier est un des leviers les plus sous-estimes en data engineering. Un changement de format peut diviser les couts par 2 et les temps de requete par 5.

Guide de decision des formats

Arbre de decision - Quel format choisir ?
                    Quel est le cas d'usage ?
                           β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β–Ό                 β–Ό                   β–Ό
     Analytique /      Streaming /          Echange
     Data Lake         Event Bus            Humain
         β”‚                 β”‚                   β”‚
         β–Ό                 β–Ό                   β–Ό
      PARQUET            AVRO                 CSV
   (+ compression      (schema              (petits
    snappy/zstd)       evolution,           fichiers,
                       Kafka natif)         Excel)
         β”‚
         β–Ό
    Besoin de
    schema flexible ?
      β”‚         β”‚
     Oui       Non
      β”‚         β”‚
      β–Ό         β–Ό
    JSON      PARQUET
   (nested    (performance
    data,      maximale)
    APIs)
Pourquoi Parquet est-il "splittable" et pourquoi c'est important ?
Un fichier Parquet est divise en "row groups" independants (typiquement 128 MB). Chaque row group peut etre lu separement, ce qui permet a Spark/Hadoop de distribuer la lecture sur plusieurs machines en parallele. Un fichier gzip CSV ne peut PAS etre splitte - un seul worker doit tout lire sequentiellement.
Quelle compression choisir pour Parquet : Snappy, Gzip, ou Zstandard ?
Snappy : compression moderee, tres rapide en decompression. Ideal pour les lectures frequentes (data lake actif). Gzip : meilleur ratio de compression, mais plus lent. Pour l'archivage. Zstandard (zstd) : le meilleur des deux mondes - compression proche de gzip avec vitesse proche de snappy. Recommande par defaut depuis 2023.
Qu'est-ce que le "dictionary encoding" dans Parquet ?
Si une colonne a peu de valeurs uniques (ex: "region" avec 5 valeurs sur 10M lignes), Parquet cree un dictionnaire {0: "IDF", 1: "PACA", ...} et stocke uniquement les indices (entiers). Gain : une colonne de 10M strings de 10 chars (100 MB) devient 10M d'entiers de 1 byte (10 MB). C'est automatique et transparent.

Quiz

Un data lake de 100 TB en CSV est migre en Parquet (snappy). Quelle taille attendue ?

80-90 TB (peu de gain)
50-60 TB (gain moyen)
10-20 TB (gain important)
1-2 TB (gain extreme)
Parquet avec Snappy compresse typiquement a 10-20% de la taille CSV, soit 10-20 TB. Le gain vient du format colonne (meilleure compression), du dictionary encoding, et de l'encodage binaire des types. Avec Zstandard, on pourrait descendre a 8-15 TB.

Quel format est recommande pour les evenements Kafka qui doivent supporter l'evolution de schema ?

CSV
Parquet
Avro
JSON
Avro est le format natif de Kafka. Il supporte l'evolution de schema (ajout/suppression de champs, valeurs par defaut) via le Schema Registry. Il est oriente ligne (adapte au streaming), compact en binaire, et chaque message porte sa version de schema.

Qu'est-ce que le "predicate pushdown" en Parquet ?

Compresser les predicats SQL dans le fichier
Pousser les filtres WHERE vers le fichier pour ne lire que les row groups necessaires
Precharger les predicats en memoire cache
Indexer automatiquement toutes les colonnes
Le predicate pushdown utilise les statistiques min/max stockees dans les metadata de chaque row group. Si vous filtrez WHERE date >= '2024-06-01', Parquet saute les row groups ou max(date) < '2024-06-01' sans les lire du disque. Combine avec le partitionnement Hive, cela peut reduire les donnees lues de 99%.

Si je ne devais retenir qu'une seule chose de cette lecon : Parquet est le format par defaut pour tout pipeline data. CSV pour l'import/export humain, Avro pour Kafka, JSON pour les APIs. C'est tout. La migration CSV vers Parquet est souvent la premiere action a fort ROI qu'un Data Architect peut proposer dans une entreprise. Facile a implementer, resultats immediats.

Projet Pipeline ETL Complet

120 min Lab Avance

Ce lab pratique vous guide dans la construction d'un pipeline ETL complet et robuste : extraction depuis une API REST et des fichiers CSV, transformation avec pandas et DuckDB, chargement dans PostgreSQL. Vous ajouterez la gestion d'erreurs, le retry, le logging et la validation de donnees - les elements qui font la difference entre un prototype et un systeme de production.

Objectifs d'apprentissage

  • Concevoir un pipeline ETL end-to-end
  • Implementer l'extraction multi-sources (API + fichiers)
  • Transformer les donnees avec pandas et DuckDB
  • Charger dans PostgreSQL avec gestion des conflits
  • Ajouter la gestion d'erreurs, retry, logging et monitoring
  • Valider la qualite des donnees a chaque etape

Architecture du pipeline

Pipeline ETL - Architecture cible
  SOURCES                    EXTRACT              TRANSFORM              LOAD
  ───────                    ───────              ─────────              ────

  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚  API REST│──┐           β”‚                                                β”‚
  β”‚ (produits)β”‚  β”‚          β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  Extract  β”‚  β”‚  Validate β”‚   β”‚ Transformβ”‚   β”‚  Validateβ”‚  β”‚
                β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β–Ά β”‚  β”‚  Schema   │──▢│  pandas  │──▢│  Quality β”‚  β”‚
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚           β”‚  β”‚  (input)  β”‚   β”‚  DuckDB  β”‚   β”‚  (output)β”‚  β”‚
  β”‚ CSV files│───           β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
  β”‚ (ventes) β”‚  β”‚           β”‚                                      β”‚        β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”˜
                β”‚                                                  β”‚
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚                                                  β–Ό
  β”‚ JSON API β”‚β”€β”€β”˜                                           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚ (clients)β”‚                                              β”‚  PostgreSQL  β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                              β”‚  + Parquet   β”‚
                                                            β”‚  (archive)   β”‚
  MONITORING                                                β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  ──────────                                                       β”‚
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
  β”‚  Logging (fichier + console)                                   β”‚
  β”‚  Metriques (lignes traitees, erreurs, duree)                  β”‚
  β”‚  Alertes (Slack/email si echec)                                β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Lab : Construction du pipeline ETL

Etape 1 : Configuration et logging

Un bon pipeline commence par une configuration propre et un systeme de logging. Ne jamais utiliser print() en production.

Python - config.py
import logging
import os
from dataclasses import dataclass
from datetime import datetime

@dataclass
class PipelineConfig:
    """Configuration centralisee du pipeline."""
    # Sources
    api_base_url: str = "https://api.example.com/v1"
    api_key: str = os.getenv("API_KEY", "")
    csv_input_dir: str = "./data/input/"

    # Destination
    db_url: str = os.getenv("DATABASE_URL",
        "postgresql://etl_user:password@localhost:5432/warehouse")

    # Pipeline
    batch_size: int = 10000
    max_retries: int = 3
    retry_delay: int = 5  # secondes
    output_parquet_dir: str = "./data/archive/"

    # Qualite
    max_null_ratio: float = 0.05  # Max 5% de nulls
    min_rows_expected: int = 100

def setup_logging(log_file: str = None) -> logging.Logger:
    """Configure le logging pour le pipeline."""
    logger = logging.getLogger("etl_pipeline")
    logger.setLevel(logging.INFO)

    # Format
    fmt = logging.Formatter(
        '%(asctime)s | %(levelname)-8s | %(name)s | %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S'
    )

    # Console handler
    console = logging.StreamHandler()
    console.setFormatter(fmt)
    logger.addHandler(console)

    # File handler
    if log_file is None:
        log_file = f"logs/etl_{datetime.now():%Y%m%d_%H%M%S}.log"
    os.makedirs(os.path.dirname(log_file), exist_ok=True)
    file_handler = logging.FileHandler(log_file)
    file_handler.setFormatter(fmt)
    logger.addHandler(file_handler)

    return logger

Etape 2 : Utilitaire de retry

Les sources externes (APIs, bases) echouent regulierement. Un decorateur retry avec backoff exponentiel est indispensable.

Python - utils.py
import time
import functools
import logging

logger = logging.getLogger("etl_pipeline")

def retry(max_attempts: int = 3, delay: float = 1.0, backoff: float = 2.0,
          exceptions: tuple = (Exception,)):
    """Decorateur retry avec backoff exponentiel."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            current_delay = delay
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        logger.error(
                            f"ECHEC {func.__name__} apres {max_attempts} "
                            f"tentatives: {e}"
                        )
                        raise
                    logger.warning(
                        f"{func.__name__} tentative {attempt}/{max_attempts} "
                        f"echouee: {e}. Retry dans {current_delay:.1f}s..."
                    )
                    time.sleep(current_delay)
                    current_delay *= backoff
        return wrapper
    return decorator

class PipelineMetrics:
    """Collecte les metriques d'execution."""
    def __init__(self):
        self.start_time = time.time()
        self.rows_extracted = 0
        self.rows_transformed = 0
        self.rows_loaded = 0
        self.rows_rejected = 0
        self.errors = []

    def log_summary(self):
        duration = time.time() - self.start_time
        logger.info("=" * 60)
        logger.info("RESUME DU PIPELINE")
        logger.info(f"  Duree totale:     {duration:.1f}s")
        logger.info(f"  Lignes extraites:  {self.rows_extracted}")
        logger.info(f"  Lignes transformees: {self.rows_transformed}")
        logger.info(f"  Lignes chargees:  {self.rows_loaded}")
        logger.info(f"  Lignes rejetees:  {self.rows_rejected}")
        logger.info(f"  Erreurs:          {len(self.errors)}")
        if self.errors:
            for err in self.errors[:5]:
                logger.info(f"    - {err}")
        logger.info("=" * 60)

Etape 3 : Extraction (Extract)

L'extraction gere deux sources : une API REST (produits et clients) et des fichiers CSV (ventes). Chaque source a sa logique d'erreur.

Python - extract.py
import pandas as pd
import requests
import glob
import logging
from utils import retry

logger = logging.getLogger("etl_pipeline")

@retry(max_attempts=3, delay=2.0, exceptions=(requests.RequestException,))
def extract_from_api(url: str, api_key: str, endpoint: str) -> pd.DataFrame:
    """Extrait des donnees depuis une API REST avec pagination."""
    all_data = []
    page = 1
    per_page = 1000

    while True:
        response = requests.get(
            f"{url}/{endpoint}",
            headers={"Authorization": f"Bearer {api_key}"},
            params={"page": page, "per_page": per_page},
            timeout=30
        )
        response.raise_for_status()

        data = response.json()
        if not data.get("results"):
            break

        all_data.extend(data["results"])
        logger.info(f"  API {endpoint}: page {page}, {len(data['results'])} records")

        if not data.get("has_next"):
            break
        page += 1

    df = pd.DataFrame(all_data)
    logger.info(f"API {endpoint}: {len(df)} lignes extraites")
    return df


def extract_csv_files(input_dir: str, pattern: str = "ventes_*.csv") -> pd.DataFrame:
    """Extrait et concatene des fichiers CSV."""
    files = sorted(glob.glob(f"{input_dir}/{pattern}"))

    if not files:
        raise FileNotFoundError(f"Aucun fichier {pattern} dans {input_dir}")

    dfs = []
    for filepath in files:
        try:
            df = pd.read_csv(
                filepath,
                sep=";",
                encoding="utf-8",
                dtype={"code_produit": str, "code_client": str},
                parse_dates=["date_vente"],
                na_values=["N/A", "NULL", ""]
            )
            dfs.append(df)
            logger.info(f"  CSV: {filepath} -> {len(df)} lignes")
        except Exception as e:
            logger.error(f"  ERREUR lecture {filepath}: {e}")
            continue

    if not dfs:
        raise ValueError("Aucun fichier CSV valide")

    result = pd.concat(dfs, ignore_index=True)
    logger.info(f"CSV total: {len(result)} lignes depuis {len(dfs)} fichiers")
    return result


def extract_all(config) -> dict:
    """Orchestre l'extraction de toutes les sources."""
    logger.info("=== PHASE EXTRACT ===")

    data = {}

    # API - Produits
    data['produits'] = extract_from_api(
        config.api_base_url, config.api_key, "products"
    )

    # API - Clients
    data['clients'] = extract_from_api(
        config.api_base_url, config.api_key, "customers"
    )

    # CSV - Ventes
    data['ventes'] = extract_csv_files(config.csv_input_dir)

    return data

Etape 4 : Validation des donnees entree

Validez TOUJOURS les donnees apres extraction et avant transformation. Une donnee corrompue en entree produit des resultats faux en sortie.

Python - validate.py
import pandas as pd
import logging
from dataclasses import dataclass
from typing import List, Optional

logger = logging.getLogger("etl_pipeline")

@dataclass
class ValidationResult:
    is_valid: bool
    errors: List[str]
    warnings: List[str]
    rows_before: int
    rows_after: int

def validate_dataframe(
    df: pd.DataFrame,
    name: str,
    required_columns: List[str],
    unique_columns: Optional[List[str]] = None,
    not_null_columns: Optional[List[str]] = None,
    min_rows: int = 1,
    max_null_ratio: float = 0.05
) -> ValidationResult:
    """Valide un DataFrame selon des regles metier."""
    errors = []
    warnings = []
    rows_before = len(df)

    # 1. Colonnes requises
    missing = set(required_columns) - set(df.columns)
    if missing:
        errors.append(f"Colonnes manquantes: {missing}")

    # 2. Nombre minimum de lignes
    if len(df) < min_rows:
        errors.append(f"Seulement {len(df)} lignes (minimum: {min_rows})")

    # 3. Doublons sur colonnes uniques
    if unique_columns:
        dupes = df.duplicated(subset=unique_columns).sum()
        if dupes > 0:
            warnings.append(f"{dupes} doublons sur {unique_columns}")

    # 4. Valeurs manquantes
    if not_null_columns:
        for col in not_null_columns:
            if col in df.columns:
                null_ratio = df[col].isnull().mean()
                if null_ratio > max_null_ratio:
                    errors.append(
                        f"Colonne '{col}': {null_ratio:.1%} nulls "
                        f"(max: {max_null_ratio:.1%})"
                    )
                elif null_ratio > 0:
                    warnings.append(f"Colonne '{col}': {null_ratio:.1%} nulls")

    # 5. Types attendus
    null_total = df.isnull().sum().sum()
    total_cells = df.shape[0] * df.shape[1]
    if null_total / total_cells > 0.1:
        warnings.append(f"Taux global de nulls eleve: {null_total/total_cells:.1%}")

    is_valid = len(errors) == 0

    # Log
    status = "VALIDE" if is_valid else "INVALIDE"
    logger.info(f"Validation [{name}]: {status} - {len(df)} lignes, "
                f"{len(errors)} erreurs, {len(warnings)} warnings")
    for e in errors:
        logger.error(f"  ERREUR: {e}")
    for w in warnings:
        logger.warning(f"  WARNING: {w}")

    return ValidationResult(
        is_valid=is_valid, errors=errors, warnings=warnings,
        rows_before=rows_before, rows_after=len(df)
    )

Etape 5 : Transformation (Transform)

La transformation nettoie, enrichit et structure les donnees. On utilise pandas pour les transformations complexes et DuckDB pour les jointures performantes.

Python - transform.py
import pandas as pd
import numpy as np
import duckdb
import logging

logger = logging.getLogger("etl_pipeline")

def transform_clients(df: pd.DataFrame) -> pd.DataFrame:
    """Nettoie et normalise les donnees clients."""
    logger.info("Transform clients...")
    df = df.copy()

    # Normalisation noms
    df['nom'] = df['nom'].str.strip().str.title()
    df['email'] = df['email'].str.strip().str.lower()

    # Validation email
    email_valid = df['email'].str.match(r'^[\w.+-]+@[\w-]+\.[\w.]+$')
    df.loc[~email_valid, 'email'] = np.nan

    # Deduplication par email
    df = df.sort_values('date_inscription', ascending=False)
    df = df.drop_duplicates(subset=['email'], keep='first')

    # Segmentation (regles metier)
    conditions = [
        df['ltv'] >= 10000,
        df['ltv'] >= 1000,
        df['ltv'] >= 100,
    ]
    segments = ['Platinum', 'Gold', 'Silver']
    df['segment'] = np.select(conditions, segments, default='Bronze')

    logger.info(f"  Clients: {len(df)} lignes apres nettoyage")
    return df.reset_index(drop=True)


def transform_ventes(df: pd.DataFrame) -> pd.DataFrame:
    """Nettoie et enrichit les donnees de ventes."""
    logger.info("Transform ventes...")
    df = df.copy()

    # Supprimer les montants invalides
    initial = len(df)
    df = df[df['montant'] > 0]
    df = df[df['quantite'] > 0]
    logger.info(f"  {initial - len(df)} lignes supprimees (montant/quantite <= 0)")

    # Enrichissement temporel
    df['annee'] = df['date_vente'].dt.year
    df['mois'] = df['date_vente'].dt.month
    df['trimestre'] = df['date_vente'].dt.quarter
    df['jour_semaine'] = df['date_vente'].dt.dayofweek
    df['est_weekend'] = df['jour_semaine'].isin([5, 6])

    # Calcul du montant total
    df['montant_total'] = df['montant'] * df['quantite']

    # Categories de montant
    df['tranche_montant'] = pd.cut(
        df['montant_total'],
        bins=[0, 50, 200, 1000, float('inf')],
        labels=['Petit', 'Moyen', 'Grand', 'Premium']
    )

    logger.info(f"  Ventes: {len(df)} lignes apres nettoyage")
    return df.reset_index(drop=True)


def create_fact_table(ventes: pd.DataFrame, clients: pd.DataFrame,
                      produits: pd.DataFrame) -> pd.DataFrame:
    """Cree la table de faits via DuckDB (jointures performantes)."""
    logger.info("Creation de la table de faits avec DuckDB...")

    fact = duckdb.sql("""
        SELECT
            v.id as vente_id,
            v.date_vente,
            v.annee,
            v.mois,
            v.trimestre,
            v.est_weekend,
            c.id as client_id,
            c.nom as client_nom,
            c.segment as client_segment,
            c.ville as client_ville,
            p.id as produit_id,
            p.nom as produit_nom,
            p.categorie as produit_categorie,
            v.quantite,
            v.montant as prix_unitaire,
            v.montant_total,
            v.tranche_montant
        FROM ventes v
        LEFT JOIN clients c ON v.code_client = c.code_client
        LEFT JOIN produits p ON v.code_produit = p.code_produit
        ORDER BY v.date_vente DESC
    """).df()

    logger.info(f"  Table de faits: {len(fact)} lignes")
    return fact

Etape 6 : Chargement (Load)

Le chargement ecrit les donnees dans PostgreSQL et archive en Parquet. On gere les conflits (upsert) et les transactions.

Python - load.py
import pandas as pd
import logging
import os
from datetime import datetime
from sqlalchemy import create_engine, text

logger = logging.getLogger("etl_pipeline")

def load_to_postgresql(df: pd.DataFrame, table_name: str,
                       engine, if_exists: str = "append",
                       chunksize: int = 10000) -> int:
    """Charge un DataFrame dans PostgreSQL."""
    logger.info(f"Chargement de {len(df)} lignes dans {table_name}...")

    try:
        df.to_sql(
            table_name,
            engine,
            if_exists=if_exists,
            index=False,
            chunksize=chunksize,
            method='multi'
        )
        logger.info(f"  {table_name}: {len(df)} lignes chargees")
        return len(df)
    except Exception as e:
        logger.error(f"  ERREUR chargement {table_name}: {e}")
        raise


def upsert_to_postgresql(df: pd.DataFrame, table_name: str,
                         engine, conflict_columns: list) -> int:
    """Upsert (INSERT ... ON CONFLICT UPDATE) dans PostgreSQL."""
    logger.info(f"Upsert de {len(df)} lignes dans {table_name}...")

    # Charger dans une table staging temporaire
    staging = f"staging_{table_name}"
    df.to_sql(staging, engine, if_exists="replace", index=False,
              method='multi', chunksize=10000)

    # Upsert depuis staging vers la table finale
    columns = df.columns.tolist()
    conflict_str = ", ".join(conflict_columns)
    update_cols = [c for c in columns if c not in conflict_columns]
    update_str = ", ".join([f"{c} = EXCLUDED.{c}" for c in update_cols])
    cols_str = ", ".join(columns)

    upsert_sql = f"""
        INSERT INTO {table_name} ({cols_str})
        SELECT {cols_str} FROM {staging}
        ON CONFLICT ({conflict_str})
        DO UPDATE SET {update_str};
    """

    with engine.begin() as conn:
        result = conn.execute(text(upsert_sql))
        conn.execute(text(f"DROP TABLE IF EXISTS {staging}"))

    logger.info(f"  Upsert {table_name}: complete")
    return len(df)


def archive_to_parquet(df: pd.DataFrame, output_dir: str,
                       name: str) -> str:
    """Archive un DataFrame en Parquet partitionne."""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filepath = os.path.join(output_dir, f"{name}_{timestamp}.parquet")
    os.makedirs(output_dir, exist_ok=True)

    df.to_parquet(filepath, compression="snappy", index=False)
    size_mb = os.path.getsize(filepath) / 1024**2
    logger.info(f"  Archive: {filepath} ({size_mb:.1f} MB)")
    return filepath

Etape 7 : Orchestrateur principal

L'orchestrateur relie toutes les etapes avec gestion d'erreurs globale et metriques.

Python - main.py
import sys
import logging
from datetime import datetime
from sqlalchemy import create_engine

from config import PipelineConfig, setup_logging
from utils import PipelineMetrics
from extract import extract_all
from validate import validate_dataframe
from transform import transform_clients, transform_ventes, create_fact_table
from load import load_to_postgresql, upsert_to_postgresql, archive_to_parquet

def run_pipeline():
    """Execute le pipeline ETL complet."""
    config = PipelineConfig()
    logger = setup_logging()
    metrics = PipelineMetrics()
    engine = create_engine(config.db_url)

    logger.info("=" * 60)
    logger.info(f"DEMARRAGE PIPELINE ETL - {datetime.now():%Y-%m-%d %H:%M:%S}")
    logger.info("=" * 60)

    try:
        # ===== EXTRACT =====
        raw_data = extract_all(config)
        metrics.rows_extracted = sum(len(df) for df in raw_data.values())

        # ===== VALIDATE INPUT =====
        v_clients = validate_dataframe(
            raw_data['clients'], "clients_raw",
            required_columns=['code_client', 'nom', 'email'],
            unique_columns=['code_client'],
            not_null_columns=['code_client', 'nom'],
            min_rows=config.min_rows_expected
        )
        v_ventes = validate_dataframe(
            raw_data['ventes'], "ventes_raw",
            required_columns=['date_vente', 'montant', 'code_client'],
            not_null_columns=['date_vente', 'montant'],
            min_rows=config.min_rows_expected
        )

        if not v_clients.is_valid or not v_ventes.is_valid:
            logger.error("ECHEC validation input - pipeline arrete")
            metrics.errors.append("Validation input echouee")
            metrics.log_summary()
            sys.exit(1)

        # ===== TRANSFORM =====
        logger.info("=== PHASE TRANSFORM ===")
        clients_clean = transform_clients(raw_data['clients'])
        ventes_clean = transform_ventes(raw_data['ventes'])
        fact_table = create_fact_table(
            ventes_clean, clients_clean, raw_data['produits']
        )
        metrics.rows_transformed = len(fact_table)

        # ===== VALIDATE OUTPUT =====
        v_output = validate_dataframe(
            fact_table, "fact_ventes",
            required_columns=['vente_id', 'date_vente', 'montant_total'],
            unique_columns=['vente_id'],
            not_null_columns=['vente_id', 'date_vente', 'montant_total'],
            min_rows=config.min_rows_expected
        )
        metrics.rows_rejected = (
            metrics.rows_extracted - metrics.rows_transformed
        )

        # ===== LOAD =====
        logger.info("=== PHASE LOAD ===")
        upsert_to_postgresql(clients_clean, "dim_clients", engine,
                            conflict_columns=["code_client"])
        load_to_postgresql(fact_table, "fact_ventes", engine)
        metrics.rows_loaded = len(fact_table)

        # Archive Parquet
        archive_to_parquet(fact_table, config.output_parquet_dir, "fact_ventes")

        logger.info("PIPELINE TERMINE AVEC SUCCES")

    except Exception as e:
        logger.critical(f"ECHEC PIPELINE: {e}", exc_info=True)
        metrics.errors.append(str(e))
        # Ici: envoyer alerte Slack/email
        send_alert(f"Pipeline ETL ECHEC: {e}")
        sys.exit(1)

    finally:
        metrics.log_summary()
        engine.dispose()

def send_alert(message: str):
    """Envoie une alerte (Slack, email, PagerDuty)."""
    logger = logging.getLogger("etl_pipeline")
    logger.info(f"ALERTE: {message}")
    # En production: requests.post(SLACK_WEBHOOK, json={"text": message})

if __name__ == "__main__":
    run_pipeline()

Etape 8 : Tests et monitoring

Un pipeline sans tests est une bombe a retardement. Ecrivez des tests unitaires pour chaque etape et des tests d'integration pour le pipeline complet.

Python - test_pipeline.py
import pytest
import pandas as pd
import numpy as np
from transform import transform_clients, transform_ventes
from validate import validate_dataframe

class TestTransformClients:
    def setup_method(self):
        self.df = pd.DataFrame({
            'code_client': ['C001', 'C002', 'C003', 'C002'],
            'nom': ['  alice dupont  ', 'BOB MARTIN', 'charlie', 'Bob Martin'],
            'email': ['alice@ex.com', 'bob@ex.com', 'invalid-email', 'bob@ex.com'],
            'date_inscription': pd.to_datetime([
                '2024-01-01', '2024-06-01', '2024-03-01', '2024-08-01'
            ]),
            'ltv': [15000, 500, 50, 1200],
            'ville': ['Paris', 'Lyon', 'Marseille', 'Lyon']
        })

    def test_nom_normalise(self):
        result = transform_clients(self.df)
        assert result.iloc[0]['nom'] == 'Alice Dupont'

    def test_email_invalide_supprime(self):
        result = transform_clients(self.df)
        assert result[result['code_client'] == 'C003']['email'].isna().all()

    def test_deduplication_email(self):
        result = transform_clients(self.df)
        assert result['email'].dropna().duplicated().sum() == 0

    def test_segmentation(self):
        result = transform_clients(self.df)
        platinum = result[result['code_client'] == 'C001']
        assert platinum.iloc[0]['segment'] == 'Platinum'

class TestValidation:
    def test_colonnes_manquantes(self):
        df = pd.DataFrame({'nom': ['test']})
        result = validate_dataframe(
            df, "test",
            required_columns=['nom', 'email', 'age']
        )
        assert not result.is_valid
        assert any('manquantes' in e for e in result.errors)

    def test_trop_de_nulls(self):
        df = pd.DataFrame({
            'id': range(100),
            'value': [None] * 20 + list(range(80))  # 20% nulls
        })
        result = validate_dataframe(
            df, "test",
            required_columns=['id', 'value'],
            not_null_columns=['value'],
            max_null_ratio=0.10
        )
        assert not result.is_valid

# pytest test_pipeline.py -v

Scenario : Pipeline fails at 3 AM - Monitoring et alertes

Votre pipeline ETL tourne chaque nuit a 2h du matin. A 3h, il echoue car l'API source renvoie des erreurs 503. Personne n'est au courant jusqu'au lendemain matin quand les analystes demandent pourquoi les donnees ne sont pas a jour.

Plan d'action :

  • 1. Alertes immediates : Configurer un webhook Slack/Teams et un email automatique en cas d'echec. Le send_alert() dans notre pipeline est le minimum.
  • 2. Dashboard de monitoring : Utiliser un outil comme Grafana pour afficher les metriques du pipeline (duree, lignes traitees, taux d'erreur). Stocker les metriques dans une table pipeline_runs.
  • 3. Retry automatique : Le decorateur @retry gere les erreurs 503 transitoires. Si 3 retries echouent, alerter et replanifier 1h plus tard.
  • 4. Dead letter queue : Les lignes qui echouent la validation ne sont pas perdues - les stocker dans un fichier rejected_YYYYMMDD.parquet pour investigation.
  • 5. SLA monitoring : Si le pipeline n'a pas termine a 5h (SLA interne), declencher une alerte "SLA breach" meme si le pipeline est encore en cours.

Pipeline ETL sans monitoring chez Knight Capital - 440M$ de perte

En 2012, Knight Capital Group a deploye du code de trading sur un serveur de production sans pipeline de monitoring adequat :

  • Probleme : un ancien code de test a ete reactive par erreur sur un des 8 serveurs de trading. Le deploiement n'avait pas de verification de coherence entre serveurs.
  • Detection tardive : aucune alerte automatique sur les anomalies de volume de trading. Les operateurs ont mis 45 minutes a comprendre le probleme.
  • Resultat : en 45 minutes, le code a genere 440 millions de dollars de pertes. Knight Capital a fait faillite.
  • Lecon pour nous : tout pipeline en production DOIT avoir des alertes automatiques, des validations de donnees a chaque etape, et un kill switch. Un pipeline ETL sans monitoring n'est pas un pipeline de production.

Ce lab couvre les fondamentaux d'un pipeline ETL de production. En realite, vous utiliserez un orchestrateur comme Airflow, Prefect ou Dagster pour gerer le scheduling, les retries, et les dependances entre taches. Mais les briques de base (extract, validate, transform, load, monitor) restent les memes. Retenez : un bon pipeline n'est pas celui qui transforme les donnees correctement - c'est celui qui ECHOUE proprement quand quelque chose ne va pas.

Examen Final - Phase 1

45 min Evaluation

Felicitations, vous avez parcouru toute la Phase 1 ! Cet examen final couvre les trois modules : SQL Avance, NoSQL & NewSQL, et Python pour la Data. Il melange des questions de connaissance et des scenarios de mise en situation reelle, comme un Data Architect les rencontre au quotidien.

Objectifs de l'examen

  • Verifier la maitrise des concepts SQL avances (window functions, CTEs, index)
  • Valider la comprehension du theoreme CAP et des choix NoSQL
  • Tester les competences Python Data (pandas, DuckDB, PySpark, formats)
  • Evaluer la capacite a prendre des decisions architecturales en situation reelle

Partie 1 : Questions de connaissance (6 questions)

Question 1 (SQL) : Quelle est la difference entre ROW_NUMBER(), RANK() et DENSE_RANK() quand deux employes ont le meme salaire ?

Les trois fonctions retournent le meme resultat en cas d'egalite
ROW_NUMBER donne des numeros uniques, RANK laisse un trou apres les ex-aequo, DENSE_RANK n'en laisse pas
RANK et DENSE_RANK sont identiques, seul ROW_NUMBER differe
ROW_NUMBER n'est pas utilisable avec PARTITION BY
Avec deux salaires egaux en 2e position : ROW_NUMBER donne 2,3 (arbitraire), RANK donne 2,2 puis 4 (trou), DENSE_RANK donne 2,2 puis 3 (pas de trou). C'est une question classique d'entretien Data.

Question 2 (SQL) : Quel est l'avantage principal d'une vue materialisee par rapport a une vue standard ?

Elle peut avoir des index
Elle stocke physiquement les resultats de la requete, eliminant le recalcul a chaque lecture
Elle se met a jour automatiquement en temps reel
Elle prend moins d'espace disque qu'une table
Une vue materialisee precalcule et stocke les resultats. Les lectures sont instantanees (comme une table). Une vue standard reexecute la requete a chaque SELECT. Le prix : espace disque et necessite de rafraichissement (REFRESH MATERIALIZED VIEW).

Question 3 (NoSQL) : Selon le theoreme CAP, qu'est-ce qu'un systeme CP (Consistency + Partition tolerance) sacrifie ?

La performance
La disponibilite (Availability) - il peut refuser des requetes en cas de partition reseau
La coherence des donnees
La tolerance aux pannes
Un systeme CP (comme MongoDB, HBase, Redis Cluster) garantit la coherence mais peut refuser de repondre si des noeuds sont injoignables. Un systeme AP (comme Cassandra, DynamoDB) reste disponible mais peut retourner des donnees pas encore synchronisees (eventual consistency).

Question 4 (NoSQL) : Pour stocker des relations de type reseau social (amis d'amis, recommandations, chemins les plus courts), quelle base NoSQL est la plus adaptee ?

MongoDB (document)
Redis (cle-valeur)
Cassandra (colonne large)
Neo4j (graphe)
Neo4j est concu pour les traversees de graphes. Trouver les amis d'amis a profondeur 3 est une operation O(1) par noeud en Neo4j, contre des JOINs recursifs O(n^3) en SQL. LinkedIn utilise un graphe pour les recommandations de connexions.

Question 5 (Python) : Quel format de fichier offre le meilleur ratio compression + vitesse de lecture pour un data lake analytique ?

CSV compresse en gzip
JSON Lines
Apache Parquet avec Snappy
Apache Avro
Parquet (format colonne) avec Snappy (compression rapide) offre le meilleur compromis : 5-10x plus petit que CSV, lecture 10-50x plus rapide grace a la projection pushdown (lecture selective de colonnes), et compression native. C'est le standard de facto des data lakes (Databricks, Snowflake, BigQuery).

Question 6 (Python) : Quelle est la maniere la plus performante de calculer une colonne conditionnelle sur 10 millions de lignes dans pandas ?

df.apply(lambda row: ..., axis=1)
for index, row in df.iterrows(): df.at[index, 'col'] = ...
np.where(condition, valeur_vrai, valeur_faux)
list comprehension puis assignation
np.where() est vectorise : il execute le calcul en C sur des blocs memoire contigus. Sur 10M lignes, il prend ~0.05s contre ~8s pour apply() et ~45s pour iterrows(). Le facteur est de 100x a 900x. C'est le pattern fondamental de performance pandas.

Partie 2 : Scenarios Data Architect (6 questions)

Scenario 1 : Vous etes Data Architect chez une fintech. L'equipe veut stocker les transactions bancaires (10M/jour, besoin de requetes analytiques sur l'historique de 2 ans, conformite reglementaire exigeant coherence ACID). Quelle architecture recommandez-vous ?

MongoDB pour la flexibilite du schema
PostgreSQL pour le transactionnel + Parquet/DuckDB pour l'analytique
Cassandra pour le volume d'ecriture
Redis pour la performance
La fintech a besoin d'ACID pour la conformite (PostgreSQL). Mais les requetes analytiques sur 2 ans (7 milliards de lignes) seraient trop lentes en PostgreSQL. Solution : PostgreSQL comme source de verite OLTP + export regulier en Parquet pour l'analytique avec DuckDB/Spark. C'est l'architecture HTAP (Hybrid Transactional/Analytical).

Scenario 2 : Vous etes Data Architect chez un media en ligne. Le CTO veut migrer de MySQL a MongoDB parce que "NoSQL est plus moderne". Le systeme actuel fonctionne bien avec 5M d'articles et des requetes relationnelles (auteurs, categories, tags). Que conseillez-vous ?

Migrer vers MongoDB pour beneficier de la flexibilite JSON
Rester sur MySQL - le modele relationnel convient parfaitement, la migration serait couteuse sans benefice clair
Migrer vers Cassandra pour la scalabilite
Migrer vers Neo4j pour les relations entre articles
Si le systeme fonctionne bien et que les requetes sont relationnelles (jointures auteurs/categories/tags), MySQL est le bon choix. Migrer vers MongoDB ajouterait de la complexite (denormalisation, perte de JOINs) sans benefice. Le choix de base de donnees doit etre guide par les besoins, pas par les tendances technologiques.

Scenario 3 : Vous etes Data Architect dans une startup e-commerce (50 employes). L'equipe data (3 personnes) traite 5 GB de donnees par jour. Un ingenieur propose de mettre en place un cluster Spark sur AWS EMR. Votre reaction ?

Excellente idee, Spark est la reference pour le Big Data
Non, 5 GB/jour est gerableen local avec DuckDB ou Polars, Spark serait de la sur-ingenierie couteuse
Oui mais utiliser Databricks au lieu de EMR
Mettre en place Hadoop en plus de Spark
5 GB/jour est largement gerable sur un seul serveur avec DuckDB ou Polars. Un cluster Spark sur EMR couterait 500-2000$/mois en infrastructure, plus le temps d'expertise de l'equipe. DuckDB traiterait ce volume en quelques secondes sur un laptop. Le Data Architect doit resister a la tentation de la sur-ingenierie.

Scenario 4 : Vous etes Data Architect chez un retailer. Votre pipeline ETL pandas prend 6 heures car il utilise des boucles iterrows() sur un DataFrame de 50M lignes. Quelle approche pour reduire a moins de 30 minutes ?

Migrer tout vers PySpark sur un cluster
Vectoriser les operations pandas (np.where, .str, .dt) et/ou utiliser DuckDB pour les transformations SQL
Acheter un serveur avec plus de RAM
Paralleliser avec multiprocessing Python
La vectorisation (remplacer iterrows par np.where, operations .str/.dt) peut accelerer de 100-1000x. 6 heures d'iterrows deviennent souvent 5-10 minutes en vectorise. DuckDB peut aussi traiter les transformations SQL directement sur les DataFrames pandas. Pas besoin de Spark pour 50M lignes sur un seul serveur.

Scenario 5 : Vous etes Data Architect chez une entreprise IoT. 10 000 capteurs envoient des mesures toutes les secondes (10K ecritures/sec). Les donnees doivent etre conservees 5 ans et interrogeables par capteur et par plage de temps. Quelle base de donnees ?

PostgreSQL avec partitionnement par mois
MongoDB avec TTL index
Cassandra ou TimescaleDB, concus pour les time-series a haut volume d'ecriture
Redis avec persistence
10K ecritures/sec avec retention de 5 ans (~1.5 trillion de points) est un cas classique de time-series database. Cassandra excelle en ecriture distribuee avec un modele de partition par capteur+jour. TimescaleDB (extension PostgreSQL) offre le confort du SQL avec les performances time-series. PostgreSQL standard ne tiendrait pas le volume d'ecriture.

Scenario 6 : Vous etes Data Architect et vous decouvrez que le data lake de votre entreprise stocke tout en CSV (200 TB). Le CEO demande une estimation du ROI d'une migration vers Parquet. Que repondez-vous ?

Le gain sera negligeable, CSV est un format standard
Reduction de 80% du stockage (200TB a 30-40TB), requetes 10-50x plus rapides, economies de 100K+$/an en infrastructure
La migration est trop risquee, mieux vaut rester en CSV
Migrer vers JSON serait plus benefique
Parquet compresse typiquement a 15-20% de la taille CSV. 200 TB deviendraient 30-40 TB. Sur le cloud (S3/GCS), le stockage coute ~0.023$/GB/mois : economie de ~3 700$/mois en stockage seul. Les requetes 10-50x plus rapides reduisent aussi les couts de compute (EMR/Dataproc). LinkedIn a economise des millions avec cette migration.

Bilan et mots du mentor

Si vous avez suivi cette Phase 1 avec attention, vous maitrisez maintenant les fondamentaux essentiels d'un Data Architect :

Module 1.1 - SQL Avance

  • Window functions (ROW_NUMBER, RANK, LAG/LEAD)
  • CTEs et CTEs recursives
  • Plans d'execution et indexation
  • Partitionnement et vues materialisees

Module 1.2 - NoSQL & NewSQL

  • Theoreme CAP et decisions architecturales
  • MongoDB, Redis, Cassandra, Neo4j
  • Quand utiliser SQL vs NoSQL
  • NewSQL et bases hybrides

Module 1.3 - Python pour la Data

  • Ecosysteme Python : pandas, DuckDB, Polars, PySpark
  • Matrice de decision basee sur le volume
  • Performance : vectorisation vs boucles
  • Formats de fichiers : Parquet, Avro, Arrow
  • Pipelines ETL robustes avec monitoring

La Phase 1 vous a donne les briques fondamentales. Vous savez maintenant interroger des donnees (SQL), choisir le bon stockage (SQL vs NoSQL), et manipuler les donnees en Python. La Phase 2 - Modelisation va construire sur ces fondations : vous apprendrez a concevoir des modeles de donnees qui servent l'entreprise, a comprendre les schemas en etoile et flocon, et a penser en termes de data warehouse et data lake. C'est la ou le Data Architect se differencie de l'ingenieur data : dans la capacite a modeler l'information pour creer de la valeur. Rendez-vous en Phase 2!

Votre parcours Data Architect
  Phase 1            Phase 2           Phase 3           Phase 4
  FONDAMENTAUX       MODELISATION      MODERN STACK      ARCHITECTURE
  β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ       β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘      β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘      β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘
  βœ“ Complete!        A venir           A venir           A venir

  SQL Avance ──▢ Data Modeling ──▢ Kafka/Spark ──▢ Cloud Architecture
  NoSQL      ──▢ Star Schema  ──▢ dbt/Airflow ──▢ Lakehouse
  Python     ──▢ Data Vault   ──▢ Delta Lake  ──▢ Gouvernance