Elasticsearch Deployment

Status: Production Ready

The Elasticsearch storage backend is fully implemented and production ready. Use this mode for high-throughput deployments with full-text search capabilities.

Overview

Elasticsearch mode (MODE 2) uses ES Transform-based normalization for continuous aggregation of workflow events. FluentBit captures events from container logs and writes to Elasticsearch raw indices, then ES Transforms aggregate them into normalized indices for GraphQL queries.

Architecture:

Quarkus Flow Apps → Container Logs → FluentBit DaemonSet
                                           ↓
                                    Elasticsearch (raw indices)
                                           ↓ (ES Transform ~1s)
                                    Elasticsearch (normalized indices)
                                           ↓
                                    Data Index GraphQL API

Characteristics:

  • Latency: ~1s normalization, sub-second query performance

  • Throughput: 100K+ workflows/day

  • Search: Full-text search, complex aggregations, analytics

  • Scaling: Horizontal (add Elasticsearch nodes)

  • Lifecycle: Automatic ILM policies for raw event retention

When to use:

  • Need full-text search capabilities

  • High event volume (> 50K workflows/day)

  • Complex aggregations or analytics required

  • Existing Elasticsearch infrastructure

  • Multi-tenancy requirements

See Elasticsearch Mode Architecture for detailed design information.

Prerequisites

Before deploying Elasticsearch mode, ensure you have:

  1. Kubernetes cluster (1.21+)

    • kubectl configured and connected

    • Sufficient resources (3+ nodes recommended)

  2. Elasticsearch cluster (8.11+)

    • Running and accessible from Kubernetes

    • HTTP API enabled (port 9200)

    • Optional: TLS/authentication configured

  3. Container images built

    • Data Index service: kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT

    • FluentBit: fluent/fluent-bit:latest

  4. FluentBit RBAC permissions

    • Service account with pod metadata read access

    • ClusterRole for Kubernetes API access

Local Development

Quick Start with Dev Services

Quarkus Dev Services automatically starts Elasticsearch 8.11.1 in Docker for local development:

# Navigate to the Elasticsearch service module
cd data-index/data-index-service/data-index-service-elasticsearch

# Start in development mode (Dev Services auto-starts Elasticsearch)
mvn quarkus:dev

# What happens automatically:
# 1. Elasticsearch 8.11.1 container starts
# 2. Schema initializer creates ILM policies, index templates, transforms
# 3. GraphQL API available at http://localhost:8080/graphql
# 4. GraphQL UI available at http://localhost:8080/q/graphql-ui

Dev Services features:

  • Auto-starts docker.elastic.co/elasticsearch/elasticsearch:8.11.1

  • Exposes Elasticsearch on random port (check logs for URL)

  • Schema initialization runs automatically

  • Live coding enabled (code changes trigger reload)

  • Container stops when dev mode exits

Configuration Files

Application properties (dev mode):

data-index-service-elasticsearch/src/main/resources/application.properties

# Elasticsearch Dev Services (enabled in dev mode)
%dev.quarkus.elasticsearch.devservices.enabled=true
%dev.quarkus.elasticsearch.devservices.image-name=docker.elastic.co/elasticsearch/elasticsearch:8.11.1
%dev.quarkus.elasticsearch.devservices.port=9200

# Schema initialization (enabled in dev, disabled in production)
%dev.data-index.storage.skip-init-schema=false
%dev.data-index.elasticsearch.schema.init.enabled=true
%prod.data-index.storage.skip-init-schema=true

# Logging
quarkus.log.category."org.kubesmarts.logic.dataindex".level=INFO
%dev.quarkus.log.category."org.kubesmarts.logic.dataindex.storage.elasticsearch".level=DEBUG

Verify Schema Initialization

When the service starts, you should see log messages like:

INFO  [org.kub...ElasticsearchSchemaInitializer] Initializing Elasticsearch schema...
INFO  [org.kub...ElasticsearchSchemaInitializer] Applying ILM policy 'data-index-events-retention'...
INFO  [org.kub...ElasticsearchSchemaInitializer] ILM policy 'data-index-events-retention' applied successfully
INFO  [org.kub...ElasticsearchSchemaInitializer] Applying index template 'workflow-events'...
INFO  [org.kub...ElasticsearchSchemaInitializer] Index template 'workflow-events' applied successfully
INFO  [org.kub...ElasticsearchSchemaInitializer] Applying transform 'workflow-instances-transform'...
INFO  [org.kub...ElasticsearchSchemaInitializer] Transform 'workflow-instances-transform' applied successfully
INFO  [org.kub...ElasticsearchSchemaInitializer] Elasticsearch schema initialization complete

