Kafka Mode Architecture (MODE 3)
Status: Production Ready
Overview
MODE 3 is a Kafka-based event ingestion service that provides an alternative to log-file ingestion.
Event pipeline:
Quarkus Flow
↓ (publishes CloudEvents to Kafka)
Kafka topic: flow-lifecycle-out
↓ (SmallRye Reactive Messaging)
KafkaLifecycleConsumer
↓ (event validation + routing)
WorkflowEventProcessor / TaskExecutionProcessor
↓ (JDBC UPSERT with field-level idempotency)
PostgreSQL normalized tables
↓ (JPA/Hibernate)
Data Index GraphQL API
(failed records → data-index-events-dlq topic)
Components
KafkaLifecycleConsumer
Listens to the flow-lifecycle-out topic and:
-
Validates incoming CloudEvents (specversion, type, time)
-
Routes to processors based on event type prefix:
-
io.serverlessworkflow.workflow.*→ WorkflowEventProcessor -
io.serverlessworkflow.task.*→ TaskExecutionProcessor
-
-
Throws
ProcessEventFailedExceptionon errors (triggers DLQ)
Event Processors
WorkflowEventProcessor
Normalizes workflow lifecycle events:
-
workflow.started→ INSERT/UPDATE workflow_instances with status=RUNNING -
workflow.completed→ UPDATE status=COMPLETED, set end time and output -
workflow.faulted→ UPDATE status=FAULTED, set error fields -
workflow.suspended→ UPDATE status=SUSPENDED -
workflow.cancelled→ UPDATE status=CANCELLED
Uses field-level idempotency (see Idempotency section below).
TaskExecutionProcessor
Normalizes task lifecycle events:
-
task.started→ INSERT/UPDATE task_instances with status=RUNNING -
task.completed→ UPDATE status=COMPLETED, set end time and output -
task.faulted→ UPDATE status=FAULTED, set error fields -
task.suspended→ UPDATE status=SUSPENDED -
task.cancelled→ UPDATE status=CANCELLED
Also handles out-of-order recovery (see Out-of-Order Handling below).
Persistence Layer
WorkflowPersistence
JDBC-based UPSERT for workflow_instances:
INSERT INTO workflow_instances (
id, namespace, name, version, status, start, "end", last_update,
input, output, error_type, error_title, error_detail, error_status, error_instance,
last_event_time, created_at, updated_at
) VALUES (...)
ON CONFLICT (id) DO UPDATE SET
status = CASE
WHEN EXCLUDED.last_event_time >= workflow_instances.last_event_time
THEN EXCLUDED.status
ELSE workflow_instances.status
END,
...
TaskPersistence
Similar UPSERT for task_instances, but with FK recovery:
INSERT INTO task_instances (
task_execution_id, instance_id, task_name, task_position, status,
start, "end", input, output, error fields, last_event_time, ...
) VALUES (...)
ON CONFLICT (instance_id, task_position) DO UPDATE SET
...
Key design decisions:
-
Composite key:
(instance_id, task_position)uniquely identifies a task-
Handles Quarkus Flow’s changing
taskExecutionIdper event -
task_positionis stable across task lifecycle
-
-
FK Recovery: Savepoint-based retry if parent workflow doesn’t exist
-
Creates placeholder workflow on first attempt failure
-
Retries task insert with placeholder in place
-
Later workflow.started event updates the placeholder
-
Event Format
CloudEvent (v1.0)
{
"specversion": "1.0",
"type": "io.serverlessworkflow.workflow.started.v1",
"source": "/workflow/executions/01KSGKY66DMS0KPPMFMMR3BJZX",
"id": "event-123",
"time": "2026-05-25T22:40:10.676900Z",
"datacontenttype": "application/json",
"data": {
"instanceId": "01KSGKY66DMS0KPPMFMMR3BJZX",
"workflowName": "order-processing",
"workflowNamespace": "org.acme",
"workflowVersion": "1.0.0",
"status": "RUNNING",
"startTime": "2026-05-25T19:40:10.676802-03:00",
"lastUpdateTime": "2026-05-25T19:40:10.676802-03:00",
"input": { "orderId": "ORD-789" }
}
}
Idempotency Guarantees
MODE 3 implements field-level idempotency to handle out-of-order and duplicate events.
Immutable Fields (First Value Wins)
Once set, never updated:
Workflow:
-
start, input, name, version, namespace
Task:
-
start, input, task_name, task_position, task_execution_id
Example: If workflow.started sets start = 10:00, later events cannot change it.
Terminal Fields (Last Non-Null Wins)
Updated only if incoming event is newer (based on last_event_time):
Workflow:
-
end, output, last_update
Task:
-
end, output
Both:
-
error_type, error_title, error_detail, error_status, error_instance
Example: If workflow.completed arrives at 10:05 with end = 10:05, later events at 10:01 don’t override the end time.
Status Field
Updated based on timestamp and precedence:
-
Terminal states override less-terminal: COMPLETED/FAULTED/CANCELLED > RUNNING > CREATED
-
If incoming event is newer: status is updated
-
If incoming event is older: status is preserved
Example:
t=10:00: workflow.started
→ status = RUNNING, last_event_time = 10:00
t=10:05: workflow.completed
→ status = COMPLETED, end = 10:05, last_event_time = 10:05
t=10:01: workflow.completed (OUT OF ORDER)
→ 10:01 < 10:05, so status stays COMPLETED, end not overwritten
Out-of-Order Event Handling
MODE 3 guarantees task events aren’t lost even if they arrive before the parent workflow.
FK Recovery Flow
-
Task event consumed: Instance for workflow not yet in database
-
Initial INSERT fails: Foreign key constraint violation (SQL state 23503)
-
Savepoint rolled back: Transaction restored to known point
-
Placeholder workflow created: Minimal row inserted
-
id = task.instanceId -
created_at = NOW() -
last_event_time = task.eventTimestamp
-
-
Task INSERT retried: Now succeeds (FK satisfied)
-
Workflow event arrives later: Updates placeholder with full data
Example
Queue: [task.started(wf-1), task.completed(wf-1), workflow.started(wf-1)]
Processing:
1. task.started(wf-1)
→ INSERT INTO task_instances ... [FK fails, wf-1 doesn't exist]
→ Insert placeholder: workflow_instances (id='wf-1', created_at=NOW(), ...)
→ Retry task insert → OK
2. task.completed(wf-1)
→ UPDATE task_instances ... [OK, wf-1 exists]
3. workflow.started(wf-1)
→ UPDATE workflow_instances SET namespace='...', name='...', ... [OK, replaces placeholder]
Result:
workflow_instances: 1 row (placeholder replaced with real data)
task_instances: 2 rows (both tasks for wf-1)
Error Handling
Failed Event Processing
When an event cannot be processed (deserialization error, DB constraint violation, etc.):
-
Exception thrown:
ProcessEventFailedExceptionwraps the underlying error -
Dead-letter queue: Record automatically sent to
data-index-events-dlqtopic -
Consumer continues: Next message is processed immediately (fail-fast disabled)
-
Monitoring: Check DLQ topic to inspect and replay failed events
Failure reasons: * Malformed CloudEvent (missing type, time, or data) * Network errors * Serialization/deserialization errors
Monitoring & Recovery
Monitor the DLQ:
kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic data-index-events-dlq \
--from-beginning
Replay failed events: . Fix the root cause (fix event publisher, upgrade service, resolve DB issues) . Copy failed event from DLQ back to main topic . Service automatically reprocesses
Comparison: MODE 1 vs MODE 2 vs MODE 3
| Aspect | MODE 1 (PostgreSQL) | MODE 2 (Elasticsearch) | MODE 3 (Kafka) |
|---|---|---|---|
Event source |
Log files |
Log files |
Kafka topics |
Ingestion layer |
FluentBit |
FluentBit |
SmallRye Messaging |
Normalization |
SQL triggers |
ES transforms |
Java JDBC |
Raw storage |
PostgreSQL tables |
ES indices |
None (direct) |
Normalized storage |
PostgreSQL |
Elasticsearch |
PostgreSQL |
Query API |
GraphQL on PostgreSQL |
GraphQL on Elasticsearch |
GraphQL on PostgreSQL |
Latency |
~10ms |
~1000ms |
- |
Idempotency |
SQL COALESCE |
Painless script |
SQL CASE/COALESCE |
DLQ support |
N/A |
N/A |
Yes |
Security |
File-based (log files) |
File-based (log files) |
Kafka (SSL/SASL) |
Query capabilities |
Standard SQL |
Full-text, aggregations |
Standard SQL |
Choose MODE 3 when:
-
Kafka already deployed
-
Security concern: avoid log files
-
Need encrypted Kafka transport
-
Prefer stream-based ingestion
Choose MODE 1 when: * Simplest setup (triggers are atomic) * Low latency critical (~10ms) * No Kafka infrastructure * Log-based ingestion acceptable
Choose MODE 2 when: * Need full-text search * Complex aggregations required * Large scale (1M+ workflows) * Multi-tenancy needed