Context Propagation

Context flows through data systems as queryable metadata that validates decisions at each execution point.

Context Flow

A pipeline filters user events to exclude incomplete records. An agent queries the same table to analyze behavior. A dashboard displays event metrics. Each system needs to know what "valid event data" means. When quality rules change, all three systems must adapt identically or produce inconsistent results.

1. Define rules (once)

2. Store as structured metadata

3. Query at decision points

4. Validate before execution

5. Update rules without code changes

Example: Data Quality Context

1. Define:

rule: user_event_quality
version: "1.3"
valid_from: "2024-03-01"
required_fields:
  - session_id
  - user_id
  - event_type
  - timestamp
completeness_threshold: 0.95
max_null_percentage: 5

2. Query in Pipeline:

def process_user_events():
    rules = context.get("user_event_quality")

    # Filter records that meet quality thresholds
    return db.query(f"""
        SELECT * FROM raw_events
        WHERE {' AND '.join([f"{f} IS NOT NULL" for f in rules.required_fields])}
    """)

3. Query in Agent:

def agent_analyze_events(query):
    # Agent generates SQL
    sql = llm.generate_sql(query)

    # Validate SQL includes quality filters
    rules = context.get("user_event_quality")
    if not rules.validate_sql(sql):
        raise ValidationError("Query missing required quality filters")

    return db.query(sql)

4. Query in Dashboard:

def dashboard_event_metrics():
    rules = context.get("user_event_quality")

    # Dashboard uses same quality rules as pipeline and agent
    return metric_widget(
        source="raw_events",
        required_fields=rules.required_fields,
        completeness_threshold=rules.completeness_threshold
    )

What This Enables

Pipeline, agent, and dashboard use identical logic

Update rule once, all systems adapt

No SQL changes needed when business rules change

Agents validate they're using current definitions

Implementation Pattern

# Context provider
class Context:
    def get(self, rule_name, date=None):
        # Fetch current or historical rule
        rule = self.store.get(rule_name, effective_date=date)
        return ExecutableRule(rule)

# Executable rule
class ExecutableRule:
    def validate_sql(self, sql):
        # Check if SQL follows current rules
        pass

    def build_conditions(self):
        # Generate SQL WHERE clause from rules
        pass

    def is_valid_for_period(self, start, end):
        # Check if rule applies to date range
        pass

Context becomes infrastructure that pipelines, agents, and applications query before making decisions. This prevents the problems that occur when orchestration tools execute tasks without validating current business state.