Kafka Deployment (MODE 3)

Status: Preview

Overview

MODE 3 is a Kafka-based event ingestion service that provides an alternative to MODE 1 and MODE 2.

Use MODE 3 when:

  • Kafka infrastructure already exists in your environment

  • Security requirements demand events not be written to disk (credit cards, PII, etc.)

  • You need encrypted transport (SSL/SASL_SSL)

  • Direct stream processing is preferred over log-based ingestion

  • You want to leverage Kafka’s at-least-once delivery guarantees

Event Pipeline

Quarkus Flow
    → Kafka (CloudEvents, topic: flow-lifecycle-out)
        → Data Index Ingestion Service
            → PostgreSQL (workflow_instances, task_instances)
                → Data Index GraphQL API

(failed records → data-index-events-dlq)

Kafka Topic Configuration

Required Topics

  • flow-lifecycle-out - Main event topic (published by Quarkus Flow applications)

  • data-index-events-dlq - Dead-letter queue for failed records

You can change topic names via configuration through environment variables:

  • MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_TOPIC (default is flow-lifecycle-out)

  • MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_DEAD_LETTER_QUEUE_TOPIC (default is data-index-events-dlq)

Creating Topics

In non-production environments, topics are typically auto-created. In production, create them explicitly in accordance with your Kafka cluster management practices.

# Main topic (replicas=3, partitions=3)
kafka-topics.sh --create \
  --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
  --topic flow-lifecycle-out \
  --replication-factor 3 \
  --partitions 3 \
  --config retention.ms=86400000 \
  --config min.insync.replicas=2

# DLQ topic
kafka-topics.sh --create \
  --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
  --topic data-index-events-dlq \
  --replication-factor 3 \
  --partitions 1 \
  --config retention.ms=604800000

Kubernetes Deployment

Basic Manifest

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-index-ingestion-kafka
  namespace: data-index
spec:
  replicas: 1
  selector:
    matchLabels:
      app: data-index-ingestion-kafka
  template:
    metadata:
      labels:
        app: data-index-ingestion-kafka
    spec:
      containers:
      - name: kafka-ingestion
        image: kubesmarts/data-index-ingestion-kafka-service:999-SNAPSHOT
        ports:
        - containerPort: 8080
          name: http
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka.kafka.svc.cluster.local:9092"
        - name: QUARKUS_DATASOURCE_JDBC_URL
          value: "jdbc:postgresql://postgresql:5432/data-index"
        - name: QUARKUS_DATASOURCE_USERNAME
          valueFrom:
            secretKeyRef:
              name: database-credentials
              key: username
        - name: QUARKUS_DATASOURCE_PASSWORD
          valueFrom:
            secretKeyRef:
              name: database-credentials
              key: password
        livenessProbe:
          httpGet:
            path: /q/health/live
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /q/health/ready
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5
        resources:
          requests:
            cpu: 500m
            memory: 512Mi
          limits:
            cpu: 2000m
            memory: 2Gi
---
apiVersion: v1
kind: Service
metadata:
  name: data-index-ingestion-kafka
  namespace: data-index
spec:
  selector:
    app: data-index-ingestion-kafka
  ports:
  - port: 8080
    targetPort: 8080
    name: http

Configuration

Required Environment Variables

Variable Default Description

KAFKA_BOOTSTRAP_SERVERS

localhost:29092

Kafka broker URLs (comma-separated)

QUARKUS_DATASOURCE_JDBC_URL

jdbc:h2:mem:test

PostgreSQL JDBC connection string

QUARKUS_DATASOURCE_USERNAME

(dev services)

Database username

QUARKUS_DATASOURCE_PASSWORD

(dev services)

Database password

Optional Configuration

Variable Default Description

MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_TOPIC

flow-lifecycle-out

Kafka topic name

MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_GROUP_ID

data-index-ingestion

Consumer group

MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_HEALTH_ENABLED

true

Enable channel health checks

MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_HEALTH_READINESS_ENABLED

true

Include channel in readiness checks

MP_MESSAGING_INCOMING_DATA_INDEX_EVENTS_DEAD_LETTER_QUEUE_TOPIC

data-index-events-dlq

DLQ topic name

Monitoring

Health Checks

# Liveness (service is running)
curl http://localhost:8080/q/health/live

# Readiness (ready to consume events)
curl http://localhost:8080/q/health/ready

# Full health
curl http://localhost:8080/q/health

# Metrics (Prometheus format)
curl http://localhost:8080/q/metrics

Kubernetes Monitoring

# Follow logs
kubectl logs -f deployment/data-index-ingestion-kafka -n data-index

# Inspect DLQ messages
kafka-console-consumer.sh \
  --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
  --topic data-index-events-dlq \
  --from-beginning \
  --max-messages 10

Event Processing

Field-Level Idempotency

MODE 3 guarantees idempotency for out-of-order and duplicate events:

Immutable fields (first value wins)
  • start, input, name, version, namespace

  • Never updated after initial insertion

Terminal fields (last non-null wins)
  • end, output, error fields

  • Updated only if incoming event timestamp is newer

Status precedence
  • COMPLETED, FAULTED, CANCELLED > RUNNING > CREATED

  • Terminal states override less-terminal states

Out-of-Order Recovery

If a task event arrives before the parent workflow:

  1. Task event consumed → INSERT fails (foreign key constraint)

  2. Savepoint rolled back

  3. Placeholder workflow created with minimal data

  4. Task event retried → INSERT succeeds

  5. Workflow event arrives later → updates placeholder with full data

This ensures no task events are lost due to event ordering.

Troubleshooting

Service won’t start

Check logs:

kubectl logs deployment/data-index-ingestion-kafka -n data-index

Common causes: * PostgreSQL unreachable → verify QUARKUS_DATASOURCE_JDBC_URL * Kafka unreachable → verify KAFKA_BOOTSTRAP_SERVERS * Database schema missing → run Flyway migrations

Events not consumed

Check readiness:

kubectl get pods -n data-index | grep data-index-ingestion-kafka

# Check logs
kubectl logs deployment/data-index-ingestion-kafka -n data-index | grep -i error

DLQ messages pile up

Inspect failed events:

kafka-console-consumer.sh \
  --bootstrap-server kafka:9092 \
  --topic data-index-events-dlq \
  --max-messages 5 | jq

Common causes: * Malformed CloudEvents → fix event publisher * Database unavailable → events will retry once recovered * Schema mismatch → upgrade service or downgrade event publisher

Comparison

Feature MODE 1 (FluentBit + Triggers) MODE 2 (FluentBit + ES) MODE 3 (Kafka)

Event Source

Log files

Log files

Kafka topics

Ingestion

FluentBit DaemonSet

FluentBit DaemonSet

SmallRye Reactive Messaging

Normalization

PostgreSQL triggers

ES transforms

Java processors (JDBC)

Raw Storage

workflow_events_raw

workflow-events index

None (direct to normalized)

Performance

~10ms latency

~1s latency

~100ms latency

DLQ

N/A

N/A

Yes (data-index-events-dlq)

Security

Disk files

Disk files

Kafka (SSL/SASL capable)