Data Ingestion and Legal Normalization
The ingestion pipeline is built around resilience, scalability, and domain sensitivity. Documents are streamed from MongoDB using a generator pattern to prevent memory exhaustion. Each document undergoes structured parsing, particularly in the case of legal acts where nested hierarchies (chapters, sections, amendments, schedules) must be recursively extracted.
Text normalization includes:
- Preeti-to-Unicode transcoding
- Unicode NFC normalization
- Mixed-language stemming
- ZWJ-aware tokenization
- Regex-based cleaning
A domain-specific flattening engine interprets DSL-like metadata paths and extracts nested legal fields for indexing.
The ingestion system supports re-entrancy. Before pushing embeddings to Qdrant, it scrolls existing document IDs and skips already indexed content. This delta-aware logic guarantees idempotent indexing and efficient updates.
Failures during ingestion are isolated per-document, preventing pipeline termination. Retries with exponential backoff protect against transient AI parsing failures.
Example in Practice: Flattening Legal Metadata and Streaming
from nep_search.utils import FlattenSpec, FlattenEngine
# 1. DSL extraction for structured JSON acts
# (Grabbing chapters, nested legislative items, schedules)
spec = FlattenSpec(ACT_SPEC_LIST)
engine = FlattenEngine(spec)
extracted_text_fragments = engine.extract(parsed_act_document, mode="list")
# 2. Generator pattern to stream efficiently from MongoDB
def get_documents_stream(self):
for collection_name, collection in self.collections.items():
cursor = collection.find(
{}, {"data": 1, "documentType": 1}
).batch_size(100)
for doc in cursor:
document_id = doc.get("data", {}).get("id")
content = self._extract_text_from_document_data(doc)
yield (f"{collection_name}:{document_id}", content)