Kubernetes Deployment

Step 1: Deploy Elasticsearch Cluster

Option A: ECK Operator (Recommended)

# Install ECK operator
kubectl create -f https://download.elastic.co/downloads/eck/2.10.0/crds.yaml
kubectl apply -f https://download.elastic.co/downloads/eck/2.10.0/operator.yaml

# Deploy Elasticsearch cluster
kubectl apply -f - <<EOF
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
  name: data-index-es
  namespace: elasticsearch
spec:
  version: 8.11.1
  nodeSets:
  - name: default
    count: 3
    config:
      node.store.allow_mmap: false
    volumeClaimTemplates:
    - metadata:
        name: elasticsearch-data
      spec:
        accessModes:
        - ReadWriteOnce
        resources:
          requests:
            storage: 10Gi
EOF

# Wait for cluster to be ready
kubectl wait --namespace elasticsearch \
  --for=condition=ready elasticsearch/data-index-es \
  --timeout=600s

Option B: Helm Chart

# Add Elastic Helm repository
helm repo add elastic https://helm.elastic.co
helm repo update

# Install Elasticsearch
helm install elasticsearch elastic/elasticsearch \
  --namespace elasticsearch \
  --create-namespace \
  --set replicas=3 \
  --set minimumMasterNodes=2 \
  --set resources.requests.memory=2Gi \
  --set volumeClaimTemplate.resources.requests.storage=10Gi \
  --version 8.11.1

# Wait for pods to be ready
kubectl wait --namespace elasticsearch \
  --for=condition=ready pod \
  --selector=app=elasticsearch-master \
  --timeout=600s

Verify Elasticsearch:

# Port-forward Elasticsearch service
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &

# Check cluster health
curl -s http://localhost:9200/_cluster/health | jq

# Expected output:
# {
#   "cluster_name": "elasticsearch",
#   "status": "green",
#   "number_of_nodes": 3,
#   ...
# }

Step 2: Deploy Data Index Service

Build and push container image:

cd data-index/data-index-service/data-index-service-elasticsearch

# Build container image
mvn clean package \
  -Dquarkus.container-image.build=true \
  -Dquarkus.container-image.push=true \
  -DskipTests

# For KIND: Load image to cluster
kind load docker-image \
  kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT \
  --name data-index-test

Create namespace and ConfigMap:

# Create namespace
kubectl create namespace data-index

# Create ConfigMap with Elasticsearch connection
kubectl create configmap data-index-config \
  --namespace data-index \
  --from-literal=QUARKUS_ELASTICSEARCH_HOSTS=elasticsearch.elasticsearch.svc.cluster.local:9200 \
  --from-literal=DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=true

Deploy service:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-index-service
  namespace: data-index
  labels:
    app: data-index-service
    mode: elasticsearch
spec:
  replicas: 1
  selector:
    matchLabels:
      app: data-index-service
  template:
    metadata:
      labels:
        app: data-index-service
        mode: elasticsearch
    spec:
      containers:
      - name: data-index-service
        image: kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT
        imagePullPolicy: Never  # For KIND; use IfNotPresent for production
        ports:
        - containerPort: 8080
          name: http
          protocol: TCP
        env:
        - name: QUARKUS_ELASTICSEARCH_HOSTS
          valueFrom:
            configMapKeyRef:
              name: data-index-config
              key: QUARKUS_ELASTICSEARCH_HOSTS
        - name: DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA
          valueFrom:
            configMapKeyRef:
              name: data-index-config
              key: DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA
        - name: QUARKUS_HTTP_PORT
          value: "8080"
        - name: QUARKUS_LOG_LEVEL
          value: "INFO"
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /q/health/live
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /q/health/ready
            port: 8080
          initialDelaySeconds: 20
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: data-index-service
  namespace: data-index
  labels:
    app: data-index-service
spec:
  type: NodePort
  selector:
    app: data-index-service
  ports:
  - port: 8080
    targetPort: 8080
    nodePort: 30080  # For KIND; omit for production
    protocol: TCP
    name: http

Apply deployment:

kubectl apply -f data-index-deployment.yaml

# Wait for deployment to be ready
kubectl wait --namespace data-index \
  --for=condition=available deployment/data-index-service \
  --timeout=300s

Step 3: Initialize Schema

Option A: Automatic (First Startup)

If you deploy with schema initialization enabled, the service will create all resources on first startup:

