Elasticsearch Mode Architecture (MODE 2)

Data Index with Elasticsearch storage uses ES Transforms for continuous normalization, enabling full-text search, complex aggregations, and horizontal scaling.

Status: Production Ready

MODE 2 provides the same GraphQL API as MODE 1 (PostgreSQL), but with Elasticsearch as the storage backend.

Overview

Elasticsearch MODE 2 uses Elasticsearch Transforms for real-time event normalization. Raw events from FluentBit are continuously aggregated into normalized indices, providing sub-second query performance with full-text search capabilities.

Key advantages:

  • Full-text search on workflow and task data

  • Complex aggregations and analytics

  • Horizontal scaling (shard-based distribution)

  • Auto-lifecycle management (ILM policies)

  • Multi-tenancy via index-per-tenant pattern

When to use:

  • Need full-text search capabilities

  • Require complex aggregations or analytics

  • Large-scale deployments (1M+ workflows)

  • Prefer Elasticsearch ecosystem

  • Want auto-scaling storage with ILM

Architecture Diagram

Quarkus Flow App
    ↓ (structured logging → stdout)
FluentBit DaemonSet
    ↓ (tail Kubernetes logs, filter JSON events)
Elasticsearch Raw Indices
    ↓ (ES Transform - continuous, 1s frequency)
Elasticsearch Normalized Indices
    ↓ (Elasticsearch Java Client)
GraphQL API (SmallRye GraphQL)

Data Flow

  1. Quarkus Flow emits events - Structured JSON logs to stdout

  2. FluentBit collects - Tails Kubernetes container logs, filters events by eventType field

  3. Routes by event type - Workflow events vs task events

  4. INSERT to raw indices - Date-based indices with ILM policies

    • workflow-events-2026.04.27

    • task-events-2026.04.27

  5. ES Transform processes - Continuous aggregation (1s frequency)

    • Groups events by instance ID

    • Applies field-level idempotency rules

    • Filters recent events + active workflows only

  6. Write to normalized indices - Single document per workflow/task

    • workflow-instances

    • task-executions

  7. GraphQL queries - Via Elasticsearch Java Client

ES Transform-Based Normalization

Elasticsearch Transforms provide continuous aggregation of raw events into normalized documents. This replaces the need for a separate Event Processor service.

Why ES Transforms?

FluentBit’s architecture (direct Elasticsearch output) makes ES Transforms the natural choice:

  • Built-in Elasticsearch feature - No additional services needed

  • Continuous processing - Runs automatically every 1 second

  • Painless scripting - Transform logic inside Elasticsearch

  • Horizontal scaling - Scales with Elasticsearch cluster

  • Fault-tolerant - Built-in checkpointing and retry

Transform Configuration

Transforms are defined in JSON and applied via ElasticsearchSchemaInitializer:

Workflow instances transform:

{
  "source": {
    "index": "workflow-events-*",
    "query": { /* smart filtering */ }
  },
  "dest": {
    "index": "workflow-instances"
  },
  "frequency": "1s",
  "sync": {
    "time": {
      "field": "@timestamp",
      "delay": "0s"
    }
  },
  "pivot": {
    "group_by": {
      "id": { "terms": { "field": "instanceId.keyword" } }
    },
    "aggregations": { /* field-level idempotency */ }
  }
}

Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/transforms/workflow-instances-transform.json

Field-Level Idempotency

ES Transforms use scripted metrics to handle out-of-order events with field-specific rules:

Immutable Fields (First Value Wins)

These fields never change after initial creation:

  • name, version, namespace - Workflow metadata

  • start - First event timestamp

  • input - Initial workflow/task input data

Implementation (name example):

{
  "name": {
    "terms": {
      "field": "workflowName.keyword",
      "size": 1
    }
  }
}

Implementation (input - first-wins with event timestamp):

{
  "input": {
    "scripted_metric": {
      "init_script": "state.data = null; state.ts = 'ZZZZ'",
      "map_script": "if (params._source.input != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) < 0) { state.data = params._source.input; state.ts = ts } }",
      "combine_script": "return state",
      "reduce_script": "def earliest = ['data': null, 'ts': 'ZZZZ']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(earliest.ts) < 0) { earliest = s } } return earliest.data"
    }
  }
}

Terminal Fields (Latest Non-Null Wins)

These fields update with the most recent event:

  • end - Latest completion timestamp

  • output - Latest output data (tracks event timestamp)

  • error - Latest error (tracks event timestamp)

Implementation (output - last-wins with event timestamp):

