Kafka Mode Architecture (MODE 3)

Status: Production Ready

Overview

MODE 3 is a Kafka-based event ingestion service that provides an alternative to log-file ingestion.

Event pipeline:

Quarkus Flow
    ↓ (publishes CloudEvents to Kafka)
Kafka topic: flow-lifecycle-out
    ↓ (SmallRye Reactive Messaging)
KafkaLifecycleConsumer
    ↓ (event validation + routing)
WorkflowEventProcessor / TaskExecutionProcessor
    ↓ (JDBC UPSERT with field-level idempotency)
PostgreSQL normalized tables
    ↓ (JPA/Hibernate)
Data Index GraphQL API

(failed records → data-index-events-dlq topic)

Components

KafkaLifecycleConsumer

Listens to the flow-lifecycle-out topic and:

  • Validates incoming CloudEvents (specversion, type, time)

  • Routes to processors based on event type prefix:

    • io.serverlessworkflow.workflow.* → WorkflowEventProcessor

    • io.serverlessworkflow.task.* → TaskExecutionProcessor

  • Throws ProcessEventFailedException on errors (triggers DLQ)

Event Processors

WorkflowEventProcessor

Normalizes workflow lifecycle events:

  • workflow.started → INSERT/UPDATE workflow_instances with status=RUNNING

  • workflow.completed → UPDATE status=COMPLETED, set end time and output

  • workflow.faulted → UPDATE status=FAULTED, set error fields

  • workflow.suspended → UPDATE status=SUSPENDED

  • workflow.cancelled → UPDATE status=CANCELLED

Uses field-level idempotency (see Idempotency section below).

TaskExecutionProcessor

Normalizes task lifecycle events:

  • task.started → INSERT/UPDATE task_instances with status=RUNNING

  • task.completed → UPDATE status=COMPLETED, set end time and output

  • task.faulted → UPDATE status=FAULTED, set error fields

  • task.suspended → UPDATE status=SUSPENDED

  • task.cancelled → UPDATE status=CANCELLED

Also handles out-of-order recovery (see Out-of-Order Handling below).

Persistence Layer

WorkflowPersistence

JDBC-based UPSERT for workflow_instances:

INSERT INTO workflow_instances (
  id, namespace, name, version, status, start, "end", last_update,
  input, output, error_type, error_title, error_detail, error_status, error_instance,
  last_event_time, created_at, updated_at
) VALUES (...)
ON CONFLICT (id) DO UPDATE SET
  status = CASE
    WHEN EXCLUDED.last_event_time >= workflow_instances.last_event_time
    THEN EXCLUDED.status
    ELSE workflow_instances.status
  END,
  ...

TaskPersistence

Similar UPSERT for task_instances, but with FK recovery:

INSERT INTO task_instances (
  task_execution_id, instance_id, task_name, task_position, status,
  start, "end", input, output, error fields, last_event_time, ...
) VALUES (...)
ON CONFLICT (instance_id, task_position) DO UPDATE SET
  ...

Key design decisions:

  • Composite key: (instance_id, task_position) uniquely identifies a task

    • Handles Quarkus Flow’s changing taskExecutionId per event

    • task_position is stable across task lifecycle

  • FK Recovery: Savepoint-based retry if parent workflow doesn’t exist

    • Creates placeholder workflow on first attempt failure

    • Retries task insert with placeholder in place

    • Later workflow.started event updates the placeholder

Event Format

CloudEvent (v1.0)

{
  "specversion": "1.0",
  "type": "io.serverlessworkflow.workflow.started.v1",
  "source": "/workflow/executions/01KSGKY66DMS0KPPMFMMR3BJZX",
  "id": "event-123",
  "time": "2026-05-25T22:40:10.676900Z",
  "datacontenttype": "application/json",
  "data": {
    "instanceId": "01KSGKY66DMS0KPPMFMMR3BJZX",
    "workflowName": "order-processing",
    "workflowNamespace": "org.acme",
    "workflowVersion": "1.0.0",
    "status": "RUNNING",
    "startTime": "2026-05-25T19:40:10.676802-03:00",
    "lastUpdateTime": "2026-05-25T19:40:10.676802-03:00",
    "input": { "orderId": "ORD-789" }
  }
}

Timestamp Handling

All timestamp fields are automatically converted to UTC OffsetDateTime (TIMESTAMP WITH TIME ZONE).

Accepted formats:

  • ISO-8601 with offset: 2026-05-25T19:40:10.676802-03:00 (recommended)

  • ISO-8601 UTC: 2026-05-25T22:40:10.676900Z

  • Unix epoch seconds: 1747486200

Idempotency Guarantees

MODE 3 implements field-level idempotency to handle out-of-order and duplicate events.