# Enable schema initialization in ConfigMap
kubectl create configmap data-index-config \
  --namespace data-index \
  --from-literal=DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=false \
  --dry-run=client -o yaml | kubectl apply -f -

# Restart deployment to trigger initialization
kubectl rollout restart deployment/data-index-service -n data-index

# Watch logs to verify schema creation
kubectl logs -n data-index -l app=data-index-service -f

Option B: Manual (Production Recommended)

For production, manage schema externally (GitOps, operators, etc.):

# Run schema initialization from local dev environment
cd data-index/data-index-service/data-index-service-elasticsearch

# Port-forward to Elasticsearch
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &

# Run service locally to initialize schema
QUARKUS_ELASTICSEARCH_HOSTS=localhost:9200 \
DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=false \
mvn quarkus:dev

# Schema is created, then stop the service (Ctrl+C)
# Deploy to Kubernetes with skip-init-schema=true

Verify schema resources:

# Check ILM policy
curl -s http://localhost:9200/_ilm/policy/data-index-events-retention | jq

# Check index templates
curl -s http://localhost:9200/_index_template/workflow-events | jq
curl -s http://localhost:9200/_index_template/workflow-instances | jq

# Check transforms
curl -s http://localhost:9200/_transform/workflow-instances-transform | jq

# Start transforms (if not auto-started)
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start

Step 4: Deploy FluentBit DaemonSet

FluentBit captures workflow events from container logs and sends them to Elasticsearch.

Create FluentBit namespace and RBAC:

kubectl create namespace logging

# Create service account and RBAC
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluent-bit
  namespace: logging
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluent-bit
rules:
- apiGroups: [""]
  resources:
  - namespaces
  - pods
  - nodes
  verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fluent-bit
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: fluent-bit
subjects:
- kind: ServiceAccount
  name: fluent-bit
  namespace: logging
EOF

Deploy FluentBit using provided scripts:

cd data-index/scripts/fluentbit/elasticsearch

# Option 1: Quick deploy (uses existing ConfigMap)
./deploy.sh

# Option 2: Regenerate ConfigMap from source files
./deploy.sh regenerate

# Verify deployment
kubectl get pods -n logging -l app=workflows-fluent-bit-mode2
kubectl logs -n logging -l app=workflows-fluent-bit-mode2 --tail=50

Manual deployment (alternative):

# Generate ConfigMap
./generate-configmap.sh

# Apply ConfigMap and DaemonSet
kubectl apply -f kubernetes/configmap.yaml
kubectl apply -f kubernetes/daemonset.yaml

# Wait for DaemonSet to be ready
kubectl rollout status daemonset/workflows-fluent-bit-mode2 -n logging

Environment variables (configure in DaemonSet):

env:
- name: WORKFLOW_NAMESPACE
  value: "workflows"  # Namespace where workflow apps run
- name: ELASTICSEARCH_HOST
  value: "elasticsearch.elasticsearch.svc.cluster.local"
- name: ELASTICSEARCH_PORT
  value: "9200"
- name: ELASTICSEARCH_TLS
  value: "Off"  # Set to "On" for HTTPS
- name: ELASTICSEARCH_TLS_VERIFY
  value: "Off"  # Set to "On" to verify certificates

For detailed FluentBit configuration, see: scripts/fluentbit/elasticsearch/README.md

Step 5: Deploy Test Workflow Application

Deploy a test workflow application to generate events:

cd data-index/scripts/kind

# Deploy test workflow app
./deploy-workflow-app.sh

# Verify deployment
kubectl get pods -n workflows

# Port-forward to test app
kubectl port-forward -n workflows svc/workflow-test-app 8080:8080 &

# Trigger a test workflow
curl -X POST http://localhost:8080/test-workflows/simple-set \
  -H "Content-Type: application/json" \
  -d '{"name": "Test Workflow"}'

Configuration

Data Index Service Properties

Common configuration:

# Elasticsearch connection
quarkus.elasticsearch.hosts=elasticsearch:9200

# Schema initialization flags
data-index.storage.skip-init-schema=false  # Enable for first startup
data-index.elasticsearch.schema.init.enabled=true

# Index names (optional, defaults provided)
data-index.elasticsearch.workflow-instance-index=workflow-instances
data-index.elasticsearch.task-execution-index=task-executions

TLS/Authentication configuration:

# HTTPS connection
quarkus.elasticsearch.protocol=https
quarkus.elasticsearch.username=elastic
quarkus.elasticsearch.password=changeme