{
  "output": {
    "scripted_metric": {
      "init_script": "state.data = null; state.ts = ''",
      "map_script": "if (params._source.output != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) > 0) { state.data = params._source.output; state.ts = ts } }",
      "combine_script": "return state",
      "reduce_script": "def latest = ['data': null, 'ts': '']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(latest.ts) > 0) { latest = s } } return latest.data"
    }
  }
}

Implementation (error - last-wins with event timestamp):

{
  "error": {
    "scripted_metric": {
      "init_script": "state.data = null; state.ts = ''",
      "map_script": "if (params._source.error != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) > 0) { state.data = params._source.error; state.ts = ts } }",
      "combine_script": "return state",
      "reduce_script": "def latest = ['data': null, 'ts': '']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(latest.ts) > 0) { latest = s } } return latest.data"
    }
  }
}

Status Field (Terminal State Precedence)

Status prioritizes terminal states over transient ones using a priority-based aggregation:

Priority order:

  1. Terminal states: COMPLETED, FAULTED, CANCELLED, FAILED (priority 3)

  2. Transient states: RUNNING, SUSPENDED (priority 2)

  3. Initial states: Others (priority 1)

Implementation (workflows):

{
  "status": {
    "terms": {
      "field": "status.keyword",
      "size": 1,
      "order": {
        "priority": "desc"
      }
    },
    "aggs": {
      "priority": {
        "max": {
          "script": {
            "source": "String s = doc['status.keyword'].value; if (s == 'COMPLETED' || s == 'FAULTED' || s == 'CANCELLED') { return 3; } else if (s == 'RUNNING' || s == 'SUSPENDED') { return 2; } else { return 1; }"
          }
        }
      }
    }
  }
}

Implementation (tasks):

{
  "status": {
    "scripted_metric": {
      "init_script": "state.status = null; state.priority = 0",
      "map_script": "String s = params._source.status; int p = (s == 'COMPLETED' || s == 'FAULTED' || s == 'FAILED') ? 2 : (s == 'RUNNING' ? 1 : 0); if (p > state.priority) { state.status = s; state.priority = p }",
      "combine_script": "return state",
      "reduce_script": "def max = ['status': null, 'priority': 0]; for (s in states) { if (s != null && s.priority > max.priority) { max = s } } return max.status"
    }
  }
}

This ensures that:

  • Once a workflow/task reaches a terminal state, late-arriving RUNNING events won’t overwrite it

  • Original status values from events are preserved (e.g., FAILED stays FAILED, not converted)

  • Out-of-order event processing doesn’t affect final state correctness

Transform Processing

Transforms use a match_all query to process all events, with continuous sync:

{
  "source": {
    "index": "workflow-events-*",
    "query": {
      "match_all": {}
    }
  }
}

Processing characteristics:

  • Continuous sync - Processes all new events within 1 second

  • Incremental - Only processes events since last checkpoint

  • Idempotent - Field-level idempotency ensures correctness

  • Fault-tolerant - Automatic checkpointing and retry

Benefits:

  • Simpler configuration - no complex filtering logic

  • Handles all edge cases correctly (late-arriving events, out-of-order delivery)

  • Built-in Elasticsearch fault tolerance

  • Transparent performance as data grows (checkpointing mechanism)

Index Structure

Raw Event Indices

Date-based indices store complete events with ILM lifecycle:

workflow-events-YYYY.MM.DD:

  • Contains all workflow lifecycle events

  • Flattened input/output fields (queryable JSON)

  • ILM policy: 7-day retention, auto-delete

  • Template: workflow-events.json

task-events-YYYY.MM.DD:

  • Contains all task execution events

  • Same structure as workflow events

  • ILM policy: 7-day retention

  • Template: task-events.json

Index template structure (workflow-events):

{
  "index_patterns": ["workflow-events-*"],
  "template": {
    "settings": {
      "index.lifecycle.name": "data-index-events-retention",
      "number_of_shards": 3,
      "number_of_replicas": 1
    },
    "mappings": {
      "properties": {
        "eventType": { "type": "keyword" },
        "instanceId": { "type": "keyword" },
        "workflowName": { "type": "keyword" },
        "workflowNamespace": { "type": "keyword" },
        "workflowVersion": { "type": "keyword" },
        "status": { "type": "keyword" },
        "startTime": { "type": "long" },
        "endTime": { "type": "long" },
        "eventTime": { "type": "keyword" },
        "timestamp": { "type": "double" },
        "@timestamp": { "type": "date" },
        "input": { "type": "flattened" },
        "output": { "type": "flattened" },
        "error": {
          "properties": {
            "type": { "type": "keyword" },
            "title": { "type": "text" },
            "detail": { "type": "text" },
            "status": { "type": "integer" },
            "instance": { "type": "keyword" }
          }
        }
      }
    }
  }
}

Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/index-templates/

