#
# Copyright RODMENA LIMITED 2025
# SPDX-License-Identifier: Apache-2.0
#
"""
Ragit Experiment - Core RAG optimization engine.
This module provides the main experiment class for optimizing RAG hyperparameters.
"""
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from itertools import product
from typing import Any
import numpy as np
from tqdm import tqdm
from ragit.core.experiment.results import EvaluationResult
from ragit.providers.base import BaseEmbeddingProvider, BaseLLMProvider
from ragit.providers.function_adapter import FunctionProvider
@dataclass
class RAGConfig:
"""Configuration for a RAG pattern."""
name: str
chunk_size: int
chunk_overlap: int
num_chunks: int # Number of chunks to retrieve
embedding_model: str
llm_model: str
[docs]
@dataclass
class Document:
"""A document in the knowledge base."""
id: str
content: str
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class Chunk:
"""A document chunk with optional rich metadata.
Metadata can include:
- document_id: SHA256 hash for deduplication and window search
- sequence_number: Order within the document
- chunk_start/chunk_end: Character positions in original text
"""
content: str
doc_id: str
chunk_index: int
embedding: tuple[float, ...] | list[float] | None = None
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class BenchmarkQuestion:
"""A benchmark question for evaluation."""
question: str
ground_truth: str
relevant_doc_ids: list[str] = field(default_factory=list)
@dataclass
class EvaluationScores:
"""Scores from evaluating a RAG response."""
answer_correctness: float
context_relevance: float
faithfulness: float
@property
def combined_score(self) -> float:
"""Combined score (weighted average)."""
return 0.4 * self.answer_correctness + 0.3 * self.context_relevance + 0.3 * self.faithfulness
[docs]
class SimpleVectorStore:
"""Simple in-memory vector store with pre-normalized embeddings for fast search.
Note: This class is NOT thread-safe.
"""
[docs]
def __init__(self) -> None:
self.chunks: list[Chunk] = []
self._embedding_matrix: np.ndarray[Any, np.dtype[np.float64]] | None = None # Pre-normalized
[docs]
def add(self, chunks: list[Chunk]) -> None:
"""Add chunks to the store and rebuild pre-normalized embedding matrix."""
self.chunks.extend(chunks)
self._rebuild_matrix()
def _rebuild_matrix(self) -> None:
"""Rebuild and pre-normalize the embedding matrix from chunks."""
embeddings = [c.embedding for c in self.chunks if c.embedding is not None]
if embeddings:
matrix = np.array(embeddings, dtype=np.float64)
# Pre-normalize for fast cosine similarity
norms = np.linalg.norm(matrix, axis=1, keepdims=True)
norms[norms == 0] = 1 # Avoid division by zero
self._embedding_matrix = matrix / norms
else:
self._embedding_matrix = None
[docs]
def clear(self) -> None:
"""Clear all chunks."""
self.chunks = []
self._embedding_matrix = None
[docs]
def search(self, query_embedding: tuple[float, ...] | list[float], top_k: int = 5) -> list[tuple[Chunk, float]]:
"""Search for similar chunks using pre-normalized cosine similarity."""
if not self.chunks or self._embedding_matrix is None:
return []
# Normalize query vector
query_vec = np.array(query_embedding, dtype=np.float64)
query_norm = np.linalg.norm(query_vec)
if query_norm == 0:
return []
query_normalized = query_vec / query_norm
# Fast cosine similarity: matrix is pre-normalized, just dot product
similarities = self._embedding_matrix @ query_normalized
# Get top_k indices efficiently
if len(similarities) <= top_k:
top_indices = np.argsort(similarities)[::-1]
else:
top_indices = np.argpartition(similarities, -top_k)[-top_k:]
top_indices = top_indices[np.argsort(similarities[top_indices])[::-1]]
return [(self.chunks[i], float(similarities[i])) for i in top_indices]
[docs]
class RagitExperiment:
"""
Ragit Experiment - Automatic RAG Hyperparameter Optimization.
This class orchestrates the optimization of RAG pipeline hyperparameters
by systematically evaluating different configurations.
Parameters
----------
documents : list[Document]
Documents to use as the knowledge base.
benchmark : list[BenchmarkQuestion]
Benchmark questions for evaluation.
embed_fn : Callable[[str], list[float]], optional
Function that takes text and returns an embedding vector.
generate_fn : Callable, optional
Function for text generation.
provider : BaseEmbeddingProvider, optional
Provider for embeddings and LLM. If embed_fn is provided, this is
ignored for embeddings but can be used for LLM.
Raises
------
ValueError
If neither embed_fn nor provider is provided.
Examples
--------
>>> # With custom functions
>>> experiment = RagitExperiment(docs, benchmark, embed_fn=my_embed, generate_fn=my_llm)
>>>
>>> # With explicit provider
>>> from ragit.providers import OllamaProvider
>>> experiment = RagitExperiment(docs, benchmark, provider=OllamaProvider())
>>>
>>> results = experiment.run()
>>> print(results[0].config) # Best configuration
"""
[docs]
def __init__(
self,
documents: list[Document],
benchmark: list[BenchmarkQuestion],
embed_fn: Callable[[str], list[float]] | None = None,
generate_fn: Callable[..., str] | None = None,
provider: BaseEmbeddingProvider | BaseLLMProvider | None = None,
):
self.documents = documents
self.benchmark = benchmark
self.vector_store = SimpleVectorStore()
self.results: list[EvaluationResult] = []
# Resolve provider from functions or explicit provider
self._embedding_provider: BaseEmbeddingProvider
self._llm_provider: BaseLLMProvider | None = None
if embed_fn is not None:
# Create FunctionProvider from provided functions
function_provider = FunctionProvider(
embed_fn=embed_fn,
generate_fn=generate_fn,
)
self._embedding_provider = function_provider
if generate_fn is not None:
self._llm_provider = function_provider
elif provider is not None and isinstance(provider, BaseLLMProvider):
self._llm_provider = provider
elif provider is not None:
if not isinstance(provider, BaseEmbeddingProvider):
raise ValueError(
"Provider must implement BaseEmbeddingProvider for embeddings. Alternatively, provide embed_fn."
)
self._embedding_provider = provider
if isinstance(provider, BaseLLMProvider):
self._llm_provider = provider
else:
raise ValueError(
"Must provide embed_fn or provider for embeddings. "
"Examples:\n"
" RagitExperiment(docs, benchmark, embed_fn=my_embed, generate_fn=my_llm)\n"
" RagitExperiment(docs, benchmark, provider=OllamaProvider())"
)
# LLM is required for evaluation
if self._llm_provider is None:
raise ValueError(
"RagitExperiment requires LLM for evaluation. Provide generate_fn or a provider with LLM support."
)
@property
def provider(self) -> BaseEmbeddingProvider:
"""Return the embedding provider (for backwards compatibility)."""
return self._embedding_provider
[docs]
def define_search_space(
self,
chunk_sizes: list[int] | None = None,
chunk_overlaps: list[int] | None = None,
num_chunks_options: list[int] | None = None,
embedding_models: list[str] | None = None,
llm_models: list[str] | None = None,
) -> list[RAGConfig]:
"""
Define the hyperparameter search space.
Parameters
----------
chunk_sizes : list[int], optional
Chunk sizes to test. Default: [256, 512]
chunk_overlaps : list[int], optional
Chunk overlaps to test. Default: [50, 100]
num_chunks_options : list[int], optional
Number of chunks to retrieve. Default: [2, 3]
embedding_models : list[str], optional
Embedding models to test. Default: ["default"]
llm_models : list[str], optional
LLM models to test. Default: ["default"]
Returns
-------
list[RAGConfig]
List of configurations to evaluate.
"""
chunk_sizes = chunk_sizes or [256, 512]
chunk_overlaps = chunk_overlaps or [50, 100]
num_chunks_options = num_chunks_options or [2, 3]
embedding_models = embedding_models or ["default"]
llm_models = llm_models or ["default"]
configs = []
pattern_num = 1
for cs, co, nc, em, lm in product(
chunk_sizes, chunk_overlaps, num_chunks_options, embedding_models, llm_models
):
# Ensure overlap is less than chunk size
if co >= cs:
continue
configs.append(
RAGConfig(
name=f"Pattern_{pattern_num}",
chunk_size=cs,
chunk_overlap=co,
num_chunks=nc,
embedding_model=em,
llm_model=lm,
)
)
pattern_num += 1
return configs
def _chunk_document(self, doc: Document, chunk_size: int, overlap: int) -> list[Chunk]:
"""Split document into overlapping chunks."""
chunks = []
text = doc.content
start = 0
chunk_idx = 0
while start < len(text):
end = start + chunk_size
chunk_text = text[start:end].strip()
if chunk_text:
chunks.append(
Chunk(
content=chunk_text,
doc_id=doc.id,
chunk_index=chunk_idx,
)
)
chunk_idx += 1
start = end - overlap
if start >= len(text) - overlap:
break
return chunks
def _build_index(self, config: RAGConfig) -> None:
"""Build vector index with given configuration using batch embedding."""
self.vector_store.clear()
all_chunks: list[Chunk] = []
# Chunk all documents
for doc in self.documents:
chunks = self._chunk_document(doc, config.chunk_size, config.chunk_overlap)
all_chunks.extend(chunks)
if not all_chunks:
return
# Batch embed all chunks at once (single API call)
texts = [chunk.content for chunk in all_chunks]
responses = self._embedding_provider.embed_batch(texts, config.embedding_model)
for chunk, response in zip(all_chunks, responses, strict=True):
chunk.embedding = response.embedding
self.vector_store.add(all_chunks)
def _retrieve(self, query: str, config: RAGConfig) -> list[Chunk]:
"""Retrieve relevant chunks for a query."""
query_response = self._embedding_provider.embed(query, config.embedding_model)
results = self.vector_store.search(query_response.embedding, top_k=config.num_chunks)
return [chunk for chunk, _ in results]
def _generate(self, question: str, context: str, config: RAGConfig) -> str:
"""Generate answer using RAG."""
if self._llm_provider is None:
raise ValueError("LLM provider is required for generation")
system_prompt = """You are a helpful assistant. Answer questions based ONLY on the provided context.
If the context doesn't contain enough information, say so. Be concise and accurate."""
prompt = f"""Context:
{context}
Question: {question}
Answer:"""
response = self._llm_provider.generate(
prompt=prompt,
model=config.llm_model,
system_prompt=system_prompt,
temperature=0.7,
)
return response.text
def _evaluate_response(
self,
question: str,
generated: str,
ground_truth: str,
context: str,
config: RAGConfig,
) -> EvaluationScores:
"""Evaluate a RAG response using LLM-as-judge."""
if self._llm_provider is None:
raise ValueError("LLM provider is required for evaluation")
def extract_score(response: str) -> float:
"""Extract numeric score from LLM response."""
try:
# Find first number in response
nums = "".join(c for c in response if c.isdigit() or c == ".")
if nums:
score = float(nums.split(".")[0]) # Take integer part
return min(100, max(0, score)) / 100
except (ValueError, IndexError):
pass
return 0.5
# Evaluate answer correctness
correctness_prompt = f"""Rate how correct this answer is compared to ground truth (0-100):
Question: {question}
Ground Truth: {ground_truth}
Generated Answer: {generated}
Respond with ONLY a number 0-100."""
resp = self._llm_provider.generate(correctness_prompt, config.llm_model)
correctness = extract_score(resp.text)
# Evaluate context relevance
relevance_prompt = f"""Rate how relevant this context is for answering the question (0-100):
Question: {question}
Context: {context[:1000]}
Respond with ONLY a number 0-100."""
resp = self._llm_provider.generate(relevance_prompt, config.llm_model)
relevance = extract_score(resp.text)
# Evaluate faithfulness
faithfulness_prompt = f"""Rate if this answer is grounded in the context (0-100):
Context: {context[:1000]}
Answer: {generated}
Respond with ONLY a number 0-100."""
resp = self._llm_provider.generate(faithfulness_prompt, config.llm_model)
faithfulness = extract_score(resp.text)
return EvaluationScores(
answer_correctness=correctness,
context_relevance=relevance,
faithfulness=faithfulness,
)
[docs]
def evaluate_config(self, config: RAGConfig, verbose: bool = False) -> EvaluationResult:
"""
Evaluate a single RAG configuration.
Parameters
----------
config : RAGConfig
Configuration to evaluate.
verbose : bool
Print progress information.
Returns
-------
EvaluationResult
Evaluation results for this configuration.
"""
if verbose:
print(f"\nEvaluating {config.name}:")
print(f" chunk_size={config.chunk_size}, overlap={config.chunk_overlap}, num_chunks={config.num_chunks}")
start_time = time.time()
# Build index
self._build_index(config)
# Evaluate on benchmark
all_scores = []
for qa in self.benchmark:
# Retrieve
chunks = self._retrieve(qa.question, config)
context = "\n\n".join(f"[{c.doc_id}]: {c.content}" for c in chunks)
# Generate
answer = self._generate(qa.question, context, config)
# Evaluate
scores = self._evaluate_response(qa.question, answer, qa.ground_truth, context, config)
all_scores.append(scores)
# Aggregate scores (use generators for memory efficiency)
avg_correctness = np.mean([s.answer_correctness for s in all_scores])
avg_relevance = np.mean([s.context_relevance for s in all_scores])
avg_faithfulness = np.mean([s.faithfulness for s in all_scores])
combined = float(np.mean([s.combined_score for s in all_scores]))
execution_time = time.time() - start_time
if verbose:
print(
f" Scores: correctness={avg_correctness:.2f}, "
f"relevance={avg_relevance:.2f}, faithfulness={avg_faithfulness:.2f}"
)
print(f" Combined: {combined:.3f} | Time: {execution_time:.1f}s")
return EvaluationResult(
pattern_name=config.name,
indexing_params={
"chunk_size": config.chunk_size,
"chunk_overlap": config.chunk_overlap,
"embedding_model": config.embedding_model,
},
inference_params={
"num_chunks": config.num_chunks,
"llm_model": config.llm_model,
},
scores={
"answer_correctness": {"mean": float(avg_correctness)},
"context_relevance": {"mean": float(avg_relevance)},
"faithfulness": {"mean": float(avg_faithfulness)},
},
execution_time=execution_time,
final_score=float(combined),
)
[docs]
def run(
self,
configs: list[RAGConfig] | None = None,
max_configs: int | None = None,
verbose: bool = True,
) -> list[EvaluationResult]:
"""
Run the RAG optimization experiment.
Parameters
----------
configs : list[RAGConfig], optional
Configurations to evaluate. If None, uses default search space.
max_configs : int, optional
Maximum number of configurations to evaluate.
verbose : bool
Print progress information.
Returns
-------
list[EvaluationResult]
Results sorted by combined score (best first).
"""
if configs is None:
configs = self.define_search_space()
if max_configs:
configs = configs[:max_configs]
if verbose:
print("=" * 60)
print("RAGIT: RAG Optimization Experiment")
print("=" * 60)
print(f"Configurations to test: {len(configs)}")
print(f"Documents: {len(self.documents)}")
print(f"Benchmark questions: {len(self.benchmark)}")
print()
self.results = []
for cfg in tqdm(configs, desc="Evaluating configs", disable=not verbose):
result = self.evaluate_config(cfg, verbose=verbose)
self.results.append(result)
# Sort by combined score (best first)
self.results.sort(key=lambda x: x.final_score, reverse=True)
if verbose:
print("\n" + "=" * 60)
print("RESULTS (sorted by score)")
print("=" * 60)
for i, result in enumerate(self.results[:5], 1):
print(f"{i}. {result.pattern_name}: {result.final_score:.3f}")
print(
f" chunk_size={result.indexing_params['chunk_size']}, "
f"num_chunks={result.inference_params['num_chunks']}"
)
return self.results
[docs]
def get_best_config(self) -> EvaluationResult | None:
"""Get the best configuration from results."""
if not self.results:
return None
return self.results[0]