Chapter 8: The Data Agent — Source, Schema, Pipeline #
"The best data engineers don't move data. They declare what data should look like, where it should flow, and what 'good' means — then let the system figure out the rest." — Maxime Beauchemin, creator of Apache Airflow
Reading time: 25 min | Personas: Priya (Data Engineer), Marcus (Data Scientist), David (VP of Data) | Part III: Data Infrastructure Agents
What You'll Learn #
- How to declare data sources across JDBC, API, file, and streaming connectors
- How to define typed schema contracts with field-level constraints
- How to build Extract / Transform / Load pipeline stages in Neam
- How quality gates enforce data correctness before downstream consumption
- How compute routing moves workloads across local, Spark, and Snowflake engines
- How automatic lineage and catalog registration work without manual effort
The Problem: Priya's 3 AM Pager #
Priya is a Senior Data Engineer at a mid-size e-commerce company. Last Tuesday at 3 AM, her phone buzzed: the customer churn model was producing nonsensical predictions. After two hours of investigation, she traced the problem to a silent schema change in the upstream CRM system. A column that once held ISO country codes now contained free-text country names. No validation caught it. No alert fired. The pipeline happily loaded garbage into the warehouse, the feature pipeline built features on garbage, and the model scored garbage.
The root cause was not technical complexity. It was the absence of a contract. Nobody had declared what country_code was supposed to look like, what was allowed, or what should happen when the contract was violated.
Priya's problem is universal. According to Gartner, poor data quality costs organizations an average of $12.9 million per year. The Data Agent exists to make that class of failure structurally impossible.
The Data Agent: Foundation of the Data Lifecycle #
The Data Agent is Neam's fourth agent type, purpose-built for data lifecycle operations. Unlike general-purpose agents that produce text, Data Agents perceive sources, validate schemas, execute pipelines, enforce quality, track lineage, and register assets in a catalog — all within Neam's compiled, type-safe model.
flowchart LR A["PERCEIVE"] --> B["PLAN"] --> C["EXTRACT"] --> D["TRANSFORM"] --> E["LOAD"] --> F["VALIDATE"] A1["Connect to sources\nDiscover schemas\nProfile data"] -.-> A B1["Build pipeline\nstrategy"] -.-> B C1["Pull data\nfrom sources"] -.-> C D1["Apply business rules\nClean, conform"] -.-> D E1["Write to\nsinks"] -.-> E F1["Run quality gates\nRecord lineage\nRegister in catalog"] -.-> F
When to Use a Data Agent #
Use a data agent when you need generic data lifecycle operations: ingesting from diverse sources, validating schemas, running E/T/L pipelines, enforcing quality, and routing compute. If you specifically need SQL-first warehouse modeling (star schemas, semantic layers, NL2SQL), use the etl agent instead (Chapter 9). The etl agent inherits everything the data agent can do and adds warehouse intelligence.
Declaring Sources #
A source tells the Data Agent where data lives and how to connect. Neam supports four connector types, each with its own connection semantics.
// ── PostgreSQL transactional database ──
source CRM_Database {
type: "postgres",
connection: env("CRM_DB_URL"),
refresh: "5m",
classification: "pii"
}
// ── S3 data lake with partitioned Parquet files ──
source ClickstreamLake {
type: "s3",
connection: "s3://analytics-raw/clickstream/",
format: "parquet",
partition_by: ["year", "month", "day"]
}
// ── REST API with hourly refresh ──
source PaymentGateway {
type: "http",
connection: "https://api.stripe.com/v1/charges",
format: "json",
refresh: "1h"
}
// ── Kafka stream for real-time events ──
source UserEvents {
type: "kafka",
connection: "kafka://broker:9092/user.activity",
mode: "stream"
}
| Connector | Protocol | Connection Format | Use Case |
|---|---|---|---|
postgres | JDBC | pg://host/database or env(...) | Transactional databases |
s3 | Object storage | s3://bucket/prefix/ | Data lakes, file-based |
http | REST | https://endpoint | APIs, webhooks |
kafka | Streaming | kafka://broker:port/topic | Real-time event streams |
Insight: The
env("VAR_NAME")function reads credentials from environment variables at runtime, keeping secrets out of source code. This is the recommended pattern for all connection strings in production.
Typed Schema Contracts #
Schemas are the backbone of data quality in Neam. A schema is a compile-time contract that declares the shape, types, and constraints of data flowing through the system. Unlike documentation that drifts from reality, schemas are enforced by the compiler and the runtime.
Field Types #
| Type | Description | Example |
|---|---|---|
string | Text data | name: string |
int | Integer numbers | age: int |
float | Floating-point numbers | balance: float |
bool | Boolean values | active: bool |
datetime | Date and time | created_at: datetime |
Constraint Annotations #
Constraints are attached to fields with the @ prefix. Multiple constraints can be chained on a single field.
schema CustomerSchema {
version: 2,
// Identity
customer_id: string @primary_key,
// Contact (with validation)
email: string @pattern("^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$") @unique @not_null,
name: string @length(1, 200) @not_null,
// Demographics (with range checks)
age: int @range(0, 150),
country_code: string @length(2, 3) @not_null,
status: string @enum(["active", "inactive", "suspended"]),
// Financial (with positivity)
lifetime_value: float @positive,
// Temporal (with defaults)
created_at: datetime @default(now()),
updated_at: datetime @default(now())
}
| Constraint | Syntax | What It Enforces |
|---|---|---|
@primary_key | id: int @primary_key | Unique + not null |
@foreign_key | cust_id: string @foreign_key(CustomerSchema) | Referential integrity |
@unique | email: string @unique | No duplicates |
@not_null | name: string @not_null | Non-null values |
@positive | revenue: float @positive | Must be > 0 |
@range | age: int @range(0, 150) | Numeric bounds (min <= max) |
@enum | status: string @enum(["active", "inactive"]) | Allowed value set |
@pattern | email: string @pattern("^.+@.+$") | Regex match |
@length | name: string @length(1, 200) | String length bounds |
@default | created: datetime @default(now()) | Default value |
Anti-Pattern: Declaring schemas without constraints. A schema with only types and no
@not_null,@range, or@patternconstraints is just documentation — it won't catch the kind of silent corruption that woke Priya at 3 AM. Be specific about what "valid" means.
Cross-Schema References #
Foreign keys create compile-time validated relationships between schemas:
schema OrderSchema {
order_id: string @primary_key,
customer_id: string @foreign_key(CustomerSchema),
product_id: string @foreign_key(ProductSchema),
quantity: int @positive,
amount: float @positive,
order_date: datetime @not_null
}
The compiler verifies that CustomerSchema and ProductSchema exist and that the referenced fields are primary keys. At runtime, the quality gate validates referential integrity.
E/T/L Pipeline Stages #
The Data Agent's pipeline is a three-stage process: Extract, Transform, Load. Each stage is declarative — you specify what should happen, not how to implement it.
data agent CustomerPipeline {
provider: "openai", model: "gpt-4o-mini",
system: "You are a data pipeline engineer.",
temperature: 0.1,
sources: [CRM_Database, ClickstreamLake],
pipeline: {
extract: {
from: [CRM_Database, ClickstreamLake],
schema: CustomerSchema,
mode: "incremental",
watermark: "updated_at"
},
transform: {
// Cleaning rules
deduplicate: { key: ["customer_id"], strategy: "latest" },
cast: {
age: "int",
lifetime_value: "float"
},
derive: {
days_since_signup: "DATEDIFF(now(), created_at)",
is_high_value: "lifetime_value > 1000.0"
},
filter: "status != 'deleted'"
},
load: {
target: CustomerWarehouse,
mode: "upsert",
key: ["customer_id"],
schema: CustomerSchema
}
},
quality: CustomerQuality,
lineage: true,
catalog: DataCatalog
}
Pipeline Flow #
flowchart TD E["EXTRACT\nCRM_Database\nClickstream Lake"] --> T["TRANSFORM\nDeduplicate\nCast types\nDerive columns\nFilter deleted"] T --> L["LOAD\nUpsert into warehouse\nValidate schema"] L --> Q["QUALITY GATE\nCompleteness\nUniqueness\nFreshness\nReferential integrity"] Q -->|PASS| R["Register in catalog"] Q -->|FAIL| A["Alert + block"]
Quality Gates #
Quality gates are the Data Agent's enforcement mechanism. They run after load and before downstream consumption. A gate can be advisory (log and continue) or blocking (halt the pipeline).
quality CustomerQuality {
level: "strict",
// Freshness: data must be < 1 hour old
freshness: "1h",
// Completeness: >= 98% non-null on required fields
completeness: 0.98,
// Uniqueness: no duplicate customer_ids
uniqueness: ["customer_id"],
// Referential integrity across schemas
referential_integrity: true,
// Statistical drift detection
drift_detection: true,
// What happens on failure
on_fail: "gate_and_alert"
}
| Check | What It Validates | Failure Behavior |
|---|---|---|
freshness | Data age vs. SLA | Block if stale |
completeness | Non-null ratio per field | Block below threshold |
uniqueness | Duplicate detection on key columns | Block on duplicates |
referential_integrity | Foreign key validity | Block on orphans |
drift_detection | Statistical distribution shift | Alert (advisory) |
Try It: In the DataSims environment, SimShop intentionally injects quality issues starting at month 13: 3% null values, 1% duplicates, and 2% late-arriving data. Run a Data Agent against the
simshop_oltp.customerstable and watch the quality gate catch these injected defects.
Compute Routing #
Not all data fits in memory. The Data Agent's compute routing automatically dispatches workloads to the right engine based on data volume, complexity, and cost.
compute LocalEngine {
engine: "local",
max_rows: 100000
}
compute SparkCluster {
engine: "spark",
cluster: env("SPARK_MASTER_URL"),
executors: 8,
memory: "16g"
}
compute SnowflakeWH {
engine: "snowflake",
account: env("SNOWFLAKE_ACCOUNT"),
warehouse: "ANALYTICS_WH",
database: "ANALYTICS",
role: "TRANSFORM_ROLE"
}
Routing Decision Tree #
flowchart TD Q["Data volume?"] Q -->|"< 100K rows"| L["LocalEngine\n(in-process, zero cost)"] Q -->|"100K - 10M rows"| S["SparkCluster\n(distributed, moderate cost)"] Q -->|"> 10M rows"| W["SnowflakeWH\n(cloud warehouse, pay-per-query)"]
The Data Agent selects compute based on the max_rows threshold of each engine and falls back to larger engines when smaller ones would exceed capacity. You declare the engines; the agent routes.
Insight: Compute routing is one of the Data Agent's most cost-effective features. Running a 50-row validation query against a Snowflake XL warehouse costs $0.02. Running it locally costs $0. Over thousands of pipeline runs per day, routing small operations locally saves significant compute spend.
Automatic Lineage Recording #
Every data movement the Data Agent performs is automatically recorded as a lineage edge. Lineage answers three questions:
- Where did this data come from? (upstream provenance)
- What transformations were applied? (processing history)
- What downstream systems depend on this data? (impact analysis)
data agent TrackedPipeline {
provider: "openai", model: "gpt-4o-mini",
sources: [CRM_Database],
pipeline: {
extract: { from: [CRM_Database], schema: CustomerSchema },
transform: { deduplicate: { key: ["customer_id"] } },
load: { target: CustomerWarehouse, mode: "upsert", key: ["customer_id"] }
},
// Enable automatic lineage recording
lineage: true
}
When lineage: true is set, the agent records:
flowchart LR A["CRM_Database\nsource_type: postgres\ntimestamp: 2026-03-20"] -->|extract| B["stg_customers\ndedup applied\nrows_in: 105,230"] B -->|transform| C["int_customers\nderive: days_since_signup\nrows_out: 104,890"] C -->|load| D["CustomerWarehouse\ntarget: snowflake\nschema: v2"]
This lineage graph is queryable. When the CRM schema changes, you can trace forward to see every downstream table, feature, and model affected — answering the question "what breaks if this column changes?" in seconds rather than hours.
Catalog Registration #
The Data Agent can automatically register processed datasets in a data catalog, making them discoverable by other agents, analysts, and tools.
catalog DataCatalog {
type: "unity",
connection: env("UNITY_CATALOG_URI"),
auto_register: true,
tags: ["customer", "churn", "pipeline-managed"]
}
When auto_register: true, the Data Agent registers each pipeline output with:
- Schema metadata (columns, types, constraints)
- Lineage graph (upstream sources, transformations)
- Quality scores (latest gate results)
- Freshness timestamp
- Tags for discovery
Full Working Example: SimShop Customer Pipeline #
This complete example declares every component needed for a production customer data pipeline, using the DataSims SimShop environment.
// ════════════════════════════════════════════════
// SimShop Customer Pipeline — Full Data Agent
// ════════════════════════════════════════════════
// ── Budget ──
budget PipelineBudget { cost: 5.00, tokens: 50000 }
// ── Schema Contract ──
schema SimShopCustomer {
version: 1,
customer_id: string @primary_key,
email: string @pattern("^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$") @unique,
first_name: string @not_null @length(1, 100),
last_name: string @not_null @length(1, 100),
date_of_birth: datetime,
country_code: string @length(2, 3) @not_null,
signup_date: datetime @not_null,
status: string @enum(["active", "inactive", "churned"]),
lifetime_value: float @positive,
segment: string @enum(["premium", "standard", "basic"]),
last_activity: datetime
}
schema SimShopOrder {
order_id: string @primary_key,
customer_id: string @foreign_key(SimShopCustomer),
order_date: datetime @not_null,
total_amount: float @positive,
item_count: int @positive,
channel: string @enum(["web", "mobile", "app"]),
status: string @enum(["completed", "pending", "cancelled", "returned"])
}
// ── Sources ──
source SimShopOLTP {
type: "postgres",
connection: env("SIMSHOP_PG_URL"),
refresh: "5m",
schema: SimShopCustomer,
classification: "pii"
}
// ── Compute Engines ──
compute LocalDev {
engine: "local",
max_rows: 50000
}
compute SimShopWarehouse {
engine: "postgres",
connection: env("SIMSHOP_PG_URL"),
database: "simshop"
}
// ── Quality Gate ──
quality CustomerQualityGate {
level: "strict",
freshness: "1h",
completeness: 0.98,
uniqueness: ["customer_id"],
referential_integrity: true,
drift_detection: true,
on_fail: "gate_and_alert"
}
// ── Catalog ──
catalog SimShopCatalog {
type: "unity",
connection: env("UNITY_CATALOG_URI"),
auto_register: true,
tags: ["simshop", "customer", "pipeline-managed"]
}
// ── The Data Agent ──
data agent SimShopCustomerPipeline {
provider: "openai",
model: "gpt-4o-mini",
system: "You are a data pipeline engineer for SimShop e-commerce.",
temperature: 0.1,
sources: [SimShopOLTP],
pipeline: {
extract: {
from: [SimShopOLTP],
schema: SimShopCustomer,
mode: "incremental",
watermark: "last_activity"
},
transform: {
deduplicate: { key: ["customer_id"], strategy: "latest" },
cast: {
lifetime_value: "float",
date_of_birth: "datetime"
},
derive: {
days_since_signup: "DATEDIFF(now(), signup_date)",
days_since_activity: "DATEDIFF(now(), last_activity)",
is_high_value: "lifetime_value > 500.0"
},
filter: "status != 'deleted'"
},
load: {
target: SimShopWarehouse,
mode: "upsert",
key: ["customer_id"],
schema: SimShopCustomer
}
},
quality: CustomerQualityGate,
lineage: true,
catalog: SimShopCatalog,
budget: PipelineBudget
}
// ── Execute ──
let result = run_pipeline(SimShopCustomerPipeline)
print(result)
What Happens at Runtime #
- Extract: The agent connects to
SimShopOLTP, reads rows wherelast_activity> last watermark - Transform: Deduplicates on
customer_id, casts types, derivesdays_since_signupandis_high_value - Load: Upserts into
SimShopWarehousewith schema validation againstSimShopCustomer - Quality Gate: Runs freshness, completeness, uniqueness, referential integrity, and drift checks
- Lineage: Records the full source-to-target provenance graph
- Catalog: Registers the output dataset in Unity Catalog with metadata and quality scores
Industry Perspective #
The Data Agent's design draws from three industry trends:
Data Contracts (popularized by Andrew Jones, 2022): The idea that producers and consumers of data should agree on a typed schema with SLAs. Neam schemas are data contracts that the compiler enforces at build time and the runtime enforces at run time.
Data Mesh (Zhamak Dehghani, 2019): The principle that domain teams should own their data products. Each Data Agent is a self-contained data product with its own source, schema, quality, and catalog registration — exactly the "data product" primitive that Data Mesh describes but rarely implements.
DataOps (DataKitchen, 2018): The practice of applying DevOps principles to data pipelines. The Data Agent's quality gates, lineage, and catalog registration automate the testing, monitoring, and documentation that DataOps mandates.
The Evidence #
The DataSims environment provides a controlled testbed for Data Agent evaluation. SimShop's 164 tables across 12 PostgreSQL schemas, with 10 intentionally injected quality issues, test every capability described in this chapter:
- Schema enforcement: Month 18 introduces a column rename in
simshop_oltp.events. The Data Agent's schema contract catches this immediately. - Quality gates: Months 13-18 inject 3% null values and 1% duplicates. The quality gate catches both within one pipeline run.
- Lineage: When the Data Agent traces the impact of the schema change, it identifies all 15 downstream ETL pipelines affected.
- Compute routing: With the
smallscale (10K customers), the agent routes toLocalDev. Atlargescale (1M customers), it routes to the warehouse.
To reproduce: see the DataSims Quick Start.
For the complete language reference, see Neam: The AI-Native Programming Language.
Key Takeaways #
- Sources declare where data lives — JDBC, S3, HTTP, Kafka — with credentials kept in environment variables via
env(). - Schemas are typed contracts with field-level constraints (
@primary_key,@not_null,@pattern,@range,@enum) that the compiler validates and the runtime enforces. - Pipelines are three-stage (Extract, Transform, Load) and declarative: you state what should happen, not how.
- Quality gates block bad data from reaching downstream systems, with configurable checks for freshness, completeness, uniqueness, referential integrity, and statistical drift.
- Compute routing automatically dispatches workloads to the cheapest capable engine (local, Spark, or warehouse).
- Lineage and catalog registration happen automatically when enabled, making every pipeline output discoverable and traceable.