Writing a custom score-compose provisioner for Apache Kafka

Interested in building a custom resource provisioner for your Score implementation? Learn step-by-step how to create a provisioner based on the example of Apache Kafka.

Score is a workload specification that allows developers to configure their application in a generic way without having to code for the specific deployment platform. We often use score-compose as the local development runtime to validate and test Score files. Score-compose provides a set of default resource provisioners for things like databases, redis, rabbitmq, etc, but at the time of writing doesn’t provide a Kafka resource! In this post I want to use this as an opportunity to show you how to write a custom resource provisioner, some guidelines to keep in mind, and some suggestion on how to write and test these.

Setting the stage

Let’s start with our simple application we’re going to use to test with today. It will publish a periodic heartbeat event on a given Kafka topic. We’ll be building this into a little Docker image to run in score-compose.

package main

import (
	"context"
	"fmt"
	"log"
	"log/slog"
	"os"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	host, port, topicName := os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC")
	if host == "" {
		log.Fatal("KAFKA_HOST is empty or not set")
	} else if port == "" {
		log.Fatal("KAFKA_PORT is empty or not set")
	} else if topicName == "" {
		log.Fatal("KAFKA_TOPIC is empty or not set")
	}

	log.Printf("connecting to %s:%s/%s and sending 'ping' every 5s", host, port, topicName)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
	defer cancel()
	conn, err := kafka.DialLeader(ctx, "tcp", fmt.Sprintf("%s:%s", host, port), topicName, 0)
	if err != nil {
		log.Fatalf("failed to dial to kafka: %v", err)
	}
	defer conn.Close()

	for {
		_ = conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
		if r, err := conn.Write([]byte("ping")); err != nil {
			log.Printf("error: failed to ping: %v", err)
		} else {
			slog.Info("successfully sent message", slog.Int("#bytes", r))
		}
		time.Sleep(time.Second * 5)
	}
}

Dockerfile

FROM golang:1.22-alpine AS builder

WORKDIR /app
ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GOWORK=off
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
    go mod download
RUN --mount=target=. \
    --mount=type=cache,target=/go/pkg/mod \
    --mount=type=cache,target=/root/.cache/go-build \
    go build -o /sample .


FROM gcr.io/distroless/static as final
COPY --from=builder /sample .
ENTRYPOINT ["/sample"]

Easy enough! How about our target Score file:

score.yaml

apiVersion: score.dev/v1b1
metadata:
  name: sample-app
  
containers:
  main:
    image: .
    variables:
      KAFKA_HOST: ${resources.bus.host}
      KAFKA_PORT: ${resources.bus.port}
      KAFKA_TOPIC: ${resources.bus.name}
      
resources:
  bus:
    type: kafka-topic

Now if we run score-compose <span class="c">init && score-compose generate score.yaml --build main=.</span> we get the expected error -  Oh no!

Error: failed to provision: resource 'kafka-topic.default#sample-app.bus' is not supported by any provisioner

Laying the groundwork for the provisioner

Now that we have a <span class="c">.score-compose</span> directory, we can start work on a custom provisioner file. I’ll work with <span class="c">.score-compose/00-kafka-topic-provisioner.provisioners.yaml</span>. I’m going to make sure that’s excluded from our <span class="c">gitignore</span> file.

echo '{}' > .score-compose/00-kafka-topic-provisioner.provisioners.yaml
echo '!.score-compose/00-kafka-topic-provisioner.provisioners.yaml' >> .gitignore
Q: Why the <span class="c">00-</span> prefix?
A: Provisioner files are loaded in lexicographic order. I want this to take precedence over other provisioner files.

Now we can get started modifying this file. It’s good to first decide what outputs the provisioner will have. This usually depends on the resource type you’re using and what conventions exist for it. Since we’re building a <span class="c">kafka-topic</span> here, we’re going to output <span class="c">host</span>, <span class="c">port</span>, and <span class="c">name</span> (the topic name). These match the Humanitec resource type of the same name. If we make them compatible, then we allow folks to deploy the same Score workload to Humanitec. We start with an initial provisioner declaration with fake outputs to see what it does to our workload.

--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1 +1,6 @@
-{}
+- uri: template://custom-provisioners/kafka-topic
+  type: kafka-topic
+  outputs: |
+    host: unknown
+    port: "9092"
+    name: unknown

If we re-run <span class="c">generate</span> now, we don’t have an error.

$ score-compose generate score.yaml --build=main=.
INFO: Loaded state directory with docker compose project 'score-eg-kafka-provisioner'
INFO: Validated workload 'sample-app'
INFO: Successfully loaded 11 resource provisioners
INFO: Provisioned 1 resources
INFO: Converting workload 'sample-app' to Docker compose
INFO: containers.main: overriding container build config to context=.