# Optional: Trust store configuration
quarkus.elasticsearch.trust-store=/path/to/truststore.jks
quarkus.elasticsearch.trust-store-password=truststore-password

Environment variables (Kubernetes):

# Set via ConfigMap
QUARKUS_ELASTICSEARCH_HOSTS=elasticsearch.elasticsearch.svc.cluster.local:9200

# Set via Secret (for credentials)
QUARKUS_ELASTICSEARCH_USERNAME=elastic
QUARKUS_ELASTICSEARCH_PASSWORD=<from-secret>

# Control schema initialization
DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=true  # Disable in production

FluentBit Configuration

Key configuration sections in fluent-bit.conf:

Input (container logs):

[INPUT]
    Name              tail
    Path              /var/log/containers/*_${WORKFLOW_NAMESPACE}_*.log
    Parser            cri
    Tag               kube.*
    Refresh_Interval  5
    Mem_Buf_Limit     5MB

Filter (extract workflow events):

[FILTER]
    Name              grep
    Match             kube.*
    Regex             eventType ^io\.serverlessworkflow\.

[FILTER]
    Name          rewrite_tag
    Match         kube.*
    Rule          $eventType ^io\.serverlessworkflow\.workflow\. workflow.instance false

Output (Elasticsearch):

[OUTPUT]
    Name            es
    Match           workflow.instance
    Host            ${ELASTICSEARCH_HOST}
    Port            ${ELASTICSEARCH_PORT}
    Index           workflow-instance-events-raw
    Logstash_Format On
    Logstash_Prefix workflow-instance-events-raw
    Logstash_DateFormat %Y.%m.%d
    Retry_Limit     5

For full configuration details, see: scripts/fluentbit/elasticsearch/README.md

Verification

Verify FluentBit Event Capture

# Check FluentBit logs
kubectl logs -n logging -l app=workflows-fluent-bit-mode2 | grep "workflow.started"

# Should see events like:
# [workflow.instance] {"instanceId":"123","eventType":"io.serverlessworkflow.workflow.started",...}

Verify Raw Events in Elasticsearch

# Port-forward Elasticsearch
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &

# Count raw workflow events
curl -s http://localhost:9200/workflow-instance-events-raw-*/_count | jq

# View recent events
curl -s "http://localhost:9200/workflow-instance-events-raw-*/_search?size=5&sort=@timestamp:desc" | jq '.hits.hits[]._source'

Verify Transform Processing

# Check transform status
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].state'

# Expected: "started" or "indexing"

# Check documents processed
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].stats'

Verify Normalized Indices

# Count normalized workflow instances
curl -s http://localhost:9200/workflow-instances/_count | jq

# View workflow instances
curl -s "http://localhost:9200/workflow-instances/_search?size=5&sort=start:desc" | jq '.hits.hits[]._source'

Test GraphQL API

# Port-forward Data Index service
kubectl port-forward -n data-index svc/data-index-service 8080:8080 &

# Test GraphQL introspection
curl http://localhost:8080/graphql \
  -H "Content-Type: application/json" \
  -d '{"query":"{ __schema { queryType { name } } }"}'

# Query workflow instances
curl http://localhost:8080/graphql \
  -H "Content-Type: application/json" \
  -d '{"query":"{ getWorkflowInstances { id name status start } }"}'

# Access GraphQL UI
open http://localhost:8080/q/graphql-ui

Monitoring

FluentBit Metrics

FluentBit exposes Prometheus metrics:

# Port-forward to FluentBit pod
kubectl port-forward -n logging <pod-name> 2020:2020 &

# View metrics
curl http://localhost:2020/api/v1/metrics/prometheus

Key metrics:

  • fluentbit_input_records_total - Records read from logs

  • fluentbit_output_records_total - Records sent to Elasticsearch

  • fluentbit_output_errors_total - Elasticsearch errors

  • fluentbit_output_retries_total - Retry attempts

Elasticsearch Monitoring

# Check index sizes
curl -s "http://localhost:9200/_cat/indices/workflow-*?v&s=index"

# Check transform stats
curl -s http://localhost:9200/_transform/_stats | jq

# Check ILM policy status
curl -s http://localhost:9200/workflow-instance-events-raw-*/_ilm/explain | jq

Data Index Service Metrics

# Port-forward Data Index service
kubectl port-forward -n data-index svc/data-index-service 8080:8080 &

# Health check
curl http://localhost:8080/q/health

# Metrics (Prometheus format)
curl http://localhost:8080/q/metrics

Troubleshooting

No Events in Elasticsearch

Check 1: FluentBit is running

