Skip to content

Kafka with Strimzi

Apache Kafka deployment and management using the Strimzi operator for cloud-native Kafka operations.

Overview

Strimzi provides a Kubernetes-native way to deploy and manage Apache Kafka clusters, offering custom resource definitions (CRDs) for Kafka, topics, users, and connectors.

Strimzi Architecture

Core Components

  • Cluster Operator: Manages Kafka clusters and resources
  • Topic Operator: Manages Kafka topics as Kubernetes resources
  • User Operator: Manages Kafka users and access control
  • Kafka Connect: Manages connector instances for data integration

Custom Resources

  • Kafka: Defines Kafka cluster configuration
  • KafkaTopic: Defines topic specifications
  • KafkaUser: Defines user credentials and ACLs
  • KafkaConnect: Defines connector cluster configuration

Kafka Cluster Configuration

Basic Cluster Setup

Kafka Cluster Resource:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 3
    listeners:
    - name: plain
      port: 9092
      type: internal
      tls: false
    - name: tls
      port: 9093
      type: internal
      tls: true
    - name: external
      port: 9094
      type: nodeport
      tls: false
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.6"
      log.message.format.version: "3.6"
    storage:
      type: persistent-claim
      size: 100Gi
      class: fast-ssd
    resources:
      requests:
        memory: 2Gi
        cpu: 500m
      limits:
        memory: 4Gi
        cpu: 2000m
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 20Gi
      class: fast-ssd
    resources:
      requests:
        memory: 1Gi
        cpu: 250m
      limits:
        memory: 2Gi
        cpu: 1000m
  entityOperator:
    topicOperator:
      resources:
        requests:
          memory: 256Mi
          cpu: 100m
        limits:
          memory: 512Mi
          cpu: 500m
    userOperator:
      resources:
        requests:
          memory: 256Mi
          cpu: 100m
        limits:
          memory: 512Mi
          cpu: 500m

Production Configuration

High Availability Setup:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-production
  namespace: kafka
spec:
  kafka:
    version: 3.6.0
    replicas: 5
    listeners:
    - name: internal
      port: 9092
      type: internal
      tls: true
      authentication:
        type: tls
    - name: external
      port: 9094
      type: loadbalancer
      tls: true
      authentication:
        type: tls
    config:
      # Performance optimization
      num.network.threads: 8
      num.io.threads: 16
      socket.send.buffer.bytes: 102400
      socket.receive.buffer.bytes: 102400
      socket.request.max.bytes: 104857600

      # Replication settings
      default.replication.factor: 3
      min.insync.replicas: 2
      unclean.leader.election.enable: false

      # Log settings
      log.retention.hours: 168
      log.segment.bytes: 1073741824
      log.retention.check.interval.ms: 300000

      # Compression
      compression.type: producer

    storage:
      type: persistent-claim
      size: 500Gi
      class: fast-ssd
    resources:
      requests:
        memory: 8Gi
        cpu: 2000m
      limits:
        memory: 16Gi
        cpu: 4000m
    jvmOptions:
      -Xms: 4g
      -Xmx: 4g
      -XX:
        UseG1GC: true
        MaxGCPauseMillis: 20
        InitiatingHeapOccupancyPercent: 35

Topic Management

Standard Topics

Customs Declaration Topics:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: customs-declarations
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 604800000  # 7 days
    segment.ms: 86400000     # 1 day
    cleanup.policy: delete
    min.insync.replicas: 2
    compression.type: lz4

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: customs-approvals
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 6
  replicas: 3
  config:
    retention.ms: 2592000000  # 30 days
    cleanup.policy: delete
    min.insync.replicas: 2

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: customs-notifications
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 3
  replicas: 3
  config:
    retention.ms: 86400000   # 1 day
    cleanup.policy: delete
    min.insync.replicas: 2

Compacted Topics

Reference Data Topics:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: customs-reference-data
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  partitions: 1
  replicas: 3
  config:
    cleanup.policy: compact
    segment.ms: 604800000    # 7 days
    min.cleanable.dirty.ratio: 0.1
    delete.retention.ms: 86400000  # 1 day

User Management and Security

Service Users

Application Service Users:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: nucleus-service
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
    # Producer permissions
    - resource:
        type: topic
        name: customs-declarations
        patternType: literal
      operation: Write
      host: "*"
    - resource:
        type: topic
        name: customs-notifications
        patternType: literal
      operation: Write
      host: "*"

    # Consumer permissions
    - resource:
        type: topic
        name: customs-approvals
        patternType: literal
      operation: Read
      host: "*"
    - resource:
        type: group
        name: nucleus-consumer-group
        patternType: literal
      operation: Read
      host: "*"

    # Schema registry access
    - resource:
        type: topic
        name: _schemas
        patternType: literal
      operation: Read
      host: "*"

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: approval-service
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
    # Consumer permissions
    - resource:
        type: topic
        name: customs-declarations
        patternType: literal
      operation: Read
      host: "*"
    - resource:
        type: group
        name: approval-service-group
        patternType: literal
      operation: Read
      host: "*"

    # Producer permissions
    - resource:
        type: topic
        name: customs-approvals
        patternType: literal
      operation: Write
      host: "*"

Administrative Users

Kafka Admin User:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: kafka-admin
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-cluster
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
    # Cluster admin permissions
    - resource:
        type: cluster
      operation: All
      host: "*"
    - resource:
        type: topic
        name: "*"
        patternType: literal
      operation: All
      host: "*"
    - resource:
        type: group
        name: "*"
        patternType: literal
      operation: All
      host: "*"
    - resource:
        type: transactionalId
        name: "*"
        patternType: literal
      operation: All
      host: "*"