But when we run <span class="c">docker compose up</span> we get an error.

$ docker compose up
[+] Running 1/0
 ✔ Container score-eg-kafka-provisioner-sample-app-main-1  Created                                                                                                                                           0.0s 
Attaching to sample-app-main-1
sample-app-main-1  | 2024/06/05 13:31:48 connecting to unknown:9092/unknown and sending 'ping' every 5s
sample-app-main-1  | 2024/06/05 13:31:48 failed to dial to kafka: failed to dial: failed to open connection to unknown:9092: dial tcp: lookup unknown on 127.0.0.11:53: no such host
sample-app-main-1 exited with code 1

Which is totally not surprising!

Diving into the Kafka implementation

Now we need a Kafka implementation. There’s a well-documented one here in Dockerhub: https://hub.docker.com/r/bitnami/kafka.  It can be configured in all sorts of ways, but we’re going to focus on the most basic single-node, non-zk, plaintext security setup.

One thing to think about is what we want to do when multiple kafka-topics are provisioned. The simplest provisioners will produce an entire new container copy when another instance of the same resource type is requested. This may be fine for simple applications, but most datastore’s support a shared “instance” with multiple individual datastores hosted on it. Kafka is no different here: a single Kafka instance can and should support multiple kafka topics. This is much better for efficiency - even on a local desktop testing environment.

This means that we’re going to be making use of the <span class="c">shared</span> section of the provisioner, which produces shared state that can be shared between multiple instances of the same resource.

Looking at the Kafka docker image, we can identify some useful environment variables we want to use:

  • <span class="c">KAFKA_CFG_NODE_ID</span> - sets the node id so we can set the cluster voters layout
  • <span class="c">KAFKA_CFG_PROCESS_ROLES</span>  - this node must run as both controller and broker with KRaft mode - no Zookeeper
  • <span class="c">KAFKA_CFG_LISTENERS</span> - set the ports
  • <span class="c">KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP</span> - set plaintext auth
  • <span class="c">KAFKA_CFG_CONTROLLER_QUORUM_VOTERS</span> - to set the KRaft layout
  • <span class="c">KAFKA_CFG_CONTROLLER_LISTENER_NAMES</span> -  set the listener name
  • <span class="c">KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE</span> - we could use this automatically create the partitions as they are requested. HOWEVER, it’s better practise to enforce that the workloads use the real topic names that we will generate and don’t use hardcoded values.

Let’s get started by creating the kafka container. We’re going to follow the same pattern as the Postgres default provisioner which has a similar layout:

  1. In the <span class="c">shared</span> section we determine a stable service and hostname for our kafka instance, or use one if it’s already set from a previous <span class="c">generate</span> call.
  2. In the <span class="c">state</span> section we determine a stable topic name for our kafka-topic instance.
  3. In the <span class="c">services</span>, we output a service with out variables set.
  4. In the <span class="c">outputs</span> we replace “unknown” with the real generated values.
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1,6 +1,22 @@
 - uri: template://custom-provisioners/kafka-topic
   type: kafka-topic
+  state: |
+    topic: {{ dig "topic" (print "topic-" (randAlphaNum 6)) .State | quote }}
+  shared: |
+    shared_kafka_instance_name: {{ dig "shared_kafka_instance_name" (print "kafka-" (randAlphaNum 6)) .Shared | quote }}
+  services: |
+    {{ .Shared.shared_kafka_instance_name }}:
+      image: bitnami/kafka:latest
+      restart: always
+      environment:
+        KAFKA_CFG_NODE_ID: "0"
+        KAFKA_CFG_PROCESS_ROLES: controller,broker
+        KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
+        KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
+        KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
+        KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+        KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
   outputs: |
-    host: unknown
+    host: {{ .Shared.shared_kafka_instance_name }}
     port: "9092"
-    name: unknown
+    name: {{ .State.topic }}
+    num_partitions: 3

Let’s see what happens if we <span class="c">score-compose generate</span> and <span class="c">docker compose up</span> again. For one: we see that the kafka instance starts properly.

...
kafka-inT7pD-1        | [2024-06-05 14:17:35,680] INFO [KafkaRaftServer nodeId=0] Kafka Server started (kafka.server.KafkaRaftServer)

However, our little Go program started first, and crashed!

sample-app-main-1     | 2024/06/05 14:17:31 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1     | 2024/06/05 14:17:31 failed to dial to kafka: failed to dial: failed to open connection to kafka-inT7pD:9092: dial tcp 192.168.224.2:9092: connect: connection refused
sample-app-main-1 exited with code 1