kubectl get pods -n logging -l app=workflows-fluent-bit-mode2

Check 2: FluentBit can read container logs

kubectl exec -n logging <pod-name> -- ls -la /var/log/containers/*_workflows_*.log

Check 3: FluentBit is parsing events

kubectl logs -n logging <pod-name> | grep "eventType"

Check 4: Elasticsearch connectivity

kubectl logs -n logging <pod-name> | grep -i "elasticsearch"
kubectl logs -n logging <pod-name> | grep -i "connection refused"

Raw Events Exist But Normalized Indices Empty

Check 1: Transform is started

curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].state'

# If stopped, start it:
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start

Check 2: Transform errors

curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].stats'

# Look for index_failures or search_failures

Check 3: Normalized indices exist

curl -s http://localhost:9200/_cat/indices/workflow-instances?v

# If missing, run schema initialization

GraphQL Query Returns Empty Results

Check 1: Events reached Elasticsearch

curl -s http://localhost:9200/workflow-instance-events-raw-*/_count | jq

Check 2: Transform processed events

curl -s http://localhost:9200/workflow-instances/_count | jq

Check 3: Data Index can connect to Elasticsearch

kubectl logs -n data-index -l app=data-index-service | grep -i elasticsearch

High Elasticsearch Resource Usage

Check 1: ILM policy is active

curl -s http://localhost:9200/_ilm/status | jq
curl -s http://localhost:9200/workflow-instance-events-raw-*/_ilm/explain | jq

Check 2: Old indices being deleted

curl -s "http://localhost:9200/_cat/indices/workflow-*?v&s=index"

# Look for indices older than 7 days (should be deleted automatically)

Check 3: Transform frequency

Default is 1 second. Increase for lower resource usage:

# Update transform (requires stopping first)
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_stop
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_update \
  -H "Content-Type: application/json" \
  -d '{"frequency":"10s"}'
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start

Production Recommendations

Schema Management

  • Disable auto-initialization: Set data-index.storage.skip-init-schema=true

  • Manage externally: Use GitOps (ArgoCD, Flux) or Kubernetes operators

  • Version control: Store schema JSON files in source control

  • Apply via CI/CD: Run schema updates in deployment pipeline

Security

  • Enable TLS: Use HTTPS for Elasticsearch connections

  • Use authentication: Configure username/password or API keys

  • Store credentials in Secrets: Never hardcode credentials

  • Network policies: Restrict traffic between namespaces

  • RBAC: Limit FluentBit permissions to necessary APIs

High Availability

  • Elasticsearch cluster: 3+ nodes (multi-AZ deployment)

  • Data Index replicas: Run 2+ instances behind load balancer

  • FluentBit: DaemonSet runs on every node automatically

  • Persistent volumes: Use dynamic provisioning with backup

Monitoring & Alerting

  • Prometheus scraping: Enable for FluentBit and Data Index

  • Alert on:

    • FluentBit output errors

    • Transform failures

    • Elasticsearch cluster health

    • Data Index service availability

  • Dashboards: Create Grafana dashboards for metrics

  • Log aggregation: Ship logs to centralized logging system

Performance Tuning

  • Elasticsearch shards: Adjust based on data volume (default: 3 per index)

  • Transform frequency: Balance latency vs resource usage (default: 1s)

  • ILM retention: Adjust raw event retention period (default: 7 days)

  • FluentBit buffer: Increase for high event volume

  • Data Index resources: Scale based on query load

Comparison with PostgreSQL Mode

Aspect PostgreSQL Mode (MODE 1) Elasticsearch Mode (MODE 2)

Normalization

PostgreSQL triggers (< 1ms)

ES Transforms (~1s frequency)

Latency

< 1ms (real-time)

~1s (near real-time)

Search

Basic SQL queries

Full-text search, aggregations

Scaling

Vertical (larger instance)

Horizontal (add nodes)

Throughput

Up to 50K workflows/day

100K+ workflows/day

Complexity

Simple (triggers, backups)

Moderate (transforms, ILM)

Lifecycle Management

Manual archival

Automatic (ILM policies)

GraphQL API

Identical

Identical

Choose PostgreSQL mode when:

  • Standard SQL queries are sufficient

  • Lower event volume (< 50K workflows/day)

  • Existing PostgreSQL infrastructure

  • Simpler operations preferred

Choose Elasticsearch mode when:

  • Full-text search needed

  • High event volume (> 50K workflows/day)

  • Complex aggregations required

  • Existing Elasticsearch infrastructure

  • Auto-scaling storage desired

Next Steps