Normalized Indices

Single-document indices for GraphQL queries:

workflow-instances:

  • One document per workflow execution

  • Updated by transform every 1 second

  • No ILM policy (keep forever)

  • Template: workflow-instances.json

task-executions:

  • One document per task execution

  • Composite ID: instance_id:task_position

  • No ILM policy

  • Template: task-executions.json

Normalized index template:

{
  "index_patterns": ["workflow-instances"],
  "template": {
    "settings": {
      "number_of_shards": 3,
      "number_of_replicas": 1
    },
    "mappings": {
      "properties": {
        "id": { "type": "keyword" },
        "name": { "type": "keyword" },
        "version": { "type": "keyword" },
        "namespace": { "type": "keyword" },
        "status": { "type": "keyword" },
        "start": { "type": "date" },
        "end": { "type": "date" },
        "input": { "type": "flattened" },
        "output": { "type": "flattened" },
        "error": { /* nested object */ },
        "last_update": { "type": "date" }
      }
    }
  }
}

ILM Policies

data-index-events-retention:

Automatically manages raw event lifecycle:

{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_age": "1d",
            "max_primary_shard_size": "50GB"
          }
        }
      },
      "delete": {
        "min_age": "7d",
        "actions": {
          "delete": {}
        }
      }
    }
  }
}

What this means:

  • Raw events automatically deleted after 7 days

  • Daily index rollover (or when shard reaches 50GB)

  • Normalized indices kept forever (no ILM policy)

  • Reduces storage costs for high-volume deployments

Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/ilm/data-index-events-retention.json

Flattened Field Type

Input and output data use Elasticsearch’s flattened field type:

{
  "input": { "type": "flattened" },
  "output": { "type": "flattened" }
}

Benefits:

  • Supports arbitrary nested JSON structures

  • Queryable via dot notation: input.orderId

  • No schema explosion from dynamic fields

  • Efficient storage for heterogeneous data

GraphQL exposure:

  • Returned as JSON strings via getInputData() / getOutputData() methods

  • Client-side JSON parsing required

  • Same approach as PostgreSQL MODE 1

Schema Initialization

ElasticsearchSchemaInitializer applies all schema resources at startup:

What it does:

  1. Creates ILM policies (if not exists)

  2. Creates index templates (if not exists)

  3. Creates transforms (if not exists)

  4. Logs success/skip for each resource

Configuration flags:

# Universal flag (both PostgreSQL and Elasticsearch)
data-index.storage.skip-init-schema=false

# Backend-specific flag
data-index.elasticsearch.schema.init.enabled=true

Schema resources location:

data-index-storage-elasticsearch-schema/
└── src/main/resources/elasticsearch/
    ├── ilm/
    │   └── data-index-events-retention.json
    ├── index-templates/
    │   ├── workflow-events.json
    │   ├── workflow-instances.json
    │   ├── task-events.json
    │   └── task-executions.json
    └── transforms/
        ├── workflow-instances-transform.json
        └── task-executions-transform.json

Manual schema creation:

Set data-index.storage.skip-init-schema=true for production environments where schema is managed externally (operators, GitOps, etc.).

Configuration

Quarkus Flow Application

Same configuration as PostgreSQL MODE 1:

# Structured logging (emits JSON events to log file)
quarkus.flow.structured-logging.enabled=true
quarkus.flow.structured-logging.timestamp-format=epoch-seconds

Data Index Service

Elasticsearch backend configuration:

# Elasticsearch connection
quarkus.elasticsearch.hosts=elasticsearch:9200

# Optional TLS configuration
quarkus.elasticsearch.username=elastic
quarkus.elasticsearch.password=changeme
quarkus.elasticsearch.protocol=https

# Schema initialization (disable in production)
data-index.storage.skip-init-schema=false
data-index.elasticsearch.schema.init.enabled=true

# Dev Services (auto-starts Elasticsearch in dev mode)
%dev.quarkus.elasticsearch.devservices.enabled=true
%dev.quarkus.elasticsearch.devservices.image-name=docker.elastic.co/elasticsearch/elasticsearch:8.11.0

FluentBit DaemonSet

FluentBit configuration for Elasticsearch output:

