Chapter 12: The DataOps Agent — SRE for Data #

"Hope is not a strategy. If a human has to watch a dashboard to know something is broken, you've already failed." — Niall Murphy, co-author of Site Reliability Engineering (Google)


25 min read | Priya, Sarah | Part IV: Platform Intelligence Agents

What you'll learn:


The Problem #

It is 4:47 AM on a Tuesday. Priya's phone buzzes. The Airflow DAG dag_finance_daily has failed. She opens her laptop, checks the Airflow UI — task extract_transactions timed out. She SSHs into the Oracle source and finds ORA-01555: snapshot too old. She checks whether the undo tablespace filled up, notices a long-running query from a batch job that someone in analytics kicked off at 2 AM. She kills the query, restarts the DAG, and watches it complete by 6:15 AM — barely making the 6:30 AM SLA.

But that is the easy night. On harder nights, failures cascade. The extract timeout causes the transform to skip, which causes the mart to load zero rows, which causes the CFO's dashboard to show blanks at 7 AM. Priya finds out about the blank dashboard from a Slack message at 8:15 AM — three hours after the root cause.

This is the coordination tax of manual operations: a human must see every alert, correlate every signal, triage every incident, and decide every remediation. Google's SRE team solved this problem for web services a decade ago. Data platforms have been slower to catch up. The DataOps Agent brings SRE to data.


SRE Principles Applied to Data #

Google's Site Reliability Engineering book introduced principles that transformed how the industry operates web services. The DataOps Agent adapts five of those principles for data platforms:

SRE PrincipleWeb ServicesData Equivalent (DataOps Agent)
MonitoringHTTP latency, error ratesPipeline duration, row counts, job status
AlertingPage on SLO breachTriage by severity, blast radius, SLA impact
Incident ResponseOn-call rotation, runbooksAutomated investigation + correlation + escalation
AutomationCanary deploys, auto-rollbackGuardrailed retry, kill-query, scale-warehouse
SLOs/SLAs99.9% availability budgetData freshness deadline, quality threshold
Insight

SRE for data is harder than SRE for web services in one critical way: data failures are often silent. A pipeline that loads zero rows returns exit code 0. A dashboard that shows yesterday's numbers looks identical to one showing today's. The DataOps Agent treats data correctness signals — row counts, schema consistency, freshness — as first-class monitoring metrics, not afterthoughts.


The Six-Stage Processing Pipeline #

Every DataOps Agent follows a six-stage pipeline. Each stage is a distinct responsibility, and the pipeline runs continuously, on schedule, or on demand.

FLOW Six-Stage Processing Pipeline
flowchart LR
  A["Collector\n\nschedulers\naudit tables\nlog sources\nplatforms"] --> B["Anomaly\nDetector\n\nstatistical\n+ LLM-assist"]
  B --> C["Correlator\n\ncross-src\ndependency\ntime-window\nblast radius"]
  C --> D["Triage\n\nP1-P4\nseverity\nSLA impact\nescalation"]
  D --> E["Auto-Heal\n\nguardrailed\nretry/kill/\nscale/resume\napproval req"]
  E --> F["Reporter\n\ndaily digest\nweekly summ\ncost report\nslack/email"]

Stage 1: Collector #

The Collector polls four types of monitoring sources:

Source TypeWhat It ReadsExamples
SchedulersJob status, duration, dependenciesAirflow, Control-M, dbt Cloud, Databricks Workflows, Snowflake Tasks, AWS Glue, Azure Data Factory
Audit TablesETL run logs (rows in/out, status, errors)Custom ETL_AUDIT_LOG tables with configurable column mapping
Log SourcesDatabase and platform logsSnowflake QUERY_HISTORY, Oracle alert log, PostgreSQL pg_stat_activity
PlatformsHealth metrics and cost dataWarehouse utilization, query latency percentiles, storage growth, credit consumption

The Collector never touches business data. It reads only metadata, logs, and system views.

Stage 2: Anomaly Detector #

The Anomaly Detector combines two detection methods:

Statistical Detection — threshold-based rules configured per audit table:

LLM-Assisted Detection — the agent's LLM analyzes patterns that statistical rules miss:

Anti-Pattern