Adding a health check to the Kafka process

To fix the start order, we need to add a health check to the Kafka process, so that we know when it is healthy and we can start the workloads:

--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -16,6 +16,11 @@
         KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
         KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
         KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+      healthcheck:
+        test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server=localhost:9092"]
+        interval: 2s
+        timeout: 2s
+        retries: 10
   outputs: |

Now if we re-generate and start, we see the Kafka server start first, and then our Go program.

kafka-inT7pD-1        | [2024-06-05 14:20:33,007] INFO [KafkaRaftServer nodeId=0] Kafka Server started (kafka.server.KafkaRaftServer)
wait-for-resources-1  | 
wait-for-resources-1 exited with code 0
sample-app-main-1     | 2024/06/05 14:20:35 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1     | 2024/06/05 14:20:40 failed to dial to kafka: context deadline exceeded
sample-app-main-1 exited with code 1

But it still failed - this time with a timeout. This is because we disabled <span class="c">KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE</span>, and the broker failed to route our message to an available topic partition.

Setting up an init container to create a Kafka topic

Our next requirement is to setup an init container that can create our topic on start. For now we’re going to hard code the number of partitions as 3, the application can decide whether it wants to target a single partition or distribute across them. We add a new init container to the provisioner to call the kafka-topics.sh script:

--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,17 @@
         interval: 2s
         timeout: 2s
         retries: 10
+    {{ .State.topic }}-init:
+      image: bitnami/kafka:latest
+      entrypoint: ["/bin/sh"]
+      command: ["-c", "kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --describe || kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --create --partitions=3"]
+      network_mode: "service:{{ .Shared.shared_kafka_instance_name }}"
+      labels:
+        dev.score.compose.labels.is-init-container: "true"
+      depends_on:
+        {{ .Shared.shared_kafka_instance_name }}:
+          condition: service_healthy
+          restart: true
   outputs: |

The command checks if the topic exists, and creates it if it does not. The <span class="c">depends_on</span> section ensures this starts after Kafka. While the <span class="c">dev.score.compose.labels.is-init-container</span> label ensures this service is included in the <span class="c">wait-for-resources  dependencies before the Workloads start.

Now we re-generate and start and things look much better! Firstly our <span class="c">topic-LVJVOX-init-1</span> ran successfully:

topic-cKgEgC-init-1  | Error while executing topic command : Topic 'topic-LVJVOX' does not exist as expected
topic-cKgEgC-init-1  | [2024-06-05 14:34:27,320] ERROR java.lang.IllegalArgumentException: Topic 'topic-LVJVOX' does not exist as expected
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand.ensureTopicExists(TopicCommand.java:215)
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand.access$700(TopicCommand.java:78)
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand$TopicService.describeTopic(TopicCommand.java:559)
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:108)
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
topic-cKgEgC-init-1  |  at org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)
topic-cKgEgC-init-1  |  (org.apache.kafka.tools.TopicCommand)
topic-cKgEgC-init-1  | Created topic topic-LVJVOX.

And our sample app is successfully writing messages!

sample-app-main-1     | 2024/06/05 14:34:29 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1     | 2024/06/05 14:34:29 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:34 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:39 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:44 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:49 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:54 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:34:59 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:35:04 INFO successfully sent message #bytes=4
sample-app-main-1     | 2024/06/05 14:35:09 INFO successfully sent message #bytes=4

Inspecting the Kafka topic using a console

We can run a Kafka console like the https://github.com/redpanda-data/console and view our topic and the messages being produced.

We can find the Docker network with a docker compose command:

$ echo $(docker compose ls -q)_default
score-eg-kafka-provisioner_default

And the credentials with score-compose:

$ score-compose resources get-outputs 'kafka-topic.default#sample-app.bus' --format '{{.host}}:{{.port}}'
kafka-inT7pD:9092

Putting it together we can attach a console:

$ docker run -p 8080:8080 --network $(docker compose ls -q)_default -e KAFKA_BROKERS=$(score-compose resources get-outputs 'kafka-topic.default#sample-app.bus' --format '{{.host}}:{{.port}}') docker.redpanda.com/redpandadata/console:latest

Adding a persistent volume to store data

The other thing that’s recommended is to explicitly add a persistent volume so that the data is stored in a predictable Docker volume.

--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,10 @@
         interval: 2s
         timeout: 2s
         retries: 10