[INPUT]
    Name              tail
    Path              /var/log/containers/*_${WORKFLOW_NAMESPACE}_*.log
    Parser            cri
    Tag               kube.*
    Refresh_Interval  5
    Mem_Buf_Limit     5MB

[FILTER]
    Name              parser
    Match             kube.*
    Key_Name          log
    Parser            json
    Reserve_Data      On

[FILTER]
    Name              grep
    Match             kube.*
    Regex             eventType ^io\.serverlessworkflow\.

[FILTER]
    Name          rewrite_tag
    Match         kube.*
    Rule          $eventType ^io\.serverlessworkflow\.workflow\. workflow.instance false

[FILTER]
    Name          rewrite_tag
    Match         kube.*
    Rule          $eventType ^io\.serverlessworkflow\.task\. workflow.task false

[OUTPUT]
    Name            es
    Match           workflow.instance
    Host            ${ELASTICSEARCH_HOST}
    Port            ${ELASTICSEARCH_PORT}
    Index           workflow-events
    Logstash_Format On
    Logstash_Prefix workflow-events
    Logstash_DateFormat %Y.%m.%d
    Retry_Limit     5
    Suppress_Type_Name On
    tls             ${ELASTICSEARCH_TLS}
    tls.verify      ${ELASTICSEARCH_TLS_VERIFY}

[OUTPUT]
    Name            es
    Match           workflow.task
    Host            ${ELASTICSEARCH_HOST}
    Port            ${ELASTICSEARCH_PORT}
    Index           task-events
    Logstash_Format On
    Logstash_Prefix task-events
    Logstash_DateFormat %Y.%m.%d
    Retry_Limit     5
    Suppress_Type_Name On
    tls             ${ELASTICSEARCH_TLS}
    tls.verify      ${ELASTICSEARCH_TLS_VERIFY}

Location: data-index/scripts/fluentbit/mode2-elasticsearch-transforms/fluent-bit.conf

Scaling Considerations

Elasticsearch MODE 2 scales horizontally by design:

Horizontal Scaling

Add Elasticsearch nodes:

  • Distribute shards across nodes

  • Use replica shards for read scaling

  • Transform processing distributed automatically

Shard configuration:

  • Raw indices: 3 primary shards (configurable in templates)

  • Normalized indices: 3 primary shards

  • 1 replica shard per primary (high availability)

Data Index service:

  • Stateless service (scale horizontally)

  • Multiple instances share same Elasticsearch cluster

  • No coordination needed between instances

Performance Tuning

Transform frequency:

  • Default: 1 second (sub-second latency)

  • Reduce frequency if higher latency acceptable

  • Trade-off: latency vs Elasticsearch load

Index refresh interval:

  • Default: 1 second

  • Increase for write-heavy workloads

  • Trade-off: query visibility vs write throughput

Time-series optimization:

  • Daily index rollover (automatic via ILM)

  • Compress old indices

  • Force merge before ILM delete phase

Painless script optimization:

  • Scripts run inside Elasticsearch JVM

  • Minimal external calls

  • Cache compiled scripts automatically

Capacity Planning

Raw events retention:

  • 7-day retention (configurable in ILM policy)

  • Estimate: ~1KB per event

  • 1M workflows/day × 5 events/workflow × 7 days = ~35GB

Normalized indices:

  • No auto-delete (keep forever)

  • Estimate: ~2KB per workflow document

  • 1M workflows × 2KB = ~2GB

Recommended cluster size:

  • Small (< 100K workflows/day): 3-node cluster

  • Medium (100K-1M workflows/day): 5-7 node cluster

  • Large (> 1M workflows/day): 10+ node cluster

Comparison with PostgreSQL Mode

Aspect MODE 1 (PostgreSQL) MODE 2 (Elasticsearch)

Normalization

PostgreSQL triggers (< 1ms)

ES Transforms (~1s frequency)

Consistency

ACID guarantees

Eventual consistency

Latency

< 1ms (real-time)

~1s (near real-time)

Search

Basic SQL queries

Full-text search, aggregations

Scaling

Vertical (larger instance)

Horizontal (add nodes)

Throughput

~10K workflows/day

100K+ workflows/day

Complexity

Simple (triggers, backups)

Moderate (transforms, ILM)

Dependencies

PostgreSQL, Flyway

Elasticsearch cluster

Storage Model

Relational tables

Document indices

Lifecycle Management

Manual archival

Automatic (ILM policies)

Multi-tenancy

Schema-per-tenant

Index-per-tenant

GraphQL API

✅ Identical

✅ Identical

Choose MODE 1 when:

  • Standard SQL queries are sufficient

  • ACID guarantees required

  • Existing PostgreSQL infrastructure

  • Simpler operations preferred

Choose MODE 2 when:

  • Full-text search needed

  • Complex aggregations required

  • Large-scale deployments (1M+ workflows)

  • Auto-scaling storage desired

  • Multi-tenancy requirements