Immutable Fields (First Value Wins)

Once set, never updated:

Workflow:

  • start, input, name, version, namespace

Task:

  • start, input, task_name, task_position, task_execution_id

Example: If workflow.started sets start = 10:00, later events cannot change it.

Terminal Fields (Last Non-Null Wins)

Updated only if incoming event is newer (based on last_event_time):

Workflow:

  • end, output, last_update

Task:

  • end, output

Both:

  • error_type, error_title, error_detail, error_status, error_instance

Example: If workflow.completed arrives at 10:05 with end = 10:05, later events at 10:01 don’t override the end time.

Status Field

Updated based on timestamp and precedence:

  • Terminal states override less-terminal: COMPLETED/FAULTED/CANCELLED > RUNNING > CREATED

  • If incoming event is newer: status is updated

  • If incoming event is older: status is preserved

Example:

t=10:00: workflow.started
  → status = RUNNING, last_event_time = 10:00

t=10:05: workflow.completed
  → status = COMPLETED, end = 10:05, last_event_time = 10:05

t=10:01: workflow.completed (OUT OF ORDER)
  → 10:01 < 10:05, so status stays COMPLETED, end not overwritten

Out-of-Order Event Handling

MODE 3 guarantees task events aren’t lost even if they arrive before the parent workflow.

FK Recovery Flow

  1. Task event consumed: Instance for workflow not yet in database

  2. Initial INSERT fails: Foreign key constraint violation (SQL state 23503)

  3. Savepoint rolled back: Transaction restored to known point

  4. Placeholder workflow created: Minimal row inserted

    • id = task.instanceId

    • created_at = NOW()

    • last_event_time = task.eventTimestamp

  5. Task INSERT retried: Now succeeds (FK satisfied)

  6. Workflow event arrives later: Updates placeholder with full data

Example

Queue: [task.started(wf-1), task.completed(wf-1), workflow.started(wf-1)]

Processing:

1. task.started(wf-1)
   → INSERT INTO task_instances ... [FK fails, wf-1 doesn't exist]
   → Insert placeholder: workflow_instances (id='wf-1', created_at=NOW(), ...)
   → Retry task insert → OK

2. task.completed(wf-1)
   → UPDATE task_instances ... [OK, wf-1 exists]

3. workflow.started(wf-1)
   → UPDATE workflow_instances SET namespace='...', name='...', ... [OK, replaces placeholder]

Result:
   workflow_instances: 1 row (placeholder replaced with real data)
   task_instances: 2 rows (both tasks for wf-1)

Error Handling

Failed Event Processing

When an event cannot be processed (deserialization error, DB constraint violation, etc.):

  1. Exception thrown: ProcessEventFailedException wraps the underlying error

  2. Dead-letter queue: Record automatically sent to data-index-events-dlq topic

  3. Consumer continues: Next message is processed immediately (fail-fast disabled)

  4. Monitoring: Check DLQ topic to inspect and replay failed events

Failure reasons: * Malformed CloudEvent (missing type, time, or data) * Network errors * Serialization/deserialization errors

Monitoring & Recovery

Monitor the DLQ:

kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic data-index-events-dlq \
  --from-beginning

Replay failed events: . Fix the root cause (fix event publisher, upgrade service, resolve DB issues) . Copy failed event from DLQ back to main topic . Service automatically reprocesses

Comparison: MODE 1 vs MODE 2 vs MODE 3

Aspect MODE 1 (PostgreSQL) MODE 2 (Elasticsearch) MODE 3 (Kafka)

Event source

Log files

Log files

Kafka topics

Ingestion layer

FluentBit

FluentBit

SmallRye Messaging

Normalization

SQL triggers

ES transforms

Java JDBC

Raw storage

PostgreSQL tables

ES indices

None (direct)

Normalized storage

PostgreSQL

Elasticsearch

PostgreSQL

Query API

GraphQL on PostgreSQL

GraphQL on Elasticsearch

GraphQL on PostgreSQL

Latency

~10ms

~1000ms

-

Idempotency

SQL COALESCE

Painless script

SQL CASE/COALESCE

DLQ support

N/A

N/A

Yes

Security

File-based (log files)

File-based (log files)

Kafka (SSL/SASL)

Query capabilities

Standard SQL

Full-text, aggregations

Standard SQL

Choose MODE 3 when:

  • Kafka already deployed

  • Security concern: avoid log files

  • Need encrypted Kafka transport

  • Prefer stream-based ingestion

Choose MODE 1 when: * Simplest setup (triggers are atomic) * Low latency critical (~10ms) * No Kafka infrastructure * Log-based ingestion acceptable

Choose MODE 2 when: * Need full-text search * Complex aggregations required * Large scale (1M+ workflows) * Multi-tenancy needed