This deep dive explains how technical teams can integrate a local LLM in the data pipeline without replacing any core systems. It expands on the architecture and process outlined in the previous overview, using Python as the primary language for every stage of implementation. The guide walks through the complete production setup, including data storage, model serving, microservices, orchestration, monitoring, and governance. The goal is to help engineers build a secure and efficient AI layer that enhances existing workflows and delivers measurable operational impact.
0. Reference architecture
1. Technical stack, Python first
- Language, Python 3.11
- Packaging, uv or pip plus venv, Poetry is fine too
- Data transport, Kafka or Redpanda, Confluent client for Python
- Storage, S3 compatible object store, Delta Lake, or Apache Iceberg tables
- Batch and stream compute, Apache Spark 3.5 with PySpark, or Flink if you already use it
- Feature and cache, Redis for hot features, DuckDB for local ad hoc joins
- Vector database, Milvus, an alternative is FAISS for single-node setups
- LLM serving, vLLM or TensorRT LLM on GPU, OpenAI compatible API
- Microservices, FastAPI with Uvicorn, Pydantic v2 models
- Orchestration, Prefect, or Airflow for batch jobs, simple async workers for online flow
- Observability, Prometheus, Grafana, Loki, OpenTelemetry
- CI and CD, GitHub Actions, ArgoCD, or Flux if you are on Kubernetes
- Secrets and config, HashiCorp Vault or Kubernetes Secrets with SOPS
- Security controls, OAuth2 service tokens, network policies, and PII redaction best practices (OWASP).
Step 1: Normalize and model your records
Create a curated Delta or Iceberg table that joins raw events with your known features. This is the contract the LLM layer will consume.
-- Delta example
CREATE TABLE IF NOT EXISTS curated.cases
USING DELTA
PARTITIONED BY (event_date)
AS
SELECT
e.event_id,
e.source_system,
e.event_ts,
e.partner_id,
e.payload_raw,
f.device_id,
f.customer_hash,
f.partner_risk_tier,
COALESCE(s.is_exception, false) AS is_exception,
COALESCE(s.exception_type, 'none') AS exception_type
FROM raw.events e
LEFT JOIN features.entity_signals f ON e.entity_id = f.entity_id
LEFT JOIN signals.exceptions s ON e.event_id = s.event_id;In Python, validate the schema at the boundary so downstream services always receive a predictable shape.
# contracts.py
from pydantic import BaseModel, Field
from typing import Optional, Dict
class CaseRecord(BaseModel):
event_id: str
source_system: str
event_ts: str
partner_id: str
payload_raw: str
device_id: Optional[str] = None
customer_hash: Optional[str] = None
partner_risk_tier: Optional[str] = "unknown"
is_exception: bool = False
exception_type: str = "none"
features: Dict[str, float] = Field(default_factory=dict)Step 2: Build your retrieval knowledge base
Collect internal policies, data dictionaries, runbooks, resolved cases, and escalation templates. Chunk, embed, and store vectors.
# kb_index.py
from sentence_transformers import SentenceTransformer
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection
import json, glob
MODEL = SentenceTransformer("intfloat/multilingual-e5-base")
connections.connect("default", host="milvus", port="19530")
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768),
FieldSchema(name="meta", dtype=DataType.VARCHAR, max_length=2048),
],
description="Policies, runbooks, resolved cases"
)
col = Collection("kb", schema)
col.create_index("vector", {"index_type": "IVF_FLAT", "metric_type": "IP", "params": {"nlist": 16384}})
def iter_docs():
for path in glob.glob("/kb/**/*.jsonl", recursive=True):
for line in open(path, "r", encoding="utf8"):
rec = json.loads(line)
yield rec["content"], {"source": rec["source"], "id": rec["id"], "type": rec["type"]}
batch_vec, batch_meta = [], []
for text, meta in iter_docs():
vec = MODEL.encode(text, normalize_embeddings=True).tolist()
batch_vec.append(vec)
batch_meta.append(json.dumps(meta))
if len(batch_vec) == 512:
col.insert([batch_vec, batch_meta])
batch_vec, batch_meta = [], []
if batch_vec:
col.insert([batch_vec, batch_meta])
col.flush()Retrieval utility for services:
# kb_retrieve.py
from sentence_transformers import SentenceTransformer
from pymilvus import connections, Collection
import json
EMB = SentenceTransformer("intfloat/multilingual-e5-base")
connections.connect("default", host="milvus", port="19530")
KB = Collection("kb")
KB.load()
def retrieve(query: str, k: int = 6):
qv = EMB.encode(query, normalize_embeddings=True).tolist()
res = KB.search([qv], "vector", {"metric_type": "IP"}, limit=k, output_fields=["meta"])
docs = [json.loads(hit.entity.get("meta")) for hit in res[0]]
return docsStep 3: Serve the local model
Run vLLM with a local open weight model, for example, Llama 3, or Mixtral. Map it to an OpenAI-compatible endpoint.
# docker-compose.yml
services:
vllm:
image: vllm/vllm-openai:latest
command: >
--model /models/llama-3-70b-instruct
--tensor-parallel-size 8
--max-model-len 8192
--gpu-memory-utilization 0.90
environment:
- VLLM_WORKER_USE_FLASH_ATTENTION=1
ports:
- "8000:8000"
deploy:
resources:
reservations:
devices:
- capabilities: [gpu]
volumes:
- /srv/models:/modelsSmoke test:
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model":"llama-3-70b-instruct",
"messages":[{"role":"user","content":"reply: ok"}],
"max_tokens":8
}'Step 4: Build Python microservices for LLM skills
Use FastAPI and a shared client. Add PII redaction, retrieval, and caching.
# llm_client.py
import httpx, os, re, json
from functools import lru_cache
from kb_retrieve import retrieve
VLLM = os.getenv("VLLM_URL", "http://vllm:8000/v1/chat/completions")
MODEL = os.getenv("MODEL_NAME", "llama-3-70b-instruct")
PII_RULES = [
(re.compile(r"\b\d{12,19}\b"), "<CARD>"),
(re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"), "<EMAIL>"),
(re.compile(r"\+?\d{8,15}"), "<PHONE>")
]
def redact(text: str) -> str:
out = text
for pat, rep in PII_RULES:
out = pat.sub(rep, out)
return out
async def chat(system: str, user: str, max_tokens: int = 512, temperature: float = 0.1) -> str:
payload = {
"model": MODEL,
"messages": [{"role": "system", "content": system}, {"role": "user", "content": user}],
"max_tokens": max_tokens,
"temperature": temperature
}
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post(VLLM, json=payload)
r.raise_for_status()
return r.json()["choices"][0]["message"]["content"]
@lru_cache(maxsize=2048)
def policy_context(topic: str) -> str:
docs = retrieve(topic, k=6)
return "\n".join([f"{d['type']}::{d['source']}::{d['id']}" for d in docs])Summarizer service:
# svc_summarizer.py
from fastapi import FastAPI
from pydantic import BaseModel
from llm_client import chat, redact, policy_context
app = FastAPI()
class Req(BaseModel):
case_id: str
payload_raw: str
features: dict
SYSTEM = (
"You are an operations triage assistant. Summarize the case in 6 bullets, "
"list key signals as bullet points, finish with a severity from 1 to 5 and a short reason."
)
@app.post("/summarize")
async def summarize(req: Req):
ctx = policy_context("triage policies and exception handling")
user = f"Context:\n{ctx}\n\nPayload:\n{redact(req.payload_raw)}\n\nFeatures:\n{req.features}"
out = await chat(SYSTEM, user, max_tokens=400)
return {"case_id": req.case_id, "summary": out}Classifier service:
# svc_classifier.py
from fastapi import FastAPI
from pydantic import BaseModel
from llm_client import chat
app = FastAPI()
class Req(BaseModel):
case_id: str
summary: str
SYSTEM = (
"Classify the case into one of: data_quality, partner_mismatch, fraud_suspected, routing_error, other. "
"Output JSON with fields label and confidence between 0 and 1."
)
@app.post("/classify")
async def classify(req: Req):
user = f"Case summary:\n{req.summary}"
out = await chat(SYSTEM, user, max_tokens=128, temperature=0.0)
# guard for invalid JSON
import json
try:
data = json.loads(out)
except Exception:
data = {"label": "other", "confidence": 0.5, "raw": out}
data["case_id"] = req.case_id
return dataValidation and action suggestions:
# svc_actions.py
from fastapi import FastAPI
from pydantic import BaseModel
from llm_client import chat, policy_context
app = FastAPI()
class Req(BaseModel):
case_id: str
summary: str
label: str
partner_risk_tier: str = "unknown"
SYSTEM = (
"Propose up to 3 next actions as a JSON list. Each item should have action, reason, approval_level. "
"Use verbs like correct, reprocess, escalate, notify, request."
)
@app.post("/actions")
async def actions(req: Req):
ctx = policy_context(f"actions for {req.partner_risk_tier} risk tier")
user = f"Summary:\n{req.summary}\n\nLabel: {req.label}\n\nPolicy context:\n{ctx}"
out = await chat(SYSTEM, user, max_tokens=256)
return {"case_id": req.case_id, "actions": out}Run all three with Uvicorn, behind a Kubernetes Service or Docker Compose network.
Step 5: Connect the services to your stream
An async orchestrator consumes curated cases from Kafka, calls services, and publishes enriched results.
# orchestrator.py
import os, json, asyncio, httpx
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
SUMM = os.getenv("SUMM_URL", "http://summarizer:9001/summarize")
CLSF = os.getenv("CLSF_URL", "http://classifier:9002/classify")
ACTS = os.getenv("ACTS_URL", "http://actions:9003/actions")
BOOT = os.getenv("KAFKA_BOOTSTRAP", "kafka:9092")
async def call(url, payload):
async with httpx.AsyncClient(timeout=30) as c:
r = await c.post(url, json=payload)
r.raise_for_status()
return r.json()
async def run():
consumer = AIOKafkaConsumer("cases.curated", bootstrap_servers=BOOT, value_deserializer=lambda b: json.loads(b.decode()))
producer = AIOKafkaProducer(bootstrap_servers=BOOT, value_serializer=lambda d: json.dumps(d).encode())
await consumer.start(), producer.start()
try:
async for msg in consumer:
case = msg.value
summ = await call(SUMM, {"case_id": case["event_id"], "payload_raw": case["payload_raw"], "features": case.get("features", {})})
clsf = await call(CLSF, {"case_id": case["event_id"], "summary": summ["summary"]})
acts = await call(ACTS, {"case_id": case["event_id"], "summary": summ["summary"], "label": clsf["label"], "partner_risk_tier": case.get("partner_risk_tier", "unknown")})
enriched = {
"event_id": case["event_id"],
"summary": summ["summary"],
"label": clsf.get("label", "other"),
"confidence": clsf.get("confidence", 0.0),
"actions": acts["actions"]
}
await producer.send_and_wait("cases.enriched", enriched)
finally:
await consumer.stop(), producer.stop()
if __name__ == "__main__":
asyncio.run(run())Idempotency tip: include a deterministic key, for example, event_id, use exactly once semantics if your Kafka cluster supports it, otherwise handle duplicates in the sink.
Step 6: Write back to your warehouse and case tools
For analytics and reporting, append enriched results into Delta or Iceberg.
# sink_delta.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("enriched-sink").getOrCreate()
df = spark.read.format("kafka").option("kafka.bootstrap.servers","kafka:9092").option("subscribe","cases.enriched").load()
from pyspark.sql.functions import col, from_json, schema_of_json
schema = schema_of_json('{"event_id":"","summary":"","label":"","confidence":0.0,"actions":""}')
parsed = df.select(from_json(col("value").cast("string"), schema).alias("v")).select("v.*")
parsed.writeStream.format("delta").option("checkpointLocation","/chk/enriched").option("path","/delta/enriched").outputMode("append").start().awaitTermination()For case management, open a single integration point, for example, Jira or your internal tool.
# sink_jira.py
import json, os, httpx
from kafka import KafkaConsumer
JIRA_URL = os.getenv("JIRA_URL")
JIRA_AUTH = (os.getenv("JIRA_USER"), os.getenv("JIRA_TOKEN"))
c = KafkaConsumer("cases.enriched", bootstrap_servers=["kafka:9092"], value_deserializer=lambda b: json.loads(b.decode()))
for m in c:
e = m.value
payload = {
"fields": {
"project": {"key": "OPS"},
"summary": f"[{e['label']}] {e['event_id']}",
"description": e["summary"] + "\n\nProposed actions:\n" + e["actions"],
"issuetype": {"name": "Task"}
}
}
r = httpx.post(f"{JIRA_URL}/rest/api/3/issue", auth=JIRA_AUTH, json=payload, timeout=30)
r.raise_for_status()Step 7: Quality, evaluation, and gates
Add an evaluation stream that samples records for human rating. Track precision, recall, and action acceptance.
# eval_sampler.py
import os, json, random
from kafka import KafkaConsumer, KafkaProducer
SAMPLE_RATE = float(os.getenv("SAMPLE_RATE", "0.01"))
c = KafkaConsumer("cases.enriched", bootstrap_servers=["kafka:9092"], value_deserializer=lambda b: json.loads(b.decode()))
p = KafkaProducer(bootstrap_servers=["kafka:9092"], value_serializer=lambda d: json.dumps(d).encode())
for m in c:
if random.random() < SAMPLE_RATE:
p.send("cases.eval", m.value)Add a simple web form or internal tool where analysts rate correctness. Store results back into a table for weekly reports and drift detection.
Step 8: Monitoring and tracing
Expose Prometheus metrics from every service.
# metrics.py
from prometheus_client import Counter, Histogram, start_http_server
TOKENS = Counter("llm_tokens_total", "Total tokens", ["service", "kind"])
LAT = Histogram("llm_latency_seconds", "Latency", ["service"])
ERRORS = Counter("svc_errors_total", "Errors", ["service","type"])
def start(port:int):
start_http_server(port)Use the metrics in services:
# in svc_summarizer.py
from metrics import LAT, TOKENS, start
import time
start(9101)
@app.post("/summarize")
async def summarize(req: Req):
t0 = time.time()
...
out = await chat(SYSTEM, user, max_tokens=400)
LAT.labels("summarizer").observe(time.time() - t0)
# if you have token counts from server logs, push them here with TOKENS.labels(...)
return {"case_id": req.case_id, "summary": out}Instrument vLLM with its own Prometheus exporter, scrape everything with Grafana dashboards that show QPS, P50, and P95 latency, error rates, token throughput, and retrieval hit ratios.
Step 9: Security, privacy, and audit
- Keep the model and vector store on a private network segment, no egress
- Enforce mutual TLS between services and vLLM if possible
- Redact PII in prompts, store redacted prompts in logs, keep originals in case tables only
- Version prompts and templates include the prompt version and the KB index version on every output
- Write a signed audit bundle that lists retrieved documents, chosen actions, and approvals
Prompt version tag example:
PROMPT_VERSION = "summarizer_v3.2"
# include in every response payload for traceabilityStep 10: Performance and cost tuning
- Use paged attention and continuous batching in vLLM for high utilization
- Prefer shorter prompts, use retrieval to supply context concisely
- Quantize models where feasible, AWQ or GPTQ, test accuracy impact
- Cap max tokens, enforce temperature 0 for classification, and low values for summarization
- Add a small supervised classifier on embeddings to gate LLM calls. If confidence is high, skip the LLM; this reduces token volume and latency
Embedding gate example:
# clf_gate.py
from sentence_transformers import SentenceTransformer
from sklearn.linear_model import LogisticRegression
import joblib
# offline train saves clf_emb.joblib
PIPE = joblib.load("clf_emb.joblib")
def gate(text: str) -> tuple[bool, float, str]:
proba = max(PIPE.predict_proba([text])[0])
label = PIPE.predict([text])[0]
return proba < 0.85, proba, label # call LLM only if proba is lowStep 11: Deployment checklist, no skipped steps
- Provision two GPU nodes, install Nvidia drivers, container runtime, and test nvidia-smi
- Pull model weights to local storage, verify checksums, restrict permissions
- Deploy vLLM with OpenAI-compatible API, confirm readiness, and health checks
- Stand up Milvus on NVMe, create a collection and an index, load policy corpus
- Build and run FastAPI services, add PII redaction, retrieval, and Prometheus metrics
- Connect Kafka topics, publish a small batch of curated cases in a staging namespace
- Run orchestrator in staging, verify end-to-end path to enriched topic, Delta sink, and ticket sink
- Create Grafana dashboards, set alerts on P95 latency, error rate, and queue lag
- Enable sampling to the evaluation queue, set weekly review, and retraining cadence
- Write policy allow list and approvals, block actions not in the list, keep humans in the loop
- Run failure drills, kill one service, verify graceful degradation to rules engine only
- Promote to production behind a feature flag, ramp traffic from 5 percent to 100 percent with guardrails
Step 12: Example sizing and throughput math
- Two nodes, each 8x L40S, about 200 tokens per second per GPU, total near 3200 tokens per second
- Typical call, 700 token prompt plus 100 token output, about 800 tokens per request, about 4 requests per second cluster-wide, about 345 thousand calls per day
- If your pipeline processes 50 thousand cases per day and each case triggers two calls, for example, summarization and actions, you are in the safe range
- Add a third node for burst or scheduled heavy jobs like backfills
Step 13: What to ship in week one
- Summarizer and classifier only, wired to case view, no automatic actions
- Index the last 3 months of policies and resolved cases in Milvus
- Enrich 10 percent of live cases, compare analyst time and accuracy
- Present Grafana dashboards that show latency, QPS, and action acceptance rate
Appendix, developer-friendly make targets
make venv # create venv, install deps
make serve-llm # run vLLM with local weights
make kb # index knowledge base into Milvus
make run-svcs # run summarizer, classifier, actions
make orchestrate # start orchestrator
make sinks # start sinks for delta and jira
make dashboards # import Grafana dashboardsConclusion
Building a local LLM layer inside your existing data pipeline is not a research experiment anymore; it is a practical way to bring reasoning and automation directly into your operational systems. By following this process, you keep full control over infrastructure, security, and data while giving your analysts and engineers a smarter foundation to work with. Every component, from ingestion to reporting, stays the same, but gains awareness of context and intent. The result is faster decisions, cleaner data, and fewer repetitive tasks handled by people. With the right stack and disciplined execution, a local LLM becomes part of your organization’s nervous system, continuously learning from your own data and improving the accuracy and speed of every process it touches.