Do not rely on LLM detection alone. LLMs hallucinate, and a hallucinated P1 alert at 3 AM erodes trust fast. Always use statistical detection as the primary signal and LLM analysis as enrichment. The DataOps Agent does this by default.

Stage 3: Correlator #

Isolated alerts are noise. The Correlator turns noise into signal by connecting events across sources within a time window.

Correlation Window: 30 minutes
TimeSourceEvent
04:15OracleORA-01555 in undo tablespace
04:17Airflowextract_transactions FAILED
04:17AuditETL_AUDIT_LOG rows_read = 0
04:18Airflowtransform_finance SKIPPED (upstream)
04:18Airflowload_finance_mart SKIPPED (upstream)
04:19SnowflakeFINANCE_MART.fact_transactions stale

Correlated Incident:

  • Root Cause: ORA-01555 (undo tablespace exhausted)
  • Blast Radius: 4 downstream jobs + 1 mart + dashboard
  • SLA Impact: 06:00 ET finance deadline at risk

The correlation declaration in Neam defines the scope, time window, dependency chain, and SLA for a group of related jobs:

Stage 4: Triage #

Triage assigns a severity level (P1 through P4) based on the incident policy. Each severity maps to conditions, response times, escalation windows, and notification channels.

SeverityConditionsResponseEscalation
P1 CriticalSLA impact < 30 min, financial data, blast radius > 50Immediate15 min
P2 HighSLA impact < 2 hr, blast radius > 1030 min2 hr
P3 MediumRepeated non-transient failure, cost anomaly > 1.5x4 hrNext day
P4 LowDefault/informationalNext business dayEmail digest

Stage 5: Auto-Heal #

Auto-Heal is where the DataOps Agent moves from observer to operator. This is also where guardrails are most critical.

FLOW Auto-Heal Decision Tree
flowchart TD
  A["Is action in allowed_actions list?"] -->|NO| B["Escalate with investigation report"]
  A -->|YES| C["Is action in requires_approval list?"]
  C -->|YES| D["Request approval, wait for human"]
  C -->|NO| E["Are guardrails satisfied?\n\nCost < max_cost_per_action ($100)?\nRetries < max_retries_per_hour (10)?\nOutside deploy/maintenance window?\nDry-run passed?"]
  E -->|ALL YES| F["Execute action"]
  E -->|ANY NO| B

The allowed actions are intentionally conservative:

ActionWhat It DoesRisk Level
retry_jobRe-trigger a failed scheduler jobLow
kill_queryTerminate a runaway queryLow
resume_warehouseResume a suspended warehouseLow
scale_warehouse_upIncrease warehouse size for throughputMedium

Actions that require human approval: alter_schema, disable_job, drop_table.

Insight

The DataOps Agent follows the principle of minimal authority. It can retry and kill, but it cannot alter data structures or disable jobs without human approval. This is not a limitation — it is a design choice. Auto-heal should resolve 80% of incidents (transient failures, resource exhaustion). The remaining 20% require human judgment, and the agent provides a complete investigation report to accelerate that judgment.

Stage 6: Reporter #

The Reporter generates three types of reports:


Neam Code: A Complete DataOps Agent #

NEAM
// ═══════════════════════════════════════════════════════════════
// DataOps Agent — SRE for the SimShop Data Platform
// ═══════════════════════════════════════════════════════════════

// ─── Budget (required — continuous monitoring consumes tokens) ───
budget OpsBudget { cost: 50.00, tokens: 500000, time: 86400000 }

// ─── Schedulers ───
scheduler AirflowProd {
    type: "airflow",
    connection: env("AIRFLOW_API_URL"),
    credentials: vault("airflow/prod/token"),
    poll_interval: "30s",
    dag_filter: ["sales_*", "finance_*", "marketing_*"],
    timezone: "America/New_York"
}

scheduler SnowflakeTasks {
    type: "snowflake",
    connection: env("SNOWFLAKE_ACCOUNT"),
    credentials: vault("snowflake/prod/creds"),
    poll_interval: "60s",
    database_filter: ["ANALYTICS", "RAW_VAULT"]
}

