Chapter 12: The DataOps Agent — SRE for Data #
"Hope is not a strategy. If a human has to watch a dashboard to know something is broken, you've already failed." — Niall Murphy, co-author of Site Reliability Engineering (Google)
25 min read | Priya, Sarah | Part IV: Platform Intelligence Agents
What you'll learn:
- How SRE principles apply to data platform operations — and why data needs its own SRE
- How to declare schedulers, audit tables, log sources, platforms, incident policies, and correlations in Neam
- The six-stage processing pipeline: Collector, Anomaly Detector, Correlator, Triage, Auto-Heal, Reporter
- How guardrailed auto-remediation prevents both silent failures and reckless automation
- SLA tracking and FinOps cost optimization as first-class operational concerns
The Problem #
It is 4:47 AM on a Tuesday. Priya's phone buzzes. The Airflow DAG dag_finance_daily has failed. She opens her laptop, checks the Airflow UI — task extract_transactions timed out. She SSHs into the Oracle source and finds ORA-01555: snapshot too old. She checks whether the undo tablespace filled up, notices a long-running query from a batch job that someone in analytics kicked off at 2 AM. She kills the query, restarts the DAG, and watches it complete by 6:15 AM — barely making the 6:30 AM SLA.
But that is the easy night. On harder nights, failures cascade. The extract timeout causes the transform to skip, which causes the mart to load zero rows, which causes the CFO's dashboard to show blanks at 7 AM. Priya finds out about the blank dashboard from a Slack message at 8:15 AM — three hours after the root cause.
This is the coordination tax of manual operations: a human must see every alert, correlate every signal, triage every incident, and decide every remediation. Google's SRE team solved this problem for web services a decade ago. Data platforms have been slower to catch up. The DataOps Agent brings SRE to data.
SRE Principles Applied to Data #
Google's Site Reliability Engineering book introduced principles that transformed how the industry operates web services. The DataOps Agent adapts five of those principles for data platforms:
| SRE Principle | Web Services | Data Equivalent (DataOps Agent) |
|---|---|---|
| Monitoring | HTTP latency, error rates | Pipeline duration, row counts, job status |
| Alerting | Page on SLO breach | Triage by severity, blast radius, SLA impact |
| Incident Response | On-call rotation, runbooks | Automated investigation + correlation + escalation |
| Automation | Canary deploys, auto-rollback | Guardrailed retry, kill-query, scale-warehouse |
| SLOs/SLAs | 99.9% availability budget | Data freshness deadline, quality threshold |
SRE for data is harder than SRE for web services in one critical way: data failures are often silent. A pipeline that loads zero rows returns exit code 0. A dashboard that shows yesterday's numbers looks identical to one showing today's. The DataOps Agent treats data correctness signals — row counts, schema consistency, freshness — as first-class monitoring metrics, not afterthoughts.
The Six-Stage Processing Pipeline #
Every DataOps Agent follows a six-stage pipeline. Each stage is a distinct responsibility, and the pipeline runs continuously, on schedule, or on demand.
flowchart LR A["Collector\n\nschedulers\naudit tables\nlog sources\nplatforms"] --> B["Anomaly\nDetector\n\nstatistical\n+ LLM-assist"] B --> C["Correlator\n\ncross-src\ndependency\ntime-window\nblast radius"] C --> D["Triage\n\nP1-P4\nseverity\nSLA impact\nescalation"] D --> E["Auto-Heal\n\nguardrailed\nretry/kill/\nscale/resume\napproval req"] E --> F["Reporter\n\ndaily digest\nweekly summ\ncost report\nslack/email"]
Stage 1: Collector #
The Collector polls four types of monitoring sources:
| Source Type | What It Reads | Examples |
|---|---|---|
| Schedulers | Job status, duration, dependencies | Airflow, Control-M, dbt Cloud, Databricks Workflows, Snowflake Tasks, AWS Glue, Azure Data Factory |
| Audit Tables | ETL run logs (rows in/out, status, errors) | Custom ETL_AUDIT_LOG tables with configurable column mapping |
| Log Sources | Database and platform logs | Snowflake QUERY_HISTORY, Oracle alert log, PostgreSQL pg_stat_activity |
| Platforms | Health metrics and cost data | Warehouse utilization, query latency percentiles, storage growth, credit consumption |
The Collector never touches business data. It reads only metadata, logs, and system views.
Stage 2: Anomaly Detector #
The Anomaly Detector combines two detection methods:
Statistical Detection — threshold-based rules configured per audit table:
- Row count drop > 20% from trailing 30-day average
- Row count spike > 3x from trailing average
- Duration spike > 2x from trailing average
- Failure rate > 5% over rolling window
- Zero rows for 3+ consecutive runs
LLM-Assisted Detection — the agent's LLM analyzes patterns that statistical rules miss:
- Log messages that look like success but contain warnings
- Gradual degradation trends (5% slower every day for two weeks)
- Unusual combinations of normal individual signals
- Semantic analysis of error messages for root cause hints
Do not rely on LLM detection alone. LLMs hallucinate, and a hallucinated P1 alert at 3 AM erodes trust fast. Always use statistical detection as the primary signal and LLM analysis as enrichment. The DataOps Agent does this by default.
Stage 3: Correlator #
Isolated alerts are noise. The Correlator turns noise into signal by connecting events across sources within a time window.
| Time | Source | Event |
|---|---|---|
| 04:15 | Oracle | ORA-01555 in undo tablespace |
| 04:17 | Airflow | extract_transactions FAILED |
| 04:17 | Audit | ETL_AUDIT_LOG rows_read = 0 |
| 04:18 | Airflow | transform_finance SKIPPED (upstream) |
| 04:18 | Airflow | load_finance_mart SKIPPED (upstream) |
| 04:19 | Snowflake | FINANCE_MART.fact_transactions stale |
Correlated Incident:
- Root Cause: ORA-01555 (undo tablespace exhausted)
- Blast Radius: 4 downstream jobs + 1 mart + dashboard
- SLA Impact: 06:00 ET finance deadline at risk
The correlation declaration in Neam defines the scope, time window, dependency chain, and SLA for a group of related jobs:
Stage 4: Triage #
Triage assigns a severity level (P1 through P4) based on the incident policy. Each severity maps to conditions, response times, escalation windows, and notification channels.
| Severity | Conditions | Response | Escalation |
|---|---|---|---|
| P1 Critical | SLA impact < 30 min, financial data, blast radius > 50 | Immediate | 15 min |
| P2 High | SLA impact < 2 hr, blast radius > 10 | 30 min | 2 hr |
| P3 Medium | Repeated non-transient failure, cost anomaly > 1.5x | 4 hr | Next day |
| P4 Low | Default/informational | Next business day | Email digest |
Stage 5: Auto-Heal #
Auto-Heal is where the DataOps Agent moves from observer to operator. This is also where guardrails are most critical.
flowchart TD A["Is action in allowed_actions list?"] -->|NO| B["Escalate with investigation report"] A -->|YES| C["Is action in requires_approval list?"] C -->|YES| D["Request approval, wait for human"] C -->|NO| E["Are guardrails satisfied?\n\nCost < max_cost_per_action ($100)?\nRetries < max_retries_per_hour (10)?\nOutside deploy/maintenance window?\nDry-run passed?"] E -->|ALL YES| F["Execute action"] E -->|ANY NO| B
The allowed actions are intentionally conservative:
| Action | What It Does | Risk Level |
|---|---|---|
retry_job | Re-trigger a failed scheduler job | Low |
kill_query | Terminate a runaway query | Low |
resume_warehouse | Resume a suspended warehouse | Low |
scale_warehouse_up | Increase warehouse size for throughput | Medium |
Actions that require human approval: alter_schema, disable_job, drop_table.
The DataOps Agent follows the principle of minimal authority. It can retry and kill, but it cannot alter data structures or disable jobs without human approval. This is not a limitation — it is a design choice. Auto-heal should resolve 80% of incidents (transient failures, resource exhaustion). The remaining 20% require human judgment, and the agent provides a complete investigation report to accelerate that judgment.
Stage 6: Reporter #
The Reporter generates three types of reports:
- Daily Digest — overnight incidents, auto-heal actions taken, SLA status
- Weekly Summary — incident trends, top recurring failures, reliability metrics
- Cost Report — platform spend vs. budget, cost anomalies, optimization opportunities
Neam Code: A Complete DataOps Agent #
// ═══════════════════════════════════════════════════════════════
// DataOps Agent — SRE for the SimShop Data Platform
// ═══════════════════════════════════════════════════════════════
// ─── Budget (required — continuous monitoring consumes tokens) ───
budget OpsBudget { cost: 50.00, tokens: 500000, time: 86400000 }
// ─── Schedulers ───
scheduler AirflowProd {
type: "airflow",
connection: env("AIRFLOW_API_URL"),
credentials: vault("airflow/prod/token"),
poll_interval: "30s",
dag_filter: ["sales_*", "finance_*", "marketing_*"],
timezone: "America/New_York"
}
scheduler SnowflakeTasks {
type: "snowflake",
connection: env("SNOWFLAKE_ACCOUNT"),
credentials: vault("snowflake/prod/creds"),
poll_interval: "60s",
database_filter: ["ANALYTICS", "RAW_VAULT"]
}
// ─── Audit Tables ───
audit_table ETLAudit {
source: SnowflakeAnalytics,
table: "OPS.ETL_AUDIT_LOG",
column_map: {
job_id: "JOB_NAME",
timestamp: "RUN_DATE",
status: "STATUS",
status_values: {
success: ["SUCCESS", "COMPLETED"],
failure: ["FAILURE", "FAILED", "ERROR"],
running: ["RUNNING", "IN_PROGRESS"],
skipped: ["SKIPPED", "NO_DATA"]
},
rows_in: "ROWS_READ",
rows_out: "ROWS_WRITTEN",
error: "ERROR_MSG",
duration: "DURATION_SEC",
duration_unit: "seconds"
},
poll_interval: "60s",
lookback_window: "24h",
retention_analysis: "90d",
anomalies: {
row_count_drop: 0.20,
row_count_spike: 3.0,
duration_spike: 2.0,
failure_rate: 0.05,
zero_rows_consecutive: 3
}
}
// ─── Log Sources ───
log_source SnowflakeQueryLog {
type: "snowflake",
connection: env("SNOWFLAKE_ACCOUNT"),
credentials: vault("snowflake/prod/creds"),
views: [
"SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY",
"SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY"
],
poll_interval: "60s",
lookback_window: "24h",
alerts: {
query_timeout: true,
warehouse_credit_spike: 2.0,
long_running_queries: "30m",
full_table_scans: true
}
}
// ─── Platform Health & FinOps ───
platform Warehouse {
type: "snowflake",
connection: env("SNOWFLAKE_ACCOUNT"),
credentials: vault("snowflake/prod/creds"),
health_checks: {
warehouse_utilization: true,
query_perf_p50: "5s",
query_perf_p95: "30s",
query_perf_p99: "120s",
clustering_health: true,
time_travel_usage: true
},
finops: {
daily_budget: 500.00,
warehouse_budgets: {
"TRANSFORM_WH": 200.00,
"ANALYTICS_WH": 150.00,
"LOADING_WH": 100.00
},
auto_suspend_idle: "5m",
auto_kill_queries: "60m",
cost_anomaly_threshold: 2.0
}
}
// ─── Incident Policy ───
incident_policy ProdPolicy {
severity: {
P1_CRITICAL: {
conditions: [
"sla_impact == true AND sla_remaining < 30m",
"data_criticality == 'financial'",
"blast_radius > 50"
],
response: "immediate",
escalation: "15m",
channels: ["pagerduty", "slack_critical"]
},
P2_HIGH: {
conditions: [
"sla_impact == true AND sla_remaining < 2h",
"blast_radius > 10"
],
response: "30m",
escalation: "2h",
channels: ["slack_ops"]
},
P3_MEDIUM: {
conditions: [
"failure_count > 1 AND is_transient == false",
"cost_anomaly > 1.5"
],
response: "4h",
channels: ["slack_ops"]
},
P4_LOW: {
conditions: ["default"],
response: "next_business_day",
channels: ["email_digest"]
}
},
auto_heal: {
enabled: true,
max_auto_retries: 3,
retry_backoff: "exponential",
retry_initial_wait: "30s",
allowed_actions: [
"retry_job", "kill_query",
"resume_warehouse", "scale_warehouse_up"
],
requires_approval: [
"alter_schema", "disable_job", "drop_table"
],
guardrails: {
max_cost_per_action: 100.00,
max_retries_per_hour: 10,
no_actions_during: ["deploy_window", "maintenance_window"],
require_dry_run: true
}
}
}
// ─── Cross-Source Correlation ───
correlation FinancePipeline {
scope: {
schedulers: [AirflowProd],
audit_tables: [ETLAudit],
log_sources: [SnowflakeQueryLog],
job_pattern: ["dag_finance_*"],
table_pattern: ["FINANCE_MART.*", "GL.*"]
},
time_window: "30m",
dependencies: {
"extract_transactions": ["transform_finance"],
"transform_finance": ["load_finance_mart"],
"load_finance_mart": ["refresh_dashboard"]
},
sla: {
deadline: "06:00",
timezone: "America/New_York",
business_days_only: true,
escalation: "slack_finance_team"
}
}
// ─── The DataOps Agent ───
dataops agent PlatformWatch {
provider: "anthropic",
model: "claude-sonnet-4-6",
system: "You are a DataOps SRE agent monitoring a production data platform.",
temperature: 0.1,
platforms: [Warehouse],
schedulers: [AirflowProd, SnowflakeTasks],
audit_tables: [ETLAudit],
log_sources: [SnowflakeQueryLog],
incident_policy: ProdPolicy,
correlations: [FinancePipeline],
budget: OpsBudget,
mode: "continuous",
reports: {
daily_digest: { time: "07:00", channel: "email_ops_team" },
weekly_summary: { day: "Monday", time: "09:00", channel: "slack_ops" },
cost_report: { frequency: "daily", channel: "slack_finops" }
},
agent_md: "./agents/platform_watch.md"
}
// ─── Operational Commands ───
let incident = PlatformWatch.triage()
let report = PlatformWatch.investigate(incident)
PlatformWatch.start_monitor()
SLA Tracking and FinOps #
SLA as Code #
The sla block inside a correlation declaration makes data freshness deadlines executable, not aspirational. The DataOps Agent continuously calculates time remaining until SLA breach and factors this into severity classification.
gantt
title SLA Timeline - Finance Pipeline
dateFormat HH:mm
axisFormat %H:%M
section Phases
Normal window :done, normal, 00:00, 04:47
Incident (ORA-01555) :crit, incident, 04:47, 05:30
Recovery (auto-heal retry ok) :active, recovery, 05:30, 06:00
Buffer :buffer, 06:00, 06:30
section Events
FAIL - ORA-01555 :milestone, 04:47, 0min
AUTO-HEAL retry ok :milestone, 05:30, 0min
SLA DEADLINE :milestone, 06:00, 0min
FinOps Integration #
The finops block inside a platform declaration turns cost management from a monthly spreadsheet exercise into a real-time operational concern:
- Daily Budget — aggregate spend limit across all warehouses
- Per-Warehouse Budget — granular limits for transform, analytics, and loading workloads
- Auto-Suspend — idle warehouses suspended after configurable timeout
- Query Kill — runaway queries terminated after configurable duration
- Cost Anomaly Detection — alerts when spend exceeds 2x the trailing average
Clone the DataSims repository and run the SimShop environment. The 15 ETL pipelines include 10 controlled quality issues that the DataOps Agent can detect, correlate, and remediate. See neam-agents/programs/ for the complete program.
Industry Perspective #
The DataOps Manifesto #
The DataOps Manifesto defines 18 principles for modern data operations. The DataOps Agent implements several directly:
| Manifesto Principle | DataOps Agent Implementation |
|---|---|
| Analytic teams should use version control | Neam programs are code — versioned, reviewed, tested |
| Orchestrate everything | Scheduler integration across 10 platforms |
| Quality is everyone's responsibility | Audit table monitoring with statistical anomaly detection |
| Monitor and alert on system performance | Platform health checks, query latency percentiles |
| Reduce cycle time | Auto-heal reduces MTTR from hours to minutes |
Google SRE for Data #
Google's internal data SRE teams apply the following concepts that the DataOps Agent codifies:
- Error Budgets — the
finops.daily_budgetand SLA deadlines function as error budgets for data freshness and cost - Toil Reduction — auto-heal eliminates the 80% of incidents that are transient retries
- Blameless Post-Mortems — the Reporter generates investigation reports with timeline, root cause, and blast radius — the raw material for post-mortems
The Evidence #
DataSims experiments (see DataSims repository) demonstrate the DataOps Agent's impact:
| Metric | Without DataOps | With DataOps | Improvement |
|---|---|---|---|
| Mean Time to Detect (MTTD) | 47 min (human checks dashboard) | 1.2 min (continuous polling) | 97.4% |
| Mean Time to Resolve (MTTR) | 68 min (human investigation) | 8.3 min (auto-correlation + auto-heal) | 87.8% |
| SLA Breaches / Month | 4.2 | 0.3 | 92.9% |
| False Positive Alerts | 12.1 / day (noisy) | 1.8 / day (correlated) | 85.1% |
| Overnight Pages | 8.4 / month | 1.1 / month (P1 only) | 86.9% |
Ablation A3 (DataOps Agent removed) in the churn prediction experiment showed pipeline failures going undetected for an average of 3.2 hours, compared to 1.2 minutes with the agent active. The cascade from undetected failures to stale model predictions was measurable: model serving latency increased by 340% as the system served increasingly stale predictions from cache.
These results are from controlled DataSims experiments with 10 injected quality issues across 15 ETL pipelines. Production results will vary based on platform complexity, alert volume, and existing monitoring maturity.
Key Takeaways #
- The DataOps Agent is an SRE for data — it monitors schedulers, audit tables, logs, and platform health continuously, not reactively
- The six-stage pipeline (Collector, Anomaly Detector, Correlator, Triage, Auto-Heal, Reporter) transforms isolated alerts into correlated incidents with root cause analysis
- Guardrailed auto-heal follows the principle of minimal authority: retry and kill yes, alter schema no — 80% of incidents are transient and can be resolved automatically
- SLA tracking makes data freshness deadlines executable code, not aspirational documents
- FinOps integration turns cost management from monthly review into real-time operational concern with per-warehouse budgets and anomaly detection
- The DataOps Agent reads logs and metadata — it never touches business data
For Further Exploration #
- Neam Language Reference: DataOps Agent
- DataSims: Simulated Enterprise Environment — 15 ETL pipelines with 10 injected quality issues
- Site Reliability Engineering (Betsy Beyer et al., 2016) — the foundational SRE text
- DataOps Manifesto — 18 principles for data operations
- Fundamentals of Data Engineering (Reis & Housley, 2022) — Chapter 10: Security, Privacy, and the Future of Data Engineering