Elasticsearch Mode Architecture
Data Index with Elasticsearch storage uses ES Transforms for normalization, enabling full-text search and high throughput.
Architecture Diagram
Quarkus Flow App
↓ (structured logging → stdout)
FluentBit DaemonSet
↓ (tail /var/log/containers/)
Elasticsearch Raw Indices
↓ (ES Transform, ~1s)
Elasticsearch Normalized Indices
↓ (Elasticsearch Java Client)
GraphQL API
Data Flow
-
Quarkus Flow emits events - JSON to stdout
-
Kubernetes captures logs -
/var/log/containers/POD_NAME.log -
FluentBit collects - Tails log files, filters JSON events
-
INSERT to raw indices -
workflow-events,task-events -
ES Transform processes - Painless scripts extract and normalize
-
Write to normalized indices -
workflow-instances,task-executions -
GraphQL queries - Via Elasticsearch Java Client
Key Characteristics
| Characteristic | Details |
|---|---|
Latency |
~1s for normalization (ES Transform), 5-10s end-to-end |
Consistency |
Eventual consistency (no ACID) |
Throughput |
100K+ workflows/day (horizontal scaling) |
Complexity |
Moderate - Painless scripts, ES Transform configuration |
Search |
Excellent - full-text search, aggregations, analytics |
Status |
✅ Production Ready |
ES Transform-Based Normalization
FluentBit design forces us to use ES Transforms because:
-
FluentBit inserts directly to Elasticsearch - No intermediate processor
-
ES Transforms are built-in - No separate service needed
-
Painless scripts - Transform logic runs inside Elasticsearch
-
Continuous processing - Transforms run automatically
Why ES Transforms?
Advantages: - ✅ No Java event processor code on our side - ✅ Built-in Elasticsearch feature - ✅ Horizontal scaling (ES cluster scales) - ✅ Full-text search and analytics
Trade-offs: - ⚠️ ~1s latency (transform polling interval) - ⚠️ Eventual consistency - ⚠️ Painless script complexity - ⚠️ Elasticsearch-specific (not portable)
Raw Indices
Raw indices store complete events for:
-
Debugging - Original events preserved
-
Replay - Can reprocess if transform logic changes
-
Audit - Complete event history
Index approach:
- workflow-events - Workflow lifecycle events
- task-events - Task execution events
- Time-series indices (optional) - e.g., workflow-events-2026.04
Normalized Indices
ES Transforms extract fields and write to normalized indices:
-
workflow-instances - One document per workflow execution
-
task-executions - One document per task execution
Transform logic (Painless scripts): - Extract fields from nested JSON - Correlate events by instance ID - Handle out-of-order events - COALESCE-like logic for updates
Configuration
Quarkus Flow Application
# Structured logging (same as PostgreSQL mode)
quarkus.flow.structured-logging.enabled=true
quarkus.flow.structured-logging.timestamp-format=epoch-seconds
FluentBit
[INPUT]
Name tail
Path /var/log/containers/*_workflows_*.log
Parser docker
[FILTER]
Name grep
Match *
Regex log {".*eventType.*}
[OUTPUT]
Name es
Match *
Host elasticsearch
Port 9200
Index workflow-events
Type _doc
Data Index Service
# Elasticsearch backend
data-index.storage.backend=elasticsearch
quarkus.elasticsearch.hosts=elasticsearch:9200
# No event processor needed (ES Transform handles it)
data-index.event-processor.enabled=false
ES Transform Setup
Create raw indices:
PUT /workflow-events
{
"mappings": {
"properties": {
"data": { "type": "object", "enabled": true },
"timestamp": { "type": "date" }
}
}
}
PUT /task-events
{
"mappings": {
"properties": {
"data": { "type": "object", "enabled": true },
"timestamp": { "type": "date" }
}
}
}
Create and start transforms:
PUT _transform/workflow-instances-transform
{
"source": {
"index": ["workflow-events"]
},
"dest": {
"index": "workflow-instances"
},
"frequency": "1s",
"sync": {
"time": {
"field": "timestamp",
"delay": "60s"
}
},
"pivot": {
"group_by": {
"instanceId": {
"terms": {
"field": "data.instanceId.keyword"
}
}
},
"aggregations": {
"name": { "terms": { "field": "data.workflowName.keyword" } },
"status": { "terms": { "field": "data.status.keyword" } },
"startTime": { "min": { "field": "data.startTime" } },
"endTime": { "max": { "field": "data.endTime" } }
}
}
}
POST _transform/workflow-instances-transform/_start
Scaling Considerations
Elasticsearch mode scales horizontally:
Horizontal scaling: - Add Elasticsearch nodes to cluster - Shard indices across nodes - Replica shards for read scaling - Multiple Data Index instances (stateless)
Performance tuning: - Adjust transform frequency (1s default) - Tune index refresh interval - Use time-series indices with ILM - Optimize Painless scripts
Advantages over PostgreSQL: - Scales to 100K+ workflows/day - Distributed processing - Full-text search capabilities - Better for analytics workloads
Search Capabilities
Elasticsearch mode enables advanced queries:
Full-text search:
{
searchWorkflows(query: "payment failed") {
id
name
status
}
}
Aggregations:
{
workflowStats {
totalByStatus
avgDuration
failureRate
}
}
Time-series analytics: - Workflow execution trends - Task duration percentiles - Error rate over time