RAG: Principe & Architecture
Le Retrieval-Augmented Generation (RAG) est devenu le pattern architectural dominant pour les applications d'IA générative en entreprise. Cette leçon explore pourquoi RAG révolutionne l'utilisation des LLMs avec vos données propriétaires.
Objectifs de la Leçon
- Comprendre les limites des LLMs seuls et pourquoi RAG est nécessaire
- Maîtriser l'architecture Retrieve-Augment-Generate
- Identifier les cas d'usage optimaux pour RAG vs Fine-Tuning
- Construire votre premier pipeline RAG simple
Le Problème: Pourquoi les LLMs Seuls Ne Suffisent Pas
Imaginez demander à GPT-4 "Quel est le chiffre d'affaires de notre entreprise ce trimestre?" Il ne peut pas répondre car ces données ne sont pas dans ses paramètres. RAG résout ce problème en permettant au LLM de "consulter" vos documents en temps réel.
L'Architecture RAG: Retrieve-Augment-Generate
┌────────────────────────────────────────────────────────────────────┐
│ PIPELINE RAG COMPLET │
└────────────────────────────────────────────────────────────────────┘
USER QUERY: "Quelle est notre politique de remboursement?"
│
↓
┌──────────────────────────────────────────────────────────────────┐
│ PHASE 1: RETRIEVE (Retrieval) │
│ │
│ Query → Embedding → Vector Search → Top-K Documents │
│ [0.2, 0.8, ...] ──→ Qdrant/Chroma ──→ 5 chunks pertinents │
└──────────────────────────────────────────────────────────────────┘
│
↓
┌──────────────────────────────────────────────────────────────────┐
│ PHASE 2: AUGMENT (Context Building) │
│ │
│ Construit le prompt enrichi: │
│ """ │
│ Contexte: [Doc 1: politique retours...] [Doc 2: délais...] │
│ Question: Quelle est notre politique de remboursement? │
│ Réponds en te basant UNIQUEMENT sur le contexte fourni. │
│ """ │
└──────────────────────────────────────────────────────────────────┘
│
↓
┌──────────────────────────────────────────────────────────────────┐
│ PHASE 3: GENERATE (LLM Response) │
│ │
│ LLM → Génère réponse fondée sur documents → Response + Sources │
│ "Selon notre politique (doc-42.pdf), remboursements sous 30j..." │
└──────────────────────────────────────────────────────────────────┘
│
↓
RESPONSE avec sources traçables
RAG vs Fine-Tuning vs Prompting: La Matrice de Décision
| Critère | Prompting Avancé | RAG | Fine-Tuning |
|---|---|---|---|
| Coût Initial | 50-200€ (API) | 500-2000€ (infra) | 5000-50000€ (GPU+données) |
| Temps Setup | Heures | 1-3 jours | 2-4 semaines |
| Données Changeantes | ✅ Bon (via few-shot) | ✅✅ Excellent (update temps réel) | ❌ Nécessite retraining |
| Précision Domaine | 60-75% | 80-92% | 90-98% |
| Explicabilité | ⭐⭐⭐ (voir prompt) | ⭐⭐⭐⭐⭐ (sources traçables) | ⭐ (boîte noire) |
| Volume Données | 5-50 exemples | 100+ documents | 1000-100K+ exemples |
| Compliance/Audit | ⭐⭐ | ⭐⭐⭐⭐⭐ (parfait RGPD) | ⭐⭐ (données dans modèle) |
| Latence | 50-300ms | 200-800ms (retrieval overhead) | 50-300ms |
Votre Premier Pipeline RAG en Python
# Pipeline RAG minimal avec LangChain et Chroma
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.llms import Ollama
# 1. CHARGER les documents
loader = TextLoader("documentation.txt")
documents = loader.load()
# 2. SPLITTER en chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # Taille chunk optimal: 300-800 tokens
chunk_overlap=50, # Overlap pour contexte continu
separators=["\n\n", "\n", " ", ""]
)
chunks = text_splitter.split_documents(documents)
print(f"Documents splitté en {len(chunks)} chunks")
# 3. EMBEDDINGS: Transformer texte en vecteurs
embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2",
model_kwargs={'device': 'cpu'}
)
# 4. VECTOR STORE: Créer base vectorielle
vectorstore = Chroma.from_documents(
documents=chunks,
embedding=embeddings,
persist_directory="./chroma_db"
)
print("Base vectorielle créée!")
# 5. RETRIEVER: Configurer recherche
retriever = vectorstore.as_retriever(
search_type="similarity", # similarity, mmr, similarity_score_threshold
search_kwargs={"k": 3} # Top-3 documents
)
# 6. LLM: Modèle génératif
llm = Ollama(model="llama3.1:8b", temperature=0.1)
# 7. CHAIN RAG: Connecter retriever + LLM
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # "stuff", "map_reduce", "refine", "map_rerank"
retriever=retriever,
return_source_documents=True
)
# 8. QUERY: Poser question
query = "Quelle est la politique de remboursement?"
result = qa_chain({"query": query})
print("\n=== RÉPONSE ===")
print(result['result'])
print("\n=== SOURCES ===")
for i, doc in enumerate(result['source_documents'], 1):
print(f"\nSource {i}:")
print(doc.page_content[:200] + "...")
Les 3 Générations de RAG
1. Naive RAG (2020-2022)
Architecture la plus simple: embed → retrieve → generate. Fonctionne pour des cas simples mais souffre de problèmes de précision et de recall.
# Naive RAG: Embed → Retrieve → Generate
query = "Comment configurer X?"
embedded_query = embed(query)
docs = vector_db.search(embedded_query, k=5)
prompt = f"Context: {docs}\nQuestion: {query}"
answer = llm(prompt)
2. Advanced RAG (2023)
Ajoute pre-retrieval (query rewriting, HyDE) et post-retrieval (reranking, fusion). Améliore la qualité de 15-30%.
# Advanced RAG avec Query Rewriting et Reranking
original_query = "Comment configurer X?"
# Pre-retrieval: Améliorer la query
rewritten_queries = [
"Configuration de X: étapes",
"Guide configuration X",
"X setup tutorial"
]
# Retrieval multiple
all_docs = []
for q in rewritten_queries:
docs = vector_db.search(embed(q), k=3)
all_docs.extend(docs)
# Post-retrieval: Reranking avec cross-encoder
reranked_docs = cross_encoder.rerank(original_query, all_docs)
top_docs = reranked_docs[:5]
# Generate avec meilleurs documents
answer = llm(f"Context: {top_docs}\nQuestion: {original_query}")
3. Modular RAG (2024+)
Architecture modulaire avec routing, agents, et retrieval adaptatif. Le système décide quelle stratégie utiliser selon le contexte.
MODULAR RAG ARCHITECTURE
User Query → Query Analyzer
│
┌───────────┼───────────┐
↓ ↓ ↓
Factual? Complex? Recent?
│ │ │
↓ ↓ ↓
Vector DB Multi-hop Web Search
Retrieval Reasoning + Cache
│ │ │
└───────────┼───────────┘
↓
Response Synthesis
Cas d'Usage RAG en Entreprise
| Secteur | Use Case | Bénéfices |
|---|---|---|
| Juridique | Recherche dans contrats, jurisprudence | 90% temps gagné sur recherche manuelle |
| Support Client | Assistant basé sur docs produit, FAQ, tickets | 70% tickets auto-résolus |
| Finance | Analyse rapports, réglementations, compliance | Audit temps réel, risque réduit |
| RH | Chatbot politiques internes, onboarding | 50% questions RH automatisées |
| Santé | Recherche littérature médicale, dossiers patients | Diagnostic assisté, conformité HIPAA |
| Développement | Search codebase, documentation technique | 30% productivité devs |
Métriques Clés pour Évaluer votre RAG
- Retrieval Precision: % documents retournés qui sont pertinents (objectif: >80%)
- Retrieval Recall: % documents pertinents qui sont retournés (objectif: >85%)
- Answer Relevance: La réponse répond-elle à la question? (objectif: >90%)
- Faithfulness: La réponse est-elle fidèle aux sources? (objectif: >95%)
- Context Precision: Les chunks retournés sont-ils tous utiles? (objectif: >75%)
- Latence: Temps total retrieve + generate (objectif: <1s)
Lab: Construisez votre Premier RAG
Étape 1: Setup Environnement
# Installer dépendances
pip install langchain chromadb sentence-transformers ollama
# Pull modèle Ollama local
ollama pull llama3.1:8b
Étape 2: Préparer Documents
# Créer fichier documentation.txt avec contenu de test
cat > documentation.txt << EOF
Politique de Remboursement
Notre entreprise offre des remboursements complets sous 30 jours pour tout produit défectueux.
Les retours doivent être dans leur emballage d'origine avec preuve d'achat.
Contactez support@example.com pour initier un retour.
Heures de Support
Notre équipe support est disponible:
- Lundi-Vendredi: 9h-18h
- Weekend: 10h-16h
- Support 24/7 pour clients Premium
Garantie Produits
Tous nos produits incluent une garantie de 2 ans contre les défauts de fabrication.
La garantie ne couvre pas les dommages causés par une mauvaise utilisation.
EOF
Étape 3: Exécuter Pipeline RAG
Utilisez le code du premier exemple et testez avec ces questions:
- "Quel est le délai de remboursement?"
- "Quelles sont les heures de support le weekend?"
- "La garantie couvre-t-elle la casse accidentelle?"
Étape 4: Analyser les Sources
Vérifiez que chaque réponse cite les chunks corrects. C'est essentiel pour la confiance et l'audit.
Embeddings: La Clé du RAG
Les embeddings sont le fondement de tout système RAG. Ils transforment du texte en vecteurs numériques qui capturent le sens sémantique, permettant de trouver des documents similaires par calcul mathématique plutôt que par mot-clé.
Objectifs de la Leçon
- Comprendre comment les embeddings encodent le sens sémantique
- Maîtriser les métriques de similarité (cosine, dot product, euclidean)
- Choisir le bon modèle d'embedding pour votre use case
- Optimiser la qualité et la vitesse des embeddings en production
Qu'est-ce qu'un Embedding?
REPRÉSENTATION VECTORIELLE DU SENS
Texte Original:
"Le chat dort sur le tapis"
"Le félin sommeille sur le sol"
"La voiture roule vite"
↓ Embedding Model ↓
Vector Space (simplifié 2D, réel = 384-1536D):
^
│ ● "chat dort"
│ ● "félin sommeille" ← Proche = similaire
│
│
│ ● "voiture roule" ← Loin = différent
│
└─────────────────────────────────────→
Les Métriques de Similarité
1. Cosine Similarity (La Plus Utilisée)
Mesure l'angle entre deux vecteurs. Range de -1 (opposés) à +1 (identiques). Insensible à la magnitude.
import numpy as np
from numpy.linalg import norm
def cosine_similarity(vec1, vec2):
"""
Calcule similarité cosinus entre deux vecteurs
Range: -1 à +1 (1 = identique, 0 = orthogonal, -1 = opposé)
"""
return np.dot(vec1, vec2) / (norm(vec1) * norm(vec2))
# Exemple
vec_chat = np.array([0.8, 0.6, 0.1, 0.0]) # "Le chat dort"
vec_felin = np.array([0.75, 0.65, 0.15, 0.05]) # "Le félin sommeille"
vec_voiture = np.array([0.1, 0.0, 0.9, 0.85]) # "La voiture roule"
print(f"Chat vs Félin: {cosine_similarity(vec_chat, vec_felin):.3f}")
# → 0.985 (très similaire!)
print(f"Chat vs Voiture: {cosine_similarity(vec_chat, vec_voiture):.3f}")
# → 0.123 (différent)
2. Dot Product (Plus Rapide)
Produit scalaire simple. Plus rapide que cosine mais sensible à la magnitude des vecteurs.
def dot_product(vec1, vec2):
"""
Produit scalaire: sum(a[i] * b[i])
Plus rapide que cosine mais magnitude-dependent
"""
return np.dot(vec1, vec2)
# Dot product favorise vecteurs de grande magnitude
# → Normaliser vos embeddings d'abord!
normalized_vec1 = vec_chat / norm(vec_chat)
normalized_vec2 = vec_felin / norm(vec_felin)
similarity = dot_product(normalized_vec1, normalized_vec2)
print(f"Dot Product (normalized): {similarity:.3f}")
3. Euclidean Distance (Distance L2)
Distance "géométrique" entre points. Utile pour clustering mais moins pour RAG.
def euclidean_distance(vec1, vec2):
"""
Distance euclidienne: sqrt(sum((a[i] - b[i])^2))
Plus petit = plus similaire (inverse des autres)
"""
return norm(vec1 - vec2)
distance = euclidean_distance(vec_chat, vec_felin)
print(f"Euclidean Distance: {distance:.3f}")
# Plus petit = plus proche
| Métrique | Range | Vitesse | Use Case |
|---|---|---|---|
| Cosine | -1 à +1 | ⭐⭐⭐ | RAG, search sémantique (défaut) |
| Dot Product | -∞ à +∞ | ⭐⭐⭐⭐⭐ | Si embeddings normalisés |
| Euclidean | 0 à +∞ | ⭐⭐⭐⭐ | Clustering, anomaly detection |
Panorama des Modèles d'Embedding
| Modèle | Dimensions | Performance | Vitesse | Use Case |
|---|---|---|---|---|
| all-MiniLM-L6-v2 | 384 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Développement, prototypes rapides |
| all-mpnet-base-v2 | 768 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Balance qualité/vitesse |
| bge-large-en-v1.5 | 1024 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Production, haute précision |
| gte-large | 1024 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Multilingual, retrieval |
| OpenAI text-embedding-3-small | 1536 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | API, pas de GPU local |
| OpenAI text-embedding-3-large | 3072 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | Production, budget API |
| Cohere embed-multilingual-v3 | 1024 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 100+ langues, international |
Implémentation: Comparer Différents Modèles
from sentence_transformers import SentenceTransformer
import numpy as np
from time import time
# Charger plusieurs modèles pour comparaison
models = {
"MiniLM": SentenceTransformer('all-MiniLM-L6-v2'),
"MPNet": SentenceTransformer('all-mpnet-base-v2'),
"BGE": SentenceTransformer('BAAI/bge-large-en-v1.5')
}
# Corpus de test
documents = [
"Le chat dort paisiblement sur le canapé",
"Le félin sommeille tranquillement sur le sofa",
"La voiture roule rapidement sur l'autoroute",
"Le véhicule circule vite sur la route"
]
query = "Où dort le chat?"
print("=== COMPARAISON MODÈLES D'EMBEDDING ===\n")
for model_name, model in models.items():
print(f"\n{model_name} (dimensions: {model.get_sentence_embedding_dimension()})")
print("-" * 60)
# Mesurer temps d'embedding
start = time()
doc_embeddings = model.encode(documents)
query_embedding = model.encode([query])[0]
elapsed = time() - start
print(f"Temps: {elapsed:.3f}s pour {len(documents)} docs")
# Calculer similarités
similarities = []
for i, doc_emb in enumerate(doc_embeddings):
sim = np.dot(query_embedding, doc_emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(doc_emb)
)
similarities.append((documents[i][:50], sim))
# Trier par similarité
similarities.sort(key=lambda x: x[1], reverse=True)
print("\nTop-3 documents similaires:")
for i, (doc, sim) in enumerate(similarities[:3], 1):
print(f" {i}. [{sim:.3f}] {doc}...")
# Résultats attendus:
# MiniLM: Rapide mais peut confondre "chat" et "voiture"
# MPNet: Meilleur équilibre
# BGE: Plus précis, distingue bien sémantique
Embeddings Multilingues
Pour applications internationales, utilisez des modèles entraînés sur 50+ langues:
from sentence_transformers import SentenceTransformer
# Modèles multilingues performants
multilingual_models = {
"paraphrase-multilingual-mpnet-base-v2": "50+ langues",
"LaBSE": "109 langues (Google)",
"distiluse-base-multilingual-cased-v2": "50+ langues, rapide"
}
model = SentenceTransformer('paraphrase-multilingual-mpnet-base-v2')
# Tester sur plusieurs langues
texts = [
"The cat is sleeping", # Anglais
"Le chat dort", # Français
"El gato duerme", # Espagnol
"Die Katze schläft", # Allemand
"The car is fast" # Anglais (différent)
]
embeddings = model.encode(texts)
# Vérifier que phrases similaires en différentes langues sont proches
print("Similarités cross-lingues:")
for i in range(4): # 4 premières sont "chat dort"
sim = cosine_similarity(embeddings[0], embeddings[i])
print(f"EN vs {texts[i][:20]}: {sim:.3f}")
# Output attendu:
# EN vs EN: 1.000
# EN vs FR: 0.85-0.92
# EN vs ES: 0.85-0.92
# EN vs DE: 0.85-0.92
# EN vs "car": 0.30-0.45 (différent!)
Embeddings Spécialisés
| Domaine | Modèle Recommandé | Avantage |
|---|---|---|
| Code Source | microsoft/codebert-base | Comprend syntaxe programming |
| Médical | cambridgeltl/BioRedditBERT-uncased | Vocabulaire médical |
| Juridique | nlpaueb/legal-bert-base-uncased | Terminologie légale |
| Finance | ProsusAI/finbert | Contexte financier |
| Scientifique | allenai/scibert_scivocab_uncased | Publications académiques |
Optimisations Performance
1. Quantization: Réduire Taille des Embeddings
import numpy as np
def quantize_embeddings(embeddings, bits=8):
"""
Quantize embeddings float32 → int8/int16
Réduit taille de 4x (32bit → 8bit)
Perte précision minime (<2%) mais gain énorme stockage/vitesse
"""
# Normaliser à [-1, 1]
embeddings_norm = embeddings / np.abs(embeddings).max(axis=1, keepdims=True)
if bits == 8:
# Quantize to int8 [-127, 127]
quantized = (embeddings_norm * 127).astype(np.int8)
elif bits == 16:
# Quantize to int16 [-32767, 32767]
quantized = (embeddings_norm * 32767).astype(np.int16)
return quantized
# Exemple: Réduire 1M embeddings de 1536D
original_size = 1_000_000 * 1536 * 4 # float32 = 4 bytes
quantized_size = 1_000_000 * 1536 * 1 # int8 = 1 byte
print(f"Taille originale: {original_size / 1e9:.2f} GB")
print(f"Taille quantized: {quantized_size / 1e9:.2f} GB")
print(f"Compression: {original_size / quantized_size:.1f}x")
# Output:
# Taille originale: 6.14 GB
# Taille quantized: 1.54 GB
# Compression: 4.0x
2. Matryoshka Embeddings: Dimensions Flexibles
# Modèle Matryoshka (ex: nomic-embed-text)
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('nomic-ai/nomic-embed-text-v1')
text = "Le chat dort sur le canapé"
full_embedding = model.encode([text])[0] # 768 dimensions
# Tronquer à différentes dimensions (toujours utilisable!)
dims_to_test = [64, 128, 256, 512, 768]
for dim in dims_to_test:
truncated = full_embedding[:dim]
storage_reduction = (1 - dim/768) * 100
print(f"{dim}D: {storage_reduction:.1f}% storage saved")
# Résultats typiques:
# 64D: 91.7% storage saved, ~85% de la précision
# 128D: 83.3% saved, ~92% précision
# 256D: 66.7% saved, ~96% précision
# 512D: 33.3% saved, ~98% précision
3. Batch Processing: Embedder Massivement
from sentence_transformers import SentenceTransformer
from tqdm import tqdm
import numpy as np
model = SentenceTransformer('all-MiniLM-L6-v2')
# Simuler 100K documents
documents = [f"Document {i} contient du texte..." for i in range(100_000)]
# ❌ MAUVAIS: Un par un (très lent)
# embeddings = [model.encode([doc])[0] for doc in documents]
# ✅ BON: Par batch
batch_size = 256 # Optimal: 128-512 selon RAM/GPU
embeddings = []
for i in tqdm(range(0, len(documents), batch_size)):
batch = documents[i:i + batch_size]
batch_embeddings = model.encode(
batch,
batch_size=batch_size,
show_progress_bar=False,
convert_to_numpy=True,
normalize_embeddings=True # Pré-normaliser pour dot product
)
embeddings.append(batch_embeddings)
embeddings = np.vstack(embeddings)
print(f"Embedded {len(embeddings)} documents")
print(f"Shape: {embeddings.shape}")
print(f"Memory: {embeddings.nbytes / 1e6:.2f} MB")
# Avec GPU: 50-100x plus rapide qu'un par un!
Lab: Benchmark d'Embeddings
Objectif
Comparer 3 modèles d'embedding sur votre dataset et mesurer qualité + vitesse.
Setup
pip install sentence-transformers scikit-learn tqdm
Code de Benchmark
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from time import time
# Définir dataset de test
queries = [
"Comment réinitialiser mon mot de passe?",
"Quels sont les horaires d'ouverture?",
"Politique de remboursement"
]
documents = [
"Pour réinitialiser votre mot de passe, cliquez sur 'Mot de passe oublié'",
"Nous sommes ouverts du lundi au vendredi de 9h à 18h",
"Les remboursements sont possibles sous 30 jours",
"Notre politique de confidentialité protège vos données",
"Contactez le support à support@example.com"
]
# Vérité terrain (quel doc répond à quelle query)
ground_truth = {
0: [0], # Query 0 → Doc 0
1: [1], # Query 1 → Doc 1
2: [2] # Query 2 → Doc 2
}
models_to_test = [
'all-MiniLM-L6-v2',
'all-mpnet-base-v2',
'BAAI/bge-base-en-v1.5'
]
for model_name in models_to_test:
print(f"\n{'='*60}")
print(f"Modèle: {model_name}")
print('='*60)
model = SentenceTransformer(model_name)
# Mesurer vitesse
start = time()
doc_embs = model.encode(documents)
query_embs = model.encode(queries)
elapsed = time() - start
print(f"Temps embedding: {elapsed:.3f}s")
print(f"Dimensions: {doc_embs.shape[1]}")
# Évaluer qualité
correct = 0
total = len(queries)
for q_idx, query_emb in enumerate(query_embs):
# Trouver doc le plus similaire
sims = cosine_similarity([query_emb], doc_embs)[0]
predicted_doc = np.argmax(sims)
print(f"\nQuery {q_idx}: {queries[q_idx][:40]}")
print(f" Predicted: Doc {predicted_doc} (sim: {sims[predicted_doc]:.3f})")
print(f" Expected: Doc {ground_truth[q_idx][0]}")
if predicted_doc in ground_truth[q_idx]:
correct += 1
print(" ✅ Correct")
else:
print(" ❌ Incorrect")
accuracy = correct / total * 100
print(f"\nAccuracy: {accuracy:.1f}%")
Interprétation
Analysez les résultats:
- Quel modèle a la meilleure accuracy?
- Quel modèle est le plus rapide?
- Quel est le meilleur compromis pour votre use case?
Chunking Strategies
Le chunking est l'art de découper vos documents en morceaux optimaux pour le retrieval. Un mauvais chunking peut détruire la performance de votre RAG, même avec les meilleurs embeddings et LLMs.
Objectifs de la Leçon
- Comprendre pourquoi le chunking est critique pour RAG
- Maîtriser les 5 stratégies principales (fixed, semantic, recursive, sentence, document-aware)
- Choisir la bonne taille de chunk selon votre use case
- Implémenter chunk overlap et metadata enrichment
Pourquoi le Chunking est Crucial?
- Chunks trop petits: Manquent de contexte, retrieval incomplet
- Chunks trop grands: Dilution sémantique, mauvais ranking
- Optimal: Balance contextuelle qui capture une "idée complète"
IMPACT DE LA TAILLE DE CHUNK
Document: 5000 tokens
┌─────────────────────────────────────────────────────────┐
│ Chunk Size = 100 tokens (TROP PETIT) │
├─────────────────────────────────────────────────────────┤
│ [Chunk 1: 100t] [Chunk 2: 100t] ... [Chunk 50: 100t] │
│ │
│ ❌ Problème: Contexte fragmenté │
│ "Le président a annoncé..." → Quel président? │
│ Phrase coupée en milieu │
│ Retrieval peu pertinent │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Chunk Size = 2000 tokens (TROP GRAND) │
├─────────────────────────────────────────────────────────┤
│ [Chunk 1: 2000t] [Chunk 2: 2000t] [Chunk 3: 1000t] │
│ │
│ ❌ Problème: Dilution sémantique │
│ Un chunk parle de 5 sujets différents │
│ Embedding "moyenné" sur tous les sujets │
│ Mauvais retrieval (trop générique) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ Chunk Size = 400-600 tokens (OPTIMAL) │
├─────────────────────────────────────────────────────────┤
│ [Chunk 1: 500t] [Chunk 2: 550t] ... [Chunk 10: 450t] │
│ │
│ ✅ Sweet Spot: 1-2 paragraphes │
│ Contexte suffisant │
│ Focus sémantique précis │
│ Bon retrieval │
└─────────────────────────────────────────────────────────┘
Stratégie 1: Fixed-Size Chunking
Méthode la plus simple: découper tous les N caractères/tokens. Rapide mais peut couper au milieu des phrases.
from langchain.text_splitter import CharacterTextSplitter
# Fixed-size chunking
text_splitter = CharacterTextSplitter(
separator=" ", # Séparer sur espaces (pas milieu de mots)
chunk_size=500, # 500 caractères par chunk
chunk_overlap=50, # 50 caractères overlap entre chunks
length_function=len
)
document = """
L'intelligence artificielle transforme les entreprises modernes.
Les systèmes RAG permettent aux LLMs d'accéder à des données à jour.
Cette technologie combine retrieval et génération pour créer des réponses précises.
Les applications incluent support client, recherche documentaire, et analyse.
"""
chunks = text_splitter.split_text(document)
for i, chunk in enumerate(chunks):
print(f"Chunk {i} ({len(chunk)} chars):")
print(chunk)
print("-" * 60)
# Avantages: Simple, rapide, prévisible
# Inconvénients: Peut couper contexte logique, ne respecte pas structure
Stratégie 2: Recursive Character Splitting (Recommandé)
Divise récursivement par différents séparateurs (paragraphes → lignes → phrases → mots). Préserve mieux la structure.
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Recursive splitting: Meilleure préservation contexte
text_splitter = RecursiveCharacterTextSplitter(
# Liste de séparateurs, testés dans l'ordre
separators=[
"\n\n", # Paragraphes d'abord
"\n", # Puis lignes
". ", # Puis phrases
" ", # Puis mots
"" # Puis caractères (dernier recours)
],
chunk_size=500,
chunk_overlap=50,
length_function=len
)
document = """
# Introduction au RAG
Le RAG (Retrieval-Augmented Generation) est une architecture qui combine:
- Retrieval: Recherche dans base vectorielle
- Augmentation: Enrichissement du prompt
- Generation: Production par LLM
## Avantages
Le RAG offre plusieurs bénéfices:
1. Accès à données à jour
2. Sources traçables
3. Pas de retraining nécessaire
## Applications
Les use cases incluent:
- Support client automatisé
- Recherche documentaire intelligente
- Assistant interne entreprise
"""
chunks = text_splitter.split_text(document)
print(f"Document splitté en {len(chunks)} chunks\n")
for i, chunk in enumerate(chunks):
print(f"=== CHUNK {i} ===")
print(f"Longueur: {len(chunk)} caractères")
print(chunk[:200] + "..." if len(chunk) > 200 else chunk)
print()
# ✅ Avantages:
# - Respecte structure markdown/text
# - Ne coupe pas au milieu des phrases
# - Maintient hiérarchie logique
# ⚠️ Limites:
# - Ne comprend pas sémantique
# - Peut séparer info liée sur plusieurs paragraphes
Stratégie 3: Semantic Chunking
Découpe basée sur similarité sémantique. Groupe les phrases similaires ensemble, crée nouveau chunk quand le sujet change.
from langchain_experimental.text_splitter import SemanticChunker
from langchain.embeddings import HuggingFaceEmbeddings
# Semantic chunking: Découpe basée sur changements de sujet
embeddings = HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2"
)
semantic_chunker = SemanticChunker(
embeddings=embeddings,
breakpoint_threshold_type="percentile", # "percentile", "standard_deviation", "interquartile"
breakpoint_threshold_amount=90 # Seuil de changement sémantique (0-100)
)
document = """
Le machine learning transforme l'industrie. Les algorithmes apprennent des patterns dans les données.
Les entreprises utilisent ML pour prédire la demande.
Le changement climatique menace notre planète. Les températures augmentent chaque année.
Il est urgent d'agir pour réduire les émissions.
Python est le langage préféré pour la data science. Pandas permet de manipuler les données.
NumPy offre des opérations vectorielles efficaces.
"""
chunks = semantic_chunker.split_text(document)
print(f"Semantic chunking: {len(chunks)} chunks\n")
for i, chunk in enumerate(chunks):
print(f"=== CHUNK SÉMANTIQUE {i} ===")
print(chunk)
print()
# Résultat attendu:
# Chunk 0: Tout sur ML (sujet cohérent)
# Chunk 1: Tout sur climat (sujet change)
# Chunk 2: Tout sur Python (sujet change)
# ✅ Avantages:
# - Chunks sémantiquement cohérents
# - Préserve sujets complets
# - Meilleure qualité retrieval
# ⚠️ Limites:
# - Plus lent (calcule embeddings)
# - Taille chunks variable
# - Peut créer chunks très grands/petits
Stratégie 4: Sentence-Based Chunking
Découpe par phrases, puis regroupe jusqu'à atteindre taille cible. Garantit que les chunks ne coupent jamais au milieu d'une phrase.
import spacy
from typing import List
# Charger modèle spacy pour segmentation phrases
nlp = spacy.load("fr_core_news_sm") # ou "en_core_web_sm" pour anglais
def sentence_chunker(text: str, target_chunk_size: int = 500) -> List[str]:
"""
Découpe texte en chunks basés sur phrases complètes
Ne coupe JAMAIS au milieu d'une phrase
"""
doc = nlp(text)
sentences = [sent.text.strip() for sent in doc.sents]
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# Si ajouter cette phrase dépasse target, créer nouveau chunk
if current_size + sentence_size > target_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = [sentence]
current_size = sentence_size
else:
current_chunk.append(sentence)
current_size += sentence_size
# Ajouter dernier chunk
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
# Exemple
text = """
Le RAG est une architecture puissante. Il combine retrieval et génération.
Les entreprises l'adoptent rapidement. La précision est excellente.
Les coûts sont maîtrisés. L'implémentation est accessible.
Les résultats sont mesurables. L'adoption continue de croître.
"""
chunks = sentence_chunker(text, target_chunk_size=150)
print(f"Sentence chunking: {len(chunks)} chunks\n")
for i, chunk in enumerate(chunks):
print(f"Chunk {i} ({len(chunk)} chars):")
print(chunk)
print()
# ✅ Avantages:
# - Jamais de phrase coupée
# - Lisibilité optimale
# - Contexte préservé
# ⚠️ Limites:
# - Nécessite modèle spacy
# - Phrases longues peuvent créer chunks trop grands
Stratégie 5: Document-Aware Chunking
Respecte la structure du document (headings, sections, code blocks). Idéal pour Markdown, code, ou documents structurés.
from langchain.text_splitter import MarkdownHeaderTextSplitter
# Document-aware: Respecte structure markdown
markdown_text = """
# Guide RAG
## Introduction
Le RAG combine retrieval et génération.
### Architecture
Le pipeline comprend trois phases:
- Retrieve: Recherche vectorielle
- Augment: Construction contexte
- Generate: Réponse LLM
## Implémentation
### Setup
Installez les dépendances:
```bash
pip install langchain chromadb
```
### Code
Voici un exemple minimal:
```python
from langchain import RAG
rag = RAG(model="llama3")
```
## Conclusion
RAG est essentiel en 2024.
"""
# Splitter basé sur headers markdown
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
markdown_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=headers_to_split_on
)
chunks = markdown_splitter.split_text(markdown_text)
print(f"Document-aware chunking: {len(chunks)} chunks\n")
for i, chunk in enumerate(chunks):
print(f"=== CHUNK {i} ===")
print(f"Metadata: {chunk.metadata}")
print(f"Content:\n{chunk.page_content[:200]}...")
print()
# Résultat: Chaque chunk a metadata avec hiérarchie headers
# Permet filtrage et contexte enrichi
# ✅ Avantages:
# - Préserve structure document
# - Metadata riches (headers path)
# - Idéal pour docs techniques
# ⚠️ Limites:
# - Spécifique à markdown
# - Chunks taille très variable
Chunk Overlap: Pourquoi et Comment?
OVERLAP ENTRE CHUNKS
Sans Overlap (❌):
[Chunk 1: 0-500] [Chunk 2: 500-1000] [Chunk 3: 1000-1500]
↑
Info coupée ici!
Avec Overlap 50 chars (✅):
[Chunk 1: 0-500]
[Chunk 2: 450-950]
[Chunk 3: 900-1400]
→ Info à cheval entre chunks est capturée par les deux
from langchain.text_splitter import RecursiveCharacterTextSplitter
# Overlap optimal: 10-20% de chunk_size
chunk_size = 500
overlap = 100 # 20% overlap
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap,
separators=["\n\n", "\n", ". ", " "]
)
# Visualiser overlap
text = "A" * 500 + "B" * 500 + "C" * 500
chunks = splitter.split_text(text)
print(f"Nombre de chunks: {len(chunks)}")
for i, chunk in enumerate(chunks):
print(f"Chunk {i}: {len(chunk)} chars, commence par '{chunk[0]}', finit par '{chunk[-1]}'")
# Output attendu:
# Chunk 0: 500 chars, commence par 'A', finit par 'A'
# Chunk 1: 500 chars, commence par 'A', finit par 'B' ← Overlap avec chunk 0
# Chunk 2: 500 chars, commence par 'B', finit par 'C' ← Overlap avec chunk 1
Comparaison des Stratégies
| Stratégie | Vitesse | Qualité | Use Case | Taille Recommandée |
|---|---|---|---|---|
| Fixed-Size | ⭐⭐⭐⭐⭐ | ⭐⭐ | Prototypage rapide | 400-600 chars |
| Recursive | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Usage général (DÉFAUT) | 500-800 chars |
| Semantic | ⭐⭐ | ⭐⭐⭐⭐⭐ | Haute précision requise | Variable (300-1000) |
| Sentence-Based | ⭐⭐⭐ | ⭐⭐⭐⭐ | FAQ, support client | 300-500 chars (2-4 phrases) |
| Document-Aware | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Docs techniques, code | Variable selon structure |
Guidelines par Type de Document
| Type Document | Stratégie | Chunk Size | Overlap |
|---|---|---|---|
| FAQ / Q&A | Sentence-Based | 200-400 chars (1-2 Q&A) | 50 chars |
| Documentation Technique | Document-Aware (MD) | 600-1000 chars | 100-150 chars |
| Articles Longs | Semantic | 800-1200 chars | 150-200 chars |
| Code Source | Document-Aware (fonction/classe) | 500-800 tokens | 50-100 tokens |
| Emails / Messages | Fixed ou Recursive | 300-500 chars | 50 chars |
| Contrats Juridiques | Sentence-Based | 400-600 chars | 100 chars |
| Rapports Financiers | Document-Aware (sections) | 700-1000 chars | 150 chars |
Metadata Enrichment: Le Secret des RAG Avancés
from langchain.schema import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from datetime import datetime
# Splitter avec metadata preservation
splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
# Document avec metadata riches
doc_text = """
Le RAG permet d'accéder à des données à jour.
Les entreprises l'adoptent pour support client et recherche documentaire.
"""
# Créer Document avec metadata
document = Document(
page_content=doc_text,
metadata={
"source": "guide_rag_v2.pdf",
"page": 15,
"section": "3.2 Applications Entreprise",
"author": "Jean Dupont",
"date": "2024-01-15",
"doc_type": "technical_guide",
"language": "fr",
"version": "2.0"
}
)
# Split preserve metadata
chunks = splitter.split_documents([document])
print(f"Nombre de chunks: {len(chunks)}\n")
for i, chunk in enumerate(chunks):
print(f"=== CHUNK {i} ===")
print(f"Content: {chunk.page_content[:100]}...")
print(f"Metadata: {chunk.metadata}")
print()
# ✅ Avantages metadata:
# 1. Filtrage: "Cherche dans docs de Jean uniquement"
# 2. Ranking: Préférer docs récents
# 3. Citation: "Source: guide_rag_v2.pdf, page 15"
# 4. Versioning: Ignorer vieilles versions
# 5. Multi-tenancy: Filter by client/org
Lab: Tester 5 Stratégies de Chunking
Objectif
Comparer les 5 stratégies sur votre propre document et mesurer la qualité du retrieval.
Setup
pip install langchain langchain-experimental spacy sentence-transformers chromadb
# Télécharger modèle spacy français
python -m spacy download fr_core_news_sm
Code de Comparaison
Créez un fichier test_document.txt avec ~2000 mots, puis exécutez:
from langchain.text_splitter import (
CharacterTextSplitter,
RecursiveCharacterTextSplitter,
MarkdownHeaderTextSplitter
)
from langchain_experimental.text_splitter import SemanticChunker
from langchain.embeddings import HuggingFaceEmbeddings
# Charger document
with open("test_document.txt") as f:
text = f.read()
print(f"Document: {len(text)} caractères\n")
# Test 5 stratégies
strategies = {
"Fixed-Size": CharacterTextSplitter(
separator=" ",
chunk_size=500,
chunk_overlap=50
),
"Recursive": RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
),
"Semantic": SemanticChunker(
embeddings=HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2"),
breakpoint_threshold_type="percentile"
)
}
for name, splitter in strategies.items():
chunks = splitter.split_text(text)
print(f"=== {name} ===")
print(f"Nombre de chunks: {len(chunks)}")
sizes = [len(c) for c in chunks]
print(f"Taille moyenne: {sum(sizes)/len(sizes):.0f} chars")
print(f"Taille min: {min(sizes)} chars")
print(f"Taille max: {max(sizes)} chars")
# Afficher 1er chunk
print(f"Premier chunk:\n{chunks[0][:200]}...")
print("-" * 60)
print()
# Analysez: Quelle stratégie produit chunks les plus cohérents?
Vector Databases: Le Coeur du RAG
Les vector databases sont spécialement conçues pour stocker et rechercher des embeddings efficacement. Cette leçon explore Qdrant, Chroma, Weaviate, pgvector, FAISS et Milvus.
Objectifs de la Leçon
- Comprendre l'architecture d'une vector database vs SQL classique
- Maîtriser les algorithmes ANN (Approximate Nearest Neighbors)
- Choisir la bonne vector DB selon votre use case
- Implémenter indexation et retrieval avec Qdrant et Chroma
Pourquoi les Bases Vectorielles?
VECTOR DATABASE vs TRADITIONAL DATABASE
SQL Database:
┌──────────────────────────────────┐
│ id │ name │ description │
├──────────────────────────────────┤
│ 1 │ Doc A │ Guide RAG... │
│ 2 │ Doc B │ Tutorial LLM... │
└──────────────────────────────────┘
Query: WHERE description LIKE '%RAG%'
→ Recherche lexicale, pas sémantique
Vector Database:
┌─────────────────────────────────────────────────────┐
│ id │ embedding (384D) │ metadata │
├─────────────────────────────────────────────────────┤
│ 1 │ [0.2, 0.8, ..., 0.3] │ {source: "A"} │
│ 2 │ [0.1, 0.6, ..., 0.9] │ {source: "B"} │
└─────────────────────────────────────────────────────┘
Query: embed("RAG architecture") → [0.25, 0.75, ...]
→ Recherche sémantique: trouve vecteurs similaires
Panorama des Vector Databases
| Database | Type | Scalabilité | Facilité | Use Case |
|---|---|---|---|---|
| Chroma | Embedded | ⭐⭐⭐ (100K-1M) | ⭐⭐⭐⭐⭐ | Prototypage, dev local |
| FAISS | Library | ⭐⭐⭐⭐ (1M-100M) | ⭐⭐ | Recherche haute perf, batch |
| Qdrant | Server | ⭐⭐⭐⭐⭐ (10M+) | ⭐⭐⭐⭐ | Production, filtering avancé |
| Weaviate | Server | ⭐⭐⭐⭐⭐ (10M+) | ⭐⭐⭐ | GraphQL, multi-modal |
| Pinecone | Cloud | ⭐⭐⭐⭐⭐ (100M+) | ⭐⭐⭐⭐⭐ | Managed, zero-ops |
| pgvector | Postgres ext | ⭐⭐⭐ (100K-1M) | ⭐⭐⭐⭐ | Si déjà Postgres |
| Milvus | Server | ⭐⭐⭐⭐⭐ (1B+) | ⭐⭐ | Enterprise, très large scale |
Chroma: La Plus Simple pour Débuter
# Chroma: Vector DB embedded, parfait pour prototypage
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
# 1. Créer embeddings model
embeddings = HuggingFaceEmbeddings(
model_name="all-MiniLM-L6-v2"
)
# 2. Créer documents
documents = [
Document(page_content="Le RAG combine retrieval et génération",
metadata={"source": "doc1", "page": 1}),
Document(page_content="Les embeddings capturent le sens sémantique",
metadata={"source": "doc2", "page": 5}),
Document(page_content="Chroma est une vector database simple",
metadata={"source": "doc3", "page": 12}),
]
# 3. Créer vector store
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
persist_directory="./chroma_db" # Persiste sur disque
)
print("✅ Vector store créée!")
# 4. Recherche par similarité
query = "Qu'est-ce qu'un embedding?"
results = vectorstore.similarity_search(
query,
k=2 # Top-2 résultats
)
print(f"\nQuery: {query}\n")
for i, doc in enumerate(results, 1):
print(f"{i}. {doc.page_content}")
print(f" Metadata: {doc.metadata}\n")
# 5. Recherche avec scores
results_with_scores = vectorstore.similarity_search_with_score(query, k=2)
for doc, score in results_with_scores:
print(f"Score: {score:.3f} | {doc.page_content[:50]}...")
# 6. Recherche avec filtre metadata
filtered_results = vectorstore.similarity_search(
query,
k=5,
filter={"source": "doc2"} # Seulement doc2
)
Qdrant: Production-Ready Vector DB
# Qdrant: Vector DB professionnelle avec filtering avancé
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
from sentence_transformers import SentenceTransformer
import uuid
# 1. Connexion Qdrant (local ou cloud)
client = QdrantClient(
url="http://localhost:6333" # Ou ":memory:" pour in-memory
)
# 2. Créer collection
collection_name = "rag_documents"
client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=384, # Dimension embeddings (dépend du modèle)
distance=Distance.COSINE # COSINE, DOT, EUCLID
)
)
# 3. Préparer embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')
documents = [
"Le RAG améliore les LLMs avec des données externes",
"Qdrant est optimisé pour la recherche vectorielle",
"Les vector databases utilisent des index HNSW"
]
embeddings = model.encode(documents)
# 4. Insérer points
points = [
PointStruct(
id=str(uuid.uuid4()),
vector=embeddings[i].tolist(),
payload={
"text": documents[i],
"source": f"doc_{i}",
"category": "technical" if i % 2 == 0 else "general",
"timestamp": "2024-01-15"
}
)
for i in range(len(documents))
]
client.upsert(
collection_name=collection_name,
points=points
)
print(f"✅ {len(points)} points insérés!\n")
# 5. Recherche simple
query = "Comment améliorer un LLM?"
query_vector = model.encode([query])[0]
search_results = client.search(
collection_name=collection_name,
query_vector=query_vector.tolist(),
limit=2
)
print(f"Query: {query}\n")
for result in search_results:
print(f"Score: {result.score:.3f}")
print(f"Text: {result.payload['text']}")
print(f"Source: {result.payload['source']}\n")
# 6. Recherche avec filtres (TRÈS PUISSANT)
from qdrant_client.models import Filter, FieldCondition, MatchValue
filtered_results = client.search(
collection_name=collection_name,
query_vector=query_vector.tolist(),
query_filter=Filter(
must=[
FieldCondition(
key="category",
match=MatchValue(value="technical")
)
]
),
limit=5
)
print("Résultats filtrés (catégorie=technical):")
for result in filtered_results:
print(f"- {result.payload['text']}")
pgvector: Si Vous Utilisez Déjà Postgres
# pgvector: Extension Postgres pour embeddings
import psycopg2
from sentence_transformers import SentenceTransformer
import numpy as np
# 1. Connexion Postgres
conn = psycopg2.connect(
host="localhost",
database="rag_db",
user="postgres",
password="password"
)
cur = conn.cursor()
# 2. Installer extension pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
# 3. Créer table avec colonne vector
cur.execute("""
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
content TEXT,
embedding vector(384), -- 384 dimensions
source VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
# 4. Créer index pour recherche rapide
cur.execute("""
CREATE INDEX IF NOT EXISTS documents_embedding_idx
ON documents
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
""")
conn.commit()
# 5. Insérer documents avec embeddings
model = SentenceTransformer('all-MiniLM-L6-v2')
documents = [
("Le RAG transforme les applications LLM", "guide.pdf"),
("pgvector permet vector search dans Postgres", "tutorial.pdf"),
("Les embeddings encodent le sens sémantique", "docs.pdf")
]
for content, source in documents:
embedding = model.encode([content])[0]
cur.execute("""
INSERT INTO documents (content, embedding, source)
VALUES (%s, %s, %s);
""", (content, embedding.tolist(), source))
conn.commit()
print(f"✅ {len(documents)} documents insérés!")
# 6. Recherche par similarité
query = "Qu'est-ce que le RAG?"
query_embedding = model.encode([query])[0]
cur.execute("""
SELECT
content,
source,
1 - (embedding <=> %s::vector) AS similarity
FROM documents
ORDER BY embedding <=> %s::vector
LIMIT 3;
""", (query_embedding.tolist(), query_embedding.tolist()))
print(f"\nQuery: {query}\n")
for content, source, similarity in cur.fetchall():
print(f"Similarity: {similarity:.3f}")
print(f"Content: {content}")
print(f"Source: {source}\n")
cur.close()
conn.close()
Algorithmes d'Index: HNSW, IVF, PQ
ALGORITHMES APPROXIMATE NEAREST NEIGHBORS (ANN)
1. HNSW (Hierarchical Navigable Small World)
┌──────────────────────────────────┐
│ Layer 2: [○────○────○] (sparse)│
│ ↓ ↓ ↓ │
│ Layer 1: [○─○─○─○─○─○] (medium) │
│ ↓ ↓ ↓ ↓ ↓ ↓ │
│ Layer 0: [○○○○○○○○○○○] (dense) │
└──────────────────────────────────┘
Navigation: Top → Bottom, saute entre layers
Performance: ⭐⭐⭐⭐⭐ (best recall/speed tradeoff)
Mémoire: Élevée (graph complet)
2. IVF (Inverted File Index)
┌──────────────────────────────────┐
│ Cluster 1: [○ ○ ○ ○] │
│ Cluster 2: [○ ○ ○] │
│ Cluster 3: [○ ○ ○ ○ ○] │
└──────────────────────────────────┘
Query → Trouve N clusters proches → Cherche dans clusters
Performance: ⭐⭐⭐⭐ (bon pour >1M vectors)
Mémoire: Moyenne
3. PQ (Product Quantization)
Vector [0.1, 0.8, 0.3, 0.5, ...]
↓ Compression
Codes [12, 45, 78, ...] (10x plus petit)
Performance: ⭐⭐⭐ (perd précision)
Mémoire: ⭐⭐⭐⭐⭐ (très compacte)
Comparaison Détaillée
| Critère | Chroma | FAISS | Qdrant | Pinecone | pgvector |
|---|---|---|---|---|---|
| Setup | pip install (5 min) | pip install (5 min) | Docker (10 min) | Cloud signup (2 min) | Postgres + ext (15 min) |
| Coût | Gratuit (local) | Gratuit (local) | Gratuit (local) / Cloud | $$ (0.096$/1M queries) | Gratuit (si Postgres) |
| Scalabilité | 100K vectors | 100M vectors | 10M+ vectors | 1B+ vectors | 1M vectors |
| Filtering | ⭐⭐ | ❌ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ (SQL) |
| Latence (p99) | 10-50ms | 5-20ms | 10-30ms | 50-100ms | 20-80ms |
| Hybrid Search | ❌ | ❌ | ✅ (BM25 + vector) | ✅ | ✅ (FTS + vector) |
| Multi-tenancy | ⭐⭐ | ❌ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
- Prototypage: Chroma (zéro config)
- Production <1M docs: Qdrant (excellent filtering)
- Production >10M docs: Pinecone (managed, scalable)
- Déjà Postgres: pgvector (réutilise infra)
- Recherche batch: FAISS (plus rapide)
Optimisations Performance
1. Tuning HNSW Parameters
from qdrant_client.models import VectorParams, HnswConfigDiff
# Configuration HNSW optimale selon use case
configs = {
"balanced": {
"m": 16, # Liens par node (défaut)
"ef_construct": 100 # Construction quality
},
"high_recall": {
"m": 32, # Plus de liens = meilleur recall
"ef_construct": 200 # Construction plus lente mais meilleure
},
"fast_build": {
"m": 8, # Moins de liens = build rapide
"ef_construct": 50 # Construction rapide
}
}
# Créer collection avec config custom
client.create_collection(
collection_name="optimized",
vectors_config=VectorParams(
size=384,
distance=Distance.COSINE
),
hnsw_config=HnswConfigDiff(
m=32,
ef_construct=200
)
)
# Search-time tuning
results = client.search(
collection_name="optimized",
query_vector=query_vector,
limit=10,
search_params={"hnsw_ef": 128} # Higher = better recall, slower
)
Lab: Benchmark Vector Databases
Objectif
Comparer Chroma, Qdrant et pgvector sur vitesse d'indexation, latence recherche, et recall@k.
Setup
# Setup environnement
pip install chromadb qdrant-client psycopg2-binary pgvector sentence-transformers
# Démarrer Qdrant (Docker)
docker run -p 6333:6333 qdrant/qdrant
# Setup Postgres avec pgvector
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=password ankane/pgvector
Code Benchmark
Testez insertion de 10K documents et 100 queries sur chaque DB. Mesurez temps et recall.
Indexation & Retrieval
Le retrieval est le coeur du RAG. Cette leçon explore les différentes stratégies: dense retrieval (embeddings), sparse retrieval (BM25), hybrid search, et Maximum Marginal Relevance (MMR) pour diversifier les résultats.
Objectifs de la Leçon
- Maîtriser dense vs sparse retrieval et leurs cas d'usage
- Implémenter hybrid search pour combiner sémantique et lexical
- Utiliser MMR pour éviter la redondance dans les résultats
- Optimiser les pipelines de retrieval pour précision et rappel
Dense Retrieval: Recherche par Embeddings
DENSE RETRIEVAL PIPELINE
Query: "comment réduire la latence?"
↓
Embedding Model (BERT, etc.)
↓
Query Vector: [0.2, 0.8, ..., 0.5] (384-1536 dimensions)
↓
Vector Database (Cosine Similarity)
↓
Top-K Documents (k=5):
1. "optimiser performance système" ← Match sémantique!
2. "réduire temps de réponse"
3. "améliorer vitesse application"
4. "latency optimization guide"
5. "cache strategies"
✅ Trouve documents même sans mot "latence" exact
# Dense Retrieval avec LangChain
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
# 1. Setup embeddings
embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True} # Important pour cosine
)
# 2. Documents
documents = [
Document(page_content="Optimiser les performances nécessite du caching",
metadata={"source": "perf-guide.pdf", "page": 12}),
Document(page_content="Réduire la latence avec CDN et compression",
metadata={"source": "perf-guide.pdf", "page": 34}),
Document(page_content="Le chat dort sur le tapis",
metadata={"source": "random.txt", "page": 1}),
Document(page_content="Monitoring système avec Prometheus",
metadata={"source": "ops.pdf", "page": 5}),
]
# 3. Créer vector store
vectorstore = Chroma.from_documents(
documents=documents,
embedding=embeddings,
persist_directory="./chroma_dense"
)
# 4. Dense retrieval
query = "comment améliorer la vitesse?"
results = vectorstore.similarity_search(query, k=3)
print(f"Query: {query}\n")
for i, doc in enumerate(results, 1):
print(f"{i}. {doc.page_content}")
print(f" Source: {doc.metadata['source']}\n")
# Résultat: Trouve docs sur performance/latence même sans mot "vitesse"
Sparse Retrieval: BM25 (Best Match 25)
# BM25 Retrieval avec rank_bm25
from rank_bm25 import BM25Okapi
import numpy as np
# 1. Corpus
documents = [
"Le RAG combine retrieval et génération",
"BM25 est un algorithme de ranking lexical",
"Les embeddings capturent le sens sémantique",
"Error code E404 indicates resource not found",
"Configuration de PostgreSQL pour production"
]
# 2. Tokenize documents
tokenized_docs = [doc.lower().split() for doc in documents]
# 3. Créer index BM25
bm25 = BM25Okapi(tokenized_docs)
# 4. Query
query = "error code E404"
tokenized_query = query.lower().split()
# 5. Calculer scores BM25
scores = bm25.get_scores(tokenized_query)
# 6. Ranking
top_n = 3
top_indices = np.argsort(scores)[::-1][:top_n]
print(f"Query: {query}\n")
for i, idx in enumerate(top_indices, 1):
print(f"{i}. [Score: {scores[idx]:.2f}] {documents[idx]}")
# Output:
# 1. [Score: 4.23] Error code E404 indicates resource not found
# 2. [Score: 0.45] BM25 est un algorithme...
# 3. [Score: 0.12] Le RAG combine...
# ✅ BM25 trouve exact match "E404" en premier
Hybrid Search: Le Meilleur des Deux Mondes
HYBRID SEARCH ARCHITECTURE
Query: "erreur E404 dans API REST"
↓
┌─────┴─────┐
↓ ↓
Dense Sparse
(BGE) (BM25)
↓ ↓
Score: 0.85 Score: 4.2
Docs: [A,B,C] Docs: [D,A,E]
↓ ↓
└─────┬─────┘
↓
Score Fusion
(RRF / Weighted)
↓
Final Ranking:
1. Doc A (dans les deux!) → Score combiné: 0.92
2. Doc D (BM25 #1) → 0.78
3. Doc B (Dense #2) → 0.71
# Hybrid Search: Dense + Sparse
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
# Documents
docs = [
Document(page_content="Error E404 means resource not found in REST API"),
Document(page_content="Les APIs REST utilisent HTTP status codes"),
Document(page_content="Debugging API errors requires logging"),
Document(page_content="PostgreSQL connection error troubleshooting"),
]
# 1. Dense retriever (embeddings)
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_documents(docs, embeddings)
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
# 2. Sparse retriever (BM25)
bm25_retriever = BM25Retriever.from_documents(docs)
bm25_retriever.k = 3
# 3. Ensemble retriever (combine both)
ensemble_retriever = EnsembleRetriever(
retrievers=[dense_retriever, bm25_retriever],
weights=[0.5, 0.5] # 50% dense, 50% sparse (ajustable!)
)
# 4. Query
query = "error E404 REST API"
results = ensemble_retriever.get_relevant_documents(query)
print(f"Hybrid Search Results for: {query}\n")
for i, doc in enumerate(results[:3], 1):
print(f"{i}. {doc.page_content[:80]}...")
# ✅ Combine:
# - BM25 trouve "E404" exact
# - Dense trouve concepts liés "REST API errors"
# → Meilleur recall + precision
Reciprocal Rank Fusion (RRF)
Méthode de fusion qui combine rankings de plusieurs retrievers sans besoin de normaliser les scores.
def reciprocal_rank_fusion(results_lists, k=60):
"""
RRF: Combine rankings from multiple retrievers
Formula: score(d) = Σ 1/(k + rank(d))
k = constant (typically 60)
rank(d) = position of doc d in ranking (1, 2, 3, ...)
"""
doc_scores = {}
for results in results_lists:
for rank, doc_id in enumerate(results, start=1):
if doc_id not in doc_scores:
doc_scores[doc_id] = 0
doc_scores[doc_id] += 1 / (k + rank)
# Sort by score descending
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
return sorted_docs
# Exemple
dense_results = ["doc_A", "doc_B", "doc_C", "doc_D"] # Dense ranking
sparse_results = ["doc_C", "doc_A", "doc_E", "doc_F"] # BM25 ranking
fused_results = reciprocal_rank_fusion([dense_results, sparse_results])
print("RRF Fusion Results:")
for doc_id, score in fused_results[:5]:
print(f"{doc_id}: {score:.4f}")
# Output:
# doc_A: 0.0311 (rank 1 dense + rank 2 sparse)
# doc_C: 0.0311 (rank 3 dense + rank 1 sparse)
# doc_B: 0.0161 (rank 2 dense only)
# doc_E: 0.0159 (rank 3 sparse only)
Maximum Marginal Relevance (MMR)
MMR: DIVERSIFICATION DES RÉSULTATS
Similarity Search (sans MMR):
Query: "Python tutorial"
Results:
1. "Python tutorial for beginners" (0.95)
2. "Python beginners guide" (0.94) ← Redondant!
3. "Python tutorial step by step" (0.93) ← Redondant!
4. "Python basics tutorial" (0.92) ← Redondant!
5. "Learn Python fast" (0.90) ← Redondant!
MMR (avec diversification):
Query: "Python tutorial"
Results:
1. "Python tutorial for beginners" (0.95) ← Most relevant
2. "Advanced Python patterns" (0.80) ← Different aspect
3. "Python vs Java comparison" (0.75) ← Related but diverse
4. "Python deployment guide" (0.73) ← Complementary
5. "Python testing best practices" (0.70) ← Different topic
✅ Résultats plus utiles car couvrent plusieurs aspects
# MMR avec LangChain
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
documents = [
"Python tutorial for beginners step by step",
"Learn Python basics quickly",
"Python beginner guide complete",
"Advanced Python design patterns",
"Python vs JavaScript comparison",
"Python deployment best practices",
"Machine learning with Python",
]
vectorstore = Chroma.from_texts(documents, embeddings)
query = "Python tutorial"
# 1. Similarity search (sans diversification)
print("=== SIMILARITY SEARCH ===")
sim_results = vectorstore.similarity_search(query, k=5)
for i, doc in enumerate(sim_results, 1):
print(f"{i}. {doc.page_content}")
print("\n=== MMR SEARCH (diversifié) ===")
# 2. MMR search (avec diversification)
mmr_results = vectorstore.max_marginal_relevance_search(
query,
k=5,
fetch_k=20, # Fetch 20 candidats, puis diversifier à 5
lambda_mult=0.5 # 0=max diversity, 1=max relevance (0.5=balance)
)
for i, doc in enumerate(mmr_results, 1):
print(f"{i}. {doc.page_content}")
# lambda_mult tuning:
# 0.0 → Maximum diversity (peut sacrifier relevance)
# 0.5 → Balance (recommandé)
# 1.0 → Maximum relevance (= similarity search classique)
Stratégies de Retrieval: Comparaison
| Stratégie | Avantages | Inconvénients | Use Case |
|---|---|---|---|
| Dense Only | Recherche sémantique, synonymes, langues | Manque mots-clés exacts, noms propres | Questions générales, concepts |
| Sparse Only (BM25) | Exact match, rapide, explicable | Pas de compréhension sémantique | Codes erreur, IDs, noms techniques |
| Hybrid | Meilleur recall, couvre les deux cas | Plus complexe, légèrement plus lent | Production (recommandé!) |
| MMR | Résultats diversifiés, moins redondance | Peut sacrifier top relevance | Exploration, découverte |
Pipeline de Retrieval Optimisé
# Pipeline complet: Hybrid + MMR + Metadata Filtering
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from langchain.schema import Document
from datetime import datetime, timedelta
# Documents avec metadata riches
docs = [
Document(
page_content="Guide de déploiement Kubernetes en production",
metadata={"source": "k8s-guide.pdf", "date": "2024-01-15",
"author": "Expert DevOps", "category": "infrastructure"}
),
Document(
page_content="Error E404: Resource not found in REST API",
metadata={"source": "api-errors.md", "date": "2024-01-10",
"author": "Dev Team", "category": "troubleshooting"}
),
Document(
page_content="Optimisation des performances PostgreSQL",
metadata={"source": "postgres-perf.pdf", "date": "2023-12-01",
"author": "DBA Team", "category": "database"}
),
]
# Setup hybrid retriever
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_documents(docs, embeddings)
dense_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
bm25_retriever = BM25Retriever.from_documents(docs, k=10)
hybrid_retriever = EnsembleRetriever(
retrievers=[dense_retriever, bm25_retriever],
weights=[0.6, 0.4] # Favoriser légèrement dense
)
# Query avec pipeline complet
def advanced_retrieval(query, category_filter=None, date_after=None, use_mmr=True):
"""
Retrieval pipeline avancé:
1. Hybrid search (dense + sparse)
2. Metadata filtering
3. MMR diversification (optionnel)
4. Reranking (leçon suivante)
"""
# 1. Retrieve candidats
if use_mmr:
candidates = vectorstore.max_marginal_relevance_search(
query, k=20, fetch_k=50, lambda_mult=0.5
)
else:
candidates = hybrid_retriever.get_relevant_documents(query)
# 2. Filter by metadata
filtered = candidates
if category_filter:
filtered = [doc for doc in filtered
if doc.metadata.get("category") == category_filter]
if date_after:
filtered = [doc for doc in filtered
if doc.metadata.get("date", "") >= date_after]
return filtered[:5] # Top-5
# Exemples
print("=== Query 1: General (no filters) ===")
results = advanced_retrieval("kubernetes deployment")
for doc in results:
print(f"- {doc.page_content[:60]}...")
print("\n=== Query 2: Category filter ===")
results = advanced_retrieval("optimization", category_filter="database")
for doc in results:
print(f"- {doc.page_content[:60]}...")
print("\n=== Query 3: Recent docs only ===")
results = advanced_retrieval("guide", date_after="2024-01-01")
for doc in results:
print(f"- {doc.page_content[:60]}... ({doc.metadata['date']})")
Métriques de Retrieval
| Métrique | Formule | Objectif | Interprétation |
|---|---|---|---|
| Precision@K | Pertinents retournés / K | >80% | % résultats utiles |
| Recall@K | Pertinents retournés / Total pertinents | >85% | % pertinents trouvés |
| MRR | 1 / Rank premier pertinent | >0.7 | Position 1er bon résultat |
| NDCG@K | DCG / IDCG (normalized) | >0.75 | Qualité du ranking |
Reranking & Cross-Encoders
Le reranking améliore drastiquement la précision en réordonnant les résultats du retrieval initial. Cette leçon explore les bi-encoders vs cross-encoders, et les solutions comme Cohere Rerank et BGE Reranker.
Objectifs de la Leçon
- Comprendre pourquoi le reranking améliore la précision de 20-40%
- Maîtriser bi-encoders vs cross-encoders et leurs trade-offs
- Implémenter reranking avec Cohere, BGE, et cross-encoders locaux
- Optimiser le pipeline retrieve → rerank pour latence et qualité
Le Problème: Embeddings Bi-Encoders
BI-ENCODER (Retrieval Initial)
Query: "Python tutorial" → Encoder → [0.2, 0.8, ...]
↓
Cosine Sim
↓
Doc: "Learn Python" → Encoder → [0.25, 0.75, ...]
❌ Problème: Encode séparément, pas d'interaction
"Python tutorial" vs "Java tutorial" → similaires!
Car "tutorial" domine l'embedding
CROSS-ENCODER (Reranking)
[Query + Doc] concaténés → Encoder → Attention → Score 0.92
"Python tutorial [SEP] Learn Python basics..."
↑ ↑
└────── Attention croisée ───────────┘
✅ Modélise interactions: "Python" dans query + "Python" dans doc
Bi-Encoder vs Cross-Encoder
| Aspect | Bi-Encoder | Cross-Encoder |
|---|---|---|
| Fonctionnement | Encode query et doc séparément | Encode [query + doc] ensemble |
| Précision | ⭐⭐⭐ (70-85%) | ⭐⭐⭐⭐⭐ (85-95%) |
| Vitesse | ⭐⭐⭐⭐⭐ (pré-compute embeddings) | ⭐⭐ (doit encoder chaque paire) |
| Scalabilité | ✅ Million+ docs (vector search) | ❌ Max 100-1000 docs (trop lent) |
| Use Case | Retrieval initial (large corpus) | Reranking final (top-K results) |
| Latence | 10-50ms (search vectoriel) | 50-500ms (N inférences) |
Pipeline Retrieve → Rerank
PIPELINE COMPLET AVEC RERANKING
1 Million Documents
↓
┌─────────────────┐
│ BI-ENCODER │ Dense retrieval (vector search)
│ Retrieval │ Rapide: 50ms
└─────────────────┘
↓
Top-100 candidats (Recall: 90%, Precision: 65%)
↓
┌─────────────────┐
│ CROSS-ENCODER │ Reranking (score chaque paire)
│ Reranking │ Plus lent: 200ms
└─────────────────┘
↓
Top-5 résultats (Recall: 90%, Precision: 92%) ✅
Amélioration: +27% precision avec seulement +150ms latence!
Implémentation: BGE Reranker (Open Source)
# BGE Reranker: Cross-encoder open-source performant
from sentence_transformers import CrossEncoder
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
# 1. Setup retrieval (bi-encoder)
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")
documents = [
"Python is a high-level programming language",
"Java is used for enterprise applications",
"Python tutorial for machine learning",
"JavaScript runs in web browsers",
"Learn Python programming step by step",
"Ruby on Rails web framework",
"Python data science libraries like Pandas",
]
vectorstore = Chroma.from_texts(documents, embeddings)
# 2. Initial retrieval (top-20 candidats)
query = "Python programming tutorial"
candidates = vectorstore.similarity_search(query, k=20)
print("=== INITIAL RETRIEVAL (bi-encoder) ===")
for i, doc in enumerate(candidates[:5], 1):
print(f"{i}. {doc.page_content}")
# 3. Load cross-encoder for reranking
reranker = CrossEncoder('BAAI/bge-reranker-base')
# 4. Rerank candidats
pairs = [[query, doc.page_content] for doc in candidates]
scores = reranker.predict(pairs)
# 5. Sort by reranker scores
reranked_results = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)
print("\n=== AFTER RERANKING (cross-encoder) ===")
for i, (doc, score) in enumerate(reranked_results[:5], 1):
print(f"{i}. [Score: {score:.4f}] {doc.page_content}")
# Résultat attendu:
# Bi-encoder peut mettre "Java" ou "JavaScript" dans top-5
# Cross-encoder repositionne correctement docs Python en haut!
Cohere Rerank: API Cloud (Meilleure Qualité)
# Cohere Rerank: API cloud, meilleure précision
import cohere
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
# 1. Setup Cohere
co = cohere.Client("your-cohere-api-key") # Gratuit: 100 calls/min
# 2. Initial retrieval
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts(documents, embeddings)
query = "how to optimize database performance"
candidates = vectorstore.similarity_search(query, k=50) # Top-50
# 3. Prepare documents for reranking
docs_text = [doc.page_content for doc in candidates]
# 4. Rerank with Cohere
rerank_results = co.rerank(
query=query,
documents=docs_text,
top_n=5, # Return top-5 after reranking
model="rerank-english-v2.0" # ou "rerank-multilingual-v2.0"
)
print("=== COHERE RERANK RESULTS ===")
for result in rerank_results:
print(f"Score: {result.relevance_score:.4f}")
print(f"Doc: {docs_text[result.index][:80]}...")
print()
# Avantages Cohere:
# - Meilleure précision que BGE (modèle plus grand)
# - Multilingue disponible
# - API simple
# - Gratuit jusqu'à 100 calls/min
# Inconvénients:
# - Coût après free tier
# - Latence API (50-200ms)
# - Dépendance externe
Reranker Models: Comparaison
| Modèle | Type | Performance | Vitesse | Coût |
|---|---|---|---|---|
| BGE Reranker Base | Local | ⭐⭐⭐⭐ | ⭐⭐⭐ (100ms/20 docs) | Gratuit |
| BGE Reranker Large | Local | ⭐⭐⭐⭐⭐ | ⭐⭐ (200ms/20 docs) | Gratuit |
| MS MARCO Cross-Encoder | Local | ⭐⭐⭐⭐ | ⭐⭐⭐ | Gratuit |
| Cohere Rerank v2 | API | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ (API latency) | $1/1000 searches |
| Voyage Rerank | API | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | $0.05/1000 searches |
| Jina Reranker | API/Local | ⭐⭐⭐⭐ | ⭐⭐⭐ | Gratuit (local) |
Pipeline Complet avec Reranking
# Pipeline production: Hybrid Retrieval + Reranking + Metadata
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.retrievers import BM25Retriever, EnsembleRetriever
from sentence_transformers import CrossEncoder
from langchain.schema import Document
class ProductionRAGRetriever:
"""
Pipeline optimisé:
1. Hybrid search (dense + BM25)
2. Metadata filtering
3. Cross-encoder reranking
4. Diversity (optionnel)
"""
def __init__(self, documents):
# Setup embeddings et retrievers
self.embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-en-v1.5"
)
# Vector store
self.vectorstore = Chroma.from_documents(
documents,
self.embeddings
)
# Hybrid retriever
dense = self.vectorstore.as_retriever(search_kwargs={"k": 50})
sparse = BM25Retriever.from_documents(documents, k=50)
self.hybrid_retriever = EnsembleRetriever(
retrievers=[dense, sparse],
weights=[0.6, 0.4]
)
# Reranker
self.reranker = CrossEncoder('BAAI/bge-reranker-base')
def retrieve(self, query, top_k=5, category=None, min_score=0.5):
"""
Retrieve avec reranking complet
"""
# 1. Hybrid retrieval (get many candidates)
candidates = self.hybrid_retriever.get_relevant_documents(query)
# 2. Filter by metadata
if category:
candidates = [
doc for doc in candidates
if doc.metadata.get('category') == category
]
# 3. Rerank with cross-encoder
pairs = [[query, doc.page_content] for doc in candidates]
scores = self.reranker.predict(pairs)
# 4. Combine and sort
scored_docs = [
(doc, score)
for doc, score in zip(candidates, scores)
if score >= min_score # Threshold
]
scored_docs.sort(key=lambda x: x[1], reverse=True)
# 5. Return top-k
return scored_docs[:top_k]
# Usage
docs = [
Document(
page_content="Python tutorial for beginners with examples",
metadata={"category": "programming", "lang": "python"}
),
Document(
page_content="Java enterprise development guide",
metadata={"category": "programming", "lang": "java"}
),
Document(
page_content="Machine learning with Python and scikit-learn",
metadata={"category": "ml", "lang": "python"}
),
]
retriever = ProductionRAGRetriever(docs)
# Query
results = retriever.retrieve(
query="Python programming tutorial",
top_k=3,
category="programming",
min_score=0.3
)
print("=== PRODUCTION RETRIEVAL RESULTS ===")
for doc, score in results:
print(f"\nScore: {score:.4f}")
print(f"Content: {doc.page_content}")
print(f"Metadata: {doc.metadata}")
Optimisations Performance
1. Batch Reranking
# Reranker par batch pour vitesse
reranker = CrossEncoder('BAAI/bge-reranker-base')
# ❌ LENT: Un par un
scores = [reranker.predict([[query, doc]])[0] for doc in docs]
# ✅ RAPIDE: Batch
pairs = [[query, doc] for doc in docs]
scores = reranker.predict(pairs, batch_size=32) # 3-5x plus rapide!
2. Reranking Seulement si Nécessaire
def smart_rerank(query, candidates, confidence_threshold=0.9):
"""
Rerank seulement si le top-1 bi-encoder n'est pas très confiant
"""
# Calculer score du top-1
top1_score = cosine_similarity(
query_embedding,
candidates[0].embedding
)
# Si très confiant, skip reranking
if top1_score > confidence_threshold:
return candidates[:5] # Return direct
# Sinon, rerank
return rerank_with_cross_encoder(query, candidates)
# Économise 50%+ des appels reranker en production!
Impact du Reranking: Métriques
| Métrique | Sans Reranking | Avec Reranking | Amélioration |
|---|---|---|---|
| Precision@5 | 68% | 89% | +31% |
| NDCG@10 | 0.72 | 0.91 | +26% |
| MRR | 0.65 | 0.87 | +34% |
| Latence | 50ms | 180ms | +130ms |
LangChain pour RAG
LangChain est le framework le plus populaire pour construire des applications RAG. Cette leçon explore Documents, text splitters, retrievers, chains, et LangChain Expression Language (LCEL).
Objectifs de la Leçon
- Maîtriser l'écosystème LangChain pour RAG (loaders, splitters, retrievers)
- Construire des chains RAG avec RetrievalQA et ConversationalRetrievalChain
- Utiliser LCEL (LangChain Expression Language) pour pipelines modernes
- Intégrer sources multiples et créer RAG production-ready
Architecture LangChain pour RAG
LANGCHAIN RAG ECOSYSTEM
┌─────────────────────────────────────────────────────────┐
│ DOCUMENT LOADERS │
│ PDFLoader │ WebLoader │ CSVLoader │ NotionLoader... │
└─────────────────────┬───────────────────────────────────┘
↓
[Raw Documents]
↓
┌─────────────────────────────────────────────────────────┐
│ TEXT SPLITTERS │
│ RecursiveCharacterTextSplitter │ MarkdownSplitter... │
└─────────────────────┬───────────────────────────────────┘
↓
[Document Chunks]
↓
┌─────────────────────────────────────────────────────────┐
│ EMBEDDINGS │
│ OpenAI │ HuggingFace │ Cohere │ Ollama... │
└─────────────────────┬───────────────────────────────────┘
↓
[Embedded Chunks]
↓
┌─────────────────────────────────────────────────────────┐
│ VECTOR STORES │
│ Chroma │ Qdrant │ Pinecone │ FAISS │ Weaviate... │
└─────────────────────┬───────────────────────────────────┘
↓
[Retriever]
↓
┌─────────────────────────────────────────────────────────┐
│ CHAINS │
│ RetrievalQA │ ConversationalRetrievalChain │ LCEL │
└─────────────────────┬───────────────────────────────────┘
↓
[Response]
Document Loaders: Charger Différentes Sources
from langchain.document_loaders import (
TextLoader,
PDFLoader,
CSVLoader,
UnstructuredMarkdownLoader,
WebBaseLoader,
DirectoryLoader
)
# 1. Load simple text file
text_loader = TextLoader("documentation.txt")
docs_text = text_loader.load()
print(f"Loaded {len(docs_text)} text documents")
# 2. Load PDF
pdf_loader = PDFLoader("guide.pdf")
docs_pdf = pdf_loader.load()
print(f"Loaded {len(docs_pdf)} pages from PDF")
# 3. Load all PDFs in directory
dir_loader = DirectoryLoader(
"./documents/",
glob="**/*.pdf",
loader_cls=PDFLoader
)
docs_all = dir_loader.load()
print(f"Loaded {len(docs_all)} documents from directory")
# 4. Load from web
web_loader = WebBaseLoader("https://example.com/docs")
docs_web = web_loader.load()
# 5. Load CSV
csv_loader = CSVLoader(
file_path="data.csv",
source_column="url" # Column to use as source metadata
)
docs_csv = csv_loader.load()
# 6. Load Markdown
md_loader = UnstructuredMarkdownLoader("README.md")
docs_md = md_loader.load()
# Chaque loader retourne List[Document] avec:
# - page_content: str (le texte)
# - metadata: dict (source, page number, etc.)
for doc in docs_pdf[:1]:
print(f"\nContent: {doc.page_content[:200]}...")
print(f"Metadata: {doc.metadata}")
Text Splitters: Découper Intelligemment
from langchain.text_splitters import (
RecursiveCharacterTextSplitter,
CharacterTextSplitter,
TokenTextSplitter,
MarkdownHeaderTextSplitter,
PythonCodeTextSplitter
)
# 1. RecursiveCharacterTextSplitter (RECOMMANDÉ)
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50,
length_function=len,
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = recursive_splitter.split_documents(docs_pdf)
print(f"Split into {len(chunks)} chunks")
# 2. TokenTextSplitter (pour limites API)
token_splitter = TokenTextSplitter(
chunk_size=100, # 100 tokens
chunk_overlap=10
)
# 3. MarkdownHeaderTextSplitter (preserve structure)
md_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[
("#", "Header 1"),
("##", "Header 2"),
("###", "Header 3"),
]
)
md_text = """
# Installation
Installation is simple.
## Requirements
Python 3.8+
## Steps
1. Install package
2. Run setup
"""
md_chunks = md_splitter.split_text(md_text)
for chunk in md_chunks:
print(f"Metadata: {chunk.metadata}")
print(f"Content: {chunk.page_content}\n")
# 4. PythonCodeTextSplitter (for code)
code_splitter = PythonCodeTextSplitter(
chunk_size=500,
chunk_overlap=0
)
code = """
def hello():
print("Hello")
class MyClass:
def __init__(self):
self.value = 0
"""
code_chunks = code_splitter.split_text(code)
Embeddings: Plusieurs Providers
from langchain.embeddings import (
OpenAIEmbeddings,
HuggingFaceEmbeddings,
CohereEmbeddings,
OllamaEmbeddings
)
# 1. HuggingFace (local, gratuit)
hf_embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5",
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
# 2. OpenAI (API, payant)
openai_embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
openai_api_key="your-key"
)
# 3. Ollama (local, gratuit)
ollama_embeddings = OllamaEmbeddings(
model="nomic-embed-text"
)
# 4. Cohere (API, payant)
cohere_embeddings = CohereEmbeddings(
model="embed-english-v3.0",
cohere_api_key="your-key"
)
# Utilisation identique pour tous
text = "Le RAG combine retrieval et génération"
embedding = hf_embeddings.embed_query(text)
print(f"Embedding dimensions: {len(embedding)}")
Vector Stores: Stockage et Recherche
from langchain.vectorstores import Chroma, FAISS, Qdrant
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
documents = [
Document(page_content="Le RAG améliore les LLMs",
metadata={"source": "doc1"}),
Document(page_content="Vector databases stockent embeddings",
metadata={"source": "doc2"}),
]
# 1. Chroma (simple, local)
chroma_db = Chroma.from_documents(
documents,
embeddings,
persist_directory="./chroma_db"
)
# 2. FAISS (rapide, in-memory)
faiss_db = FAISS.from_documents(
documents,
embeddings
)
faiss_db.save_local("./faiss_index")
# 3. Qdrant (production)
from qdrant_client import QdrantClient
qdrant_db = Qdrant.from_documents(
documents,
embeddings,
url="http://localhost:6333",
collection_name="rag_docs"
)
# Recherche (API identique)
query = "Qu'est-ce que RAG?"
results_chroma = chroma_db.similarity_search(query, k=2)
results_faiss = faiss_db.similarity_search(query, k=2)
results_qdrant = qdrant_db.similarity_search(query, k=2)
# Avec scores
results_with_scores = chroma_db.similarity_search_with_score(query, k=2)
for doc, score in results_with_scores:
print(f"Score: {score:.3f} | {doc.page_content}")
Retrievers: Abstraction sur Vector Stores
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
vectorstore = Chroma.from_texts(
["Doc 1 content", "Doc 2 content", "Doc 3 content"],
HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
)
# 1. Basic retriever
retriever = vectorstore.as_retriever()
# 2. Retriever avec paramètres
retriever = vectorstore.as_retriever(
search_type="similarity", # "similarity", "mmr", "similarity_score_threshold"
search_kwargs={"k": 5} # Top-5 results
)
# 3. MMR retriever (diversification)
mmr_retriever = vectorstore.as_retriever(
search_type="mmr",
search_kwargs={
"k": 5,
"fetch_k": 20, # Fetch 20, return 5 diversified
"lambda_mult": 0.5 # Diversity vs relevance
}
)
# 4. Score threshold retriever
threshold_retriever = vectorstore.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={
"score_threshold": 0.7, # Minimum score
"k": 5
}
)
# Usage: get_relevant_documents()
query = "What is RAG?"
docs = retriever.get_relevant_documents(query)
for doc in docs:
print(doc.page_content)
RetrievalQA Chain: RAG Basique
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
# 1. Setup components
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
documents = [
"Le RAG combine retrieval et génération pour améliorer les LLMs",
"Les embeddings transforment texte en vecteurs numériques",
"Chroma est une vector database simple et efficace"
]
vectorstore = Chroma.from_texts(documents, embeddings)
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
# 2. Setup LLM
llm = Ollama(model="llama3.1:8b", temperature=0.1)
# 3. Create RetrievalQA chain
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # "stuff", "map_reduce", "refine", "map_rerank"
retriever=retriever,
return_source_documents=True, # Return sources
verbose=True # Debug mode
)
# 4. Query
query = "Qu'est-ce qu'un embedding?"
result = qa_chain({"query": query})
print("=== RÉPONSE ===")
print(result['result'])
print("\n=== SOURCES ===")
for i, doc in enumerate(result['source_documents'], 1):
print(f"{i}. {doc.page_content}")
# Chain types explained:
# - stuff: Concat all docs in prompt (simple, best for <4K tokens)
# - map_reduce: Summarize each doc, then combine (for many docs)
# - refine: Iterative refinement (best quality, slow)
# - map_rerank: Score each doc, use best (experimental)
ConversationalRetrievalChain: RAG avec Mémoire
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
# 1. Setup
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts(
["RAG doc 1", "RAG doc 2", "RAG doc 3"],
embeddings
)
llm = Ollama(model="llama3.1:8b")
# 2. Memory for conversation history
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
# 3. Create conversational chain
conv_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(),
memory=memory,
return_source_documents=True,
verbose=True
)
# 4. Multi-turn conversation
print("=== Turn 1 ===")
result1 = conv_chain({"question": "Qu'est-ce que le RAG?"})
print(result1['answer'])
print("\n=== Turn 2 (avec contexte) ===")
result2 = conv_chain({"question": "Quels sont ses avantages?"})
# "ses" fait référence au RAG grâce à la mémoire!
print(result2['answer'])
print("\n=== Turn 3 ===")
result3 = conv_chain({"question": "Comment l'implémenter?"})
print(result3['answer'])
# La mémoire maintient le contexte conversationnel!
LCEL: LangChain Expression Language
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
from langchain.prompts import ChatPromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
# 1. Setup components
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts(
["RAG combines retrieval and generation",
"Embeddings encode semantic meaning"],
embeddings
)
retriever = vectorstore.as_retriever()
llm = Ollama(model="llama3.1:8b")
# 2. Prompt template
prompt = ChatPromptTemplate.from_template("""
Answer the question based only on the following context:
Context: {context}
Question: {question}
Answer:
""")
# 3. LCEL Chain (modern syntax!)
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# 4. Invoke
response = rag_chain.invoke("What is RAG?")
print(response)
# Avantages LCEL:
# - Syntaxe claire (pipe | operator)
# - Streaming automatique
# - Parallélisation
# - Debugging facile
# - Composabilité
# Streaming example
for chunk in rag_chain.stream("What are embeddings?"):
print(chunk, end="", flush=True)
RAG Avancé avec LCEL: Custom Chain
from langchain.schema.runnable import RunnableLambda, RunnableParallel
from langchain.prompts import ChatPromptTemplate
from langchain.llms import Ollama
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
# Setup
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts(["Doc 1", "Doc 2"], embeddings)
llm = Ollama(model="llama3.1:8b")
# Custom functions
def format_docs(docs):
"""Format retrieved docs for prompt"""
return "\n\n".join(doc.page_content for doc in docs)
def add_sources(result):
"""Add source citations"""
return {
"answer": result["answer"],
"sources": result["sources"]
}
# Complex LCEL chain
advanced_chain = (
# 1. Parallel: retrieve + pass question
RunnableParallel(
context=vectorstore.as_retriever() | RunnableLambda(format_docs),
question=RunnablePassthrough(),
sources=vectorstore.as_retriever() # Keep sources
)
# 2. Build prompt
| RunnableLambda(lambda x: {
"answer": ChatPromptTemplate.from_template(
"Context: {context}\n\nQuestion: {question}\n\nAnswer:"
).format(**x),
"sources": x["sources"]
})
# 3. Generate answer
| RunnableLambda(lambda x: {
"answer": llm.invoke(x["answer"]),
"sources": x["sources"]
})
)
# Usage
result = advanced_chain.invoke("What is in the documents?")
print(f"Answer: {result['answer']}")
print(f"Sources: {result['sources']}")
LangChain vs LlamaIndex
| Aspect | LangChain | LlamaIndex |
|---|---|---|
| Focus | Applications LLM générales | Spécialisé RAG/search |
| Learning Curve | ⭐⭐⭐ (modéré) | ⭐⭐ (plus simple) |
| RAG Features | ⭐⭐⭐⭐ (très bon) | ⭐⭐⭐⭐⭐ (excellent) |
| Agents | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| Ecosystem | Très large (500+ integrations) | Moyen (100+ integrations) |
| Best For | Apps complexes, agents, chains | RAG pur, indexation, query |
- Applications complexes avec agents, tools, mémoire
- Besoin de nombreuses intégrations (Notion, Google Docs, etc.)
- Chains personnalisées avec LCEL
- Écosystème mature et large communauté
Quiz Module 5.1: Fondamentaux du RAG
Testez vos connaissances sur les fondamentaux du RAG: principes, embeddings, chunking, vector databases, retrieval, reranking et LangChain.
Question 1: Quelle est la principale différence entre RAG et Fine-Tuning?
Question 2: Quelle métrique de similarité est la plus utilisée pour les embeddings?
Question 3: Quelle est la taille de chunk optimale pour la plupart des use cases RAG?
Question 4: Pourquoi ajouter du chunk overlap?
Question 5: Quelle vector database est recommandée pour prototypage rapide?
Question 6: Qu'est-ce que l'algorithme HNSW?
Question 7: Quelle est la différence entre dense et sparse retrieval?
Question 8: Pourquoi utiliser Hybrid Search (dense + sparse)?
Question 9: Qu'est-ce que MMR (Maximum Marginal Relevance)?
Question 10: Quelle est la différence entre bi-encoder et cross-encoder?
Question 11: Le reranking améliore typiquement la précision de combien?
Question 12: Quel est le pipeline optimal retrieve → rerank?
Question 13: Quelle chain LangChain utiliser pour RAG conversationnel?
Question 14: Qu'est-ce que LCEL (LangChain Expression Language)?
Question 15: Quel est l'avantage principal de metadata enrichment dans les chunks?
LlamaIndex: Framework Spécialisé RAG
LlamaIndex (anciennement GPT Index) est un framework spécialement conçu pour le RAG et l'indexation de données. Cette leçon explore les Nodes, indices, query engines, et response synthesizers.
Objectifs de la Leçon
- Comprendre l'architecture LlamaIndex et ses différences avec LangChain
- Maîtriser les concepts de Nodes, Documents, et Indices
- Construire des query engines avec différentes stratégies
- Utiliser response synthesizers pour générer des réponses optimales
LlamaIndex vs LangChain
| Aspect | LangChain | LlamaIndex |
|---|---|---|
| Focus Principal | Applications LLM générales, agents | RAG, indexation, query sur data |
| Abstraction | Chains, Prompts, Memory | Indices, Query Engines, Synthesizers |
| Complexité | ⭐⭐⭐⭐ (plus de concepts) | ⭐⭐⭐ (plus focalisé) |
| RAG Features | ⭐⭐⭐⭐ (bon) | ⭐⭐⭐⭐⭐ (excellent) |
| Structured Data | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ (SQL, Pandas, etc.) |
| Learning Curve | 3-5 jours | 1-2 jours |
| Best For | Apps complexes, production | RAG pur, data apps, prototypes |
Architecture LlamaIndex
LLAMAINDEX ARCHITECTURE
┌─────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ PDF │ Web │ SQL │ API │ Notion │ Google Docs... │
└────────────────────┬────────────────────────────────────┘
↓
┌─────────────┐
│ LOADERS │ SimpleDirectoryReader, etc.
└──────┬──────┘
↓
┌─────────────┐
│ DOCUMENTS │ Metadata + Content
└──────┬──────┘
↓
┌─────────────┐
│ SPLITTER │ SentenceSplitter, etc.
└──────┬──────┘
↓
┌─────────────┐
│ NODES │ Atomic units with relationships
└──────┬──────┘
↓
┌─────────────┐
│ INDICES │ VectorStoreIndex, TreeIndex, etc.
└──────┬──────┘
↓
┌─────────────┐
│QUERY ENGINE │ Retrieve + Synthesize
└──────┬──────┘
↓
┌─────────────┐
│ RESPONSE │ Final answer
└─────────────┘
Démarrage Rapide avec LlamaIndex
# Installation
pip install llama-index llama-index-llms-ollama llama-index-embeddings-huggingface
# Premier exemple LlamaIndex
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader, Settings
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
# 1. Configuration globale
Settings.llm = Ollama(model="llama3.1:8b", request_timeout=120.0)
Settings.embed_model = HuggingFaceEmbedding(
model_name="BAAI/bge-small-en-v1.5"
)
# 2. Charger documents
documents = SimpleDirectoryReader("./data").load_data()
print(f"Loaded {len(documents)} documents")
# 3. Créer index (auto-split + embed + store)
index = VectorStoreIndex.from_documents(documents)
# 4. Query engine
query_engine = index.as_query_engine()
# 5. Query!
response = query_engine.query("What is RAG?")
print(response)
# C'est tout! LlamaIndex gère automatiquement:
# - Splitting des documents
# - Embedding
# - Vector storage
# - Retrieval
# - Response synthesis
Documents et Nodes
- Document: Unité de données brute (fichier, page, etc.)
- Node: Chunk atomique avec metadata et relations
from llama_index.core import Document
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import TextNode
# 1. Créer documents manuellement
doc1 = Document(
text="Le RAG combine retrieval et génération.",
metadata={"source": "guide.pdf", "page": 1, "author": "Expert"}
)
doc2 = Document(
text="Les embeddings encodent le sens sémantique des textes.",
metadata={"source": "guide.pdf", "page": 2, "author": "Expert"}
)
documents = [doc1, doc2]
# 2. Parser en Nodes
parser = SentenceSplitter(
chunk_size=512,
chunk_overlap=50
)
nodes = parser.get_nodes_from_documents(documents)
print(f"Created {len(nodes)} nodes from {len(documents)} documents")
# 3. Examiner un Node
node = nodes[0]
print(f"\nNode ID: {node.node_id}")
print(f"Text: {node.text}")
print(f"Metadata: {node.metadata}")
print(f"Embedding: {node.embedding}") # None avant indexation
# 4. Créer Node manuellement
custom_node = TextNode(
text="Custom content here",
metadata={"category": "technical", "importance": "high"},
excluded_embed_metadata_keys=["importance"], # Ne pas embedder
excluded_llm_metadata_keys=["category"] # Ne pas envoyer au LLM
)
# Nodes peuvent avoir des relationships
custom_node.relationships = {
"next": nodes[0].node_id, # Lien vers node suivant
"parent": doc1.doc_id # Lien vers document parent
}
Types d'Indices
1. VectorStoreIndex (Le Plus Utilisé)
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
# Charger docs
documents = SimpleDirectoryReader("./data").load_data()
# VectorStoreIndex: Recherche par similarité embeddings
index = VectorStoreIndex.from_documents(
documents,
show_progress=True
)
# Query avec similarity search
query_engine = index.as_query_engine(
similarity_top_k=5 # Top-5 chunks
)
response = query_engine.query("Explain RAG")
print(response)
# Sauvegarder index
index.storage_context.persist(persist_dir="./storage")
# Charger index existant
from llama_index.core import StorageContext, load_index_from_storage
storage_context = StorageContext.from_defaults(persist_dir="./storage")
loaded_index = load_index_from_storage(storage_context)
2. TreeIndex (Hiérarchique)
from llama_index.core import TreeIndex
# TreeIndex: Construit arbre hiérarchique de summaries
# Utile pour documents longs avec structure
tree_index = TreeIndex.from_documents(documents)
# Query remonte l'arbre du spécifique au général
query_engine = tree_index.as_query_engine()
response = query_engine.query("Summarize the main points")
print(response)
# Avantages:
# - Bon pour summarization
# - Navigue hiérarchie
# Inconvénients:
# - Plus lent que VectorStore
# - Moins précis pour questions spécifiques
3. ListIndex (Séquentiel)
from llama_index.core import SummaryIndex # Anciennement ListIndex
# ListIndex: Itère séquentiellement sur tous les nodes
list_index = SummaryIndex.from_documents(documents)
# Query utilise tous les nodes (pas de retrieval sélectif)
query_engine = list_index.as_query_engine()
response = query_engine.query("Give me a comprehensive summary")
# Use case:
# - Summarization complète
# - Petits datasets (<100 docs)
# Éviter pour:
# - Large datasets (trop lent)
# - Questions spécifiques
4. KeywordTableIndex (Sparse)
from llama_index.core import KeywordTableIndex
# KeywordTableIndex: Extrait keywords, recherche par mots-clés
keyword_index = KeywordTableIndex.from_documents(documents)
query_engine = keyword_index.as_query_engine()
response = query_engine.query("error code E404")
# Similaire à BM25
# Bon pour: Codes, IDs, noms techniques
Query Engines: Stratégies de Requête
from llama_index.core import VectorStoreIndex
index = VectorStoreIndex.from_documents(documents)
# 1. Retriever Mode (défaut): Retrieve then synthesize
query_engine = index.as_query_engine(
similarity_top_k=5,
response_mode="compact" # Voir response modes ci-dessous
)
# 2. Response Modes
modes = {
"refine": "Itérative refinement (meilleure qualité, lent)",
"compact": "Concat chunks jusqu'à limite (rapide, bon)",
"tree_summarize": "Bottom-up tree (bon pour longs docs)",
"simple_summarize": "Truncate to fit context (rapide, perd info)",
"no_text": "Juste retrieval, pas de synthesis"
}
# Exemple: Refine mode
refine_engine = index.as_query_engine(
similarity_top_k=10,
response_mode="refine"
)
# Le LLM va:
# 1. Générer réponse avec chunk 1
# 2. Raffiner avec chunk 2
# 3. Raffiner avec chunk 3...
# = Meilleure qualité mais 10x plus lent
response = refine_engine.query("Detailed explanation of RAG")
print(response)
# 3. Streaming responses
streaming_engine = index.as_query_engine(
streaming=True
)
streaming_response = streaming_engine.query("What is RAG?")
for text in streaming_response.response_gen:
print(text, end="", flush=True)
Custom Query Engines
from llama_index.core import VectorStoreIndex
from llama_index.core.query_engine import RetrieverQueryEngine
from llama_index.core.retrievers import VectorIndexRetriever
from llama_index.core.response_synthesizers import get_response_synthesizer
# Construire query engine custom
index = VectorStoreIndex.from_documents(documents)
# 1. Retriever personnalisé
retriever = VectorIndexRetriever(
index=index,
similarity_top_k=10,
)
# 2. Response synthesizer personnalisé
response_synthesizer = get_response_synthesizer(
response_mode="compact",
use_async=True # Parallélise les appels LLM
)
# 3. Assembler
custom_query_engine = RetrieverQueryEngine(
retriever=retriever,
response_synthesizer=response_synthesizer,
)
# 4. Query
response = custom_query_engine.query("Explain embeddings")
print(response)
# Accéder aux source nodes
for node in response.source_nodes:
print(f"Score: {node.score:.3f}")
print(f"Text: {node.text[:100]}...")
print(f"Metadata: {node.metadata}\n")
Composability: Combiner Indices
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.tools import QueryEngineTool
from llama_index.core.query_engine import SubQuestionQueryEngine
# 1. Créer plusieurs indices (différentes sources)
docs_rag = SimpleDirectoryReader("./docs/rag").load_data()
docs_llm = SimpleDirectoryReader("./docs/llm").load_data()
index_rag = VectorStoreIndex.from_documents(docs_rag)
index_llm = VectorStoreIndex.from_documents(docs_llm)
# 2. Créer query engines
query_engine_rag = index_rag.as_query_engine()
query_engine_llm = index_llm.as_query_engine()
# 3. Wrapper en Tools
query_engine_tools = [
QueryEngineTool.from_defaults(
query_engine=query_engine_rag,
name="rag_docs",
description="Documentation sur RAG: retrieval, embeddings, vector DB"
),
QueryEngineTool.from_defaults(
query_engine=query_engine_llm,
name="llm_docs",
description="Documentation sur LLMs: prompting, fine-tuning, inference"
),
]
# 4. SubQuestionQueryEngine: Décompose question complexe
sub_question_engine = SubQuestionQueryEngine.from_defaults(
query_engine_tools=query_engine_tools,
use_async=True
)
# 5. Query complexe
response = sub_question_engine.query(
"Compare RAG and fine-tuning for adapting LLMs"
)
# Le système va:
# 1. Décomposer en sub-questions:
# - "What is RAG?" → query rag_docs
# - "What is fine-tuning?" → query llm_docs
# 2. Combiner réponses
print(response)
Metadata Filtering Avancé
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.vector_stores import MetadataFilters, ExactMatchFilter
# Documents avec metadata riches
documents = [
Document(
text="Guide RAG version 2024",
metadata={"year": 2024, "category": "rag", "author": "Expert"}
),
Document(
text="Guide RAG version 2023",
metadata={"year": 2023, "category": "rag", "author": "Novice"}
),
Document(
text="Guide LLM version 2024",
metadata={"year": 2024, "category": "llm", "author": "Expert"}
),
]
index = VectorStoreIndex.from_documents(documents)
# Query avec filters
filters = MetadataFilters(
filters=[
ExactMatchFilter(key="year", value=2024),
ExactMatchFilter(key="category", value="rag"),
]
)
query_engine = index.as_query_engine(
filters=filters,
similarity_top_k=5
)
# Ne cherchera que dans docs RAG 2024
response = query_engine.query("Latest RAG techniques")
print(response)
LlamaIndex avec Différents LLMs
from llama_index.core import VectorStoreIndex, Settings
from llama_index.llms.openai import OpenAI
from llama_index.llms.ollama import Ollama
from llama_index.llms.huggingface import HuggingFaceLLM
# 1. OpenAI
Settings.llm = OpenAI(model="gpt-4o", temperature=0.1)
# 2. Ollama (local)
Settings.llm = Ollama(model="llama3.1:8b", request_timeout=120.0)
# 3. HuggingFace (local)
Settings.llm = HuggingFaceLLM(
model_name="meta-llama/Llama-2-7b-chat-hf",
tokenizer_name="meta-llama/Llama-2-7b-chat-hf",
context_window=4096,
max_new_tokens=256,
)
# Puis utiliser normalement
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
response = query_engine.query("What is RAG?")
Observability: Debugging Queries
from llama_index.core import set_global_handler
# 1. Enable callback handler
set_global_handler("simple") # Ou "wandb", "arize", etc.
# 2. Query avec logs
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine()
response = query_engine.query("Explain RAG")
# Verra dans console:
# - Query reçue
# - Nodes retrieved
# - Prompts envoyés au LLM
# - Réponse générée
# - Tokens utilisés
print(response)
# 3. Accéder metadata
print(f"\nSource nodes: {len(response.source_nodes)}")
for node in response.source_nodes:
print(f"- {node.metadata['source']}: {node.score:.3f}")
- Focus sur RAG pur sans besoin d'agents complexes
- Prototypage rapide de systèmes de recherche
- Structured data (SQL, Pandas) + unstructured
- Besoin de composability entre différentes sources
RAG Multi-Sources
Les applications RAG modernes doivent ingérer des données de sources multiples: PDFs, sites web, bases SQL, APIs, images, et fichiers structurés. Cette leçon explore comment construire un système RAG unifié sur données hétérogènes.
Objectifs de la Leçon
- Intégrer PDFs, Word, PowerPoint avec parsers avancés
- Scraper et indexer sites web automatiquement
- Connecter bases de données SQL et APIs comme sources RAG
- Traiter images et documents multimodaux
Architecture Multi-Sources
RAG MULTI-SOURCES ARCHITECTURE
┌─────────────────────────────────────────────────────────┐
│ DATA SOURCES │
├──────────┬──────────┬──────────┬──────────┬─────────────┤
│ PDF │ Web │ SQL │ API │ Images │
│ Word │ Notion │ Sheets │ JSON │ Audio │
└────┬─────┴────┬─────┴────┬─────┴────┬─────┴──────┬──────┘
│ │ │ │ │
↓ ↓ ↓ ↓ ↓
┌────────────────────────────────────────────────────────┐
│ SOURCE-SPECIFIC LOADERS │
│ PyPDF │ Selenium │ SQLAlchemy │ Requests │ PIL/CLIP │
└────────────────────┬───────────────────────────────────┘
↓
[Unified Documents]
↓
┌────────────────────────────────────────────────────────┐
│ PREPROCESSING LAYER │
│ Chunking │ Cleaning │ Metadata Enrichment │
└────────────────────┬───────────────────────────────────┘
↓
[Normalized Chunks]
↓
┌────────────────────────────────────────────────────────┐
│ UNIFIED VECTOR STORE │
│ (All sources indexed together) │
└────────────────────┬───────────────────────────────────┘
↓
[Query Engine]
↓
Answers from ALL sources
Source 1: PDFs et Documents Office
# Loaders avancés pour documents
from langchain.document_loaders import (
PyPDFLoader,
UnstructuredPDFLoader,
PDFPlumberLoader,
UnstructuredWordDocumentLoader,
UnstructuredPowerPointLoader,
UnstructuredExcelLoader
)
# 1. PDF Simple (PyPDF)
pdf_loader = PyPDFLoader("document.pdf")
docs_pdf = pdf_loader.load()
# 2. PDF Avancé (Unstructured - meilleur layout)
unstructured_loader = UnstructuredPDFLoader(
"complex_document.pdf",
mode="elements", # Préserve structure (headers, tables, etc.)
strategy="hi_res" # High resolution extraction
)
docs_unstructured = unstructured_loader.load()
# 3. PDF avec tables (PDFPlumber)
plumber_loader = PDFPlumberLoader("tables_document.pdf")
docs_plumber = plumber_loader.load()
# 4. Word documents
word_loader = UnstructuredWordDocumentLoader("report.docx")
docs_word = word_loader.load()
# 5. PowerPoint
ppt_loader = UnstructuredPowerPointLoader("presentation.pptx")
docs_ppt = ppt_loader.load()
# 6. Excel
excel_loader = UnstructuredExcelLoader("data.xlsx")
docs_excel = excel_loader.load()
# Combiner tous les documents
all_docs = docs_pdf + docs_word + docs_ppt + docs_excel
print(f"Loaded {len(all_docs)} documents from multiple sources")
# Enrichir metadata
for doc in all_docs:
doc.metadata["ingestion_date"] = "2024-01-15"
doc.metadata["source_type"] = doc.metadata.get("source", "").split(".")[-1]
# Indexer ensemble
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")
vectorstore = Chroma.from_documents(all_docs, embeddings)
# Query recherche dans TOUTES les sources
results = vectorstore.similarity_search("quarterly revenue", k=5)
for doc in results:
print(f"Source: {doc.metadata['source']} ({doc.metadata['source_type']})")
print(f"Content: {doc.page_content[:150]}...\n")
Source 2: Sites Web et Documentation en Ligne
from langchain.document_loaders import (
WebBaseLoader,
SeleniumURLLoader,
SitemapLoader,
RecursiveUrlLoader
)
# 1. Simple webpage
web_loader = WebBaseLoader("https://example.com/docs")
docs_web = web_loader.load()
# 2. JavaScript-heavy sites (Selenium)
selenium_loader = SeleniumURLLoader(
urls=["https://app.example.com/dashboard"]
)
docs_selenium = selenium_loader.load()
# 3. Sitemap complet (crawler automatique)
sitemap_loader = SitemapLoader(
"https://example.com/sitemap.xml",
filter_urls=["https://example.com/docs/"] # Seulement /docs
)
docs_sitemap = sitemap_loader.load()
# 4. Recursive crawler
from bs4 import BeautifulSoup
def custom_extractor(html):
"""Extract only main content, remove nav/footer"""
soup = BeautifulSoup(html, 'html.parser')
# Remove navigation, footer, ads
for tag in soup(['nav', 'footer', 'aside', 'script', 'style']):
tag.decompose()
return soup.get_text(separator='\n', strip=True)
recursive_loader = RecursiveUrlLoader(
url="https://docs.example.com",
max_depth=3,
extractor=custom_extractor,
prevent_outside=True # Ne pas sortir du domaine
)
docs_recursive = recursive_loader.load()
# 5. Documentation spécifique
from langchain.document_loaders import (
GitbookLoader,
ConfluenceLoader,
NotionDBLoader
)
# Gitbook
gitbook_loader = GitbookLoader("https://docs.company.com")
docs_gitbook = gitbook_loader.load()
# Confluence
confluence_loader = ConfluenceLoader(
url="https://company.atlassian.net/wiki",
username="user@company.com",
api_key="your-api-key"
)
docs_confluence = confluence_loader.load()
# Notion
notion_loader = NotionDBLoader(
integration_token="secret_token",
database_id="database-id"
)
docs_notion = notion_loader.load()
# Combiner tout
all_web_docs = (docs_web + docs_sitemap + docs_recursive +
docs_gitbook + docs_confluence + docs_notion)
print(f"Loaded {len(all_web_docs)} documents from web sources")
Source 3: Bases de Données SQL
from langchain.document_loaders import SQLDatabaseLoader
from langchain.utilities import SQLDatabase
from langchain.agents import create_sql_agent
from langchain.agents.agent_toolkits import SQLDatabaseToolkit
from langchain.llms import Ollama
# 1. Connexion DB
db = SQLDatabase.from_uri("postgresql://user:pass@localhost:5432/company_db")
# 2. Loader SQL basique
sql_loader = SQLDatabaseLoader(
"SELECT id, title, content, created_at FROM articles",
db
)
docs_sql = sql_loader.load()
# 3. SQL Agent pour queries dynamiques
llm = Ollama(model="llama3.1:8b")
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
sql_agent = create_sql_agent(
llm=llm,
toolkit=toolkit,
verbose=True,
agent_type="openai-tools"
)
# Query naturelle convertie en SQL
response = sql_agent.run(
"What were the top 5 selling products last quarter?"
)
print(response)
# 4. Hybrid: SQL + Vector search
# Charger tables dans vector store pour recherche sémantique
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Enrichir metadata SQL
for doc in docs_sql:
doc.metadata["source_type"] = "sql"
doc.metadata["table"] = "articles"
vectorstore = Chroma.from_documents(docs_sql, embeddings)
# Maintenant on peut faire recherche sémantique sur données SQL!
results = vectorstore.similarity_search(
"articles about AI safety",
k=3
)
Source 4: APIs REST et JSON
import requests
from langchain.schema import Document
from langchain.document_loaders import JSONLoader
# 1. API REST simple
def load_from_api(api_url, headers=None):
"""Load documents from REST API"""
response = requests.get(api_url, headers=headers)
data = response.json()
documents = []
for item in data['results']:
doc = Document(
page_content=item['content'],
metadata={
"source": "api",
"api_url": api_url,
"item_id": item['id'],
"timestamp": item.get('created_at'),
}
)
documents.append(doc)
return documents
# Usage
api_docs = load_from_api(
"https://api.company.com/v1/knowledge-base",
headers={"Authorization": "Bearer YOUR_TOKEN"}
)
# 2. JSON files
json_loader = JSONLoader(
file_path="data.json",
jq_schema=".messages[]", # jq pour extraire nested data
content_key="text"
)
docs_json = json_loader.load()
# 3. Pagination API
def load_paginated_api(base_url, max_pages=10):
"""Load all pages from paginated API"""
all_docs = []
page = 1
while page <= max_pages:
response = requests.get(f"{base_url}?page={page}")
data = response.json()
if not data['results']:
break
for item in data['results']:
doc = Document(
page_content=item['content'],
metadata={
"source": "api",
"page": page,
"id": item['id']
}
)
all_docs.append(doc)
page += 1
return all_docs
# 4. Webhook-based updates (temps réel)
from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook/new-document', methods=['POST'])
def webhook_new_document():
"""Receive new documents via webhook"""
data = request.json
# Créer document
doc = Document(
page_content=data['content'],
metadata={
"source": "webhook",
"received_at": datetime.now().isoformat(),
"doc_id": data['id']
}
)
# Ajouter au vector store (voir leçon 18 temps réel)
vectorstore.add_documents([doc])
return {"status": "indexed"}, 200
# app.run(port=5000)
Source 5: Images et Multimodal
# RAG Multimodal: Images + Texte
from PIL import Image
import base64
from io import BytesIO
# 1. Image captions avec CLIP/BLIP
from transformers import BlipProcessor, BlipForConditionalGeneration
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
model = BlipForConditionalGeneration.from_pretrained(
"Salesforce/blip-image-captioning-base"
)
def generate_image_caption(image_path):
"""Generate caption for image"""
image = Image.open(image_path).convert('RGB')
inputs = processor(image, return_tensors="pt")
outputs = model.generate(**inputs)
caption = processor.decode(outputs[0], skip_special_tokens=True)
return caption
# 2. Indexer images avec captions
from pathlib import Path
from langchain.schema import Document
image_dir = Path("./images")
image_docs = []
for img_path in image_dir.glob("*.jpg"):
caption = generate_image_caption(str(img_path))
# Encoder image en base64 pour storage
with open(img_path, "rb") as f:
img_b64 = base64.b64encode(f.read()).decode()
doc = Document(
page_content=f"Image: {caption}",
metadata={
"source": str(img_path),
"type": "image",
"caption": caption,
"image_b64": img_b64[:100] # Truncate for metadata
}
)
image_docs.append(doc)
# 3. Multimodal retrieval
vectorstore = Chroma.from_documents(image_docs, embeddings)
# Query trouve images pertinentes par caption!
results = vectorstore.similarity_search("cat sleeping on couch", k=3)
for doc in results:
print(f"Image: {doc.metadata['source']}")
print(f"Caption: {doc.metadata['caption']}\n")
# 4. OCR pour images avec texte
import pytesseract
def extract_text_from_image(image_path):
"""OCR to extract text from images"""
image = Image.open(image_path)
text = pytesseract.image_to_string(image)
return text
# Combiner OCR + Caption
ocr_text = extract_text_from_image("document_scan.jpg")
caption = generate_image_caption("document_scan.jpg")
doc = Document(
page_content=f"OCR: {ocr_text}\nCaption: {caption}",
metadata={"source": "document_scan.jpg", "type": "image_with_text"}
)
Pipeline Multi-Sources Unifié
# Architecture complète multi-sources
from langchain.document_loaders import (
PyPDFLoader, WebBaseLoader, SQLDatabaseLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from datetime import datetime
class MultiSourceRAG:
"""Unified RAG system for multiple data sources"""
def __init__(self, persist_directory="./multi_source_db"):
self.embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-large-en-v1.5"
)
self.persist_directory = persist_directory
self.vectorstore = None
self.splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=50
)
def ingest_pdfs(self, pdf_dir):
"""Ingest all PDFs from directory"""
from pathlib import Path
all_docs = []
for pdf_path in Path(pdf_dir).glob("*.pdf"):
loader = PyPDFLoader(str(pdf_path))
docs = loader.load()
# Enrich metadata
for doc in docs:
doc.metadata["source_type"] = "pdf"
doc.metadata["ingestion_date"] = datetime.now().isoformat()
all_docs.extend(docs)
return self._process_documents(all_docs)
def ingest_web(self, urls):
"""Ingest web pages"""
all_docs = []
for url in urls:
loader = WebBaseLoader(url)
docs = loader.load()
for doc in docs:
doc.metadata["source_type"] = "web"
doc.metadata["url"] = url
doc.metadata["ingestion_date"] = datetime.now().isoformat()
all_docs.extend(docs)
return self._process_documents(all_docs)
def ingest_sql(self, db_uri, query):
"""Ingest from SQL database"""
from langchain.utilities import SQLDatabase
db = SQLDatabase.from_uri(db_uri)
loader = SQLDatabaseLoader(query, db)
docs = loader.load()
for doc in docs:
doc.metadata["source_type"] = "sql"
doc.metadata["ingestion_date"] = datetime.now().isoformat()
return self._process_documents(docs)
def ingest_api(self, api_url, headers=None):
"""Ingest from REST API"""
import requests
response = requests.get(api_url, headers=headers)
data = response.json()
docs = []
for item in data.get('results', []):
doc = Document(
page_content=item.get('content', ''),
metadata={
"source_type": "api",
"api_url": api_url,
"item_id": item.get('id'),
"ingestion_date": datetime.now().isoformat()
}
)
docs.append(doc)
return self._process_documents(docs)
def _process_documents(self, documents):
"""Process and index documents"""
# Split
chunks = self.splitter.split_documents(documents)
# Index
if self.vectorstore is None:
self.vectorstore = Chroma.from_documents(
chunks,
self.embeddings,
persist_directory=self.persist_directory
)
else:
self.vectorstore.add_documents(chunks)
return len(chunks)
def query(self, question, source_type=None, k=5):
"""Query across all sources or specific type"""
if source_type:
# Filter by source type
results = self.vectorstore.similarity_search(
question,
k=k,
filter={"source_type": source_type}
)
else:
# Search all sources
results = self.vectorstore.similarity_search(question, k=k)
return results
# Usage
rag = MultiSourceRAG()
# Ingest from multiple sources
rag.ingest_pdfs("./documents/pdfs")
rag.ingest_web(["https://docs.company.com", "https://blog.company.com"])
rag.ingest_sql(
"postgresql://user:pass@localhost/db",
"SELECT * FROM knowledge_base"
)
rag.ingest_api("https://api.company.com/v1/articles")
# Query across ALL sources
results = rag.query("What is our refund policy?")
for doc in results:
print(f"Source Type: {doc.metadata['source_type']}")
print(f"Content: {doc.page_content[:200]}...\n")
# Query specific source
pdf_results = rag.query("refund policy", source_type="pdf")
Défis Multi-Sources et Solutions
| Défi | Impact | Solution |
|---|---|---|
| Formats hétérogènes | Qualité variable d'extraction | Source-specific parsers, normalisation |
| Metadata inconsistante | Difficile de filter/trier | Schema metadata unifié, enrichment |
| Freshness variable | Données périmées mixées | Timestamp tracking, TTL, refresh jobs |
| Accès control | Risque de leak de données | Per-document ACL, user-based filtering |
| Scale différent | SQL=milliers, Web=millions | Hybrid storage, tiered indexing |
RAG Conversationnel
Le RAG conversationnel maintient un historique de dialogue et reformule les questions en utilisant le contexte. Cette leçon explore la gestion de mémoire, la reformulation de queries, et les patterns conversationnels.
Objectifs de la Leçon
- Implémenter gestion d'historique conversationnel avec différentes stratégies de mémoire
- Maîtriser la reformulation de queries pour résoudre les coréférences
- Construire un chatbot RAG avec follow-up questions
- Optimiser window size et summarization pour mémoire long-terme
Le Problème: Contexte Conversationnel
RAG SANS vs AVEC CONTEXTE
❌ RAG Basique (sans mémoire):
User: "Parle-moi de notre politique de remboursement"
RAG: "Remboursement sous 30 jours..." ✅
User: "Quelles sont les exceptions?"
RAG: "Exceptions de quoi?" ❌ (a oublié le contexte)
✅ RAG Conversationnel (avec mémoire):
User: "Parle-moi de notre politique de remboursement"
RAG: "Remboursement sous 30 jours..." ✅
User: "Quelles sont les exceptions?"
↓ Reformulation automatique
"Quelles sont les exceptions à la politique de remboursement?"
RAG: "Exceptions: produits personnalisés, soldes..." ✅
ConversationalRetrievalChain (LangChain)
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
# 1. Setup vectorstore
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts(
[
"Notre politique de remboursement permet retours sous 30 jours",
"Exceptions: produits personnalisés, articles soldés, articles hygiéniques",
"Le remboursement est crédité sous 5-7 jours ouvrables",
"Frais de retour à la charge du client sauf défaut produit"
],
embeddings
)
# 2. Memory pour historique
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True,
output_key="answer" # Important!
)
# 3. LLM
llm = Ollama(model="llama3.1:8b", temperature=0.1)
# 4. Conversational chain
qa_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(search_kwargs={"k": 3}),
memory=memory,
return_source_documents=True,
verbose=True
)
# 5. Conversation multi-turn
print("=== Turn 1 ===")
result1 = qa_chain({"question": "Quelle est votre politique de remboursement?"})
print(f"Answer: {result1['answer']}\n")
print("=== Turn 2 (avec coréférence) ===")
result2 = qa_chain({"question": "Quelles sont les exceptions?"})
# Chain reformule automatiquement en:
# "Quelles sont les exceptions à la politique de remboursement?"
print(f"Answer: {result2['answer']}\n")
print("=== Turn 3 ===")
result3 = qa_chain({"question": "Combien de temps pour recevoir le remboursement?"})
print(f"Answer: {result3['answer']}\n")
# La mémoire maintient tout le contexte!
print("=== Chat History ===")
print(memory.chat_memory.messages)
Types de Mémoire
1. ConversationBufferMemory (Simple)
from langchain.memory import ConversationBufferMemory
# Stocke TOUT l'historique
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
memory.save_context(
{"input": "Bonjour"},
{"output": "Bonjour! Comment puis-je vous aider?"}
)
memory.save_context(
{"input": "Parle-moi de RAG"},
{"output": "RAG combine retrieval et génération..."}
)
# Récupérer historique
print(memory.load_memory_variables({}))
# Avantages: Simple, contexte complet
# Inconvénients: Croît indéfiniment, coûteux après 10+ turns
2. ConversationBufferWindowMemory (Fenêtre Glissante)
from langchain.memory import ConversationBufferWindowMemory
# Garde seulement les K derniers messages
memory = ConversationBufferWindowMemory(
k=5, # Garde 5 dernières paires (user + assistant)
memory_key="chat_history",
return_messages=True
)
# Après 10 turns, seuls les 5 derniers restent
for i in range(10):
memory.save_context(
{"input": f"Question {i}"},
{"output": f"Réponse {i}"}
)
# Ne contient que turns 5-9
messages = memory.load_memory_variables({})
print(f"Messages in memory: {len(messages['chat_history'])}")
# Avantages: Mémoire bornée, bon pour conversations longues
# Inconvénients: Perd contexte ancien (peut oublier sujet initial)
3. ConversationSummaryMemory (Summarization)
from langchain.memory import ConversationSummaryMemory
from langchain.llms import Ollama
llm = Ollama(model="llama3.1:8b")
# Summarize conversation au fur et à mesure
memory = ConversationSummaryMemory(
llm=llm,
memory_key="chat_history"
)
memory.save_context(
{"input": "Parle-moi de notre politique de remboursement"},
{"output": "Nous offrons remboursements sous 30 jours..."}
)
memory.save_context(
{"input": "Quelles sont les exceptions?"},
{"output": "Exceptions incluent produits personnalisés..."}
)
# Génère summary progressif
summary = memory.load_memory_variables({})
print(summary['chat_history'])
# → "User asked about refund policy. We explained 30-day returns
# with exceptions for custom products..."
# Avantages: Mémoire compacte, résume long contexte
# Inconvénients: Coût LLM pour summarization, peut perdre détails
4. ConversationSummaryBufferMemory (Hybride)
from langchain.memory import ConversationSummaryBufferMemory
# Combine buffer récent + summary ancien
memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=500, # Limite tokens
memory_key="chat_history",
return_messages=True
)
# Messages récents: garde verbatim
# Messages anciens: summarize
# → Best of both worlds!
# Avantages: Contexte récent précis, ancien résumé
# Inconvénients: Plus complexe, coût summarization
Query Reformulation
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import Ollama
# Custom query reformulation
reformulation_template = """Given the following conversation and a follow-up question,
rephrase the follow-up question to be a standalone question.
Chat History:
{chat_history}
Follow-up Question: {question}
Standalone Question:"""
reformulation_prompt = PromptTemplate(
input_variables=["chat_history", "question"],
template=reformulation_template
)
llm = Ollama(model="llama3.1:8b")
reformulation_chain = LLMChain(
llm=llm,
prompt=reformulation_prompt
)
# Exemple
chat_history = """
Human: Tell me about your refund policy
Assistant: We offer full refunds within 30 days of purchase.
"""
follow_up = "What are the exceptions?"
# Reformuler
standalone_question = reformulation_chain.run(
chat_history=chat_history,
question=follow_up
)
print(f"Original: {follow_up}")
print(f"Reformulated: {standalone_question}")
# → "What are the exceptions to the refund policy?"
# Maintenant on peut query avec question standalone
results = vectorstore.similarity_search(standalone_question, k=3)
Architecture RAG Conversationnel Complète
# Pipeline conversationnel production-ready
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationSummaryBufferMemory
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
class ConversationalRAG:
"""Production RAG conversationnel avec reformulation"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
# Memory hybride
self.memory = ConversationSummaryBufferMemory(
llm=llm,
max_token_limit=1000,
memory_key="chat_history",
return_messages=True,
output_key="answer"
)
# Custom condense prompt pour reformulation
condense_template = """Given the conversation history and new question,
rephrase the question to be standalone. Keep technical terms.
Chat History:
{chat_history}
New Question: {question}
Standalone Question:"""
condense_prompt = PromptTemplate(
input_variables=["chat_history", "question"],
template=condense_template
)
# QA prompt
qa_template = """Use the following context to answer the question.
If you don't know, say so. Cite sources.
Context: {context}
Question: {question}
Answer with sources:"""
qa_prompt = PromptTemplate(
input_variables=["context", "question"],
template=qa_template
)
# Chain
self.chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
memory=self.memory,
condense_question_prompt=condense_prompt,
combine_docs_chain_kwargs={"prompt": qa_prompt},
return_source_documents=True,
verbose=True
)
def chat(self, question):
"""Chat avec contexte conversationnel"""
result = self.chain({"question": question})
return {
"answer": result["answer"],
"sources": result["source_documents"],
"chat_history": self.memory.load_memory_variables({})
}
def reset(self):
"""Reset conversation"""
self.memory.clear()
# Usage
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")
vectorstore = Chroma.from_texts([...], embeddings)
llm = Ollama(model="llama3.1:8b")
rag = ConversationalRAG(vectorstore, llm)
# Conversation
print("Turn 1:")
r1 = rag.chat("What is your refund policy?")
print(r1["answer"])
print("\nTurn 2:")
r2 = rag.chat("What are the exceptions?") # Auto-reformulé
print(r2["answer"])
print("\nTurn 3:")
r3 = rag.chat("How long for the refund?") # Utilise contexte
print(r3["answer"])
Best Practices RAG Conversationnel
| Aspect | Recommandation | Raison |
|---|---|---|
| Memory Type | ConversationSummaryBuffer | Balance contexte récent + summary ancien |
| Window Size | 5-10 derniers turns | Évite context window overflow |
| Reformulation | Toujours activer | Résout 80% problèmes coréférences |
| Source Citation | Inclure dans réponse | Traçabilité et confiance |
| Reset Button | Offrir aux users | Changement de sujet propre |
Self-RAG & Adaptive RAG
Les systèmes RAG avancés peuvent s'auto-évaluer et adapter leur stratégie. Cette leçon explore Self-RAG (auto-correction), Adaptive RAG (routing intelligent), et iterative retrieval.
Objectifs de la Leçon
- Comprendre Self-RAG: génération avec auto-critique et refinement
- Implémenter Adaptive RAG avec routing conditionnel
- Maîtriser iterative retrieval pour questions complexes
- Construire un système RAG qui s'améliore automatiquement
Self-RAG: Auto-Évaluation et Correction
SELF-RAG PIPELINE
Query: "What is the capital of France?"
↓
┌──────────────────────────┐
│ 1. RETRIEVE │ → Top-5 documents
└───────────┬──────────────┘
↓
┌──────────────────────────┐
│ 2. GENERATE │ → "The capital of France is Paris"
└───────────┬──────────────┘
↓
┌──────────────────────────┐
│ 3. SELF-CRITIQUE │
│ Q: Is answer supported? │ → YES (found in docs)
│ Q: Is answer relevant? │ → YES (answers question)
│ Q: Is answer complete? │ → NO (could add details)
└───────────┬──────────────┘
↓
┌──────────────────────────┐
│ 4. REFINE (if needed) │ → "The capital of France is Paris,
│ │ located in north-central France"
└──────────────────────────┘
vs RAG classique: génère et retourne directement (pas de critique)
# Self-RAG Implementation
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
from langchain.llms import Ollama
class SelfRAG:
"""RAG with self-critique and refinement"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
# Generate prompt
self.generate_prompt = PromptTemplate(
input_variables=["context", "question"],
template="""Context: {context}
Question: {question}
Answer based on context:"""
)
# Critique prompts
self.faithfulness_prompt = PromptTemplate(
input_variables=["context", "answer"],
template="""Context: {context}
Answer: {answer}
Is the answer fully supported by the context? Answer YES or NO:"""
)
self.relevance_prompt = PromptTemplate(
input_variables=["question", "answer"],
template="""Question: {question}
Answer: {answer}
Does the answer address the question? Answer YES or NO:"""
)
# Refine prompt
self.refine_prompt = PromptTemplate(
input_variables=["context", "question", "answer", "critique"],
template="""Context: {context}
Question: {question}
Previous Answer: {answer}
Issues: {critique}
Provide an improved answer:"""
)
def query(self, question, max_iterations=2):
"""Self-RAG query with critique loop"""
# 1. Retrieve
docs = self.vectorstore.similarity_search(question, k=5)
context = "\n\n".join([doc.page_content for doc in docs])
# 2. Generate initial answer
answer = self.llm(self.generate_prompt.format(
context=context,
question=question
))
# 3. Self-critique loop
for iteration in range(max_iterations):
print(f"\n=== Iteration {iteration + 1} ===")
print(f"Answer: {answer}")
# Check faithfulness
faithfulness = self.llm(self.faithfulness_prompt.format(
context=context,
answer=answer
)).strip()
# Check relevance
relevance = self.llm(self.relevance_prompt.format(
question=question,
answer=answer
)).strip()
print(f"Faithfulness: {faithfulness}")
print(f"Relevance: {relevance}")
# If both pass, we're done
if "YES" in faithfulness and "YES" in relevance:
print("✅ Answer validated!")
break
# Otherwise, refine
critique = []
if "NO" in faithfulness:
critique.append("not fully supported by context")
if "NO" in relevance:
critique.append("doesn't fully address question")
critique_text = ", ".join(critique)
answer = self.llm(self.refine_prompt.format(
context=context,
question=question,
answer=answer,
critique=critique_text
))
return {
"answer": answer,
"sources": docs,
"iterations": iteration + 1
}
# Usage
llm = Ollama(model="llama3.1:8b", temperature=0.1)
self_rag = SelfRAG(vectorstore, llm)
result = self_rag.query("What is the refund policy?")
print(f"\n=== Final Answer ===\n{result['answer']}")
print(f"\nIterations: {result['iterations']}")
Adaptive RAG: Routing Intelligent
ADAPTIVE RAG ROUTING
Query: "What is 2+2?"
↓
┌────────────────────┐
│ QUERY ANALYZER │ → Type: Math (factual)
│ (LLM classifier) │ → Complexity: Simple
└─────────┬──────────┘ → Needs retrieval: NO
↓
┌─────┴─────┐
│ ROUTER │
└─────┬─────┘
↓
DIRECT ANSWER (no RAG needed)
"4"
Query: "Explain quantum computing in the context of our product"
↓
QUERY ANALYZER → Type: Complex, Domain-specific
→ Needs: Multi-source RAG
↓
ROUTER → Multi-hop retrieval + synthesis
Query: "Latest news about AI regulation"
↓
QUERY ANALYZER → Type: Current events
→ Needs: Fresh data
↓
ROUTER → Web search (not internal docs)
# Adaptive RAG with intelligent routing
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
class AdaptiveRAG:
"""RAG that adapts strategy based on query type"""
def __init__(self, vectorstore, llm, web_search_tool=None):
self.vectorstore = vectorstore
self.llm = llm
self.web_search = web_search_tool
# Query analysis prompt
self.analysis_prompt = PromptTemplate(
input_variables=["question"],
template="""Analyze this question and classify:
Question: {question}
Classify as:
- SIMPLE_FACTUAL: Simple fact that LLM likely knows
- DOMAIN_SPECIFIC: Requires company/domain documents
- CURRENT_EVENTS: Needs recent/real-time information
- COMPLEX_REASONING: Multi-step reasoning needed
Classification:"""
)
def analyze_query(self, question):
"""Analyze query to determine strategy"""
analysis = self.llm(self.analysis_prompt.format(
question=question
)).strip()
return analysis
def query(self, question):
"""Adaptive query with routing"""
# 1. Analyze query
query_type = self.analyze_query(question)
print(f"Query Type: {query_type}")
# 2. Route based on type
if "SIMPLE_FACTUAL" in query_type:
# Direct LLM answer (no retrieval)
print("→ Strategy: Direct LLM")
answer = self.llm(f"Question: {question}\nAnswer:")
return {"answer": answer, "strategy": "direct", "sources": []}
elif "CURRENT_EVENTS" in query_type:
# Web search
print("→ Strategy: Web Search")
if self.web_search:
results = self.web_search.run(question)
return {"answer": results, "strategy": "web_search", "sources": []}
else:
return {"answer": "Web search not available", "strategy": "none"}
elif "COMPLEX_REASONING" in query_type:
# Multi-hop retrieval
print("→ Strategy: Multi-hop RAG")
return self._multi_hop_rag(question)
else: # DOMAIN_SPECIFIC
# Standard RAG
print("→ Strategy: Standard RAG")
return self._standard_rag(question)
def _standard_rag(self, question):
"""Standard RAG retrieval"""
docs = self.vectorstore.similarity_search(question, k=5)
context = "\n\n".join([d.page_content for d in docs])
prompt = f"""Context: {context}
Question: {question}
Answer based on context:"""
answer = self.llm(prompt)
return {"answer": answer, "strategy": "rag", "sources": docs}
def _multi_hop_rag(self, question):
"""Multi-hop retrieval for complex questions"""
# Decompose question
decompose_prompt = f"""Break down this complex question into simpler sub-questions:
Question: {question}
Sub-questions (one per line):"""
sub_questions = self.llm(decompose_prompt).strip().split("\n")
print(f"Sub-questions: {sub_questions}")
# Answer each sub-question
sub_answers = []
all_sources = []
for sq in sub_questions[:3]: # Limit to 3
if not sq.strip():
continue
docs = self.vectorstore.similarity_search(sq, k=3)
context = "\n\n".join([d.page_content for d in docs])
answer = self.llm(f"Context: {context}\nQuestion: {sq}\nAnswer:")
sub_answers.append(f"Q: {sq}\nA: {answer}")
all_sources.extend(docs)
# Synthesize final answer
synthesis_prompt = f"""Original Question: {question}
Sub-question answers:
{chr(10).join(sub_answers)}
Synthesize a complete answer to the original question:"""
final_answer = self.llm(synthesis_prompt)
return {
"answer": final_answer,
"strategy": "multi_hop",
"sources": all_sources,
"sub_questions": sub_questions
}
# Usage
adaptive_rag = AdaptiveRAG(vectorstore, llm)
# Different query types
r1 = adaptive_rag.query("What is 2+2?") # Direct
r2 = adaptive_rag.query("What is our refund policy?") # RAG
r3 = adaptive_rag.query("How does our pricing compare to competitors and what are the strategic implications?") # Multi-hop
Iterative Retrieval
# Iterative Retrieval: Retrieve → Assess → Retrieve More if Needed
class IterativeRAG:
"""RAG with iterative retrieval until sufficient context"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def query(self, question, max_iterations=3):
"""Iterative retrieval with sufficiency check"""
all_docs = []
k = 3 # Start with 3 docs
for iteration in range(max_iterations):
print(f"\n=== Iteration {iteration + 1} ===")
# Retrieve
new_docs = self.vectorstore.similarity_search(
question,
k=k,
filter={"id": {"$nin": [d.metadata.get("id") for d in all_docs]}}
)
all_docs.extend(new_docs)
print(f"Retrieved {len(new_docs)} new docs (total: {len(all_docs)})")
# Build context
context = "\n\n".join([d.page_content for d in all_docs])
# Check sufficiency
sufficiency_prompt = f"""Context: {context}
Question: {question}
Is there enough information in the context to answer the question completely?
Answer YES or NO:"""
is_sufficient = self.llm(sufficiency_prompt).strip()
print(f"Sufficient: {is_sufficient}")
if "YES" in is_sufficient:
print("✅ Sufficient context found!")
break
# Need more docs
k = 5 # Retrieve more next iteration
# Generate final answer
answer_prompt = f"""Context: {context}
Question: {question}
Comprehensive answer:"""
answer = self.llm(answer_prompt)
return {
"answer": answer,
"sources": all_docs,
"iterations": iteration + 1
}
# Usage
iterative_rag = IterativeRAG(vectorstore, llm)
result = iterative_rag.query("Explain our complete refund and exchange process")
Comparaison Approches RAG
| Approche | Avantages | Coût | Latence | Qualité |
|---|---|---|---|---|
| Naive RAG | Simple, rapide | 💰 | 200ms | ⭐⭐⭐ |
| Self-RAG | Auto-correction, +20% qualité | 💰💰💰 | 800ms | ⭐⭐⭐⭐ |
| Adaptive RAG | Routing optimal, efficace | 💰💰 | 300ms | ⭐⭐⭐⭐ |
| Iterative RAG | Questions complexes | 💰💰💰 | 1000ms | ⭐⭐⭐⭐⭐ |
Graph RAG: Knowledge Graphs + RAG
Graph RAG combine retrieval vectoriel avec knowledge graphs pour capturer relations et raisonnement structuré. Cette leçon explore Neo4j + RAG, extraction de relations, et Cypher queries.
Objectifs de la Leçon
- Comprendre pourquoi combiner graphs et RAG améliore raisonnement relationnel
- Construire un knowledge graph depuis documents avec LLM
- Implémenter Graph RAG avec Neo4j et Cypher
- Maîtriser hybrid retrieval: vector + graph traversal
Pourquoi Graph RAG?
VECTOR RAG vs GRAPH RAG
Vector RAG:
Query: "Who is the CEO of Tesla?"
↓ Similarity search
Documents: ["Elon Musk leads...", "Tesla CEO...", "SpaceX and Tesla..."]
↓ Generate
Answer: "Elon Musk" ✅ (trouve l'info)
Query: "What companies does the CEO of Tesla also lead?"
↓ Similarity search
Documents: ["Tesla CEO...", "SpaceX founded...", "Elon biography..."]
↓ Generate
Answer: ❓ (doit inférer relations entre chunks séparés)
Graph RAG:
Query: "What companies does the CEO of Tesla also lead?"
↓ Graph query
Cypher: MATCH (p:Person)-[:CEO_OF]->(c1:Company {name: "Tesla"})
MATCH (p)-[:CEO_OF|FOUNDER_OF]->(c2:Company)
RETURN c2.name
↓
Answer: ["Tesla", "SpaceX", "The Boring Company", "Neuralink"] ✅
→ Graph capture relations explicites, meilleur pour raisonnement multi-hop
Construction Knowledge Graph depuis Documents
# Extract entities and relations with LLM
from langchain.llms import Ollama
from langchain.prompts import PromptTemplate
from neo4j import GraphDatabase
class KnowledgeGraphBuilder:
"""Build knowledge graph from documents"""
def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
self.driver = GraphDatabase.driver(
neo4j_uri,
auth=(neo4j_user, neo4j_password)
)
self.llm = Ollama(model="llama3.1:8b")
# Extraction prompt
self.extraction_prompt = PromptTemplate(
input_variables=["text"],
template="""Extract entities and relationships from this text.
Text: {text}
Format as:
ENTITIES:
- [Type] Name
RELATIONSHIPS:
- Entity1 -> RELATION -> Entity2
Output:"""
)
def extract_graph_from_text(self, text):
"""Extract entities and relations with LLM"""
# LLM extraction
result = self.llm(self.extraction_prompt.format(text=text))
# Parse result
entities = []
relationships = []
current_section = None
for line in result.split("\n"):
line = line.strip()
if "ENTITIES:" in line:
current_section = "entities"
continue
elif "RELATIONSHIPS:" in line:
current_section = "relationships"
continue
if not line or line.startswith("-"):
continue
if current_section == "entities":
# Parse: [Type] Name
if "[" in line and "]" in line:
entity_type = line[line.find("[")+1:line.find("]")]
name = line[line.find("]")+1:].strip()
entities.append({"type": entity_type, "name": name})
elif current_section == "relationships":
# Parse: Entity1 -> RELATION -> Entity2
if "->" in line:
parts = line.split("->")
if len(parts) == 3:
source = parts[0].strip()
relation = parts[1].strip()
target = parts[2].strip()
relationships.append({
"source": source,
"relation": relation,
"target": target
})
return entities, relationships
def add_to_graph(self, entities, relationships):
"""Add entities and relationships to Neo4j"""
with self.driver.session() as session:
# Create entities
for entity in entities:
session.run(
f"""
MERGE (n:{entity['type']} {{name: $name}})
""",
name=entity['name']
)
# Create relationships
for rel in relationships:
session.run(
f"""
MATCH (a {{name: $source}})
MATCH (b {{name: $target}})
MERGE (a)-[:{rel['relation']}]->(b)
""",
source=rel['source'],
target=rel['target']
)
def build_graph_from_documents(self, documents):
"""Process multiple documents"""
for doc in documents:
entities, relationships = self.extract_graph_from_text(
doc.page_content
)
print(f"Extracted {len(entities)} entities, {len(relationships)} relations")
self.add_to_graph(entities, relationships)
# Usage
kg_builder = KnowledgeGraphBuilder(
"bolt://localhost:7687",
"neo4j",
"password"
)
# Example document
doc_text = """
Elon Musk is the CEO of Tesla and SpaceX.
Tesla is headquartered in Austin, Texas.
SpaceX was founded in 2002 and focuses on space exploration.
"""
entities, relations = kg_builder.extract_graph_from_text(doc_text)
print("Entities:", entities)
# → [{"type": "Person", "name": "Elon Musk"},
# {"type": "Company", "name": "Tesla"}, ...]
print("Relations:", relations)
# → [{"source": "Elon Musk", "relation": "CEO_OF", "target": "Tesla"}, ...]
kg_builder.add_to_graph(entities, relations)
Graph RAG: Hybrid Retrieval
# Graph RAG: Vector + Graph retrieval
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from neo4j import GraphDatabase
from langchain.llms import Ollama
class GraphRAG:
"""Hybrid RAG with vector and graph retrieval"""
def __init__(self, vectorstore, neo4j_driver, llm):
self.vectorstore = vectorstore
self.neo4j = neo4j_driver
self.llm = llm
def detect_query_type(self, question):
"""Detect if query needs graph traversal"""
graph_keywords = [
"relationship", "connected", "related to",
"works for", "CEO of", "founded by",
"between", "links", "network"
]
question_lower = question.lower()
for keyword in graph_keywords:
if keyword in question_lower:
return "graph"
return "vector"
def vector_retrieval(self, question, k=5):
"""Standard vector retrieval"""
docs = self.vectorstore.similarity_search(question, k=k)
context = "\n\n".join([d.page_content for d in docs])
return context
def graph_retrieval(self, question):
"""Graph-based retrieval"""
# Generate Cypher query with LLM
cypher_prompt = f"""Generate a Neo4j Cypher query for this question.
Use nodes with properties and relationships.
Question: {question}
Cypher query:"""
cypher_query = self.llm(cypher_prompt).strip()
# Clean up query
if "```" in cypher_query:
cypher_query = cypher_query.split("```")[1]
if cypher_query.startswith("cypher"):
cypher_query = cypher_query[6:]
cypher_query = cypher_query.strip()
print(f"Generated Cypher: {cypher_query}")
# Execute query
try:
with self.neo4j.session() as session:
result = session.run(cypher_query)
records = list(result)
# Format results
context = "Graph Query Results:\n"
for record in records:
context += f"{dict(record)}\n"
return context
except Exception as e:
print(f"Graph query error: {e}")
return ""
def query(self, question):
"""Hybrid query: vector + graph"""
query_type = self.detect_query_type(question)
print(f"Query type: {query_type}")
# Always get vector context
vector_context = self.vector_retrieval(question, k=3)
# Add graph context if relational query
graph_context = ""
if query_type == "graph":
graph_context = self.graph_retrieval(question)
# Combine contexts
full_context = f"""Vector Search Results:
{vector_context}
{graph_context if graph_context else ""}"""
# Generate answer
answer_prompt = f"""Context: {full_context}
Question: {question}
Answer using both vector and graph information:"""
answer = self.llm(answer_prompt)
return {
"answer": answer,
"query_type": query_type,
"vector_context": vector_context,
"graph_context": graph_context
}
# Usage
neo4j_driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password")
)
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = Chroma.from_texts([...], embeddings)
llm = Ollama(model="llama3.1:8b")
graph_rag = GraphRAG(vectorstore, neo4j_driver, llm)
# Vector query
r1 = graph_rag.query("What is Tesla's mission?")
# Graph query
r2 = graph_rag.query("What companies does the CEO of Tesla also lead?")
print(r2["answer"])
Cypher Queries Courantes
# Common Cypher patterns for RAG
# 1. Find person's relationships
"""
MATCH (p:Person {name: "Elon Musk"})-[r]->(target)
RETURN type(r) as relationship, target.name as entity
"""
# 2. Multi-hop: Find companies of CEO's companies
"""
MATCH (p:Person)-[:CEO_OF]->(c1:Company {name: "Tesla"})
MATCH (p)-[:CEO_OF|FOUNDER_OF]->(c2:Company)
RETURN DISTINCT c2.name
"""
# 3. Shortest path between entities
"""
MATCH path = shortestPath(
(a:Person {name: "Elon Musk"})-[*]-(b:Person {name: "Tim Cook"})
)
RETURN path
"""
# 4. Find all related documents
"""
MATCH (e:Entity {name: "Tesla"})-[:MENTIONED_IN]->(d:Document)
RETURN d.content, d.source
"""
# 5. Aggregate relationships
"""
MATCH (c:Company)<-[:WORKS_FOR]-(p:Person)
RETURN c.name, count(p) as employee_count
ORDER BY employee_count DESC
"""
# 6. Temporal queries
"""
MATCH (p:Person)-[r:CEO_OF]->(c:Company)
WHERE r.start_date <= date() AND
(r.end_date IS NULL OR r.end_date >= date())
RETURN p.name, c.name
"""
Graph RAG vs Vector RAG
| Aspect | Vector RAG | Graph RAG |
|---|---|---|
| Retrieval | Similarité sémantique | Traversal de relations |
| Questions Multi-Hop | ⭐⭐ (difficile) | ⭐⭐⭐⭐⭐ (excellent) |
| Raisonnement Relationnel | ❌ Implicite | ✅ Explicite |
| Setup | Simple (embeddings) | Complexe (extraction + graph) |
| Scalabilité | Excellent (millions docs) | Bon (milliers entités) |
| Best For | Questions simples, FAQ | Domaines structurés, relations |
Évaluation de RAG avec RAGAS
Évaluer la qualité d'un système RAG est crucial pour l'optimiser. Cette leçon explore RAGAS (RAG Assessment), le framework standard pour mesurer faithfulness, relevance, context precision et recall.
Objectifs de la Leçon
- Comprendre les métriques clés: Faithfulness, Answer Relevance, Context Precision/Recall
- Implémenter évaluation automatisée avec le framework RAGAS
- Créer des datasets d'évaluation (questions + ground truth)
- Optimiser RAG basé sur métriques mesurées
Les 4 Métriques RAGAS Essentielles
MÉTRIQUES RAGAS
Query: "Quelle est notre politique de remboursement?"
↓
Retrieved Context: [Doc1, Doc2, Doc3]
↓
Generated Answer: "Remboursement sous 30 jours avec exceptions..."
↓
Ground Truth: "30 jours, exceptions produits soldés"
┌─────────────────────────────────────────────────────────┐
│ 1. FAITHFULNESS (Fidélité) │
│ Question: La réponse est-elle supportée par context? │
│ Score: 0.95 (95% statements supportés) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 2. ANSWER RELEVANCE (Pertinence Réponse) │
│ Question: La réponse répond-elle à la question? │
│ Score: 0.92 (très pertinent) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 3. CONTEXT PRECISION (Précision Contexte) │
│ Question: Les docs retrieved sont-ils pertinents? │
│ Score: 0.80 (80% chunks utiles) │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
│ 4. CONTEXT RECALL (Rappel Contexte) │
│ Question: Tous les docs nécessaires sont retrieved? │
│ Score: 0.90 (90% info nécessaire trouvée) │
└─────────────────────────────────────────────────────────┘
Installation et Setup RAGAS
# Installation
pip install ragas langchain openai
# Imports
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall
)
from datasets import Dataset
Créer un Dataset d'Évaluation
# Dataset format pour RAGAS
eval_data = {
"question": [
"Quelle est votre politique de remboursement?",
"Quels sont les délais de livraison?",
"Comment contacter le support?"
],
"answer": [
"Nous offrons des remboursements complets sous 30 jours pour tout produit défectueux. Exceptions: produits personnalisés et articles soldés.",
"Livraison standard: 3-5 jours ouvrables. Express: 24-48h. Gratuite dès 50€.",
"Support disponible par email à support@company.com, chat 9h-18h, ou téléphone au 01-23-45-67-89."
],
"contexts": [
[
"Politique remboursement: 30 jours, produits défectueux remboursés intégralement",
"Exceptions remboursement: personnalisés, soldés, hygiène"
],
[
"Livraison standard 3-5 jours, express 24-48h",
"Frais port gratuits >50€"
],
[
"Contact: support@company.com",
"Horaires chat: 9h-18h lun-ven",
"Téléphone: 01-23-45-67-89"
]
],
"ground_truth": [
"Remboursement sous 30 jours pour défauts. Pas pour personnalisés ou soldés.",
"Standard 3-5 jours, express 24-48h, gratuit >50€",
"Email support@company.com, chat 9h-18h, tel 01-23-45-67-89"
]
}
# Convertir en Dataset
eval_dataset = Dataset.from_dict(eval_data)
print(eval_dataset)
Évaluation avec RAGAS
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall
)
# Évaluer avec toutes les métriques
result = evaluate(
eval_dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall,
],
)
# Résultats
print("=== RAGAS Evaluation Results ===")
print(result)
# Convertir en DataFrame pour analyse
df = result.to_pandas()
print("\n=== Detailed Scores ===")
print(df[['question', 'faithfulness', 'answer_relevancy',
'context_precision', 'context_recall']])
# Moyennes
print("\n=== Average Scores ===")
print(f"Faithfulness: {df['faithfulness'].mean():.3f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.3f}")
print(f"Context Precision: {df['context_precision'].mean():.3f}")
print(f"Context Recall: {df['context_recall'].mean():.3f}")
# Identifier questions problématiques
low_scores = df[df['faithfulness'] < 0.7]
if not low_scores.empty:
print("\n=== Low Faithfulness Scores ===")
print(low_scores[['question', 'faithfulness']])
Optimisation Basée sur Métriques
| Métrique Faible | Problème | Solution |
|---|---|---|
| Faithfulness < 0.7 | LLM hallucine, invente infos | Prompt "stick to context", température basse, reranking |
| Answer Relevancy < 0.7 | Réponses hors-sujet | Améliorer prompt, query reformulation, better LLM |
| Context Precision < 0.6 | Trop de chunks non-pertinents | Reranking, MMR, meilleurs embeddings |
| Context Recall < 0.7 | Manque d'infos clés | Augmenter k, hybrid search, chunking adapté |
Haystack & Dify: Frameworks Alternatifs
Au-delà de LangChain et LlamaIndex, d'autres frameworks offrent des approches différentes pour construire des systèmes RAG. Cette leçon explore Haystack (pipelines modulaires) et Dify (low-code platform).
Objectifs de la Leçon
- Découvrir Haystack et son architecture pipeline-based
- Construire des RAG pipelines avec Haystack components
- Explorer Dify pour RAG low-code/no-code
- Comparer les 4 frameworks principaux et choisir selon use case
Haystack: Architecture Pipeline
HAYSTACK PIPELINE ARCHITECTURE
┌──────────────────────────────────────────────────────┐
│ HAYSTACK PIPELINE │
└──────────────────────────────────────────────────────┘
Query: "What is RAG?"
↓
┌────────────────┐
│ QueryProcessor │ → Clean, expand query
└───────┬────────┘
↓
┌────────────────┐
│ EmbeddingModel │ → Embed query
└───────┬────────┘
↓
┌────────────────┐
│ Retriever │ → Dense + Sparse retrieval
│ (BM25+Dense) │
└───────┬────────┘
↓
┌────────────────┐
│ Ranker │ → Rerank results
└───────┬────────┘
↓
┌────────────────┐
│ PromptBuilder │ → Build context
└───────┬────────┘
↓
┌────────────────┐
│ Generator │ → LLM response
└───────┬────────┘
↓
┌────────────────┐
│ AnswerBuilder │ → Format final answer
└────────────────┘
→ Chaque component est indépendant et remplaçable
Pipeline RAG Basic avec Haystack
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import HuggingFaceLocalGenerator
# 1. Create document store
document_store = InMemoryDocumentStore()
# 2. Add documents
documents = [
Document(content="RAG combines retrieval and generation for better LLM responses"),
Document(content="Vector databases store embeddings for semantic search"),
Document(content="LangChain and Haystack are popular RAG frameworks"),
]
document_store.write_documents(documents)
# 3. Create components
retriever = InMemoryBM25Retriever(document_store=document_store)
template = """
Given these documents, answer the question.
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
prompt_builder = PromptBuilder(template=template)
generator = HuggingFaceLocalGenerator(model="google/flan-t5-base")
# 4. Build pipeline
rag_pipeline = Pipeline()
rag_pipeline.add_component("retriever", retriever)
rag_pipeline.add_component("prompt_builder", prompt_builder)
rag_pipeline.add_component("llm", generator)
# Connect components
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
# 5. Run pipeline
result = rag_pipeline.run({
"retriever": {"query": "What is RAG?"},
"prompt_builder": {"question": "What is RAG?"}
})
print("Answer:", result["llm"]["replies"][0])
Dify: Platform Low-Code RAG
Features Dify
- Visual Workflow Builder: Drag-and-drop pipelines RAG
- Multi-LLM Support: OpenAI, Anthropic, local models
- Document Management: Upload PDFs, sync Google Docs, Notion
- Built-in Vector DB: Qdrant, Weaviate, Pinecone integration
- API Generation: Auto-generate REST API pour votre RAG
- Monitoring: Logs, analytics, user feedback
Comparaison des 4 Frameworks RAG
| Aspect | LangChain | LlamaIndex | Haystack | Dify |
|---|---|---|---|---|
| Approche | Chains, Agents | Data framework | Pipelines modulaires | Low-code platform |
| Learning Curve | ⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐ (très simple) |
| Flexibilité | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| Production Ready | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Best For | Apps complexes | RAG pur, prototypes | Production pipelines | Prototypes rapides |
Quiz Module 5.2: RAG Avancé
Testez vos connaissances sur le RAG avancé: LlamaIndex, multi-sources, conversationnel, Self-RAG, Graph RAG, évaluation RAGAS, et frameworks alternatifs.
Question 1: Quelle est la différence principale entre LangChain et LlamaIndex?
Question 2: Pour un RAG multi-sources (PDF + SQL + API), quelle est la meilleure approche?
Question 3: Quelle mémoire LangChain utiliser pour conversations RAG longues (50+ turns)?
Question 4: Qu'est-ce que Self-RAG améliore principalement?
Question 5: Adaptive RAG route les queries basé sur:
Question 6: Pourquoi Graph RAG est meilleur pour questions multi-hop?
Question 7: RAGAS mesure quelles 4 métriques principales?
Question 8: Qu'est-ce que Faithfulness mesure?
Question 9: Score Faithfulness < 0.7 indique:
Question 10: Haystack utilise quelle approche architecturale?
Question 11: Dify est principalement:
Question 12: Pour production enterprise avec pipelines complexes, quel framework?
Question 13: Context Precision mesure:
Question 14: Graph RAG utilise typiquement quelle base de données?
Question 15: VectorStoreIndex dans LlamaIndex est équivalent à quoi dans LangChain?
RAG pour l'Entreprise
Déployer RAG en entreprise nécessite conformité (GDPR), contrôle d'accès, audit trails, et gestion de documents internes sensibles. Cette leçon explore les patterns enterprise pour RAG sécurisé et conforme.
Objectifs de la Leçon
- Implémenter contrôle d'accès granulaire (RBAC) pour RAG
- Assurer conformité GDPR, SOC2, HIPAA
- Créer audit trails complets de toutes les queries
- Gérer documents confidentiels et classifications de sécurité
Architecture RAG Enterprise
ENTERPRISE RAG ARCHITECTURE
┌──────────────┐
│ Users │
│ (différents │
│ rôles/dept) │
└──────┬───────┘
↓
┌──────────────┐
│ Auth Layer │
│ (SSO/SAML) │
└──────┬───────┘
↓
┌──────────────┐
│ RAG Gateway │
│ + RBAC Check │
└──────┬───────┘
↓
┌─────────────┴─────────────┐
↓ ↓
┌─────────────┐ ┌─────────────┐
│ Vector DB │ │ Audit Log │
│ (filtered │ │ (queries, │
│ by ACL) │ │ answers) │
└──────┬──────┘ └─────────────┘
↓
┌─────────────┐
│ LLM + PII │
│ Filtering │
└─────────────┘
Key Features:
- Per-document access control
- Department-level filtering
- Complete audit trails
- PII detection & redaction
- Compliance reporting
Role-Based Access Control (RBAC)
# RBAC pour RAG Enterprise
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
from typing import List
import hashlib
import datetime
import json
class EnterpriseRAG:
"""RAG with RBAC and audit trails"""
def __init__(self):
self.embeddings = HuggingFaceEmbeddings(
model_name="BAAI/bge-base-en-v1.5"
)
self.vectorstore = None
# Role definitions
self.roles = {
"employee": {"departments": ["public"]},
"manager": {"departments": ["public", "internal"]},
"hr": {"departments": ["public", "internal", "hr"]},
"executive": {"departments": ["public", "internal", "hr", "executive"]},
"admin": {"departments": ["*"]}
}
def query(self, question: str, user_id: str, user_role: str):
"""Query with RBAC filtering"""
# Check user permissions
allowed_departments = self.roles.get(user_role, {}).get("departments", [])
if not allowed_departments:
return {"answer": "Access denied: Invalid role"}
# Build metadata filter
if "*" not in allowed_departments:
filter_dict = {"department": {"$in": allowed_departments}}
else:
filter_dict = None
# Retrieve with filtering
results = self.vectorstore.similarity_search(
question,
k=5,
filter=filter_dict
)
# Generate answer
answer = self._generate_answer(results, question)
# Audit log
self._log_query(user_id, user_role, question, answer, results)
return {"answer": answer, "sources": results}
def _log_query(self, user_id, user_role, question, answer, results):
"""Log query for audit trail"""
audit_entry = {
"timestamp": datetime.datetime.now().isoformat(),
"user_id": user_id,
"user_role": user_role,
"question": question,
"answer_preview": answer[:200],
"num_results": len(results)
}
with open("audit_log.jsonl", "a") as f:
f.write(json.dumps(audit_entry) + "\n")
GDPR Compliance
- Right to Access: Users peuvent demander quelles données sont indexées
- Right to Erasure: Possibilité de supprimer documents
- Data Minimization: N'indexer que données nécessaires
- Audit Trail: Logs de qui accède quoi, quand
Conformité Multi-Réglementaire
| Réglementation | Requirements Clés | Implementation RAG |
|---|---|---|
| GDPR (EU) | Droit accès, effacement, consentement | Audit logs, delete API, consent tracking |
| HIPAA (US Healthcare) | PHI encryption, access logs, BAAs | Encryption at rest/transit, RBAC, audit |
| SOC 2 | Security controls, monitoring, incidents | Access control, logging, incident response |
RAG pour le Code
RAG appliqué aux codebases permet de répondre à des questions sur l'architecture, générer de la documentation, et assister au développement. Cette leçon explore l'indexation de code, la recherche sémantique dans les repos, et les outils comme GitHub Copilot.
Objectifs de la Leçon
- Indexer des codebases entières avec chunking intelligent
- Effectuer recherche sémantique dans le code (fonctions, classes, patterns)
- Générer documentation automatique via RAG
- Créer assistants de code RAG (questions sur architecture, debugging)
Architecture RAG Code
RAG CODE PIPELINE
┌────────────────────────────────────────────┐
│ STEP 1: CODEBASE INGESTION │
│ │
│ Git Repo → Parse (AST) → Chunk by: │
│ - Functions │
│ - Classes │
│ - Modules │
│ - Docstrings │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ STEP 2: ENRICHMENT │
│ │
│ For each chunk, add metadata: │
│ - File path │
│ - Function/class signature │
│ - Dependencies (imports) │
│ - Language │
│ - Complexity metrics │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ STEP 3: EMBEDDING │
│ │
│ Use code-specific embeddings: │
│ - CodeBERT │
│ - GraphCodeBERT │
│ - UniXcoder │
│ → Store in Vector DB │
└────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────┐
│ STEP 4: RETRIEVAL + GENERATION │
│ │
│ Query: "How does auth work?" │
│ → Retrieve relevant functions │
│ → LLM synthesizes explanation │
└────────────────────────────────────────────┘
Use Cases:
- Documentation generation
- Code Q&A chatbot
- Onboarding new developers
- Finding usage examples
- Debugging assistance
Chunking Stratégique pour Code
- Function-level: Chaque fonction = 1 chunk (avec signature + docstring)
- Class-level: Classes complètes ou méthodes individuelles selon taille
- Context Window: Inclure imports et type hints
- Metadata: File path, line numbers, language, dependencies
Implementation: Indexer un Repo Python
# Indexer un codebase Python avec LangChain
import os
import ast
from pathlib import Path
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.schema import Document
from langchain.text_splitter import Language, RecursiveCharacterTextSplitter
class CodebaseIndexer:
"""Index Python codebases with function-level chunking"""
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path)
self.embeddings = HuggingFaceEmbeddings(
model_name="microsoft/codebert-base"
)
self.vectorstore = None
def parse_python_file(self, file_path: Path):
"""Parse Python file with AST"""
with open(file_path, 'r', encoding='utf-8') as f:
source = f.read()
try:
tree = ast.parse(source)
except SyntaxError:
return []
documents = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
# Extract function
func_source = ast.get_source_segment(source, node)
if func_source:
doc = Document(
page_content=func_source,
metadata={
"type": "function",
"name": node.name,
"file": str(file_path.relative_to(self.repo_path)),
"line": node.lineno,
"language": "python"
}
)
documents.append(doc)
elif isinstance(node, ast.ClassDef):
# Extract class
class_source = ast.get_source_segment(source, node)
if class_source:
doc = Document(
page_content=class_source,
metadata={
"type": "class",
"name": node.name,
"file": str(file_path.relative_to(self.repo_path)),
"line": node.lineno,
"language": "python"
}
)
documents.append(doc)
return documents
def index_repository(self):
"""Index entire repository"""
all_documents = []
# Walk through repo
for py_file in self.repo_path.rglob("*.py"):
if ".venv" in str(py_file) or "__pycache__" in str(py_file):
continue
docs = self.parse_python_file(py_file)
all_documents.extend(docs)
print(f"Indexed {len(all_documents)} code chunks")
# Create vectorstore
self.vectorstore = Chroma.from_documents(
documents=all_documents,
embedding=self.embeddings,
persist_directory="./chroma_code_db"
)
self.vectorstore.persist()
return self.vectorstore
# Usage
indexer = CodebaseIndexer("/path/to/your/repo")
vectorstore = indexer.index_repository()
# Query
results = vectorstore.similarity_search(
"How is authentication implemented?",
k=5
)
for doc in results:
print(f"File: {doc.metadata['file']}")
print(f"Type: {doc.metadata['type']}")
print(f"Name: {doc.metadata['name']}")
print(f"Code:\n{doc.page_content[:200]}...\n")
Code-Specific Embeddings
| Model | Specialization | Best For |
|---|---|---|
| CodeBERT | Pre-trained on code + NL | Code search, documentation matching |
| GraphCodeBERT | Understands code structure (AST) | Semantic code search, clone detection |
| UniXcoder | Multi-language (6 languages) | Cross-language code search |
| CodeT5 | Code generation + understanding | Code summarization, generation |
RAG Code Q&A System
# Système Q&A sur codebase
from langchain.chat_models import ChatOpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
# Custom prompt pour code
code_qa_template = """You are a code expert assistant. Use the following code snippets to answer the question.
Code Context:
{context}
Question: {question}
Provide:
1. A clear explanation
2. Reference to relevant files/functions
3. Code examples if applicable
Answer:"""
PROMPT = PromptTemplate(
template=code_qa_template,
input_variables=["context", "question"]
)
# Create QA chain
llm = ChatOpenAI(model="gpt-4", temperature=0)
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
chain_type_kwargs={"prompt": PROMPT}
)
# Example queries
questions = [
"How is user authentication implemented?",
"Show me all database models",
"How does the API rate limiting work?",
"Where is error handling done?",
]
for question in questions:
print(f"\nQ: {question}")
answer = qa_chain.run(question)
print(f"A: {answer}\n")
Documentation Auto-Generation
# Générer documentation avec RAG
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate
class DocumentationGenerator:
"""Generate docs from codebase using RAG"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def generate_module_docs(self, module_name: str):
"""Generate documentation for a module"""
# Retrieve all code from module
results = self.vectorstore.similarity_search(
module_name,
k=20,
filter={"file": {"$regex": f"^{module_name}"}}
)
# Aggregate code
all_code = "\n\n".join([doc.page_content for doc in results])
# Generate documentation
prompt = PromptTemplate(
template="""Generate comprehensive documentation for this module.
Module Code:
{code}
Generate:
1. Overview (what the module does)
2. Key Classes and Functions
3. Usage Examples
4. Dependencies
Documentation:""",
input_variables=["code"]
)
chain = LLMChain(llm=self.llm, prompt=prompt)
docs = chain.run(code=all_code[:8000]) # Limit context
return docs
# Usage
doc_gen = DocumentationGenerator(vectorstore, llm)
auth_docs = doc_gen.generate_module_docs("authentication")
print(auth_docs)
Advanced: Graph RAG pour Code
CODE KNOWLEDGE GRAPH
┌─────────────┐
│ User.py │
│ class User │
└──────┬──────┘
│ imports
↓
┌─────────────┐ calls ┌──────────────┐
│ auth.py │─────────────────────→ │ database.py │
│ login() │ │ query() │
└──────┬──────┘ └──────────────┘
│ calls
↓
┌─────────────┐
│ token.py │
│ generate() │
└─────────────┘
Query: "What happens when user logs in?"
→ Traverse graph: login() → token.generate() → database.query()
→ Retrieve all 3 functions
→ LLM explains full flow
RAG en Temps Réel
RAG classique fonctionne sur données statiques. Pour les cas d'usage nécessitant données actualisées (support client, monitoring, news), il faut streaming ingestion et updates continus. Cette leçon couvre real-time RAG avec webhooks, CDC, et incremental indexing.
Objectifs de la Leçon
- Implementer continuous ingestion de nouvelles données
- Utiliser CDC (Change Data Capture) pour updates auto
- Créer pipelines streaming avec Kafka/RabbitMQ
- Gérer incremental indexing sans reindex complet
Architecture Real-Time RAG
REAL-TIME RAG ARCHITECTURE
┌──────────────┐ Webhook ┌──────────────┐
│ Data Source │─────────────────→│ Ingestion │
│ (CRM, Docs, │ │ Service │
│ Support) │ └──────┬───────┘
└──────────────┘ ↓
┌──────────────┐
┌──────────────┐ CDC │ Message │
│ Database │─────────────────→│ Queue │
│ (Postgres) │ (Debezium) │ (Kafka/RMQ) │
└──────────────┘ └──────┬───────┘
↓
┌──────────────┐ RSS/API ┌──────────────┐
│ External │─────────────────→│ Embedding │
│ (News, Web) │ Poll │ Worker │
└──────────────┘ └──────┬───────┘
↓
┌──────────────┐
│ Vector DB │
│ (Qdrant) │
│ + Incremental│
│ Updates │
└──────────────┘
Features:
- <5 min latency from source to searchable
- No full reindex required
- Automatic updates
- Handles high throughput (1000s docs/min)
Continuous Ingestion avec Webhooks
# Real-time ingestion endpoint
from fastapi import FastAPI, BackgroundTasks
from langchain.embeddings import HuggingFaceEmbeddings
from qdrant_client import QdrantClient
from qdrant_client.models import PointStruct, VectorParams, Distance
import uuid
from datetime import datetime
app = FastAPI()
# Initialize clients
embeddings = HuggingFaceEmbeddings(model_name="BAAI/bge-base-en-v1.5")
qdrant = QdrantClient(host="localhost", port=6333)
# Ensure collection exists
try:
qdrant.create_collection(
collection_name="realtime_docs",
vectors_config=VectorParams(size=768, distance=Distance.COSINE)
)
except:
pass
def process_and_index(doc_id: str, content: str, metadata: dict):
"""Process and index document asynchronously"""
# Generate embedding
vector = embeddings.embed_query(content)
# Create point
point = PointStruct(
id=doc_id,
vector=vector,
payload={
"content": content,
"metadata": metadata,
"indexed_at": datetime.now().isoformat()
}
)
# Upsert to Qdrant (idempotent)
qdrant.upsert(
collection_name="realtime_docs",
points=[point]
)
print(f"Indexed document {doc_id}")
@app.post("/webhook/new-document")
async def webhook_new_document(
doc_id: str,
content: str,
metadata: dict,
background_tasks: BackgroundTasks
):
"""Webhook endpoint for new documents"""
# Process asynchronously
background_tasks.add_task(process_and_index, doc_id, content, metadata)
return {"status": "queued", "doc_id": doc_id}
@app.post("/webhook/update-document")
async def webhook_update_document(
doc_id: str,
content: str,
metadata: dict,
background_tasks: BackgroundTasks
):
"""Webhook for document updates"""
# Upsert (overwrites if exists)
background_tasks.add_task(process_and_index, doc_id, content, metadata)
return {"status": "queued", "doc_id": doc_id}
@app.delete("/webhook/delete-document/{doc_id}")
async def webhook_delete_document(doc_id: str):
"""Webhook for document deletion"""
qdrant.delete(
collection_name="realtime_docs",
points_selector=[doc_id]
)
return {"status": "deleted", "doc_id": doc_id}
# Run: uvicorn realtime_rag:app --reload
Change Data Capture (CDC) avec Debezium
- Zero code changes: Pas besoin modifier app existante
- Real-time sync: Updates reflétés en <5 secondes
- Guaranteed delivery: Pas de data loss (transaction log)
- Scalable: Supporte millions d'events/jour
# CDC consumer with Kafka
from kafka import KafkaConsumer
import json
# Debezium CDC config (docker-compose.yml)
"""
services:
debezium:
image: debezium/connect:2.4
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=connect_configs
- OFFSET_STORAGE_TOPIC=connect_offsets
# Connect to Postgres
# Connector config sends CDC events to Kafka topic
"""
# Consumer for CDC events
consumer = KafkaConsumer(
'dbserver1.public.documents', # Debezium topic
bootstrap_servers='localhost:9092',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def handle_cdc_event(event):
"""Process CDC event and update RAG"""
op = event['payload']['op'] # c=create, u=update, d=delete
after = event['payload'].get('after', {})
if op in ['c', 'u']: # Create or Update
doc_id = str(after['id'])
content = after['content']
metadata = {
"title": after.get('title'),
"category": after.get('category')
}
# Index document
process_and_index(doc_id, content, metadata)
elif op == 'd': # Delete
doc_id = str(event['payload']['before']['id'])
# Remove from vector DB
qdrant.delete(
collection_name="realtime_docs",
points_selector=[doc_id]
)
# Consume events
for message in consumer:
event = message.value
handle_cdc_event(event)
Streaming Pipeline avec RabbitMQ
# Producer: Enqueue documents for indexing
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='documents_to_index', durable=True)
def enqueue_document(doc_id, content, metadata):
"""Add document to indexing queue"""
message = {
"doc_id": doc_id,
"content": content,
"metadata": metadata
}
channel.basic_publish(
exchange='',
routing_key='documents_to_index',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Persistent
)
# Consumer: Process queue
def callback(ch, method, properties, body):
"""Process document from queue"""
data = json.loads(body)
try:
process_and_index(
data['doc_id'],
data['content'],
data['metadata']
)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='documents_to_index', on_message_callback=callback)
print('Waiting for documents...')
channel.start_consuming()
Incremental Indexing
| Strategy | Use Case | Latency |
|---|---|---|
| Upsert | Updates fréquents (docs existants modifiés) | <1 sec |
| Append-only | Nouveaux docs uniquement (logs, articles) | <500ms |
| Batch micro-batching | High throughput (1000s docs/min) | 5-30 sec |
| Time-based partitioning | Time-series data (news, monitoring) | <5 sec |
Monitoring Real-Time RAG
# Monitoring metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Metrics
docs_indexed = Counter('documents_indexed_total', 'Total documents indexed')
indexing_latency = Histogram('indexing_latency_seconds', 'Time to index document')
queue_size = Gauge('indexing_queue_size', 'Current queue size')
def monitored_index(doc_id, content, metadata):
"""Index with monitoring"""
with indexing_latency.time():
process_and_index(doc_id, content, metadata)
docs_indexed.inc()
# Start metrics server
start_http_server(8000)
# Metrics available at http://localhost:8000/metrics
Optimisation de Performance
Production RAG doit être rapide (<1 sec end-to-end), scalable (1000s requêtes/sec), et cost-efficient. Cette leçon couvre caching, query optimization, batch processing, hardware acceleration, et techniques pour atteindre <100ms P95 latency.
Objectifs de la Leçon
- Réduire latency retrieval à <50ms avec indexing optimal
- Implementer caching multi-layer (semantic + exact match)
- Utiliser batch processing pour throughput élevé
- Optimiser coûts LLM avec caching et prompt compression
Performance Bottlenecks RAG
RAG LATENCY BREAKDOWN (typical)
┌──────────────────────────────────────────────┐
│ TOTAL: ~2000ms │
│ │
│ ┌────────────────────────────┐ 50ms │
│ │ Query Embedding │ │
│ └────────────────────────────┘ │
│ │
│ ┌────────────────────────────┐ 100ms │
│ │ Vector Search │ │
│ └────────────────────────────┘ │
│ │
│ ┌────────────────────────────┐ 1800ms ⚠️ │
│ │ LLM Generation │ │
│ └────────────────────────────┘ │
│ │
│ ┌────────────────────────────┐ 50ms │
│ │ Network/Overhead │ │
│ └────────────────────────────┘ │
└──────────────────────────────────────────────┘
OPTIMIZED:
┌──────────────────────────────────────────────┐
│ TOTAL: ~400ms ✅ │
│ │
│ ┌──────┐ 10ms (batch embedding) │
│ │ Embed│ │
│ └──────┘ │
│ │
│ ┌──────┐ 30ms (ANN + GPU) │
│ │Search│ │
│ └──────┘ │
│ │
│ ┌─────────────────────┐ 350ms (cached/vLLM)│
│ │ LLM Generation │ │
│ └─────────────────────┘ │
│ │
│ ┌──┐ 10ms │
│ │OH│ │
│ └──┘ │
└──────────────────────────────────────────────┘
Optimization 1: Caching Multi-Layer
# Multi-layer caching for RAG
import hashlib
from functools import lru_cache
import redis
import json
# Initialize Redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
class CachedRAG:
"""RAG with multi-layer caching"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def _query_hash(self, query: str) -> str:
"""Generate cache key"""
return hashlib.md5(query.lower().encode()).hexdigest()
@lru_cache(maxsize=1000)
def _embed_query_cached(self, query: str):
"""Cache embeddings in memory"""
return self.embeddings.embed_query(query)
def query_with_cache(self, query: str):
"""Query with 3-level cache"""
cache_key = self._query_hash(query)
# Level 1: Exact match cache (Redis)
cached_response = redis_client.get(f"rag:exact:{cache_key}")
if cached_response:
print("✅ Cache HIT (exact)")
return json.loads(cached_response)
# Level 2: Semantic cache (similar queries)
similar_query = self._find_similar_cached_query(query)
if similar_query:
cached_response = redis_client.get(f"rag:exact:{self._query_hash(similar_query)}")
if cached_response:
print("✅ Cache HIT (semantic)")
return json.loads(cached_response)
# Level 3: No cache - full RAG
print("❌ Cache MISS - running full RAG")
# Retrieve (with cached embeddings)
query_embedding = self._embed_query_cached(query)
results = self.vectorstore.similarity_search_by_vector(
query_embedding,
k=5
)
# Generate
context = "\n\n".join([doc.page_content for doc in results])
answer = self.llm.predict(
f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer:"
)
response = {
"answer": answer,
"sources": [doc.metadata for doc in results]
}
# Cache response (TTL 1 hour)
redis_client.setex(
f"rag:exact:{cache_key}",
3600,
json.dumps(response)
)
return response
def _find_similar_cached_query(self, query: str, threshold=0.9):
"""Find semantically similar cached query"""
# TODO: Implement semantic similarity check
# Could use separate vector DB for cached queries
return None
Optimization 2: Vector Search Acceleration
| Technique | Speedup | Trade-off |
|---|---|---|
| HNSW index | 10-100x vs brute-force | Slight recall loss (~98%) |
| Quantization (PQ) | 4-8x memory reduction | Small accuracy loss |
| GPU acceleration | 5-10x vs CPU | Cost (GPU instances) |
| Sharding | Linear with shards | Infrastructure complexity |
# Qdrant with optimized indexing
from qdrant_client import QdrantClient
from qdrant_client.models import (
VectorParams,
Distance,
HnswConfigDiff,
OptimizersConfigDiff,
QuantizationConfig,
ScalarQuantization,
ScalarType
)
client = QdrantClient(host="localhost", port=6333)
# Create collection with optimizations
client.create_collection(
collection_name="optimized_docs",
vectors_config=VectorParams(
size=768,
distance=Distance.COSINE,
# HNSW parameters
hnsw_config=HnswConfigDiff(
m=16, # Connections per layer (higher = better recall)
ef_construct=100, # Build-time search depth
)
),
optimizers_config=OptimizersConfigDiff(
indexing_threshold=10000, # Index after 10k vectors
),
quantization_config=ScalarQuantization(
scalar=ScalarType.INT8, # Quantize to int8 (4x compression)
quantile=0.99,
always_ram=True # Keep quantized vectors in RAM
)
)
# Search with tuned ef parameter
results = client.search(
collection_name="optimized_docs",
query_vector=query_embedding,
limit=5,
search_params={"hnsw_ef": 128} # Search-time depth (higher = better recall)
)
# Benchmark: 50ms → 15ms on 1M vectors
Optimization 3: LLM Acceleration
- vLLM: Continuous batching pour 10-20x throughput
- Flash Attention: 2-4x faster generation
- Quantization (AWQ/GPTQ): Run 70B models sur single GPU
- Speculative Decoding: 2x faster avec small draft model
- Prompt Caching: Cache system prompts (GPT-4 Turbo)
# vLLM for high-throughput inference
from vllm import LLM, SamplingParams
# Initialize vLLM (loads model once)
llm = LLM(
model="meta-llama/Llama-2-13b-chat-hf",
tensor_parallel_size=2, # Multi-GPU
max_num_batched_tokens=8192, # Large batch
)
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=256
)
# Batch inference (10-20x faster than sequential)
prompts = [
f"Context: {doc1}\n\nQuestion: {q1}\n\nAnswer:",
f"Context: {doc2}\n\nQuestion: {q2}\n\nAnswer:",
# ... hundreds of prompts
]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(output.outputs[0].text)
# Throughput: 2000 tokens/sec vs 100 tokens/sec (HF)
Optimization 4: Batch Processing
# Batch embedding generation
import numpy as np
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-base-en-v1.5')
# Instead of encoding one by one:
# for doc in documents:
# embedding = model.encode(doc) # SLOW
# Batch encode (10x faster)
documents = [doc.page_content for doc in all_docs]
embeddings = model.encode(
documents,
batch_size=128, # Large batch
show_progress_bar=True,
normalize_embeddings=True,
device='cuda' # GPU acceleration
)
# Result: 1000 docs/sec vs 100 docs/sec
Performance Benchmarking
# Benchmark RAG performance
import time
import statistics
def benchmark_rag(rag_system, test_queries, num_runs=10):
"""Comprehensive RAG benchmarking"""
results = {
"retrieval_latency": [],
"generation_latency": [],
"total_latency": [],
"cache_hit_rate": 0
}
cache_hits = 0
for _ in range(num_runs):
for query in test_queries:
# Time retrieval
t0 = time.time()
docs = rag_system.retrieve(query)
retrieval_time = (time.time() - t0) * 1000 # ms
# Time generation
t0 = time.time()
answer = rag_system.generate(query, docs)
generation_time = (time.time() - t0) * 1000
results["retrieval_latency"].append(retrieval_time)
results["generation_latency"].append(generation_time)
results["total_latency"].append(retrieval_time + generation_time)
# Calculate statistics
print("=== RAG Performance Report ===")
print(f"Retrieval P50: {statistics.median(results['retrieval_latency']):.2f}ms")
print(f"Retrieval P95: {sorted(results['retrieval_latency'])[int(len(results['retrieval_latency']) * 0.95)]:.2f}ms")
print(f"Generation P50: {statistics.median(results['generation_latency']):.2f}ms")
print(f"Generation P95: {sorted(results['generation_latency'])[int(len(results['generation_latency']) * 0.95)]:.2f}ms")
print(f"Total P50: {statistics.median(results['total_latency']):.2f}ms")
print(f"Total P95: {sorted(results['total_latency'])[int(len(results['total_latency']) * 0.95)]:.2f}ms")
return results
# Run benchmark
test_queries = [
"What is the return policy?",
"How do I reset my password?",
# ... add 50+ realistic queries
]
results = benchmark_rag(rag_system, test_queries)
Cost Optimization
| Technique | Cost Saving | Implementation |
|---|---|---|
| Prompt Caching | 50% (GPT-4 Turbo) | Cache system/context prompts |
| Smaller Model | 90% (GPT-3.5 vs GPT-4) | Use GPT-3.5 pour simple queries |
| Self-Hosted | 70-90% | Llama 2 on AWS/GCP GPUs |
| Batch API | 50% (OpenAI Batch) | Non-realtime workloads |
Sécurité & Guardrails
Production RAG expose à des risques: data poisoning, prompt injection, PII leakage, jailbreaks. Cette leçon couvre hardening de RAG systems avec input validation, output filtering, guardrails, et security best practices pour deployment sécurisé.
Objectifs de la Leçon
- Identifier et mitiger attaques RAG (poisoning, injection)
- Implementer guardrails pour output safety
- Détecter et redact PII automatiquement
- Créer security monitoring et alerting
Threat Model pour RAG
RAG SECURITY THREATS
┌─────────────────────────────────────────────┐
│ 1. DATA POISONING │
│ Attacker injects malicious documents │
│ → RAG retrieves & uses poisoned content │
│ → LLM generates harmful/false answers │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ 2. PROMPT INJECTION │
│ Attacker crafts query to manipulate LLM │
│ "Ignore previous instructions and..." │
│ → Bypasses guardrails │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ 3. PII LEAKAGE │
│ Sensitive data indexed (SSN, emails) │
│ → Retrieval exposes PII │
│ → GDPR/CCPA violations │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ 4. JAILBREAKING │
│ Query designed to bypass content filters │
│ → LLM generates restricted content │
│ → Harmful outputs │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────┐
│ 5. CONTEXT CONFUSION │
│ Retrieved docs contain contradictions │
│ → LLM hallucinates or gives wrong answer │
│ → Misinformation spread │
└─────────────────────────────────────────────┘
Defense 1: Input Validation & Sanitization
# Input validation for RAG
import re
from typing import Optional
class InputValidator:
"""Validate and sanitize user queries"""
# Prompt injection patterns
INJECTION_PATTERNS = [
r"ignore previous instructions",
r"disregard.*previous",
r"forget.*instructions",
r"new instructions:",
r"system prompt:",
r"you are now",
r"