// ─── Audit Tables ───
audit_table ETLAudit {
    source: SnowflakeAnalytics,
    table: "OPS.ETL_AUDIT_LOG",
    column_map: {
        job_id: "JOB_NAME",
        timestamp: "RUN_DATE",
        status: "STATUS",
        status_values: {
            success: ["SUCCESS", "COMPLETED"],
            failure: ["FAILURE", "FAILED", "ERROR"],
            running: ["RUNNING", "IN_PROGRESS"],
            skipped: ["SKIPPED", "NO_DATA"]
        },
        rows_in: "ROWS_READ",
        rows_out: "ROWS_WRITTEN",
        error: "ERROR_MSG",
        duration: "DURATION_SEC",
        duration_unit: "seconds"
    },
    poll_interval: "60s",
    lookback_window: "24h",
    retention_analysis: "90d",
    anomalies: {
        row_count_drop: 0.20,
        row_count_spike: 3.0,
        duration_spike: 2.0,
        failure_rate: 0.05,
        zero_rows_consecutive: 3
    }
}

// ─── Log Sources ───
log_source SnowflakeQueryLog {
    type: "snowflake",
    connection: env("SNOWFLAKE_ACCOUNT"),
    credentials: vault("snowflake/prod/creds"),
    views: [
        "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY",
        "SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY"
    ],
    poll_interval: "60s",
    lookback_window: "24h",
    alerts: {
        query_timeout: true,
        warehouse_credit_spike: 2.0,
        long_running_queries: "30m",
        full_table_scans: true
    }
}

// ─── Platform Health & FinOps ───
platform Warehouse {
    type: "snowflake",
    connection: env("SNOWFLAKE_ACCOUNT"),
    credentials: vault("snowflake/prod/creds"),
    health_checks: {
        warehouse_utilization: true,
        query_perf_p50: "5s",
        query_perf_p95: "30s",
        query_perf_p99: "120s",
        clustering_health: true,
        time_travel_usage: true
    },
    finops: {
        daily_budget: 500.00,
        warehouse_budgets: {
            "TRANSFORM_WH": 200.00,
            "ANALYTICS_WH": 150.00,
            "LOADING_WH": 100.00
        },
        auto_suspend_idle: "5m",
        auto_kill_queries: "60m",
        cost_anomaly_threshold: 2.0
    }
}

// ─── Incident Policy ───
incident_policy ProdPolicy {
    severity: {
        P1_CRITICAL: {
            conditions: [
                "sla_impact == true AND sla_remaining < 30m",
                "data_criticality == 'financial'",
                "blast_radius > 50"
            ],
            response: "immediate",
            escalation: "15m",
            channels: ["pagerduty", "slack_critical"]
        },
        P2_HIGH: {
            conditions: [
                "sla_impact == true AND sla_remaining < 2h",
                "blast_radius > 10"
            ],
            response: "30m",
            escalation: "2h",
            channels: ["slack_ops"]
        },
        P3_MEDIUM: {
            conditions: [
                "failure_count > 1 AND is_transient == false",
                "cost_anomaly > 1.5"
            ],
            response: "4h",
            channels: ["slack_ops"]
        },
        P4_LOW: {
            conditions: ["default"],
            response: "next_business_day",
            channels: ["email_digest"]
        }
    },
    auto_heal: {
        enabled: true,
        max_auto_retries: 3,
        retry_backoff: "exponential",
        retry_initial_wait: "30s",
        allowed_actions: [
            "retry_job", "kill_query",
            "resume_warehouse", "scale_warehouse_up"
        ],
        requires_approval: [
            "alter_schema", "disable_job", "drop_table"
        ],
        guardrails: {
            max_cost_per_action: 100.00,
            max_retries_per_hour: 10,
            no_actions_during: ["deploy_window", "maintenance_window"],
            require_dry_run: true
        }
    }
}

// ─── Cross-Source Correlation ───
correlation FinancePipeline {
    scope: {
        schedulers: [AirflowProd],
        audit_tables: [ETLAudit],
        log_sources: [SnowflakeQueryLog],
        job_pattern: ["dag_finance_*"],
        table_pattern: ["FINANCE_MART.*", "GL.*"]
    },
    time_window: "30m",
    dependencies: {
        "extract_transactions": ["transform_finance"],
        "transform_finance": ["load_finance_mart"],
        "load_finance_mart": ["refresh_dashboard"]
    },
    sla: {
        deadline: "06:00",
        timezone: "America/New_York",
        business_days_only: true,
        escalation: "slack_finance_team"
    }
}

