Elasticsearch Mode Architecture

Data Index with Elasticsearch storage uses ES Transforms for normalization, enabling full-text search and high throughput.

Architecture Diagram

Quarkus Flow App
    ↓ (structured logging → stdout)
FluentBit DaemonSet
    ↓ (tail /var/log/containers/)
Elasticsearch Raw Indices
    ↓ (ES Transform, ~1s)
Elasticsearch Normalized Indices
    ↓ (Elasticsearch Java Client)
GraphQL API

Data Flow

  1. Quarkus Flow emits events - JSON to stdout

  2. Kubernetes captures logs - /var/log/containers/POD_NAME.log

  3. FluentBit collects - Tails log files, filters JSON events

  4. INSERT to raw indices - workflow-events, task-events

  5. ES Transform processes - Painless scripts extract and normalize

  6. Write to normalized indices - workflow-instances, task-executions

  7. GraphQL queries - Via Elasticsearch Java Client

Key Characteristics

Characteristic Details

Latency

~1s for normalization (ES Transform), 5-10s end-to-end

Consistency

Eventual consistency (no ACID)

Throughput

100K+ workflows/day (horizontal scaling)

Complexity

Moderate - Painless scripts, ES Transform configuration

Search

Excellent - full-text search, aggregations, analytics

Status

✅ Production Ready

ES Transform-Based Normalization

FluentBit design forces us to use ES Transforms because:

  1. FluentBit inserts directly to Elasticsearch - No intermediate processor

  2. ES Transforms are built-in - No separate service needed

  3. Painless scripts - Transform logic runs inside Elasticsearch

  4. Continuous processing - Transforms run automatically

Why ES Transforms?

Advantages: - ✅ No Java event processor code on our side - ✅ Built-in Elasticsearch feature - ✅ Horizontal scaling (ES cluster scales) - ✅ Full-text search and analytics

Trade-offs: - ⚠️ ~1s latency (transform polling interval) - ⚠️ Eventual consistency - ⚠️ Painless script complexity - ⚠️ Elasticsearch-specific (not portable)

Raw Indices

Raw indices store complete events for:

  • Debugging - Original events preserved

  • Replay - Can reprocess if transform logic changes

  • Audit - Complete event history

Index approach: - workflow-events - Workflow lifecycle events - task-events - Task execution events - Time-series indices (optional) - e.g., workflow-events-2026.04

Normalized Indices

ES Transforms extract fields and write to normalized indices:

  • workflow-instances - One document per workflow execution

  • task-executions - One document per task execution

Transform logic (Painless scripts): - Extract fields from nested JSON - Correlate events by instance ID - Handle out-of-order events - COALESCE-like logic for updates

Configuration

Quarkus Flow Application

# Structured logging (same as PostgreSQL mode)
quarkus.flow.structured-logging.enabled=true
quarkus.flow.structured-logging.timestamp-format=epoch-seconds

FluentBit

[INPUT]
    Name              tail
    Path              /var/log/containers/*_workflows_*.log
    Parser            docker

[FILTER]
    Name              grep
    Match             *
    Regex             log {".*eventType.*}

[OUTPUT]
    Name              es
    Match             *
    Host              elasticsearch
    Port              9200
    Index             workflow-events
    Type              _doc

Data Index Service

# Elasticsearch backend
data-index.storage.backend=elasticsearch
quarkus.elasticsearch.hosts=elasticsearch:9200

# No event processor needed (ES Transform handles it)
data-index.event-processor.enabled=false

ES Transform Setup

Create raw indices:

PUT /workflow-events
{
  "mappings": {
    "properties": {
      "data": { "type": "object", "enabled": true },
      "timestamp": { "type": "date" }
    }
  }
}

PUT /task-events
{
  "mappings": {
    "properties": {
      "data": { "type": "object", "enabled": true },
      "timestamp": { "type": "date" }
    }
  }
}

Create and start transforms:

PUT _transform/workflow-instances-transform
{
  "source": {
    "index": ["workflow-events"]
  },
  "dest": {
    "index": "workflow-instances"
  },
  "frequency": "1s",
  "sync": {
    "time": {
      "field": "timestamp",
      "delay": "60s"
    }
  },
  "pivot": {
    "group_by": {
      "instanceId": {
        "terms": {
          "field": "data.instanceId.keyword"
        }
      }
    },
    "aggregations": {
      "name": { "terms": { "field": "data.workflowName.keyword" } },
      "status": { "terms": { "field": "data.status.keyword" } },
      "startTime": { "min": { "field": "data.startTime" } },
      "endTime": { "max": { "field": "data.endTime" } }
    }
  }
}

POST _transform/workflow-instances-transform/_start

Scaling Considerations

Elasticsearch mode scales horizontally:

Horizontal scaling: - Add Elasticsearch nodes to cluster - Shard indices across nodes - Replica shards for read scaling - Multiple Data Index instances (stateless)

Performance tuning: - Adjust transform frequency (1s default) - Tune index refresh interval - Use time-series indices with ILM - Optimize Painless scripts

Advantages over PostgreSQL: - Scales to 100K+ workflows/day - Distributed processing - Full-text search capabilities - Better for analytics workloads

Search Capabilities

Elasticsearch mode enables advanced queries:

Full-text search:

{
  searchWorkflows(query: "payment failed") {
    id
    name
    status
  }
}

Aggregations:

{
  workflowStats {
    totalByStatus
    avgDuration
    failureRate
  }
}

Time-series analytics: - Workflow execution trends - Task duration percentiles - Error rate over time