1. SQL Window Functions
ROW_NUMBER()Numero unique par ligne dans la partition
RANK()Rang avec trous en cas d'egalite
DENSE_RANK()Rang sans trous en cas d'egalite
NTILE(n)Divise en n groupes egaux
LAG(col, n)Valeur de n lignes precedentes
LEAD(col, n)Valeur de n lignes suivantes
FIRST_VALUE()Premiere valeur de la fenetre
LAST_VALUE()Derniere valeur de la fenetre
SUM() OVERSomme cumulative / par partition
AVG() OVERMoyenne glissante / par partition
-- Syntaxe generale
fonction() OVER (
PARTITION BY colonne
ORDER BY colonne
ROWS BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW
)
-- Running total des ventes
SELECT date_vente,
montant,
SUM(montant) OVER (
ORDER BY date_vente
ROWS BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW
) AS cumul
FROM ventes;
-- Top 3 produits par categorie
SELECT * FROM (
SELECT nom, categorie, ca,
ROW_NUMBER() OVER (
PARTITION BY categorie
ORDER BY ca DESC
) AS rn
FROM produits
) t WHERE rn <= 3;
-- Croissance mois par mois
SELECT mois, ca,
ca - LAG(ca) OVER (ORDER BY mois)
AS croissance,
ROUND(100.0 * (ca - LAG(ca)
OVER (ORDER BY mois))
/ LAG(ca) OVER (ORDER BY mois), 2)
AS pct_croissance
FROM ventes_mensuelles;
RANK: 1,1,3 vs DENSE_RANK: 1,1,2 - Choisir DENSE_RANK pour pagination, RANK pour classements sportifs.
2. SQL Optimisation
EXPLAINAffiche le plan d'execution estime
EXPLAIN ANALYZEExecute + affiche le plan reel
EXPLAIN BUFFERSAjoute les stats I/O (pages lues)
EXPLAIN FORMAT JSONSortie en JSON (plus detaillee)
-- Analyser une requete
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT)
SELECT * FROM orders
WHERE customer_id = 42
ORDER BY created_at DESC
LIMIT 10;
-- Noeuds du plan a connaitre
-- Seq Scan : Scan sequentiel (lent)
-- Index Scan : Utilise un index (rapide)
-- Index Only Scan: Lecture index seul
-- Bitmap Scan : Combine index + heap
-- Hash Join : Jointure par hash table
-- Merge Join : Jointure par tri+fusion
-- Nested Loop : Boucle imbriquee
| Type d'Index | Usage | Quand l'utiliser |
| B-tree | =, <, >, BETWEEN, LIKE 'x%' | Par defaut, 90% des cas |
| Hash | = uniquement | Egalite stricte, gros volumes |
| GIN | JSONB, full-text, arrays | Recherche dans documents |
| GiST | Geometrie, ranges, full-text | Donnees spatiales |
| BRIN | Colonnes correlees physiquement | Tables tres larges, timestamps |
-- Creer un index partiel
CREATE INDEX idx_orders_active
ON orders (created_at)
WHERE status = 'active';
-- Index multi-colonnes
CREATE INDEX idx_cust_date
ON orders (customer_id, created_at DESC);
-- Index GIN pour JSONB
CREATE INDEX idx_data_gin
ON events USING GIN (payload);
Regle d'or : si Seq Scan sur une grande table avec filtre, il manque probablement un index.
3. PostgreSQL Commandes Essentielles
\lLister toutes les bases de donnees
\c dbnameSe connecter a une base
\dtLister les tables du schema courant
\d tableDecrire une table (colonnes, index)
\diLister tous les index
\dnLister les schemas
\dfLister les fonctions
\timingActiver/desactiver le chrono
\xMode affichage etendu (vertical)
\copyImport/export CSV depuis le client
-- CTE (Common Table Expression)
WITH monthly_sales AS (
SELECT date_trunc('month', created_at)
AS mois,
SUM(amount) AS ca
FROM orders
GROUP BY 1
)
SELECT mois, ca,
AVG(ca) OVER (
ORDER BY mois
ROWS BETWEEN 2 PRECEDING
AND CURRENT ROW
) AS moyenne_mobile_3m
FROM monthly_sales;
-- CTE Recursive (hierarchie)
WITH RECURSIVE org AS (
SELECT id, nom, manager_id, 1 AS niveau
FROM employes WHERE manager_id IS NULL
UNION ALL
SELECT e.id, e.nom, e.manager_id,
o.niveau + 1
FROM employes e
JOIN org o ON e.manager_id = o.id
)
SELECT * FROM org ORDER BY niveau;
-- Partitionnement
CREATE TABLE logs (
id BIGSERIAL,
created_at TIMESTAMP,
message TEXT
) PARTITION BY RANGE (created_at);
CREATE TABLE logs_2024_01
PARTITION OF logs
FOR VALUES FROM ('2024-01-01')
TO ('2024-02-01');
-- Materialized View
CREATE MATERIALIZED VIEW mv_stats AS
SELECT customer_id,
COUNT(*) AS nb_orders,
SUM(amount) AS total
FROM orders GROUP BY customer_id;
REFRESH MATERIALIZED VIEW
CONCURRENTLY mv_stats;
4. MongoDB Commandes
insertOne()Inserer un document
insertMany()Inserer plusieurs documents
find()Rechercher des documents
updateOne()Modifier un document
deleteOne()Supprimer un document
aggregate()Pipeline d'aggregation
createIndex()Creer un index
// CRUD Operations
db.users.insertOne({
nom: "Dupont",
email: "dupont@mail.com",
age: 32,
adresse: { ville: "Paris", cp: "75001" }
});
db.users.find({
age: { $gte: 25, $lte: 40 },
"adresse.ville": "Paris"
}).sort({ nom: 1 }).limit(10);
db.users.updateOne(
{ email: "dupont@mail.com" },
{ $set: { age: 33 },
$push: { tags: "premium" } }
);
// Aggregation Pipeline
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: {
_id: "$customer_id",
total: { $sum: "$amount" },
count: { $sum: 1 },
avg: { $avg: "$amount" }
}},
{ $sort: { total: -1 } },
{ $limit: 10 },
{ $lookup: {
from: "customers",
localField: "_id",
foreignField: "_id",
as: "customer"
}},
{ $unwind: "$customer" },
{ $project: {
nom: "$customer.nom",
total: 1, count: 1, avg: 1
}}
]);
Embedding vs Referencing : Embarquer si toujours lu ensemble et <16MB. Referencer si mis a jour independamment ou partage entre documents.
5. Redis Commandes
SET k vDefinir une cle-valeur
GET kLire une valeur
SETEX k s vSET avec expiration (secondes)
DEL kSupprimer une cle
HSET k f vDefinir un champ dans un hash
HGET k fLire un champ d'un hash
HGETALL kLire tous les champs d'un hash
LPUSH k vAjouter en tete de liste
RPUSH k vAjouter en fin de liste
LRANGE k 0 -1Lire toute la liste
SADD k vAjouter a un ensemble (set)
SMEMBERS kLire tous les membres du set
ZADD k s vAjouter a un sorted set (score)
ZRANGE k 0 -1Lire le sorted set (par score)
EXPIRE k sDefinir un TTL en secondes
TTL kVoir le TTL restant
# Patterns courants Redis
# Cache avec TTL
SET user:42:profile '{"nom":"Dupont"}'
EXPIRE user:42:profile 3600
# Session utilisateur
HSET session:abc user_id 42
HSET session:abc role "admin"
EXPIRE session:abc 1800
# Rate limiting
INCR api:user:42:requests
EXPIRE api:user:42:requests 60
# Leaderboard
ZADD leaderboard 1500 "alice"
ZADD leaderboard 2300 "bob"
ZREVRANGE leaderboard 0 9 WITHSCORES
# Pub/Sub
SUBSCRIBE channel:notifications
PUBLISH channel:notifications "msg"
# Stream (event log)
XADD events * type "click" page "/home"
XREAD COUNT 10 STREAMS events 0
RDB = snapshots periodiques (rapide au redemarrage). AOF = log de chaque commande (plus durable). En production, utiliser les deux.
6. Cassandra CQL
CREATE KEYSPACECreer un espace de noms
CREATE TABLECreer une table (partition key)
INSERT INTOInserer une ligne
SELECTLire (toujours filtrer par PK)
-- Keyspace avec replication
CREATE KEYSPACE ecommerce
WITH replication = {
'class': 'NetworkTopologyStrategy',
'dc1': 3, 'dc2': 3
};
-- Table optimisee pour les lectures
CREATE TABLE ecommerce.orders_by_customer (
customer_id UUID,
order_date TIMESTAMP,
order_id UUID,
amount DECIMAL,
status TEXT,
PRIMARY KEY (customer_id, order_date)
) WITH CLUSTERING ORDER BY (order_date DESC);
-- Requetes (toujours par partition key)
SELECT * FROM orders_by_customer
WHERE customer_id = ?
AND order_date >= '2024-01-01'
LIMIT 20;
| Niveau Consistance | Lecture | Ecriture | Usage |
| ONE | 1 replica | 1 replica | Perf max, risque |
| QUORUM | N/2 + 1 | N/2 + 1 | Equilibre (defaut) |
| LOCAL_QUORUM | DC local | DC local | Multi-datacenter |
| ALL | Tous | Tous | Consistance max |
Regle cle : modeliser par requete, pas par entite. Une table = une requete. La denormalisation est normale en Cassandra.
7. Neo4j Cypher
CREATE (n:Label)Creer un noeud
MATCH (n)Chercher des noeuds
-[:REL]->Relation dirigee
WHEREFiltrer les resultats
RETURNRetourner les resultats
MERGECreer si n'existe pas
// Creer des noeuds et relations
CREATE (alice:Personne {nom: "Alice", age: 30})
CREATE (bob:Personne {nom: "Bob", age: 28})
CREATE (alice)-[:CONNAIT {depuis: 2020}]->(bob);
// Trouver les amis d'amis
MATCH (p:Personne {nom: "Alice"})
-[:CONNAIT]->(ami)
-[:CONNAIT]->(ami_ami)
WHERE ami_ami <> p
RETURN DISTINCT ami_ami.nom;
// Detection de fraude (cycles)
MATCH chemin = (c:Compte)-[:TRANSFERT*3..6]->(c)
WHERE ALL(r IN relationships(chemin)
WHERE r.montant > 10000)
RETURN chemin;
// Recommandation produit
MATCH (u:User {id: 42})-[:ACHETE]->(p)
<-[:ACHETE]-(autre)
-[:ACHETE]->(reco)
WHERE NOT (u)-[:ACHETE]->(reco)
RETURN reco.nom, COUNT(*) AS score
ORDER BY score DESC LIMIT 5;
// Shortest path
MATCH chemin = shortestPath(
(a:Personne {nom:"Alice"})
-[:CONNAIT*]->
(b:Personne {nom:"Zoe"})
)
RETURN chemin, length(chemin);
Les graph databases excellent pour : reseaux sociaux, detection de fraude, moteurs de recommandation, gestion de dependances, et knowledge graphs.
8. Python pandas Reference
import pandas as pd
import numpy as np
# Lecture de donnees
df = pd.read_csv("data.csv")
df = pd.read_parquet("data.parquet")
df = pd.read_sql(query, engine)
df = pd.read_json("data.json")
# Exploration
df.shape # (lignes, colonnes)
df.info() # Types et nulls
df.describe() # Stats descriptives
df.head(10) # 10 premieres lignes
df.dtypes # Types des colonnes
df.isna().sum() # Comptage des nulls
# Selection et filtrage
df["colonne"] # Serie
df[["col1", "col2"]] # DataFrame
df.loc[0:5, "col1":"col3"] # Par label
df.iloc[0:5, 0:3] # Par position
df[df["age"] > 25] # Filtre bool
df.query("age > 25 and ville == 'Paris'")
# Transformation
df["new"] = df["a"] + df["b"]
df["cat"] = pd.cut(df["age"],
bins=[0,25,35,50,100],
labels=["junior","mid","senior","exec"])
df["date"] = pd.to_datetime(df["date_str"])
# Aggregation
df.groupby("categorie").agg(
ca_total=("montant", "sum"),
nb_ventes=("id", "count"),
panier_moyen=("montant", "mean")
).reset_index()
# Jointure / Merge
result = pd.merge(
orders, customers,
on="customer_id", how="left"
)
# Pivot Table
pivot = df.pivot_table(
values="montant",
index="mois",
columns="categorie",
aggfunc="sum",
fill_value=0
)
# Apply
df["label"] = df["score"].apply(
lambda x: "bon" if x >= 80 else "moyen"
)
# Export
df.to_csv("out.csv", index=False)
df.to_parquet("out.parquet")
df.to_sql("table", engine, if_exists="replace")
9. SQLAlchemy Reference
from sqlalchemy import (
create_engine, text, Column,
Integer, String, Float, DateTime
)
from sqlalchemy.orm import (
declarative_base, sessionmaker
)
# Connexion
engine = create_engine(
"postgresql://user:pass@host:5432/db",
pool_size=10,
max_overflow=20,
echo=False
)
# Core (SQL brut avec parametres)
with engine.connect() as conn:
result = conn.execute(
text("SELECT * FROM users WHERE id = :id"),
{"id": 42}
)
rows = result.fetchall()
# ORM - Definition du modele
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
nom = Column(String(100), nullable=False)
email = Column(String(200), unique=True)
age = Column(Integer)
# ORM - Session et requetes
Session = sessionmaker(bind=engine)
session = Session()
# Create
user = User(nom="Dupont", email="d@m.com")
session.add(user)
session.commit()
# Read
users = session.query(User)\
.filter(User.age >= 25)\
.order_by(User.nom)\
.limit(10)\
.all()
# Update
session.query(User)\
.filter(User.id == 42)\
.update({"age": 33})
session.commit()
# Delete
session.query(User)\
.filter(User.id == 42)\
.delete()
session.commit()
# pandas integration
import pandas as pd
df = pd.read_sql(
"SELECT * FROM users WHERE age > 25",
engine
)
df.to_sql("users_backup", engine,
if_exists="replace", index=False)
Architecture SQLAlchemy
========================
Application Python
|
[SQLAlchemy ORM] -- Modeles, Session
|
[SQLAlchemy Core] -- SQL Expression, Engine
|
[DBAPI / Driver] -- psycopg2, pymysql
|
[Base de donnees] -- PostgreSQL, MySQL
10. PySpark Reference
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Initialisation
spark = SparkSession.builder \
.appName("DataPipeline") \
.config("spark.sql.shuffle.partitions", 200) \
.getOrCreate()
# Lecture
df = spark.read.csv("data.csv",
header=True, inferSchema=True)
df = spark.read.parquet("data.parquet")
df = spark.read.json("data.json")
df = spark.read.jdbc(url, "table",
properties={"user":"u","password":"p"})
# Transformations (lazy)
df2 = df \
.filter(F.col("age") > 25) \
.withColumn("annee",
F.year(F.col("date"))) \
.withColumn("nom_upper",
F.upper(F.col("nom"))) \
.select("id", "nom_upper", "age", "annee")
# Aggregation
stats = df.groupBy("categorie").agg(
F.sum("montant").alias("ca_total"),
F.count("id").alias("nb_ventes"),
F.avg("montant").alias("panier_moyen")
)
# Window Functions
w = Window.partitionBy("dept") \
.orderBy(F.desc("salaire"))
df = df.withColumn("rang",
F.row_number().over(w))
# Jointure
result = orders.join(
customers,
orders.cust_id == customers.id,
"left"
)
# Actions (declenchent l'execution)
df.show(20) # Afficher
df.count() # Compter
df.collect() # Recuperer tout
df.toPandas() # Convertir
# Ecriture
df.write.parquet("output/",
mode="overwrite",
partitionBy=["annee", "mois"])
df.write.jdbc(url, "table",
mode="append",
properties=props)
Architecture Spark
===================
Driver Program
[SparkContext]
|
Cluster Manager
(YARN / K8s / Standalone)
/ | \
Executor Executor Executor
[Task] [Task] [Task]
[Task] [Task] [Task]
[Cache] [Cache] [Cache]
11. Formats de Fichiers Data
| Format | Type | Compression | Schema | Usage |
| CSV | Ligne | Non | Non | Echange simple, import |
| JSON | Document | Non | Non | APIs, config, logs |
| Parquet | Colonne | Snappy/Gzip | Oui | Analytics, Data Lake |
| Avro | Ligne | Snappy/Deflate | Oui | Streaming, Kafka |
| ORC | Colonne | Zlib/Snappy | Oui | Hive, Hadoop |
| Delta | Colonne+ | Snappy | Oui | Lakehouse, ACID |
Stockage Ligne vs Colonne
==========================
Ligne (CSV, Avro): Colonne (Parquet, ORC):
+----+------+-----+ +----+----+----+----+
| id | nom | age | | 1 | 2 | 3 | 4 | id
+----+------+-----+ +----+----+----+----+
| 1 | Anne | 30 | | An | Bo | Ca | Di | nom
| 2 | Bob | 25 | +----+----+----+----+
| 3 | Carl | 35 | | 30 | 25 | 35 | 28 | age
| 4 | Dia | 28 | +----+----+----+----+
+----+------+-----+
SELECT AVG(age) FROM t;
Bon pour : INSERT, Bon pour : analytics,
ecriture, streaming aggregation, colonnes
specifiques
# Python : lire/ecrire les formats
import pandas as pd
import pyarrow.parquet as pq
# Parquet
df = pd.read_parquet("data.parquet")
df.to_parquet("out.parquet",
engine="pyarrow",
compression="snappy")
# Avro
from fastavro import reader, writer
with open("data.avro", "rb") as f:
rdr = reader(f)
records = [r for r in rdr]
# JSON Lines
df = pd.read_json("data.jsonl",
lines=True)
df.to_json("out.jsonl",
orient="records", lines=True)
12. GROUPING SETS / ROLLUP / CUBE
GROUPING SETSDefinir manuellement les niveaux d'aggregation
ROLLUPHierarchie d'aggregations (sous-totaux + total)
CUBEToutes les combinaisons possibles
GROUPING()Detecter les lignes de sous-total (0=reel, 1=total)
-- GROUPING SETS : controle fin
SELECT region, produit, SUM(ca) AS total
FROM ventes
GROUP BY GROUPING SETS (
(region, produit), -- par region+produit
(region), -- sous-total par region
(produit), -- sous-total par produit
() -- total general
);
-- ROLLUP : hierarchie automatique
-- Genere : (annee,trimestre,mois), (annee,trimestre), (annee), ()
SELECT annee, trimestre, mois,
SUM(ca) AS total
FROM ventes
GROUP BY ROLLUP (annee, trimestre, mois);
-- CUBE : toutes combinaisons
-- Genere : (region,produit), (region), (produit), ()
SELECT region, produit, SUM(ca) AS total
FROM ventes
GROUP BY CUBE (region, produit);
-- Distinguer vrais NULLs des sous-totaux
SELECT
CASE WHEN GROUPING(region) = 1
THEN '** TOTAL **' ELSE region END AS region,
CASE WHEN GROUPING(produit) = 1
THEN '** TOUS **' ELSE produit END AS produit,
SUM(ca) AS total
FROM ventes
GROUP BY CUBE (region, produit);
| Clause | Groupements generes | Cas d'usage |
| GROUPING SETS | Exactement ceux specifies | Rapports personnalises |
| ROLLUP(a,b,c) | (a,b,c), (a,b), (a), () | Hierarchies (geo, temps) |
| CUBE(a,b) | (a,b), (a), (b), () | Tableaux croises complets |
ROLLUP = sous-totaux hierarchiques (ideal pour rapports financiers). CUBE = toutes les combinaisons (ideal pour analyses multidimensionnelles / OLAP).
13. PostgreSQL JSON / JSONB
->Acces par cle, retourne JSON
->>Acces par cle, retourne TEXT
#>Acces par chemin, retourne JSON
#>>Acces par chemin, retourne TEXT
?La cle existe-t-elle ?
@>Contient le JSON (inclusion)
jsonb_agg()Aggrege en tableau JSON
jsonb_object_agg()Aggrege en objet JSON
-- Acces aux donnees JSONB
SELECT
payload->'user'->>'name' AS nom,
payload->'user'->>'email' AS email,
(payload->>'amount')::numeric AS montant
FROM events
WHERE payload @> '{"type": "purchase"}'
AND payload->'user' ? 'email';
-- Mise a jour de champs JSONB
UPDATE products
SET metadata = jsonb_set(
metadata, '{stock}',
to_jsonb(
(metadata->>'stock')::int - 1
)
)
WHERE id = 42;
-- Aggregation en JSON
SELECT customer_id,
jsonb_agg(
jsonb_build_object(
'produit', product_name,
'montant', amount
) ORDER BY created_at DESC
) AS commandes
FROM orders
GROUP BY customer_id;
-- Decomposer un tableau JSON
SELECT e.id,
item->>'name' AS item_name,
(item->>'qty')::int AS qty
FROM events e,
jsonb_array_elements(e.payload->'items')
AS item;
Toujours utiliser JSONB (pas JSON) : supporte les index GIN, plus rapide en lecture. Creer un index GIN pour les requetes avec @>, ?, ?|, ?&.
14. DuckDB - OLAP In-Process
duckdbLancer le CLI DuckDB
duckdb.connect()Connexion Python (in-memory par defaut)
read_parquet()Lire Parquet directement en SQL
read_csv_auto()Lire CSV avec detection auto du schema
import duckdb
# SQL directement sur des fichiers
duckdb.sql("""
SELECT region, SUM(amount) AS total
FROM read_parquet('sales/*.parquet')
WHERE year = 2024
GROUP BY region
ORDER BY total DESC
""").show()
# Integration avec pandas
import pandas as pd
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Carol'],
'score': [95, 82, 91]
})
# Requeter un DataFrame pandas en SQL !
result = duckdb.sql("""
SELECT name, score,
RANK() OVER (ORDER BY score DESC) AS rang
FROM df
WHERE score > 80
""").df() # .df() retourne un pandas DataFrame
# Jointure fichier + DataFrame
duckdb.sql("""
SELECT c.name, o.product, o.amount
FROM df AS c
JOIN read_csv_auto('orders.csv') AS o
ON c.name = o.customer_name
""")
# Export direct en Parquet
duckdb.sql("""
COPY (SELECT * FROM read_csv_auto('big.csv'))
TO 'output.parquet' (FORMAT PARQUET)
""")
DuckDB = "le SQLite de l'analytique". Zero config, embarque dans le process Python, ideal pour l'exploration de donnees et le prototypage de requetes SQL sur Parquet/CSV.
15. Polars - DataFrame Rapide (Rust)
pl.read_parquet()Lire un fichier Parquet
pl.scan_parquet()LazyFrame (execution differee)
.filter()Filtrer les lignes
.select()Selectionner/transformer des colonnes
.group_by()Aggregation par groupe
.collect()Executer un LazyFrame
import polars as pl
# Lecture et transformation
df = pl.read_parquet("sales.parquet")
# API Expressions (chainable)
result = df.filter(
pl.col("amount") > 100
).group_by("region").agg(
pl.col("amount").sum().alias("ca_total"),
pl.col("amount").mean().alias("panier_moyen"),
pl.col("order_id").count().alias("nb_ventes")
).sort("ca_total", descending=True)
# Mode Lazy (optimisation automatique)
lazy_result = (
pl.scan_parquet("sales/*.parquet")
.filter(pl.col("year") == 2024)
.group_by("region", "product")
.agg(pl.col("amount").sum().alias("total"))
.sort("total", descending=True)
.collect() # execution optimisee
)
# Window functions
df = df.with_columns(
pl.col("amount")
.rank(method="dense")
.over("region")
.alias("rank_in_region")
)
# Conversion pandas <-> Polars
pd_df = result.to_pandas()
pl_df = pl.from_pandas(pd_df)
| Critere | pandas | Polars |
| Langage natif | Python/C | Rust |
| Execution | Eager uniquement | Eager + Lazy |
| Multithreading | Non (GIL) | Oui (natif) |
| Memoire (1Go CSV) | ~3-5 Go | ~1-2 Go |
| Vitesse (groupby) | 1x | 5-20x |
| Cas d'usage | Prototypage, ML | ETL, gros volumes |
Polars : 5-20x plus rapide que pandas, utilise tous les coeurs CPU. Privilegier le mode Lazy (scan_*) pour l'optimisation automatique du plan de requete.
16. Elasticsearch Reference
PUT /indexCreer un index
POST /index/_docIndexer un document
GET /index/_searchRechercher dans un index
POST /_bulkIndexation en masse
# Creer un index avec mapping
PUT /products
{
"mappings": {
"properties": {
"name": { "type": "text",
"analyzer": "french" },
"description": { "type": "text" },
"price": { "type": "float" },
"category": { "type": "keyword" },
"created_at": { "type": "date" }
}
}
}
# Recherche full-text avec scoring
GET /products/_search
{
"query": {
"bool": {
"must": [
{ "match": {
"name": "laptop gaming" } }
],
"filter": [
{ "range": {
"price": { "lte": 2000 } } },
{ "term": {
"category": "electronics" } }
]
}
},
"sort": [{ "_score": "desc" }],
"size": 10
}
# Python avec elasticsearch-py
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
es.index(index="products", document={
"name": "Laptop Pro",
"price": 1499.99,
"category": "electronics"
})
results = es.search(
index="products",
query={"match": {"name": "laptop"}}
)
Elasticsearch = index inverse distribue. Ideal pour : recherche full-text, logs (ELK Stack), monitoring. Ne PAS utiliser comme base de donnees primaire.
17. TimescaleDB - Time Series
create_hypertableConvertir une table en hypertable
time_bucket()Aggregation par intervalle de temps
continuous aggregateMaterialized view auto-rafraichie
compressionCompression automatique des anciennes donnees
-- Extension TimescaleDB
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- Creer une hypertable
CREATE TABLE metrics (
time TIMESTAMPTZ NOT NULL,
sensor_id INT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
SELECT create_hypertable('metrics', 'time');
-- Inserer des donnees (SQL standard)
INSERT INTO metrics VALUES
(NOW(), 1, 22.5, 65.3),
(NOW(), 2, 23.1, 62.8);
-- Aggregation par intervalle de temps
SELECT
time_bucket('1 hour', time) AS heure,
sensor_id,
AVG(temperature) AS temp_moy,
MAX(temperature) AS temp_max,
MIN(temperature) AS temp_min
FROM metrics
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY heure, sensor_id
ORDER BY heure DESC;
-- Continuous Aggregate (auto-refresh)
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS bucket,
sensor_id,
AVG(temperature) AS avg_temp,
COUNT(*) AS nb_points
FROM metrics
GROUP BY bucket, sensor_id;
-- Compression automatique (> 7 jours)
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
SELECT add_compression_policy('metrics',
INTERVAL '7 days');
TimescaleDB = PostgreSQL + time series. Avantage cle : tout le SQL standard fonctionne (JOINs, CTEs, Window Functions). Ideal pour IoT, monitoring, finance.
18. Pandera - Data Validation
pa.DataFrameSchemaDefinir un schema de validation
pa.ColumnDefinir les contraintes d'une colonne
pa.CheckDefinir des regles de validation
schema.validate()Valider un DataFrame
import pandera as pa
import pandas as pd
# Definir un schema de validation
schema = pa.DataFrameSchema({
"customer_id": pa.Column(
int, pa.Check.gt(0),
nullable=False, unique=True
),
"email": pa.Column(
str,
pa.Check.str_matches(
r'^[\w.+-]+@[\w-]+\.[\w.]+$'
),
nullable=False
),
"age": pa.Column(
int,
[pa.Check.in_range(18, 120)],
nullable=True
),
"revenue": pa.Column(
float,
pa.Check.ge(0),
nullable=False
),
"segment": pa.Column(
str,
pa.Check.isin(
["bronze","silver","gold","platinum"]
)
)
})
# Valider un DataFrame
try:
validated_df = schema.validate(df)
print("Donnees valides !")
except pa.errors.SchemaError as e:
print(f"Erreur : {e.failure_cases}")
# Decorator pour fonctions
@pa.check_input(schema)
def process_customers(df: pd.DataFrame):
return df.groupby("segment").agg(
total=("revenue", "sum")
)
Pandera valide les DataFrames pandas/Polars. A utiliser en entree de chaque pipeline ETL pour garantir la qualite des donnees. Complement : Great Expectations pour la validation a grande echelle.
19. Theoreme CAP - Guide de Decision
Consistency (C)
/\
/ \
/ \
/ CP \
/ Bases \
/----------\
/ MongoDB \
/ PostgreSQL \
/ Redis Cluster \
/------------------\
/ \
/ CA AP \
/ Bases Bases \
/______________|___________\
Availability (A) Partition
Tolerance (P)
CP: PostgreSQL, MongoDB, Redis Cluster
-> Consistance + Tolerance partitions
-> Sacrifice: disponibilite en cas de partition
AP: Cassandra, DynamoDB, CouchDB
-> Disponibilite + Tolerance partitions
-> Sacrifice: consistance (eventuelle)
CA: PostgreSQL (single node), MySQL (single)
-> Consistance + Disponibilite
-> Sacrifice: pas de tolerance partitions
| Besoin | Base recommandee | Pourquoi |
| Transactions ACID | PostgreSQL | Consistance forte, SQL standard |
| Documents flexibles | MongoDB | Schema dynamique, scale horizontal |
| Cache ultra-rapide | Redis | In-memory, sub-milliseconde |
| Ecriture massive | Cassandra | Lineairement scalable, multi-DC |
| Relations complexes | Neo4j | Traversal de graphes natif |
| Time series | TimescaleDB | PostgreSQL + hypertables |
| Full-text search | Elasticsearch | Index inverse, relevance scoring |
| SQL distribue | CockroachDB | SQL + scale horizontal + ACID |
Arbre de decision Database
============================
Donnees relationnelles ?
|-- OUI --> Besoin de scale horizontal ?
| |-- OUI --> CockroachDB / TiDB
| |-- NON --> PostgreSQL / MySQL
|
|-- NON --> Type de donnees ?
|-- Documents --> MongoDB
|-- Key-Value --> Redis
|-- Colonnes larges --> Cassandra
|-- Graphes --> Neo4j
|-- Time Series --> TimescaleDB
|-- Texte/Search --> Elasticsearch
Data Architect Academy - Phase 1 Fondamentaux Data - Cheatsheet v2.0 (19 sections)