Kafka Connect Integration

Connect Cluster

Kafka Connect Deployment:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect-cluster
  namespace: kafka
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.6.0
  replicas: 3
  bootstrapServers: kafka-cluster-kafka-bootstrap:9093
  tls:
    trustedCertificates:
    - secretName: kafka-cluster-cluster-ca-cert
      certificate: ca.crt
  authentication:
    type: tls
    certificateAndKey:
      secretName: kafka-connect-user
      certificate: user.crt
      key: user.key
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 3
    offset.storage.replication.factor: 3
    status.storage.replication.factor: 3
  resources:
    requests:
      cpu: 500m
      memory: 1Gi
    limits:
      cpu: 1000m
      memory: 2Gi

Database Connector

SQL Server Source Connector:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mssql-source-connector
  namespace: kafka
  labels:
    strimzi.io/cluster: kafka-connect-cluster
spec:
  class: io.debezium.connector.sqlserver.SqlServerConnector
  tasksMax: 2
  config:
    database.hostname: mssql-server
    database.port: 1433
    database.user: kafka_user
    database.password: ${file:/opt/kafka/external-configuration/connector-config/password:password}
    database.dbname: CustomsDB
    database.server.name: customs-db
    table.include.list: dbo.declarations,dbo.approvals
    database.history.kafka.bootstrap.servers: kafka-cluster-kafka-bootstrap:9093
    database.history.kafka.topic: schema-changes.customs-db
    database.history.consumer.security.protocol: SSL
    database.history.producer.security.protocol: SSL
    transforms: route
    transforms.route.type: org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex: ([^.]+)\\.([^.]+)\\.([^.]+)
    transforms.route.replacement: $3

Monitoring and Operations

Metrics Collection

Kafka Metrics:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
spec:
  kafka:
    metrics:
      lowercaseOutputName: true
      rules:
      # Broker metrics
      - pattern: kafka.server<type=(.+), name=(.+)><>Value
        name: kafka_server_$1_$2
      - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+)><>Value
        name: kafka_server_$1_$2
        labels:
          clientId: "$3"
      # Topic metrics
      - pattern: kafka.log<type=LogSize, name=Size, topic=(.+), partition=(.+)><>Value
        name: kafka_log_size
        labels:
          topic: "$1"
          partition: "$2"

Health Monitoring

Cluster Health Checks:

# Check cluster status
kubectl get kafka kafka-cluster -n kafka

# Check broker pods
kubectl get pods -l strimzi.io/cluster=kafka-cluster -n kafka

# Check topic status
kubectl get kafkatopic -n kafka

# Check user status
kubectl get kafkauser -n kafka

# View cluster events
kubectl get events -n kafka --sort-by=.metadata.creationTimestamp

Performance Monitoring

Key Performance Metrics:

# Message throughput
rate(kafka_server_brokertopicmetrics_messagesinpersec[5m])

# Byte throughput
rate(kafka_server_brokertopicmetrics_bytesinpersec[5m])

# Consumer lag
kafka_consumer_lag_sum

# Broker CPU usage
rate(process_cpu_seconds_total{job="kafka-brokers"}[5m])

# JVM memory usage
jvm_memory_bytes_used / jvm_memory_bytes_max

Troubleshooting

Common Issues

Broker Not Starting:

# Check broker logs
kubectl logs kafka-cluster-kafka-0 -n kafka

# Check persistent volume claims
kubectl get pvc -n kafka

# Check resource constraints
kubectl describe pod kafka-cluster-kafka-0 -n kafka

Topic Creation Issues:

# Check topic operator logs
kubectl logs deployment/kafka-cluster-entity-operator -c topic-operator -n kafka

# Verify topic configuration
kubectl describe kafkatopic <topic-name> -n kafka

# Check cluster configuration
kubectl get kafka kafka-cluster -o yaml -n kafka

Consumer Lag Issues:

# Check consumer group status
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe --group <group-id>

# Reset consumer group offset
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --group <group-id> \
  --reset-offsets --to-earliest \
  --topic <topic-name> --execute

Diagnostic Commands

# List topics
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

# Describe topic
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic <topic-name>

# Test producer
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic <topic-name>

# Test consumer
kubectl exec kafka-cluster-kafka-0 -n kafka -- \
  bin/kafka-console-consumer.sh \
  --bootstrap-server localhost:9092 \
  --topic <topic-name> --from-beginning

Best Practices

Cluster Configuration

  1. Replication Factor: Use minimum of 3 for production
  2. Partition Strategy: Plan partitions based on throughput requirements
  3. Resource Allocation: Adequate CPU and memory for brokers
  4. Storage: Use fast SSDs for Kafka logs

Topic Design

  1. Naming Convention: Consistent topic naming standards
  2. Partition Count: Balance between parallelism and overhead
  3. Retention Policy: Appropriate retention based on use case
  4. Compaction: Use compacted topics for reference data

Security

  1. TLS Encryption: Enable TLS for all communications
  2. Authentication: Use TLS client certificates or SASL
  3. Authorization: Implement ACLs with least privilege
  4. Network Policies: Restrict network access to Kafka

Operations

  1. Monitoring: Comprehensive metrics and alerting
  2. Backup: Regular backup of topic configurations
  3. Capacity Planning: Monitor growth and plan scaling
  4. Documentation: Maintain current operational procedures

For advanced Kafka and Strimzi configurations, refer to the Strimzi documentation.