Chapter 25: Case Study -- Autonomous Data Pipeline #
The previous two case studies featured agents that respond to user requests. In this chapter, we build something fundamentally different: a system of autonomous agents that manage a data pipeline with minimal human intervention. The agents ingest data on a schedule, detect anomalies using reasoning, alert operators when thresholds are breached, stay within budget constraints, coordinate across distributed replicas via leader election, and export telemetry to OpenTelemetry collectors.
This case study exercises Neam's most advanced features: autonomy with goals and triggers, budget controls, distributed state backends, the LLM gateway with circuit breaking, and multi-cloud deployment. By the end, you will have a self-managing data pipeline that runs 24/7 without human babysitting.
25.1 Requirements #
An autonomous data pipeline has six core requirements:
-
Scheduled data ingestion. A data ingestion agent runs every hour, pulls data from external sources (APIs, databases, S3 buckets), validates the data, and stores it in a staging area.
-
Anomaly detection. An anomaly detection agent analyzes freshly ingested data, compares it against historical baselines, and flags statistical outliers. The agent must reason about what constitutes a "real" anomaly versus normal variance.
-
Alerting with initiative. An alert agent monitors for anomalies and, when it detects a critical issue, proactively sends notifications to the operations team. The agent uses initiative -- it acts without being asked.
-
Budget constraints. Autonomous agents run continuously and can accumulate significant LLM costs. Each agent must operate within daily limits for API calls, token consumption, and dollar cost.
-
Distributed execution. When the pipeline runs across multiple Kubernetes pods (for high availability), only one replica should execute scheduled tasks at a time. This requires distributed leader election.
-
Observability. Every agent action, tool call, and anomaly detection must be traced with OpenTelemetry for debugging and audit purposes.
25.2 Architecture #
The architecture has three autonomous agents operating independently on their own schedules, coordinated through a shared state backend (PostgreSQL) and a distributed leader election mechanism. A monitoring layer exports all activity to OpenTelemetry.
25.3 Step 1: Data Ingestion Agent with Triggers #
The ingestion agent runs every hour. It pulls data from an external API, validates the schema, and stores the data in a staging table. The agent uses chain-of-thought reasoning to identify data quality issues during ingestion.
// ================================================================
// STEP 1: Data Ingestion Agent
// ================================================================
// Tool: Fetch data from the external metrics API.
tool fetch_metrics {
description: "Fetch the latest metrics data from the external API. Returns JSON with timestamp, cpu_usage, memory_usage, request_count, error_rate, and latency_p99."
parameters: {
"source": {
"type": "string",
"description": "Data source identifier (e.g., production, staging)"
},
"hours": {
"type": "number",
"description": "Number of hours of data to fetch (1-24)"
}
}
execute: fun(args) {
let source = args["source"];
let hours = args["hours"];
let url = "https://metrics.internal/api/v1/query"
+ "?source=" + source
+ "&hours=" + str(hours);
emit "[Tool] Fetching " + str(hours) + "h of metrics from " + source;
let response = http_get(url);
let data = json_parse(response);
emit "[Tool] Received " + str(len(data["datapoints"])) + " datapoints";
return data;
}
}
// Tool: Store validated data in the staging table.
tool store_metrics {
description: "Store validated metrics data in the staging database."
parameters: {
"datapoints": {
"type": "string",
"description": "JSON array of validated metric datapoints"
},
"source": {
"type": "string",
"description": "Data source identifier"
}
}
execute: fun(args) {
let payload = json_stringify({
"datapoints": json_parse(args["datapoints"]),
"source": args["source"],
"ingested_at": time_now()
});
let response = http_request({
"method": "POST",
"url": "https://metrics.internal/api/v1/ingest",
"headers": { "Content-Type": "application/json" },
"body": payload
});
let result = json_parse(response);
emit "[Tool] Stored " + str(result["count"]) + " datapoints";
return result;
}
}
// The ingestion agent runs autonomously every hour.
// It fetches data, validates it, and stores it.
agent IngestionAgent {
provider: "openai"
model: "gpt-4o-mini"
temperature: 0.2
system: "You are a data ingestion agent. Your job is to:
1. Fetch the latest metrics data from the production source
2. Validate the data for completeness and schema correctness
3. Identify any obvious data quality issues (missing fields, impossible values)
4. Store validated data in the staging database
5. Report a summary of what was ingested
If you detect data quality issues (e.g., negative CPU usage, error rates above 100%),
log the issues but still ingest the valid datapoints. Never silently drop data.
Report format:
- Datapoints fetched: N
- Datapoints validated: N
- Data quality issues: N (list issues if any)
- Datapoints stored: N"
reasoning: chain_of_thought
tools: [fetch_metrics, store_metrics]
goals: [
"Ingest production metrics data every hour",
"Validate data quality before storage",
"Report ingestion summaries"
]
triggers: {
on_schedule: "every 1h"
}
initiative: true
budget: {
max_daily_calls: 50
max_daily_cost: 2.0
max_daily_tokens: 25000
}
memory: "ingestion_memory"
}
What happens at runtime:
- The VM registers
IngestionAgentwith theAutonomousExecutor. - Every hour, the executor checks the distributed lock (see Step 5).
- If this pod holds the leader lock, the executor constructs a prompt from the
agent's goals and calls
call_agent_internal(). - The agent uses chain-of-thought to plan: "First fetch, then validate, then store."
- It calls
fetch_metricswithsource: "production"andhours: 1. - It analyzes the returned data, identifies any quality issues.
- It calls
store_metricswith the validated data. - The action is logged to
autonomous_actions, and budget counters are updated. - Daily budget resets at midnight UTC.
25.4 Step 2: Anomaly Detection Agent with Reasoning #
The anomaly detection agent runs every 15 minutes. It analyzes the most recent
ingested data, compares it against historical baselines, and flags anomalies. It
uses tree_of_thought reasoning to evaluate multiple hypotheses about why a metric
might be abnormal.
// ================================================================
// STEP 2: Anomaly Detection Agent
// ================================================================
// Tool: Query historical baselines for comparison.
tool query_baselines {
description: "Query historical baseline statistics for a metric. Returns mean, stddev, min, max, and percentiles over the last 30 days."
parameters: {
"metric": {
"type": "string",
"description": "Metric name (cpu_usage, memory_usage, request_count, error_rate, latency_p99)"
},
"source": {
"type": "string",
"description": "Data source identifier"
}
}
execute: fun(args) {
let url = "https://metrics.internal/api/v1/baselines"
+ "?metric=" + args["metric"]
+ "&source=" + args["source"]
+ "&days=30";
let response = http_get(url);
return json_parse(response);
}
}
// Tool: Record a detected anomaly.
tool record_anomaly {
description: "Record a detected anomaly in the anomaly tracking system."
parameters: {
"metric": {
"type": "string",
"description": "The metric that is anomalous"
},
"current_value": {
"type": "number",
"description": "The current observed value"
},
"baseline_mean": {
"type": "number",
"description": "The historical baseline mean"
},
"baseline_stddev": {
"type": "number",
"description": "The historical baseline standard deviation"
},
"severity": {
"type": "string",
"description": "Severity: low, medium, high, critical"
},
"hypothesis": {
"type": "string",
"description": "The most likely explanation for the anomaly"
}
}
execute: fun(args) {
let payload = json_stringify({
"metric": args["metric"],
"current_value": args["current_value"],
"baseline_mean": args["baseline_mean"],
"baseline_stddev": args["baseline_stddev"],
"severity": args["severity"],
"hypothesis": args["hypothesis"],
"detected_at": time_now()
});
let response = http_request({
"method": "POST",
"url": "https://metrics.internal/api/v1/anomalies",
"headers": { "Content-Type": "application/json" },
"body": payload
});
return json_parse(response);
}
}
// The anomaly detection agent uses tree_of_thought reasoning
// to evaluate multiple hypotheses when it detects an outlier.
agent AnomalyAgent {
provider: "openai"
model: "gpt-4o"
temperature: 0.3
system: "You are an anomaly detection agent for a production metrics pipeline.
Your job is to:
1. Analyze the latest ingested metrics data
2. Compare each metric against its 30-day historical baseline
3. Flag values that deviate more than 2 standard deviations from the mean
4. For each anomaly, generate multiple hypotheses about the cause
5. Score each hypothesis on likelihood
6. Record the anomaly with the most likely hypothesis
Use these severity levels:
- LOW: 2-3 standard deviations from mean
- MEDIUM: 3-4 standard deviations
- HIGH: >4 standard deviations or affecting multiple metrics
- CRITICAL: >4 standard deviations AND error_rate > 5%
Do not raise false alarms. Consider normal patterns:
- Traffic drops at night (00:00-06:00 UTC)
- CPU spikes during deployment windows (Tuesday/Thursday 14:00-16:00 UTC)
- Slightly elevated latency during batch processing (03:00-04:00 UTC)
Think carefully before classifying something as an anomaly."
reasoning: tree_of_thought
tools: [fetch_metrics, query_baselines, record_anomaly]
reflect: {
after: each_response
evaluate: [precision, recall, reasoning_quality]
min_confidence: 0.75
on_low_quality: {
strategy: "revise"
max_revisions: 1
}
}
learning: {
strategy: "experience_replay"
review_interval: 20
max_adaptations: 100
rollback_on_decline: true
}
goals: [
"Analyze latest metrics for anomalies every 15 minutes",
"Compare against 30-day baselines",
"Record anomalies with severity and hypothesis",
"Minimize false positives while catching real issues"
]
triggers: {
on_schedule: "every 15m"
}
initiative: true
budget: {
max_daily_calls: 200
max_daily_cost: 15.0
max_daily_tokens: 150000
}
memory: "anomaly_memory"
}
How tree_of_thought works for anomaly detection:
When the agent detects that CPU usage is 3.5 standard deviations above the mean:
- The VM generates 3 hypotheses:
- Hypothesis A: "Deployment in progress -- this is within the Tuesday deployment window."
- Hypothesis B: "Traffic spike from a viral social media post."
-
Hypothesis C: "Memory leak causing increased garbage collection CPU overhead."
-
The VM scores each hypothesis:
- A: Feasibility 0.3 (it is Thursday, not Tuesday), Impact 0.2, Risk 0.1 = 0.20
- B: Feasibility 0.6 (possible), Impact 0.7, Risk 0.5 = 0.60
-
C: Feasibility 0.8 (correlates with memory usage trend), Impact 0.8, Risk 0.9 = 0.83
-
The agent selects Hypothesis C and records the anomaly as MEDIUM severity.
25.5 Step 3: Alert Agent with Initiative #
The alert agent checks for unacknowledged anomalies and sends notifications.
It uses initiative: true to act proactively -- it does not wait for a user to
ask it to send alerts.
// ================================================================
// STEP 3: Alert Agent
// ================================================================
// Tool: Check for unacknowledged anomalies.
tool check_anomalies {
description: "Check for unacknowledged anomalies in the tracking system. Returns a list of anomalies that have not been acknowledged by an operator."
parameters: {
"severity_min": {
"type": "string",
"description": "Minimum severity to include: low, medium, high, critical"
}
}
execute: fun(args) {
let url = "https://metrics.internal/api/v1/anomalies/unacknowledged"
+ "?severity_min=" + args["severity_min"];
let response = http_get(url);
return json_parse(response);
}
}
// Tool: Send an alert notification.
tool send_alert {
description: "Send an alert notification to the operations team via Slack and PagerDuty."
parameters: {
"channel": {
"type": "string",
"description": "Alert channel: slack, pagerduty, or both"
},
"severity": {
"type": "string",
"description": "Alert severity: low, medium, high, critical"
},
"title": {
"type": "string",
"description": "Alert title"
},
"message": {
"type": "string",
"description": "Detailed alert message with context"
}
}
execute: fun(args) {
let payload = json_stringify({
"channel": args["channel"],
"severity": args["severity"],
"title": args["title"],
"message": args["message"],
"timestamp": time_now()
});
let response = http_request({
"method": "POST",
"url": "https://alerts.internal/api/v1/notify",
"headers": { "Content-Type": "application/json" },
"body": payload
});
return json_parse(response);
}
}
// The alert agent runs every 5 minutes and sends notifications
// for any unacknowledged anomalies.
agent AlertAgent {
provider: "openai"
model: "gpt-4o-mini"
temperature: 0.2
system: "You are an alert agent for a production data pipeline. Your job is to:
1. Check for unacknowledged anomalies
2. Compose clear, actionable alert messages
3. Route alerts to the appropriate channel based on severity:
- LOW/MEDIUM: Slack only (#ops-alerts channel)
- HIGH: Slack + PagerDuty (on-call engineer)
- CRITICAL: PagerDuty with immediate escalation
Alert message format:
- Title: [SEVERITY] Metric: brief description
- Body: current value, baseline, deviation, hypothesis, suggested action
Do not send duplicate alerts for the same anomaly within a 30-minute window.
If the same anomaly persists for more than 1 hour, escalate the severity by one level.
Be concise. Operators receive many alerts -- make yours easy to triage."
tools: [check_anomalies, send_alert]
goals: [
"Monitor for unacknowledged anomalies every 5 minutes",
"Send clear, actionable alerts to the appropriate channel",
"Escalate persistent anomalies"
]
triggers: {
on_schedule: "every 5m"
}
initiative: true
budget: {
max_daily_calls: 300
max_daily_cost: 3.0
max_daily_tokens: 30000
}
memory: "alert_memory"
}
25.6 Step 4: Budget Constraints and Daily Limits #
Budget management is critical for autonomous agents. Without limits, a runaway agent could consume hundreds of dollars of LLM API credits in a single day.
Here is a summary of the budget configuration across all three agents:
| Agent | Schedule | Max Calls/Day | Max Cost/Day | Max Tokens/Day |
|---|---|---|---|---|
| IngestionAgent | every 1h | 50 | $2.00 | 25,000 |
| AnomalyAgent | every 15m | 200 | $15.00 | 150,000 |
| AlertAgent | every 5m | 300 | $3.00 | 30,000 |
| Total | 550 | $20.00 | 205,000 |
How budget enforcement works:
// Budget enforcement is automatic, but you can monitor it:
{
// Check current budget usage for an agent
let status = agent_status("AnomalyAgent");
emit "Daily calls used: " + str(status["budget_calls_used"]);
emit "Daily cost used: $" + str(status["budget_cost_used"]);
emit "Daily tokens used: " + str(status["budget_tokens_used"]);
// If budget is exhausted, autonomous execution pauses.
// Interactive calls (.ask()) are NOT affected by autonomous budgets.
// Budget resets at midnight UTC.
// You can also adjust budgets at runtime:
// (This requires modifying the agent configuration, which
// is not currently supported at runtime. Plan for v0.7.)
}
Budget limits apply only to autonomous (scheduled) execution. If
you call AnomalyAgent.ask(...) interactively, it bypasses the autonomous budget.
This is intentional -- it lets operators query agents during incidents without
worrying about budget exhaustion. But it means you should monitor interactive
usage separately.
25.7 Step 5: Distributed Execution with Leader Election #
When the pipeline runs across multiple Kubernetes pods, only one pod should execute scheduled tasks at any given time. Otherwise, every pod would independently run the same ingestion, anomaly detection, and alerting -- causing duplicate alerts and redundant data processing.
Neam's distributed state backend (PostgreSQL) provides a try_acquire_lock method
that implements leader election. The leader holds the lock, renews it periodically,
and standby pods wait to acquire it if the leader fails.
Configuration: neam.toml #
[project]
name = "data-pipeline"
version = "1.0.0"
entry = "data_pipeline.neam"
[state]
backend = "postgres"
connection-string = "${DATABASE_URL}"
ttl = "30d"
prefix = "pipeline"
[llm]
default-provider = "openai"
default-model = "gpt-4o"
[llm.rate-limits.openai]
requests-per-minute = 200
[llm.circuit-breaker]
failure-threshold = 3
reset-timeout = "60s"
half-open-max = 1
[llm.cost]
daily-budget-usd = 25.0
[telemetry]
enabled = true
endpoint = "http://otel-collector.monitoring:4318"
service-name = "data-pipeline"
sampling-rate = 1.0
[secrets]
provider = "env"
The distributed autonomous executor (a v0.6.0 feature) handles leader election internally. When the VM starts with a distributed state backend (PostgreSQL, Redis, or DynamoDB), the autonomous executor automatically:
- Generates a unique
holder_idbased on hostname and process ID. - Attempts to acquire the
pipeline-leaderlock with a 60-second TTL. - If acquired, the executor becomes the leader and runs scheduled agents.
- The leader renews the lock every 30 seconds (half the TTL).
- Standby pods retry acquisition every 30 seconds.
- If the leader crashes or loses connectivity, the lock expires after 60 seconds and a standby pod acquires it.
No application code is needed for leader election -- it is a runtime behavior
configured through neam.toml.
25.8 Step 6: Monitoring with OpenTelemetry #
With [telemetry] enabled in neam.toml, every agent action, tool call, and
handoff is automatically traced. The VM generates spans for:
neam.agent.autonomous_action-- each scheduled executionneam.agent.ask-- each LLM callneam.tool.execute-- each tool invocationneam.agent.reflect-- each reflection evaluationneam.agent.learn-- each learning review
Example Trace (as seen in Jaeger) #
Trace: data-pipeline/anomaly-check
|
+-- neam.agent.autonomous_action [AnomalyAgent] (12.4s)
| |
| +-- neam.agent.ask [AnomalyAgent] (8.2s)
| | |
| | +-- neam.llm.request [openai/gpt-4o] (3.1s)
| | | provider=openai, model=gpt-4o, tokens=1842
| | |
| | +-- neam.tool.execute [fetch_metrics] (1.2s)
| | | source=production, hours=1, datapoints=240
| | |
| | +-- neam.tool.execute [query_baselines] (0.8s)
| | | metric=cpu_usage, source=production
| | |
| | +-- neam.tool.execute [query_baselines] (0.7s)
| | | metric=error_rate, source=production
| | |
| | +-- neam.tool.execute [record_anomaly] (0.4s)
| | metric=cpu_usage, severity=medium
| |
| +-- neam.agent.reflect [AnomalyAgent] (3.8s)
| precision=0.88, recall=0.82, reasoning_quality=0.91
| avg=0.87 (above min_confidence=0.75)
Custom Metrics #
In addition to automatic tracing, you can define custom metrics for monitoring:
// Custom metrics for the data pipeline
fun report_pipeline_metrics() {
// Check all agent statuses
let ingestion_status = agent_status("IngestionAgent");
let anomaly_status = agent_status("AnomalyAgent");
let alert_status = agent_status("AlertAgent");
emit "=== Pipeline Health Report ===";
emit "Ingestion - calls: " + str(ingestion_status["budget_calls_used"]) + "/50";
emit "Anomaly - calls: " + str(anomaly_status["budget_calls_used"]) + "/200";
emit "Alert - calls: " + str(alert_status["budget_calls_used"]) + "/300";
emit "";
// Check learning statistics for the anomaly agent
let learning = agent_learning_stats("AnomalyAgent");
emit "Anomaly agent learning:";
emit " Total interactions: " + str(learning["total_interactions"]);
emit " Avg reflection score: " + str(learning["avg_reflection_score"]);
emit " Reviews completed: " + str(learning["reviews_completed"]);
}
25.9 Step 7: Multi-Cloud Deployment #
For high availability, we deploy the pipeline to AWS ECS as the primary environment and GCP Cloud Run as a warm standby. Both environments connect to the same PostgreSQL database (hosted on AWS RDS with a read replica for GCP).
AWS ECS Deployment #
# neam.toml additions for AWS ECS
[deploy.aws]
region = "us-east-1"
account-id = "123456789012"
[deploy.aws.ecs]
cluster = "data-pipeline-cluster"
service = "data-pipeline-service"
task-family = "data-pipeline-task"
cpu = "512"
memory = "1024"
desired-count = 3
Generate and deploy:
# Generate ECS task definition and service configuration
neamc deploy aws-ecs --output ./deploy/aws/
# Files generated:
# deploy/aws/task-definition.json
# deploy/aws/service.json
# deploy/aws/alb.json
# deploy/aws/deploy-ecs.sh
# Deploy to AWS
cd deploy/aws
chmod +x deploy-ecs.sh
./deploy-ecs.sh
GCP Cloud Run Deployment (Warm Standby) #
# neam.toml additions for GCP backup
[deploy.gcp]
project = "my-project-id"
region = "us-central1"
[deploy.gcp.cloud-run]
service = "data-pipeline-backup"
max-instances = 1
memory = "1Gi"
cpu = "1"
Generate and deploy:
# Generate Cloud Run configuration
neamc deploy gcp-cloud-run --output ./deploy/gcp/
# Deploy to GCP
cd deploy/gcp
gcloud run deploy data-pipeline-backup \
--source . \
--region us-central1 \
--set-env-vars "DATABASE_URL=${DATABASE_URL}" \
--set-env-vars "OPENAI_API_KEY=${OPENAI_API_KEY}" \
--min-instances 0 \
--max-instances 1
The GCP deployment runs with min-instances: 0 (cold start on demand). If the
AWS primary goes down, a health check monitor (external to Neam) adjusts the
GCP instance count to 1, activating the standby.
25.10 Full Code Listing #
// ================================================================
// Autonomous Data Pipeline
// Version: 1.0.0
//
// Self-managing data pipeline with scheduled ingestion,
// anomaly detection, proactive alerting, and budget controls.
// ================================================================
// ── Tools ──────────────────────────────────────────────────────
tool fetch_metrics {
description: "Fetch latest metrics data from external API."
parameters: {
"source": { "type": "string", "description": "Data source (production, staging)" },
"hours": { "type": "number", "description": "Hours of data to fetch (1-24)" }
}
execute: fun(args) {
let url = "https://metrics.internal/api/v1/query?source="
+ args["source"] + "&hours=" + str(args["hours"]);
return json_parse(http_get(url));
}
}
tool store_metrics {
description: "Store validated metrics in staging database."
parameters: {
"datapoints": { "type": "string", "description": "JSON array of datapoints" },
"source": { "type": "string", "description": "Data source" }
}
execute: fun(args) {
let payload = json_stringify({
"datapoints": json_parse(args["datapoints"]),
"source": args["source"],
"ingested_at": time_now()
});
let resp = http_request({
"method": "POST",
"url": "https://metrics.internal/api/v1/ingest",
"headers": { "Content-Type": "application/json" },
"body": payload
});
return json_parse(resp);
}
}
tool query_baselines {
description: "Query 30-day historical baselines for a metric."
parameters: {
"metric": { "type": "string", "description": "Metric name" },
"source": { "type": "string", "description": "Data source" }
}
execute: fun(args) {
let url = "https://metrics.internal/api/v1/baselines?metric="
+ args["metric"] + "&source=" + args["source"] + "&days=30";
return json_parse(http_get(url));
}
}
tool record_anomaly {
description: "Record a detected anomaly."
parameters: {
"metric": { "type": "string", "description": "Anomalous metric" },
"current_value": { "type": "number", "description": "Current value" },
"baseline_mean": { "type": "number", "description": "Baseline mean" },
"baseline_stddev":{ "type": "number", "description": "Baseline stddev" },
"severity": { "type": "string", "description": "low/medium/high/critical" },
"hypothesis": { "type": "string", "description": "Most likely cause" }
}
execute: fun(args) {
let payload = json_stringify(args);
let resp = http_request({
"method": "POST",
"url": "https://metrics.internal/api/v1/anomalies",
"headers": { "Content-Type": "application/json" },
"body": payload
});
return json_parse(resp);
}
}
tool check_anomalies {
description: "Check for unacknowledged anomalies."
parameters: {
"severity_min": { "type": "string", "description": "Minimum severity" }
}
execute: fun(args) {
let url = "https://metrics.internal/api/v1/anomalies/unacknowledged?severity_min="
+ args["severity_min"];
return json_parse(http_get(url));
}
}
tool send_alert {
description: "Send alert to Slack and/or PagerDuty."
parameters: {
"channel": { "type": "string", "description": "slack, pagerduty, or both" },
"severity": { "type": "string", "description": "Alert severity" },
"title": { "type": "string", "description": "Alert title" },
"message": { "type": "string", "description": "Alert message" }
}
execute: fun(args) {
let resp = http_request({
"method": "POST",
"url": "https://alerts.internal/api/v1/notify",
"headers": { "Content-Type": "application/json" },
"body": json_stringify(args)
});
return json_parse(resp);
}
}
// ── Agents ─────────────────────────────────────────────────────
agent IngestionAgent {
provider: "openai"
model: "gpt-4o-mini"
temperature: 0.2
system: "You are a data ingestion agent. Fetch, validate, and store
production metrics every hour. Report quality issues."
reasoning: chain_of_thought
tools: [fetch_metrics, store_metrics]
goals: ["Ingest production metrics hourly", "Validate data quality"]
triggers: { on_schedule: "every 1h" }
initiative: true
budget: { max_daily_calls: 50, max_daily_cost: 2.0, max_daily_tokens: 25000 }
memory: "ingestion_memory"
}
agent AnomalyAgent {
provider: "openai"
model: "gpt-4o"
temperature: 0.3
system: "You are an anomaly detection agent. Analyze metrics against 30-day
baselines. Flag deviations >2 stddev. Use severity: LOW (2-3x), MEDIUM (3-4x),
HIGH (>4x), CRITICAL (>4x + error_rate>5%). Consider normal patterns before alerting."
reasoning: tree_of_thought
tools: [fetch_metrics, query_baselines, record_anomaly]
reflect: {
after: each_response
evaluate: [precision, recall, reasoning_quality]
min_confidence: 0.75
on_low_quality: { strategy: "revise", max_revisions: 1 }
}
learning: {
strategy: "experience_replay"
review_interval: 20
max_adaptations: 100
rollback_on_decline: true
}
goals: ["Detect anomalies every 15 minutes", "Minimize false positives"]
triggers: { on_schedule: "every 15m" }
initiative: true
budget: { max_daily_calls: 200, max_daily_cost: 15.0, max_daily_tokens: 150000 }
memory: "anomaly_memory"
}
agent AlertAgent {
provider: "openai"
model: "gpt-4o-mini"
temperature: 0.2
system: "You are an alert agent. Check for unacknowledged anomalies and
send notifications. LOW/MEDIUM -> Slack. HIGH -> Slack+PagerDuty. CRITICAL ->
PagerDuty immediate. No duplicates within 30 minutes."
tools: [check_anomalies, send_alert]
goals: ["Monitor anomalies every 5 minutes", "Send actionable alerts"]
triggers: { on_schedule: "every 5m" }
initiative: true
budget: { max_daily_calls: 300, max_daily_cost: 3.0, max_daily_tokens: 30000 }
memory: "alert_memory"
}
// ── Main ───────────────────────────────────────────────────────
{
emit "=== Autonomous Data Pipeline v1.0 ===";
emit "Agents registered:";
emit " IngestionAgent - every 1h (budget: $2/day)";
emit " AnomalyAgent - every 15m (budget: $15/day)";
emit " AlertAgent - every 5m (budget: $3/day)";
emit "";
emit "Total daily budget: $20.00";
emit "State backend: PostgreSQL (distributed locks enabled)";
emit "Telemetry: OpenTelemetry -> otel-collector.monitoring:4318";
emit "";
emit "Pipeline running. Agents will execute on their schedules.";
}
25.11 Cognitive Evolution Over Time #
One of the most interesting aspects of this pipeline is watching the anomaly detection agent improve over time. Here is a timeline of its cognitive evolution:
Week 1: Baseline #
Interactions: 672 (96/day x 7 days)
Avg reflection score: 0.72
False positive rate: 18%
Learning reviews: 33
Evolution version: 0 (original prompt)
The agent generates many false positives, particularly during known maintenance
windows and normal traffic fluctuations. The reflection dimension precision
consistently scores below 0.7.
Week 2: First Learning Review #
After 10 reviews, the learning system appends a prompt addendum:
LEARNED PATTERNS:
- CPU spikes between 14:00-16:00 UTC on Tuesday/Thursday are deployment-related
- Memory usage naturally increases 5-10% during batch processing at 03:00 UTC
- Request count drops below 50% of baseline between 00:00-06:00 UTC (normal)
Interactions: 1,344
Avg reflection score: 0.79 (+0.07)
False positive rate: 11% (-7pp)
Learning reviews: 67
Evolution version: 0
Week 4: First Prompt Evolution #
After 50 interactions, the evolution cycle triggers. The system proposes a refined system prompt that incorporates the learned patterns directly:
Evolution v1 proposed prompt (excerpt):
"...When analyzing anomalies, first check against these known patterns
before raising an alert:
1. Deployment windows: Tue/Thu 14:00-16:00 UTC (expect CPU +30-50%)
2. Batch processing: 03:00-04:00 UTC (expect memory +5-10%)
3. Low traffic: 00:00-06:00 UTC (expect request_count -50%)
Only flag anomalies that cannot be explained by these patterns..."
Interactions: 2,688
Avg reflection score: 0.85 (+0.06)
False positive rate: 6% (-5pp)
Learning reviews: 134
Evolution version: 1
Week 8: Mature System #
Interactions: 5,376
Avg reflection score: 0.89
False positive rate: 3%
Learning reviews: 268
Evolution version: 2
The agent now catches genuine anomalies with 97% precision while ignoring known patterns. The second evolution added seasonal awareness (weekend vs. weekday baselines) that the original prompt lacked entirely.
The cognitive evolution cycle is the difference between a static rule-based system and one that adapts to its operating environment. After 8 weeks, the anomaly agent's effective system prompt is significantly different from -- and better than -- the one we wrote by hand on day one.
25.12 Ingest Pipeline Modules #
The standard library includes an ingest/ package with reusable components for
data pipeline construction. Instead of writing custom HTTP-based tools for every
data source, you can compose pipelines from pre-built modules.
Pipeline Architecture #
import ingest/pipeline
import ingest/source
import ingest/parser/json_parser
import ingest/chunk
import ingest/progress
let metrics_pipeline = pipeline.create({
name: "production-metrics",
stages: [
source.http({
url: "https://metrics.internal/api/v1/query",
method: "GET",
schedule: "every 1h",
auth: { type: "bearer", token: env("METRICS_API_TOKEN") }
}),
json_parser.create({
schema_validation: true,
schema: {
required: ["timestamp", "cpu_usage", "memory_usage", "error_rate"],
types: {
"timestamp": "string",
"cpu_usage": "number",
"memory_usage": "number",
"error_rate": "number"
}
}
}),
chunk.create({
strategy: "time_window",
window: "1h",
overlap: "0m"
})
],
on_progress: fun(event) {
progress.log(event)
}
})
Available Source Connectors #
| Connector | Import Path | Description |
|---|---|---|
| HTTP/REST | ingest/source |
Polling HTTP endpoints with auth |
| S3 | ingest/source |
AWS S3 bucket file watching |
| GCS | ingest/source |
Google Cloud Storage |
| Database | ingest/source |
PostgreSQL/MySQL query results |
| Kafka | ingest/source |
Kafka topic consumption |
| File system | ingest/source |
Local directory watching |
Available Parsers #
| Parser | Import Path | Formats |
|---|---|---|
| JSON | ingest/parser/json_parser |
JSON, JSONL, JSON Schema validation |
| CSV | ingest/parser/csv_parser |
CSV, TSV with header mapping |
ingest/parser/pdf_parser |
PDF text and table extraction | |
| Code | ingest/parser/code_parser |
Source code with AST awareness |
| Log | ingest/parser/log_parser |
Structured and unstructured logs |
| Office | ingest/parser/office_parser |
DOCX, XLSX, PPTX |
Pipeline Metrics #
The ingest/metrics module tracks pipeline performance:
import ingest/metrics
let stats = metrics.get("production-metrics")
emit "Records processed: " + str(stats.records_total)
emit "Records failed: " + str(stats.records_failed)
emit "Avg latency: " + str(stats.avg_latency_ms) + "ms"
emit "Last run: " + stats.last_run_at
These metrics are automatically exported to the OpenTelemetry pipeline when telemetry is enabled, appearing alongside agent traces in your Jaeger and Prometheus dashboards.
25.13 Safety Testing for Autonomous Agents #
Autonomous agents operate without human oversight between scheduled runs. Before deploying them to production, validate their behavior under adversarial and edge-case conditions.
Budget Exhaustion Testing #
Verify that agents stop gracefully when budgets are exhausted:
fun test_budget_exhaustion() {
// Set a very low budget for testing
agent TestAgent {
provider: "openai"
model: "gpt-4o-mini"
system: "You are a test agent."
triggers: { on_schedule: "every 1m" }
initiative: true
budget: { max_daily_calls: 3, max_daily_cost: 0.01 }
}
// After 3 calls, the agent should stop executing
// Verify via agent_status
let status = agent_status("TestAgent")
assert_true(status["budget_exhausted"])
emit "Budget exhaustion: PASS"
}
Data Quality Edge Cases #
Test the ingestion agent with malformed data:
{"id": "missing-fields", "input": {"cpu_usage": 75.2}, "expected": "quality issue logged", "grader": "contains"}
{"id": "negative-cpu", "input": {"cpu_usage": -5.0, "memory_usage": 60.0}, "expected": "impossible value flagged", "grader": "llm_judge"}
{"id": "extreme-error-rate", "input": {"error_rate": 150.0}, "expected": "out of range", "grader": "contains"}
{"id": "empty-payload", "input": {}, "expected": "no valid datapoints", "grader": "semantic_match"}
Anomaly Agent Precision Testing #
Evaluate the anomaly agent's false positive rate with scenarios that should not trigger alerts:
{"id": "normal-night-drop", "input": "request_count at 40% of baseline at 02:00 UTC", "expected": "no anomaly", "grader": "llm_judge"}
{"id": "deploy-window-spike", "input": "cpu at 145% of baseline on Tuesday 15:00 UTC", "expected": "no anomaly", "grader": "llm_judge"}
{"id": "batch-processing", "input": "latency at 130% of baseline at 03:30 UTC", "expected": "no anomaly", "grader": "llm_judge"}
{"id": "real-anomaly", "input": "error_rate at 8% (baseline 0.5%) on Monday 10:00 UTC", "expected": "CRITICAL anomaly", "grader": "contains"}
Run these tests after every change to the anomaly agent's system prompt or reasoning strategy, and after every prompt evolution cycle.
25.14 Lessons Learned #
1. Budget is the primary safety mechanism for autonomous agents. Without budget limits, the anomaly agent (running every 15 minutes with gpt-4o) would cost $45/day instead of the $15 limit. Set budgets conservatively and adjust upward only after observing actual usage patterns.
2. Leader election prevents duplicate work. In our initial deployment without leader election, three pods independently ran the same anomaly detection every 15 minutes, tripling our LLM costs and generating duplicate alerts. Leader election reduced costs by 66% and eliminated alert noise.
3. tree_of_thought is worth the cost for anomaly detection. The multi-hypothesis reasoning adds approximately 0.8 extra LLM calls per execution compared to chain_of_thought. But it reduced false positives by 40% in our evaluation, which translates directly to less alert fatigue for the operations team.
4. Learning needs enough data. The anomaly agent did not show meaningful improvement until after approximately 200 interactions (about 2 days of operation). The learning system needs a critical mass of examples to extract useful patterns. Do not expect improvement in the first few hours.
5. Monitor budget consumption trends. We track budget usage as a Prometheus metric and alert when any agent is on pace to exhaust its daily budget before 18:00 UTC. This catches scenarios where an agent enters a high-frequency anomaly detection loop and burns through its budget prematurely.
6. Cold standby works for disaster recovery. The GCP warm standby with
min-instances: 0 adds zero cost during normal operation but can be activated in
under 60 seconds. For a data pipeline with hourly ingestion, a 60-second failover
window is acceptable.
25.15 Exercises #
-
Add a data quality agent. Create a fourth autonomous agent that runs daily and produces a data quality report: completeness, consistency, and timeliness of the ingested data. Use
plan_and_executereasoning to structure the report. -
Implement alert deduplication in Neam. Instead of relying on the external alert API for deduplication, implement a memory-based deduplication check in the alert agent. Use the agent's memory store to track recently sent alerts and skip duplicates.
-
Add evolution to the alert agent. Enable learning and evolution on the alert agent. Define
core_identity: "You are an alert agent."and let the system refine its alert composition style based on operator feedback (collected viaagent_rate). -
Multi-region leader election. Modify the deployment so that the AWS and GCP instances use the same PostgreSQL database for leader election. Test failover by stopping the AWS ECS tasks and verifying that the GCP Cloud Run instance acquires the leader lock.
Summary #
In this chapter, we built an autonomous data pipeline that operates with minimal human intervention. Three agents -- ingestion, anomaly detection, and alerting -- run on independent schedules, coordinate through distributed leader election, stay within budget constraints, and improve through cognitive evolution. We also explored the standard library's ingest pipeline modules for composable data processing, and safety testing strategies for validating autonomous agent behavior under adversarial and edge-case conditions.
The key architectural pattern is the autonomous agent loop: agents define goals, execute on triggers, consume budgets, and learn from experience. Combined with distributed state backends, leader election, reusable ingest modules, and systematic safety testing, this pattern enables reliable 24/7 operation across multiple replicas and even multiple cloud providers.
This case study demonstrates that Neam's cognitive and cloud-native features work together to create systems that are genuinely self-managing. The anomaly detection agent's evolution from 18% false positives to 3% over eight weeks is not something we programmed -- it is something the agent learned.