// ─── The DataOps Agent ───
dataops agent PlatformWatch {
    provider: "anthropic",
    model: "claude-sonnet-4-6",
    system: "You are a DataOps SRE agent monitoring a production data platform.",
    temperature: 0.1,

    platforms: [Warehouse],
    schedulers: [AirflowProd, SnowflakeTasks],
    audit_tables: [ETLAudit],
    log_sources: [SnowflakeQueryLog],

    incident_policy: ProdPolicy,
    correlations: [FinancePipeline],
    budget: OpsBudget,

    mode: "continuous",

    reports: {
        daily_digest: { time: "07:00", channel: "email_ops_team" },
        weekly_summary: { day: "Monday", time: "09:00", channel: "slack_ops" },
        cost_report: { frequency: "daily", channel: "slack_finops" }
    },

    agent_md: "./agents/platform_watch.md"
}

// ─── Operational Commands ───
let incident = PlatformWatch.triage()
let report = PlatformWatch.investigate(incident)
PlatformWatch.start_monitor()

SLA Tracking and FinOps #

SLA as Code #

The sla block inside a correlation declaration makes data freshness deadlines executable, not aspirational. The DataOps Agent continuously calculates time remaining until SLA breach and factors this into severity classification.

TIMELINE SLA Timeline: Finance Pipeline
gantt
  title SLA Timeline - Finance Pipeline
  dateFormat HH:mm
  axisFormat %H:%M
  section Phases
    Normal window           :done, normal, 00:00, 04:47
    Incident (ORA-01555)    :crit, incident, 04:47, 05:30
    Recovery (auto-heal retry ok) :active, recovery, 05:30, 06:00
    Buffer                  :buffer, 06:00, 06:30
  section Events
    FAIL - ORA-01555        :milestone, 04:47, 0min
    AUTO-HEAL retry ok      :milestone, 05:30, 0min
    SLA DEADLINE            :milestone, 06:00, 0min

FinOps Integration #

The finops block inside a platform declaration turns cost management from a monthly spreadsheet exercise into a real-time operational concern:

Try It

Clone the DataSims repository and run the SimShop environment. The 15 ETL pipelines include 10 controlled quality issues that the DataOps Agent can detect, correlate, and remediate. See neam-agents/programs/ for the complete program.


Industry Perspective #

The DataOps Manifesto #

The DataOps Manifesto defines 18 principles for modern data operations. The DataOps Agent implements several directly:

Manifesto PrincipleDataOps Agent Implementation
Analytic teams should use version controlNeam programs are code — versioned, reviewed, tested
Orchestrate everythingScheduler integration across 10 platforms
Quality is everyone's responsibilityAudit table monitoring with statistical anomaly detection
Monitor and alert on system performancePlatform health checks, query latency percentiles
Reduce cycle timeAuto-heal reduces MTTR from hours to minutes

Google SRE for Data #

Google's internal data SRE teams apply the following concepts that the DataOps Agent codifies:


The Evidence #

DataSims experiments (see DataSims repository) demonstrate the DataOps Agent's impact:

MetricWithout DataOpsWith DataOpsImprovement
Mean Time to Detect (MTTD)47 min (human checks dashboard)1.2 min (continuous polling)97.4%
Mean Time to Resolve (MTTR)68 min (human investigation)8.3 min (auto-correlation + auto-heal)87.8%
SLA Breaches / Month4.20.392.9%
False Positive Alerts12.1 / day (noisy)1.8 / day (correlated)85.1%
Overnight Pages8.4 / month1.1 / month (P1 only)86.9%

Ablation A3 (DataOps Agent removed) in the churn prediction experiment showed pipeline failures going undetected for an average of 3.2 hours, compared to 1.2 minutes with the agent active. The cascade from undetected failures to stale model predictions was measurable: model serving latency increased by 340% as the system served increasingly stale predictions from cache.

Note

These results are from controlled DataSims experiments with 10 injected quality issues across 15 ETL pipelines. Production results will vary based on platform complexity, alert volume, and existing monitoring maturity.


Key Takeaways #

For Further Exploration #