Dynamic Logic in Workflows

Dynamic logic checks current business rules before execution rather than hardcoding assumptions.

Static Logic

During schema migrations, tables contain records in both old and new formats. Hardcoded join logic breaks when different records require different handling.

def join_orders_to_customers():
    # Assumes single schema version
    return db.query("""
        SELECT o.*, c.email, c.name
        FROM orders o
        JOIN customers c ON o.customer_uuid = c.uuid
    """)

Dynamic Logic

def join_orders_to_customers():
    # Query current schema state
    schema_rules = context.get("orders_schema")

    # Migration in progress: some records use legacy customer_id, others use customer_uuid
    if schema_rules.migration_status == "in_progress":
        return db.query("""
            SELECT o.*, c.email, c.name
            FROM orders o
            JOIN customers c ON
                CASE
                    WHEN o.schema_version = 'v1' THEN o.customer_id = c.id
                    WHEN o.schema_version = 'v2' THEN o.customer_uuid = c.uuid
                END
        """)
    else:
        # Migration complete, use new schema only
        return db.query("""
            SELECT o.*, c.email, c.name
            FROM orders o
            JOIN customers c ON o.customer_uuid = c.uuid
        """)

What Changed

The function now:

Queries current schema migration state before executing

Applies different join logic based on record schema version

Switches to new schema automatically when migration completes

As the migration progresses, the join logic adapts without code changes. This approach prevents static pipelines from breaking when schema versions change.

Common Patterns

Conditional execution:

rules = context.get("customer_segmentation")
if rules.enabled:
    apply_segmentation(data, rules.parameters)

Fallback logic:

primary_source = context.get("authoritative_source")
if not primary_source.available:
    use_fallback(primary_source.fallback_config)

Validation before joins:

join_rules = context.get("customer_order_join")
if not join_rules.validate(orders_df, customers_df):
    raise JoinValidationError(join_rules.errors)
# Prevents edge cases like missing customer records

This validation catches edge cases before they break pipelines.

When to Use

Apply dynamic logic when:

Business rules change independently of code

Multiple systems need consistent rule enforcement