+      volumes:
+      - type: volume
+        source: {{ .Shared.shared_kafka_instance_name }}-data
+        target: /bitnami/kafka
     {{ .State.topic }}-init:
       image: bitnami/kafka:latest
       entrypoint: ["/bin/sh"]
@@ -32,6 +36,9 @@
         {{ .Shared.shared_kafka_instance_name }}:
           condition: service_healthy
           restart: true
+  volumes: |
+    {{ .Shared.shared_kafka_instance_name }}-data:
+      driver: local
   outputs: |

When starting, we’ll see a volume being created:

[+] Running 6/6
 ✔ Network score-eg-kafka-provisioner_default                 Created
 ✔ Volume "score-eg-kafka-provisioner_kafka-inT7pD-data"      Created    <--
 ✔ Container score-eg-kafka-provisioner-kafka-inT7pD-1        Healthy
 ✔ Container score-eg-kafka-provisioner-topic-LVJVOX-init-1   Exited
 ✔ Container score-eg-kafka-provisioner-wait-for-resources-1  Started
 ✔ Container score-eg-kafka-provisioner-sample-app-main-1     Started

Adding an annotation to expose Kafka ports

Finally, it’s a good idea to support an annotation which allows us to expose the Kafka ports so that other processes or tests can be run against it from outside Docker. We do this for things like the RabbitMQ provisioner as well.

--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,12 @@
         interval: 2s
         timeout: 2s
         retries: 10
+      {{ $publishPort := (dig "annotations" "compose.score.dev/publish-port" "0" .Metadata | atoi) }}
+      {{ if ne $publishPort 0 }}
+      ports:
+      - target: 9092
+        published: {{ $publishPort }}
+      {{ end }}
       volumes:

Now if we add that annotation to the kafka-topic resource in our Score file, we’ll see the port exposed:

score-eg-kafka-provisioner-kafka-inT7pD-1      bitnami/kafka:latest                         "/opt/bitnami/script…"   kafka-inT7pD      27 seconds ago   Up 26 seconds (healthy)   127.0.0.1:55002->9092/tcp

Verifying our setup

To be doubly sure we’ve done the right thing, let’s provision another kafka topic in our Score file, and validate that it produces 2 separate topics, but on the same Kafka process.

2 init containers, with 2 different topic names:

 ✔ Container score-eg-kafka-provisioner-kafka-inT7pD-1        Healthy
 ✔ Container score-eg-kafka-provisioner-topic-LVJVOX-init-1   Exited
 ✔ Container score-eg-kafka-provisioner-topic-gDyZ62-init-1   Exited
 ✔ Container score-eg-kafka-provisioner-wait-for-resources-1  Started
 ✔ Container score-eg-kafka-provisioner-sample-app-main-1     Started

And the console shows both topics created:

Great! Now I could keep this to myself and use it in my projects, or internally in my own team. But really I want to contribute it up to the score-compose project so that others can use it. So with some final polishing, a PR is up here: https://github.com/score-spec/score-compose/pull/150.

🎉 And there we have it, a brand new score-compose provisioner for a Kafka topic. Complete with multi-topic support and a random service and topic name. I hope this helps other folks going through a similar process in the future and contributing to either score-compose or score-k8s.

FAQ

Why don’t we include a username or password in the outputs?

Kafka has many different authentication mechanisms, including mutual TLS, basic username and password, no auth (like we used today), etc. Since there is no standard authentication mechanism, we suggest this is handled using a different resource type. For example, a <span class="c">kafka-auth-password</span> or something like that. This would then be provisioned on the shared Kafka instance.

Why not clustering or more complex layouts?

The point of the resource-types in docker compose are to provision the most simple implementation that meets the specification. If a user needs a more sophisticated configuration, they should copy the provisioner into their own custom provisioners file, and change it to their liking.

What’s the difference between score-compose and score-k8s provisioners?

They will look very similar, because <span class="c">score-k8s</span> copied the template mechanism from <span class="c">score-compose</span>. However, instead of producing services, networks, and volumes, the <span class="c">score-k8s</span> provisioners create Kubernetes manifests in the <span class="c">manifests</span> section. Other than this, they are quite similar.

The team behind Score

The idea for Score organically formed from looking at hundreds of delivery setups, across engineering orgs of all sizes (from startups to Fortune 100). We have all been involved in platform engineering and developer tooling for the past decade, in one way or another.Some of us built Internal Developer Platforms at the likes of Google, Apple or IBM, some come from the IaC side of things and played leading roles at companies like Hashicorp.

We all share the vision of developer and workload-centric development. We want to reduce cognitive load on developers and make sure engineers can spend their time coding and shipping features, instead of fighting config files.

Configure once. Deploy anywhere. From local to prod.