Elasticsearch Mode Architecture (MODE 2)
Data Index with Elasticsearch storage uses ES Transforms for continuous normalization, enabling full-text search, complex aggregations, and horizontal scaling.
|
Status: Production Ready MODE 2 provides the same GraphQL API as MODE 1 (PostgreSQL), but with Elasticsearch as the storage backend. |
Overview
Elasticsearch MODE 2 uses Elasticsearch Transforms for real-time event normalization. Raw events from FluentBit are continuously aggregated into normalized indices, providing sub-second query performance with full-text search capabilities.
Key advantages:
-
Full-text search on workflow and task data
-
Complex aggregations and analytics
-
Horizontal scaling (shard-based distribution)
-
Auto-lifecycle management (ILM policies)
-
Multi-tenancy via index-per-tenant pattern
When to use:
-
Need full-text search capabilities
-
Require complex aggregations or analytics
-
Large-scale deployments (1M+ workflows)
-
Prefer Elasticsearch ecosystem
-
Want auto-scaling storage with ILM
Architecture Diagram
Quarkus Flow App
↓ (structured logging → stdout)
FluentBit DaemonSet
↓ (tail Kubernetes logs, filter JSON events)
Elasticsearch Raw Indices
↓ (ES Transform - continuous, 1s frequency)
Elasticsearch Normalized Indices
↓ (Elasticsearch Java Client)
GraphQL API (SmallRye GraphQL)
Data Flow
-
Quarkus Flow emits events - Structured JSON logs to stdout
-
FluentBit collects - Tails Kubernetes container logs, filters events by
eventTypefield -
Routes by event type - Workflow events vs task events
-
INSERT to raw indices - Date-based indices with ILM policies
-
workflow-events-2026.04.27 -
task-events-2026.04.27
-
-
ES Transform processes - Continuous aggregation (1s frequency)
-
Groups events by instance ID
-
Applies field-level idempotency rules
-
Filters recent events + active workflows only
-
-
Write to normalized indices - Single document per workflow/task
-
workflow-instances -
task-executions
-
-
GraphQL queries - Via Elasticsearch Java Client
ES Transform-Based Normalization
Elasticsearch Transforms provide continuous aggregation of raw events into normalized documents. This replaces the need for a separate Event Processor service.
Why ES Transforms?
FluentBit’s architecture (direct Elasticsearch output) makes ES Transforms the natural choice:
-
Built-in Elasticsearch feature - No additional services needed
-
Continuous processing - Runs automatically every 1 second
-
Painless scripting - Transform logic inside Elasticsearch
-
Horizontal scaling - Scales with Elasticsearch cluster
-
Fault-tolerant - Built-in checkpointing and retry
Transform Configuration
Transforms are defined in JSON and applied via ElasticsearchSchemaInitializer:
Workflow instances transform:
{
"source": {
"index": "workflow-events-*",
"query": { /* smart filtering */ }
},
"dest": {
"index": "workflow-instances"
},
"frequency": "1s",
"sync": {
"time": {
"field": "@timestamp",
"delay": "0s"
}
},
"pivot": {
"group_by": {
"id": { "terms": { "field": "instanceId.keyword" } }
},
"aggregations": { /* field-level idempotency */ }
}
}
Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/transforms/workflow-instances-transform.json
Field-Level Idempotency
ES Transforms use scripted metrics to handle out-of-order events with field-specific rules:
Immutable Fields (First Value Wins)
These fields never change after initial creation:
-
name,version,namespace- Workflow metadata -
start- First event timestamp -
input- Initial workflow/task input data
Implementation (name example):
{
"name": {
"terms": {
"field": "workflowName.keyword",
"size": 1
}
}
}
Implementation (input - first-wins with event timestamp):
{
"input": {
"scripted_metric": {
"init_script": "state.data = null; state.ts = 'ZZZZ'",
"map_script": "if (params._source.input != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) < 0) { state.data = params._source.input; state.ts = ts } }",
"combine_script": "return state",
"reduce_script": "def earliest = ['data': null, 'ts': 'ZZZZ']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(earliest.ts) < 0) { earliest = s } } return earliest.data"
}
}
}
Terminal Fields (Latest Non-Null Wins)
These fields update with the most recent event:
-
end- Latest completion timestamp -
output- Latest output data (tracks event timestamp) -
error- Latest error (tracks event timestamp)
Implementation (output - last-wins with event timestamp):
{
"output": {
"scripted_metric": {
"init_script": "state.data = null; state.ts = ''",
"map_script": "if (params._source.output != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) > 0) { state.data = params._source.output; state.ts = ts } }",
"combine_script": "return state",
"reduce_script": "def latest = ['data': null, 'ts': '']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(latest.ts) > 0) { latest = s } } return latest.data"
}
}
}
Implementation (error - last-wins with event timestamp):
{
"error": {
"scripted_metric": {
"init_script": "state.data = null; state.ts = ''",
"map_script": "if (params._source.error != null) { String ts = params._source.eventTime != null ? params._source.eventTime : String.valueOf(params._source.timestamp); if (ts != null && ts.compareTo(state.ts) > 0) { state.data = params._source.error; state.ts = ts } }",
"combine_script": "return state",
"reduce_script": "def latest = ['data': null, 'ts': '']; for (s in states) { if (s != null && s.get('ts') != null && s.ts.compareTo(latest.ts) > 0) { latest = s } } return latest.data"
}
}
}
Status Field (Terminal State Precedence)
Status prioritizes terminal states over transient ones using a priority-based aggregation:
Priority order:
-
Terminal states:
COMPLETED,FAULTED,CANCELLED,FAILED(priority 3) -
Transient states:
RUNNING,SUSPENDED(priority 2) -
Initial states: Others (priority 1)
Implementation (workflows):
{
"status": {
"terms": {
"field": "status.keyword",
"size": 1,
"order": {
"priority": "desc"
}
},
"aggs": {
"priority": {
"max": {
"script": {
"source": "String s = doc['status.keyword'].value; if (s == 'COMPLETED' || s == 'FAULTED' || s == 'CANCELLED') { return 3; } else if (s == 'RUNNING' || s == 'SUSPENDED') { return 2; } else { return 1; }"
}
}
}
}
}
}
Implementation (tasks):
{
"status": {
"scripted_metric": {
"init_script": "state.status = null; state.priority = 0",
"map_script": "String s = params._source.status; int p = (s == 'COMPLETED' || s == 'FAULTED' || s == 'FAILED') ? 2 : (s == 'RUNNING' ? 1 : 0); if (p > state.priority) { state.status = s; state.priority = p }",
"combine_script": "return state",
"reduce_script": "def max = ['status': null, 'priority': 0]; for (s in states) { if (s != null && s.priority > max.priority) { max = s } } return max.status"
}
}
}
This ensures that:
-
Once a workflow/task reaches a terminal state, late-arriving
RUNNINGevents won’t overwrite it -
Original status values from events are preserved (e.g.,
FAILEDstaysFAILED, not converted) -
Out-of-order event processing doesn’t affect final state correctness
Transform Processing
Transforms use a match_all query to process all events, with continuous sync:
{
"source": {
"index": "workflow-events-*",
"query": {
"match_all": {}
}
}
}
Processing characteristics:
-
Continuous sync - Processes all new events within 1 second
-
Incremental - Only processes events since last checkpoint
-
Idempotent - Field-level idempotency ensures correctness
-
Fault-tolerant - Automatic checkpointing and retry
Benefits:
-
Simpler configuration - no complex filtering logic
-
Handles all edge cases correctly (late-arriving events, out-of-order delivery)
-
Built-in Elasticsearch fault tolerance
-
Transparent performance as data grows (checkpointing mechanism)
Index Structure
Raw Event Indices
Date-based indices store complete events with ILM lifecycle:
workflow-events-YYYY.MM.DD:
-
Contains all workflow lifecycle events
-
Flattened
input/outputfields (queryable JSON) -
ILM policy: 7-day retention, auto-delete
-
Template:
workflow-events.json
task-events-YYYY.MM.DD:
-
Contains all task execution events
-
Same structure as workflow events
-
ILM policy: 7-day retention
-
Template:
task-events.json
Index template structure (workflow-events):
{
"index_patterns": ["workflow-events-*"],
"template": {
"settings": {
"index.lifecycle.name": "data-index-events-retention",
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"eventType": { "type": "keyword" },
"instanceId": { "type": "keyword" },
"workflowName": { "type": "keyword" },
"workflowNamespace": { "type": "keyword" },
"workflowVersion": { "type": "keyword" },
"status": { "type": "keyword" },
"startTime": { "type": "long" },
"endTime": { "type": "long" },
"eventTime": { "type": "keyword" },
"timestamp": { "type": "double" },
"@timestamp": { "type": "date" },
"input": { "type": "flattened" },
"output": { "type": "flattened" },
"error": {
"properties": {
"type": { "type": "keyword" },
"title": { "type": "text" },
"detail": { "type": "text" },
"status": { "type": "integer" },
"instance": { "type": "keyword" }
}
}
}
}
}
}
Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/index-templates/
Normalized Indices
Single-document indices for GraphQL queries:
workflow-instances:
-
One document per workflow execution
-
Updated by transform every 1 second
-
No ILM policy (keep forever)
-
Template:
workflow-instances.json
task-executions:
-
One document per task execution
-
Composite ID:
instance_id:task_position -
No ILM policy
-
Template:
task-executions.json
Normalized index template:
{
"index_patterns": ["workflow-instances"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"id": { "type": "keyword" },
"name": { "type": "keyword" },
"version": { "type": "keyword" },
"namespace": { "type": "keyword" },
"status": { "type": "keyword" },
"start": { "type": "date" },
"end": { "type": "date" },
"input": { "type": "flattened" },
"output": { "type": "flattened" },
"error": { /* nested object */ },
"last_update": { "type": "date" }
}
}
}
}
ILM Policies
data-index-events-retention:
Automatically manages raw event lifecycle:
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_age": "1d",
"max_primary_shard_size": "50GB"
}
}
},
"delete": {
"min_age": "7d",
"actions": {
"delete": {}
}
}
}
}
}
What this means:
-
Raw events automatically deleted after 7 days
-
Daily index rollover (or when shard reaches 50GB)
-
Normalized indices kept forever (no ILM policy)
-
Reduces storage costs for high-volume deployments
Location: data-index-storage-elasticsearch-schema/src/main/resources/elasticsearch/ilm/data-index-events-retention.json
Flattened Field Type
Input and output data use Elasticsearch’s flattened field type:
{
"input": { "type": "flattened" },
"output": { "type": "flattened" }
}
Benefits:
-
Supports arbitrary nested JSON structures
-
Queryable via dot notation:
input.orderId -
No schema explosion from dynamic fields
-
Efficient storage for heterogeneous data
GraphQL exposure:
-
Returned as JSON strings via
getInputData()/getOutputData()methods -
Client-side JSON parsing required
-
Same approach as PostgreSQL MODE 1
Schema Initialization
ElasticsearchSchemaInitializer applies all schema resources at startup:
What it does:
-
Creates ILM policies (if not exists)
-
Creates index templates (if not exists)
-
Creates transforms (if not exists)
-
Logs success/skip for each resource
Configuration flags:
# Universal flag (both PostgreSQL and Elasticsearch)
data-index.storage.skip-init-schema=false
# Backend-specific flag
data-index.elasticsearch.schema.init.enabled=true
Schema resources location:
data-index-storage-elasticsearch-schema/
└── src/main/resources/elasticsearch/
├── ilm/
│ └── data-index-events-retention.json
├── index-templates/
│ ├── workflow-events.json
│ ├── workflow-instances.json
│ ├── task-events.json
│ └── task-executions.json
└── transforms/
├── workflow-instances-transform.json
└── task-executions-transform.json
Manual schema creation:
Set data-index.storage.skip-init-schema=true for production environments where schema is managed externally (operators, GitOps, etc.).
Configuration
Quarkus Flow Application
Same configuration as PostgreSQL MODE 1:
# Structured logging (emits JSON events to log file)
quarkus.flow.structured-logging.enabled=true
quarkus.flow.structured-logging.timestamp-format=epoch-seconds
Data Index Service
Elasticsearch backend configuration:
# Elasticsearch connection
quarkus.elasticsearch.hosts=elasticsearch:9200
# Optional TLS configuration
quarkus.elasticsearch.username=elastic
quarkus.elasticsearch.password=changeme
quarkus.elasticsearch.protocol=https
# Schema initialization (disable in production)
data-index.storage.skip-init-schema=false
data-index.elasticsearch.schema.init.enabled=true
# Dev Services (auto-starts Elasticsearch in dev mode)
%dev.quarkus.elasticsearch.devservices.enabled=true
%dev.quarkus.elasticsearch.devservices.image-name=docker.elastic.co/elasticsearch/elasticsearch:8.11.0
FluentBit DaemonSet
FluentBit configuration for Elasticsearch output:
[INPUT]
Name tail
Path /var/log/containers/*_${WORKFLOW_NAMESPACE}_*.log
Parser cri
Tag kube.*
Refresh_Interval 5
Mem_Buf_Limit 5MB
[FILTER]
Name parser
Match kube.*
Key_Name log
Parser json
Reserve_Data On
[FILTER]
Name grep
Match kube.*
Regex eventType ^io\.serverlessworkflow\.
[FILTER]
Name rewrite_tag
Match kube.*
Rule $eventType ^io\.serverlessworkflow\.workflow\. workflow.instance false
[FILTER]
Name rewrite_tag
Match kube.*
Rule $eventType ^io\.serverlessworkflow\.task\. workflow.task false
[OUTPUT]
Name es
Match workflow.instance
Host ${ELASTICSEARCH_HOST}
Port ${ELASTICSEARCH_PORT}
Index workflow-events
Logstash_Format On
Logstash_Prefix workflow-events
Logstash_DateFormat %Y.%m.%d
Retry_Limit 5
Suppress_Type_Name On
tls ${ELASTICSEARCH_TLS}
tls.verify ${ELASTICSEARCH_TLS_VERIFY}
[OUTPUT]
Name es
Match workflow.task
Host ${ELASTICSEARCH_HOST}
Port ${ELASTICSEARCH_PORT}
Index task-events
Logstash_Format On
Logstash_Prefix task-events
Logstash_DateFormat %Y.%m.%d
Retry_Limit 5
Suppress_Type_Name On
tls ${ELASTICSEARCH_TLS}
tls.verify ${ELASTICSEARCH_TLS_VERIFY}
Location: data-index/scripts/fluentbit/mode2-elasticsearch-transforms/fluent-bit.conf
Scaling Considerations
Elasticsearch MODE 2 scales horizontally by design:
Horizontal Scaling
Add Elasticsearch nodes:
-
Distribute shards across nodes
-
Use replica shards for read scaling
-
Transform processing distributed automatically
Shard configuration:
-
Raw indices: 3 primary shards (configurable in templates)
-
Normalized indices: 3 primary shards
-
1 replica shard per primary (high availability)
Data Index service:
-
Stateless service (scale horizontally)
-
Multiple instances share same Elasticsearch cluster
-
No coordination needed between instances
Performance Tuning
Transform frequency:
-
Default: 1 second (sub-second latency)
-
Reduce frequency if higher latency acceptable
-
Trade-off: latency vs Elasticsearch load
Index refresh interval:
-
Default: 1 second
-
Increase for write-heavy workloads
-
Trade-off: query visibility vs write throughput
Time-series optimization:
-
Daily index rollover (automatic via ILM)
-
Compress old indices
-
Force merge before ILM delete phase
Painless script optimization:
-
Scripts run inside Elasticsearch JVM
-
Minimal external calls
-
Cache compiled scripts automatically
Capacity Planning
Raw events retention:
-
7-day retention (configurable in ILM policy)
-
Estimate: ~1KB per event
-
1M workflows/day × 5 events/workflow × 7 days = ~35GB
Normalized indices:
-
No auto-delete (keep forever)
-
Estimate: ~2KB per workflow document
-
1M workflows × 2KB = ~2GB
Recommended cluster size:
-
Small (< 100K workflows/day): 3-node cluster
-
Medium (100K-1M workflows/day): 5-7 node cluster
-
Large (> 1M workflows/day): 10+ node cluster
Comparison with PostgreSQL Mode
| Aspect | MODE 1 (PostgreSQL) | MODE 2 (Elasticsearch) |
|---|---|---|
Normalization |
PostgreSQL triggers (< 1ms) |
ES Transforms (~1s frequency) |
Consistency |
ACID guarantees |
Eventual consistency |
Latency |
< 1ms (real-time) |
~1s (near real-time) |
Search |
Basic SQL queries |
Full-text search, aggregations |
Scaling |
Vertical (larger instance) |
Horizontal (add nodes) |
Throughput |
~10K workflows/day |
100K+ workflows/day |
Complexity |
Simple (triggers, backups) |
Moderate (transforms, ILM) |
Dependencies |
PostgreSQL, Flyway |
Elasticsearch cluster |
Storage Model |
Relational tables |
Document indices |
Lifecycle Management |
Manual archival |
Automatic (ILM policies) |
Multi-tenancy |
Schema-per-tenant |
Index-per-tenant |
GraphQL API |
✅ Identical |
✅ Identical |
Choose MODE 1 when:
-
Standard SQL queries are sufficient
-
ACID guarantees required
-
Existing PostgreSQL infrastructure
-
Simpler operations preferred
Choose MODE 2 when:
-
Full-text search needed
-
Complex aggregations required
-
Large-scale deployments (1M+ workflows)
-
Auto-scaling storage desired
-
Multi-tenancy requirements