Elasticsearch Deployment
|
Status: Production Ready The Elasticsearch storage backend is fully implemented and production ready. Use this mode for high-throughput deployments with full-text search capabilities. |
Overview
Elasticsearch mode (MODE 2) uses ES Transform-based normalization for continuous aggregation of workflow events. FluentBit captures events from container logs and writes to Elasticsearch raw indices, then ES Transforms aggregate them into normalized indices for GraphQL queries.
Architecture:
Quarkus Flow Apps → Container Logs → FluentBit DaemonSet
↓
Elasticsearch (raw indices)
↓ (ES Transform ~1s)
Elasticsearch (normalized indices)
↓
Data Index GraphQL API
Characteristics:
-
Latency: ~1s normalization, sub-second query performance
-
Throughput: 100K+ workflows/day
-
Search: Full-text search, complex aggregations, analytics
-
Scaling: Horizontal (add Elasticsearch nodes)
-
Lifecycle: Automatic ILM policies for raw event retention
When to use:
-
Need full-text search capabilities
-
High event volume (> 50K workflows/day)
-
Complex aggregations or analytics required
-
Existing Elasticsearch infrastructure
-
Multi-tenancy requirements
See Elasticsearch Mode Architecture for detailed design information.
Prerequisites
Before deploying Elasticsearch mode, ensure you have:
-
Kubernetes cluster (1.21+)
-
kubectl configured and connected
-
Sufficient resources (3+ nodes recommended)
-
-
Elasticsearch cluster (8.11+)
-
Running and accessible from Kubernetes
-
HTTP API enabled (port 9200)
-
Optional: TLS/authentication configured
-
-
Container images built
-
Data Index service:
kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT -
FluentBit:
fluent/fluent-bit:latest
-
-
FluentBit RBAC permissions
-
Service account with pod metadata read access
-
ClusterRole for Kubernetes API access
-
Local Development
Quick Start with Dev Services
Quarkus Dev Services automatically starts Elasticsearch 8.11.1 in Docker for local development:
# Navigate to the Elasticsearch service module
cd data-index/data-index-service/data-index-service-elasticsearch
# Start in development mode (Dev Services auto-starts Elasticsearch)
mvn quarkus:dev
# What happens automatically:
# 1. Elasticsearch 8.11.1 container starts
# 2. Schema initializer creates ILM policies, index templates, transforms
# 3. GraphQL API available at http://localhost:8080/graphql
# 4. GraphQL UI available at http://localhost:8080/q/graphql-ui
Dev Services features:
-
Auto-starts
docker.elastic.co/elasticsearch/elasticsearch:8.11.1 -
Exposes Elasticsearch on random port (check logs for URL)
-
Schema initialization runs automatically
-
Live coding enabled (code changes trigger reload)
-
Container stops when dev mode exits
Configuration Files
Application properties (dev mode):
data-index-service-elasticsearch/src/main/resources/application.properties
# Elasticsearch Dev Services (enabled in dev mode)
%dev.quarkus.elasticsearch.devservices.enabled=true
%dev.quarkus.elasticsearch.devservices.image-name=docker.elastic.co/elasticsearch/elasticsearch:8.11.1
%dev.quarkus.elasticsearch.devservices.port=9200
# Schema initialization (enabled in dev, disabled in production)
%dev.data-index.storage.skip-init-schema=false
%dev.data-index.elasticsearch.schema.init.enabled=true
%prod.data-index.storage.skip-init-schema=true
# Logging
quarkus.log.category."org.kubesmarts.logic.dataindex".level=INFO
%dev.quarkus.log.category."org.kubesmarts.logic.dataindex.storage.elasticsearch".level=DEBUG
Verify Schema Initialization
When the service starts, you should see log messages like:
INFO [org.kub...ElasticsearchSchemaInitializer] Initializing Elasticsearch schema...
INFO [org.kub...ElasticsearchSchemaInitializer] Applying ILM policy 'data-index-events-retention'...
INFO [org.kub...ElasticsearchSchemaInitializer] ILM policy 'data-index-events-retention' applied successfully
INFO [org.kub...ElasticsearchSchemaInitializer] Applying index template 'workflow-events'...
INFO [org.kub...ElasticsearchSchemaInitializer] Index template 'workflow-events' applied successfully
INFO [org.kub...ElasticsearchSchemaInitializer] Applying transform 'workflow-instances-transform'...
INFO [org.kub...ElasticsearchSchemaInitializer] Transform 'workflow-instances-transform' applied successfully
INFO [org.kub...ElasticsearchSchemaInitializer] Elasticsearch schema initialization complete
Kubernetes Deployment
Step 1: Deploy Elasticsearch Cluster
Option A: ECK Operator (Recommended)
# Install ECK operator
kubectl create -f https://download.elastic.co/downloads/eck/2.10.0/crds.yaml
kubectl apply -f https://download.elastic.co/downloads/eck/2.10.0/operator.yaml
# Deploy Elasticsearch cluster
kubectl apply -f - <<EOF
apiVersion: elasticsearch.k8s.elastic.co/v1
kind: Elasticsearch
metadata:
name: data-index-es
namespace: elasticsearch
spec:
version: 8.11.1
nodeSets:
- name: default
count: 3
config:
node.store.allow_mmap: false
volumeClaimTemplates:
- metadata:
name: elasticsearch-data
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
EOF
# Wait for cluster to be ready
kubectl wait --namespace elasticsearch \
--for=condition=ready elasticsearch/data-index-es \
--timeout=600s
Option B: Helm Chart
# Add Elastic Helm repository
helm repo add elastic https://helm.elastic.co
helm repo update
# Install Elasticsearch
helm install elasticsearch elastic/elasticsearch \
--namespace elasticsearch \
--create-namespace \
--set replicas=3 \
--set minimumMasterNodes=2 \
--set resources.requests.memory=2Gi \
--set volumeClaimTemplate.resources.requests.storage=10Gi \
--version 8.11.1
# Wait for pods to be ready
kubectl wait --namespace elasticsearch \
--for=condition=ready pod \
--selector=app=elasticsearch-master \
--timeout=600s
Verify Elasticsearch:
# Port-forward Elasticsearch service
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &
# Check cluster health
curl -s http://localhost:9200/_cluster/health | jq
# Expected output:
# {
# "cluster_name": "elasticsearch",
# "status": "green",
# "number_of_nodes": 3,
# ...
# }
Step 2: Deploy Data Index Service
Build and push container image:
cd data-index/data-index-service/data-index-service-elasticsearch
# Build container image
mvn clean package \
-Dquarkus.container-image.build=true \
-Dquarkus.container-image.push=true \
-DskipTests
# For KIND: Load image to cluster
kind load docker-image \
kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT \
--name data-index-test
Create namespace and ConfigMap:
# Create namespace
kubectl create namespace data-index
# Create ConfigMap with Elasticsearch connection
kubectl create configmap data-index-config \
--namespace data-index \
--from-literal=QUARKUS_ELASTICSEARCH_HOSTS=elasticsearch.elasticsearch.svc.cluster.local:9200 \
--from-literal=DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=true
Deploy service:
apiVersion: apps/v1
kind: Deployment
metadata:
name: data-index-service
namespace: data-index
labels:
app: data-index-service
mode: elasticsearch
spec:
replicas: 1
selector:
matchLabels:
app: data-index-service
template:
metadata:
labels:
app: data-index-service
mode: elasticsearch
spec:
containers:
- name: data-index-service
image: kubesmarts/data-index-service-elasticsearch:999-SNAPSHOT
imagePullPolicy: Never # For KIND; use IfNotPresent for production
ports:
- containerPort: 8080
name: http
protocol: TCP
env:
- name: QUARKUS_ELASTICSEARCH_HOSTS
valueFrom:
configMapKeyRef:
name: data-index-config
key: QUARKUS_ELASTICSEARCH_HOSTS
- name: DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA
valueFrom:
configMapKeyRef:
name: data-index-config
key: DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA
- name: QUARKUS_HTTP_PORT
value: "8080"
- name: QUARKUS_LOG_LEVEL
value: "INFO"
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /q/health/live
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /q/health/ready
port: 8080
initialDelaySeconds: 20
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: data-index-service
namespace: data-index
labels:
app: data-index-service
spec:
type: NodePort
selector:
app: data-index-service
ports:
- port: 8080
targetPort: 8080
nodePort: 30080 # For KIND; omit for production
protocol: TCP
name: http
Apply deployment:
kubectl apply -f data-index-deployment.yaml
# Wait for deployment to be ready
kubectl wait --namespace data-index \
--for=condition=available deployment/data-index-service \
--timeout=300s
Step 3: Initialize Schema
Option A: Automatic (First Startup)
If you deploy with schema initialization enabled, the service will create all resources on first startup:
# Enable schema initialization in ConfigMap
kubectl create configmap data-index-config \
--namespace data-index \
--from-literal=DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=false \
--dry-run=client -o yaml | kubectl apply -f -
# Restart deployment to trigger initialization
kubectl rollout restart deployment/data-index-service -n data-index
# Watch logs to verify schema creation
kubectl logs -n data-index -l app=data-index-service -f
Option B: Manual (Production Recommended)
For production, manage schema externally (GitOps, operators, etc.):
# Run schema initialization from local dev environment
cd data-index/data-index-service/data-index-service-elasticsearch
# Port-forward to Elasticsearch
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &
# Run service locally to initialize schema
QUARKUS_ELASTICSEARCH_HOSTS=localhost:9200 \
DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=false \
mvn quarkus:dev
# Schema is created, then stop the service (Ctrl+C)
# Deploy to Kubernetes with skip-init-schema=true
Verify schema resources:
# Check ILM policy
curl -s http://localhost:9200/_ilm/policy/data-index-events-retention | jq
# Check index templates
curl -s http://localhost:9200/_index_template/workflow-events | jq
curl -s http://localhost:9200/_index_template/workflow-instances | jq
# Check transforms
curl -s http://localhost:9200/_transform/workflow-instances-transform | jq
# Start transforms (if not auto-started)
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start
Step 4: Deploy FluentBit DaemonSet
FluentBit captures workflow events from container logs and sends them to Elasticsearch.
Create FluentBit namespace and RBAC:
kubectl create namespace logging
# Create service account and RBAC
kubectl apply -f - <<EOF
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluent-bit
namespace: logging
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluent-bit
rules:
- apiGroups: [""]
resources:
- namespaces
- pods
- nodes
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluent-bit
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluent-bit
subjects:
- kind: ServiceAccount
name: fluent-bit
namespace: logging
EOF
Deploy FluentBit using provided scripts:
cd data-index/scripts/fluentbit/elasticsearch
# Option 1: Quick deploy (uses existing ConfigMap)
./deploy.sh
# Option 2: Regenerate ConfigMap from source files
./deploy.sh regenerate
# Verify deployment
kubectl get pods -n logging -l app=workflows-fluent-bit-mode2
kubectl logs -n logging -l app=workflows-fluent-bit-mode2 --tail=50
Manual deployment (alternative):
# Generate ConfigMap
./generate-configmap.sh
# Apply ConfigMap and DaemonSet
kubectl apply -f kubernetes/configmap.yaml
kubectl apply -f kubernetes/daemonset.yaml
# Wait for DaemonSet to be ready
kubectl rollout status daemonset/workflows-fluent-bit-mode2 -n logging
Environment variables (configure in DaemonSet):
env:
- name: WORKFLOW_NAMESPACE
value: "workflows" # Namespace where workflow apps run
- name: ELASTICSEARCH_HOST
value: "elasticsearch.elasticsearch.svc.cluster.local"
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTICSEARCH_TLS
value: "Off" # Set to "On" for HTTPS
- name: ELASTICSEARCH_TLS_VERIFY
value: "Off" # Set to "On" to verify certificates
For detailed FluentBit configuration, see: scripts/fluentbit/elasticsearch/README.md
Step 5: Deploy Test Workflow Application
Deploy a test workflow application to generate events:
cd data-index/scripts/kind
# Deploy test workflow app
./deploy-workflow-app.sh
# Verify deployment
kubectl get pods -n workflows
# Port-forward to test app
kubectl port-forward -n workflows svc/workflow-test-app 8080:8080 &
# Trigger a test workflow
curl -X POST http://localhost:8080/test-workflows/simple-set \
-H "Content-Type: application/json" \
-d '{"name": "Test Workflow"}'
Configuration
Data Index Service Properties
Common configuration:
# Elasticsearch connection
quarkus.elasticsearch.hosts=elasticsearch:9200
# Schema initialization flags
data-index.storage.skip-init-schema=false # Enable for first startup
data-index.elasticsearch.schema.init.enabled=true
# Index names (optional, defaults provided)
data-index.elasticsearch.workflow-instance-index=workflow-instances
data-index.elasticsearch.task-execution-index=task-executions
TLS/Authentication configuration:
# HTTPS connection
quarkus.elasticsearch.protocol=https
quarkus.elasticsearch.username=elastic
quarkus.elasticsearch.password=changeme
# Optional: Trust store configuration
quarkus.elasticsearch.trust-store=/path/to/truststore.jks
quarkus.elasticsearch.trust-store-password=truststore-password
Environment variables (Kubernetes):
# Set via ConfigMap
QUARKUS_ELASTICSEARCH_HOSTS=elasticsearch.elasticsearch.svc.cluster.local:9200
# Set via Secret (for credentials)
QUARKUS_ELASTICSEARCH_USERNAME=elastic
QUARKUS_ELASTICSEARCH_PASSWORD=<from-secret>
# Control schema initialization
DATA_INDEX_STORAGE_SKIP_INIT_SCHEMA=true # Disable in production
FluentBit Configuration
Key configuration sections in fluent-bit.conf:
Input (container logs):
[INPUT]
Name tail
Path /var/log/containers/*_${WORKFLOW_NAMESPACE}_*.log
Parser cri
Tag kube.*
Refresh_Interval 5
Mem_Buf_Limit 5MB
Filter (extract workflow events):
[FILTER]
Name grep
Match kube.*
Regex eventType ^io\.serverlessworkflow\.
[FILTER]
Name rewrite_tag
Match kube.*
Rule $eventType ^io\.serverlessworkflow\.workflow\. workflow.instance false
Output (Elasticsearch):
[OUTPUT]
Name es
Match workflow.instance
Host ${ELASTICSEARCH_HOST}
Port ${ELASTICSEARCH_PORT}
Index workflow-instance-events-raw
Logstash_Format On
Logstash_Prefix workflow-instance-events-raw
Logstash_DateFormat %Y.%m.%d
Retry_Limit 5
For full configuration details, see: scripts/fluentbit/elasticsearch/README.md
Verification
Verify FluentBit Event Capture
# Check FluentBit logs
kubectl logs -n logging -l app=workflows-fluent-bit-mode2 | grep "workflow.started"
# Should see events like:
# [workflow.instance] {"instanceId":"123","eventType":"io.serverlessworkflow.workflow.started",...}
Verify Raw Events in Elasticsearch
# Port-forward Elasticsearch
kubectl port-forward -n elasticsearch svc/elasticsearch 9200:9200 &
# Count raw workflow events
curl -s http://localhost:9200/workflow-instance-events-raw-*/_count | jq
# View recent events
curl -s "http://localhost:9200/workflow-instance-events-raw-*/_search?size=5&sort=@timestamp:desc" | jq '.hits.hits[]._source'
Verify Transform Processing
# Check transform status
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].state'
# Expected: "started" or "indexing"
# Check documents processed
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].stats'
Verify Normalized Indices
# Count normalized workflow instances
curl -s http://localhost:9200/workflow-instances/_count | jq
# View workflow instances
curl -s "http://localhost:9200/workflow-instances/_search?size=5&sort=start:desc" | jq '.hits.hits[]._source'
Test GraphQL API
# Port-forward Data Index service
kubectl port-forward -n data-index svc/data-index-service 8080:8080 &
# Test GraphQL introspection
curl http://localhost:8080/graphql \
-H "Content-Type: application/json" \
-d '{"query":"{ __schema { queryType { name } } }"}'
# Query workflow instances
curl http://localhost:8080/graphql \
-H "Content-Type: application/json" \
-d '{"query":"{ getWorkflowInstances { id name status start } }"}'
# Access GraphQL UI
open http://localhost:8080/q/graphql-ui
Monitoring
FluentBit Metrics
FluentBit exposes Prometheus metrics:
# Port-forward to FluentBit pod
kubectl port-forward -n logging <pod-name> 2020:2020 &
# View metrics
curl http://localhost:2020/api/v1/metrics/prometheus
Key metrics:
-
fluentbit_input_records_total- Records read from logs -
fluentbit_output_records_total- Records sent to Elasticsearch -
fluentbit_output_errors_total- Elasticsearch errors -
fluentbit_output_retries_total- Retry attempts
Troubleshooting
No Events in Elasticsearch
Check 1: FluentBit is running
kubectl get pods -n logging -l app=workflows-fluent-bit-mode2
Check 2: FluentBit can read container logs
kubectl exec -n logging <pod-name> -- ls -la /var/log/containers/*_workflows_*.log
Check 3: FluentBit is parsing events
kubectl logs -n logging <pod-name> | grep "eventType"
Check 4: Elasticsearch connectivity
kubectl logs -n logging <pod-name> | grep -i "elasticsearch"
kubectl logs -n logging <pod-name> | grep -i "connection refused"
Raw Events Exist But Normalized Indices Empty
Check 1: Transform is started
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].state'
# If stopped, start it:
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start
Check 2: Transform errors
curl -s http://localhost:9200/_transform/workflow-instances-transform/_stats | jq '.transforms[0].stats'
# Look for index_failures or search_failures
Check 3: Normalized indices exist
curl -s http://localhost:9200/_cat/indices/workflow-instances?v
# If missing, run schema initialization
GraphQL Query Returns Empty Results
Check 1: Events reached Elasticsearch
curl -s http://localhost:9200/workflow-instance-events-raw-*/_count | jq
Check 2: Transform processed events
curl -s http://localhost:9200/workflow-instances/_count | jq
Check 3: Data Index can connect to Elasticsearch
kubectl logs -n data-index -l app=data-index-service | grep -i elasticsearch
High Elasticsearch Resource Usage
Check 1: ILM policy is active
curl -s http://localhost:9200/_ilm/status | jq
curl -s http://localhost:9200/workflow-instance-events-raw-*/_ilm/explain | jq
Check 2: Old indices being deleted
curl -s "http://localhost:9200/_cat/indices/workflow-*?v&s=index"
# Look for indices older than 7 days (should be deleted automatically)
Check 3: Transform frequency
Default is 1 second. Increase for lower resource usage:
# Update transform (requires stopping first)
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_stop
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_update \
-H "Content-Type: application/json" \
-d '{"frequency":"10s"}'
curl -X POST http://localhost:9200/_transform/workflow-instances-transform/_start
Production Recommendations
Schema Management
-
Disable auto-initialization: Set
data-index.storage.skip-init-schema=true -
Manage externally: Use GitOps (ArgoCD, Flux) or Kubernetes operators
-
Version control: Store schema JSON files in source control
-
Apply via CI/CD: Run schema updates in deployment pipeline
Security
-
Enable TLS: Use HTTPS for Elasticsearch connections
-
Use authentication: Configure username/password or API keys
-
Store credentials in Secrets: Never hardcode credentials
-
Network policies: Restrict traffic between namespaces
-
RBAC: Limit FluentBit permissions to necessary APIs
High Availability
-
Elasticsearch cluster: 3+ nodes (multi-AZ deployment)
-
Data Index replicas: Run 2+ instances behind load balancer
-
FluentBit: DaemonSet runs on every node automatically
-
Persistent volumes: Use dynamic provisioning with backup
Monitoring & Alerting
-
Prometheus scraping: Enable for FluentBit and Data Index
-
Alert on:
-
FluentBit output errors
-
Transform failures
-
Elasticsearch cluster health
-
Data Index service availability
-
-
Dashboards: Create Grafana dashboards for metrics
-
Log aggregation: Ship logs to centralized logging system
Performance Tuning
-
Elasticsearch shards: Adjust based on data volume (default: 3 per index)
-
Transform frequency: Balance latency vs resource usage (default: 1s)
-
ILM retention: Adjust raw event retention period (default: 7 days)
-
FluentBit buffer: Increase for high event volume
-
Data Index resources: Scale based on query load
Comparison with PostgreSQL Mode
| Aspect | PostgreSQL Mode (MODE 1) | Elasticsearch Mode (MODE 2) |
|---|---|---|
Normalization |
PostgreSQL triggers (< 1ms) |
ES Transforms (~1s frequency) |
Latency |
< 1ms (real-time) |
~1s (near real-time) |
Search |
Basic SQL queries |
Full-text search, aggregations |
Scaling |
Vertical (larger instance) |
Horizontal (add nodes) |
Throughput |
Up to 50K workflows/day |
100K+ workflows/day |
Complexity |
Simple (triggers, backups) |
Moderate (transforms, ILM) |
Lifecycle Management |
Manual archival |
Automatic (ILM policies) |
GraphQL API |
Identical |
Identical |
Choose PostgreSQL mode when:
-
Standard SQL queries are sufficient
-
Lower event volume (< 50K workflows/day)
-
Existing PostgreSQL infrastructure
-
Simpler operations preferred
Choose Elasticsearch mode when:
-
Full-text search needed
-
High event volume (> 50K workflows/day)
-
Complex aggregations required
-
Existing Elasticsearch infrastructure
-
Auto-scaling storage desired
Next Steps
-
PostgreSQL Deployment - Alternative storage backend
-
Elasticsearch Architecture - Detailed design
-
Configuration Guide - All configuration options
-
Deployment Overview - Compare storage backends