Writing a custom score-compose provisioner for Apache Kafka
Score is a workload specification that allows developers to configure their application in a generic way without having to code for the specific deployment platform. We often use score-compose as the local development runtime to validate and test Score files. Score-compose provides a set of default resource provisioners for things like databases, redis, rabbitmq, etc, but at the time of writing doesn’t provide a Kafka resource! In this post I want to use this as an opportunity to show you how to write a custom resource provisioner, some guidelines to keep in mind, and some suggestion on how to write and test these.
Setting the stage
Let’s start with our simple application we’re going to use to test with today. It will publish a periodic heartbeat event on a given Kafka topic. We’ll be building this into a little Docker image to run in score-compose.
package main
import (
"context"
"fmt"
"log"
"log/slog"
"os"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
host, port, topicName := os.Getenv("KAFKA_HOST"), os.Getenv("KAFKA_PORT"), os.Getenv("KAFKA_TOPIC")
if host == "" {
log.Fatal("KAFKA_HOST is empty or not set")
} else if port == "" {
log.Fatal("KAFKA_PORT is empty or not set")
} else if topicName == "" {
log.Fatal("KAFKA_TOPIC is empty or not set")
}
log.Printf("connecting to %s:%s/%s and sending 'ping' every 5s", host, port, topicName)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
conn, err := kafka.DialLeader(ctx, "tcp", fmt.Sprintf("%s:%s", host, port), topicName, 0)
if err != nil {
log.Fatalf("failed to dial to kafka: %v", err)
}
defer conn.Close()
for {
_ = conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
if r, err := conn.Write([]byte("ping")); err != nil {
log.Printf("error: failed to ping: %v", err)
} else {
slog.Info("successfully sent message", slog.Int("#bytes", r))
}
time.Sleep(time.Second * 5)
}
}
Dockerfile
FROM golang:1.22-alpine AS builder
WORKDIR /app
ENV CGO_ENABLED=0 GOOS=linux GOARCH=amd64 GOWORK=off
COPY go.mod go.sum ./
RUN --mount=type=cache,target=/go/pkg/mod \
go mod download
RUN --mount=target=. \
--mount=type=cache,target=/go/pkg/mod \
--mount=type=cache,target=/root/.cache/go-build \
go build -o /sample .
FROM gcr.io/distroless/static as final
COPY --from=builder /sample .
ENTRYPOINT ["/sample"]
Easy enough! How about our target Score file:
score.yaml
apiVersion: score.dev/v1b1
metadata:
name: sample-app
containers:
main:
image: .
variables:
KAFKA_HOST: ${resources.bus.host}
KAFKA_PORT: ${resources.bus.port}
KAFKA_TOPIC: ${resources.bus.name}
resources:
bus:
type: kafka-topic
Now if we run score-compose init && score-compose generate score.yaml --build main=.
we get the expected error - Oh no!
Error: failed to provision: resource 'kafka-topic.default#sample-app.bus' is not supported by any provisioner
Laying the groundwork for the provisioner
Now that we have a .score-compose
directory, we can start work on a custom provisioner file. I’ll work with .score-compose/00-kafka-topic-provisioner.provisioners.yaml
. I’m going to make sure that’s excluded from our gitignore
file.
echo '{}' > .score-compose/00-kafka-topic-provisioner.provisioners.yaml
echo '!.score-compose/00-kafka-topic-provisioner.provisioners.yaml' >> .gitignore
Q: Why the
00-
prefix?
A: Provisioner files are loaded in lexicographic order. I want this to take precedence over other provisioner files.
Now we can get started modifying this file. It’s good to first decide what outputs the provisioner will have. This usually depends on the resource type you’re using and what conventions exist for it. Since we’re building a kafka-topic
here, we’re going to output host
, port
, and name
(the topic name). These match the Humanitec resource type of the same name. If we make them compatible, then we allow folks to deploy the same Score workload to Humanitec. We start with an initial provisioner declaration with fake outputs to see what it does to our workload.
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1 +1,6 @@
-{}
+- uri: template://custom-provisioners/kafka-topic
+ type: kafka-topic
+ outputs: |
+ host: unknown
+ port: "9092"
+ name: unknown
If we re-run generate
now, we don’t have an error.
$ score-compose generate score.yaml --build=main=.
INFO: Loaded state directory with docker compose project 'score-eg-kafka-provisioner'
INFO: Validated workload 'sample-app'
INFO: Successfully loaded 11 resource provisioners
INFO: Provisioned 1 resources
INFO: Converting workload 'sample-app' to Docker compose
INFO: containers.main: overriding container build config to context=.
But when we run docker compose up
we get an error.
$ docker compose up
[+] Running 1/0
✔ Container score-eg-kafka-provisioner-sample-app-main-1 Created 0.0s
Attaching to sample-app-main-1
sample-app-main-1 | 2024/06/05 13:31:48 connecting to unknown:9092/unknown and sending 'ping' every 5s
sample-app-main-1 | 2024/06/05 13:31:48 failed to dial to kafka: failed to dial: failed to open connection to unknown:9092: dial tcp: lookup unknown on 127.0.0.11:53: no such host
sample-app-main-1 exited with code 1
Which is totally not surprising!
Diving into the Kafka implementation
Now we need a Kafka implementation. There’s a well-documented one here in Dockerhub: https://hub.docker.com/r/bitnami/kafka. It can be configured in all sorts of ways, but we’re going to focus on the most basic single-node, non-zk, plaintext security setup.
One thing to think about is what we want to do when multiple kafka-topics are provisioned. The simplest provisioners will produce an entire new container copy when another instance of the same resource type is requested. This may be fine for simple applications, but most datastore’s support a shared “instance” with multiple individual datastores hosted on it. Kafka is no different here: a single Kafka instance can and should support multiple kafka topics. This is much better for efficiency - even on a local desktop testing environment.
This means that we’re going to be making use of the shared
section of the provisioner, which produces shared state that can be shared between multiple instances of the same resource.
Looking at the Kafka docker image, we can identify some useful environment variables we want to use:
KAFKA_CFG_NODE_ID
- sets the node id so we can set the cluster voters layoutKAFKA_CFG_PROCESS_ROLES
- this node must run as both controller and broker with KRaft mode - no ZookeeperKAFKA_CFG_LISTENERS
- set the portsKAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
- set plaintext authKAFKA_CFG_CONTROLLER_QUORUM_VOTERS
- to set the KRaft layoutKAFKA_CFG_CONTROLLER_LISTENER_NAMES
- set the listener nameKAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
- we could use this automatically create the partitions as they are requested. HOWEVER, it’s better practise to enforce that the workloads use the real topic names that we will generate and don’t use hardcoded values.
Let’s get started by creating the kafka container. We’re going to follow the same pattern as the Postgres default provisioner which has a similar layout:
- In the
shared
section we determine a stable service and hostname for our kafka instance, or use one if it’s already set from a previousgenerate
call. - In the
state
section we determine a stable topic name for our kafka-topic instance. - In the
services
, we output a service with out variables set. - In the
outputs
we replace “unknown” with the real generated values.
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -1,6 +1,22 @@
- uri: template://custom-provisioners/kafka-topic
type: kafka-topic
+ state: |
+ topic: {{ dig "topic" (print "topic-" (randAlphaNum 6)) .State | quote }}
+ shared: |
+ shared_kafka_instance_name: {{ dig "shared_kafka_instance_name" (print "kafka-" (randAlphaNum 6)) .Shared | quote }}
+ services: |
+ {{ .Shared.shared_kafka_instance_name }}:
+ image: bitnami/kafka:latest
+ restart: always
+ environment:
+ KAFKA_CFG_NODE_ID: "0"
+ KAFKA_CFG_PROCESS_ROLES: controller,broker
+ KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
+ KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
+ KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
+ KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
+ KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
outputs: |
- host: unknown
+ host: {{ .Shared.shared_kafka_instance_name }}
port: "9092"
- name: unknown
+ name: {{ .State.topic }}
+ num_partitions: 3
Let’s see what happens if we score-compose generate
and docker compose up
again. For one: we see that the kafka instance starts properly.
...
kafka-inT7pD-1 | [2024-06-05 14:17:35,680] INFO [KafkaRaftServer nodeId=0] Kafka Server started (kafka.server.KafkaRaftServer)
However, our little Go program started first, and crashed!
sample-app-main-1 | 2024/06/05 14:17:31 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1 | 2024/06/05 14:17:31 failed to dial to kafka: failed to dial: failed to open connection to kafka-inT7pD:9092: dial tcp 192.168.224.2:9092: connect: connection refused
sample-app-main-1 exited with code 1
Adding a health check to the Kafka process
To fix the start order, we need to add a health check to the Kafka process, so that we know when it is healthy and we can start the workloads:
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -16,6 +16,11 @@
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@{{ .Shared.shared_kafka_instance_name }}:9093"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false"
+ healthcheck:
+ test: ["CMD", "kafka-topics.sh", "--list", "--bootstrap-server=localhost:9092"]
+ interval: 2s
+ timeout: 2s
+ retries: 10
outputs: |
Now if we re-generate and start, we see the Kafka server start first, and then our Go program.
kafka-inT7pD-1 | [2024-06-05 14:20:33,007] INFO [KafkaRaftServer nodeId=0] Kafka Server started (kafka.server.KafkaRaftServer)
wait-for-resources-1 |
wait-for-resources-1 exited with code 0
sample-app-main-1 | 2024/06/05 14:20:35 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1 | 2024/06/05 14:20:40 failed to dial to kafka: context deadline exceeded
sample-app-main-1 exited with code 1
But it still failed - this time with a timeout. This is because we disabled KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE
, and the broker failed to route our message to an available topic partition.
Setting up an init container to create a Kafka topic
Our next requirement is to setup an init container that can create our topic on start. For now we’re going to hard code the number of partitions as 3, the application can decide whether it wants to target a single partition or distribute across them. We add a new init container to the provisioner to call the kafka-topics.sh script:
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,17 @@
interval: 2s
timeout: 2s
retries: 10
+ {{ .State.topic }}-init:
+ image: bitnami/kafka:latest
+ entrypoint: ["/bin/sh"]
+ command: ["-c", "kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --describe || kafka-topics.sh --topic={{.State.topic}} --bootstrap-server=localhost:9092 --create --partitions=3"]
+ network_mode: "service:{{ .Shared.shared_kafka_instance_name }}"
+ labels:
+ dev.score.compose.labels.is-init-container: "true"
+ depends_on:
+ {{ .Shared.shared_kafka_instance_name }}:
+ condition: service_healthy
+ restart: true
outputs: |
The command checks if the topic exists, and creates it if it does not. The depends_on
section ensures this starts after Kafka. While the dev.score.compose.labels.is-init-container
label ensures this service is included in the wait-for-resources dependencies before the Workloads start
.
Now we re-generate and start and things look much better! Firstly our topic-LVJVOX-init-1
ran successfully:
topic-cKgEgC-init-1 | Error while executing topic command : Topic 'topic-LVJVOX' does not exist as expected
topic-cKgEgC-init-1 | [2024-06-05 14:34:27,320] ERROR java.lang.IllegalArgumentException: Topic 'topic-LVJVOX' does not exist as expected
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand.ensureTopicExists(TopicCommand.java:215)
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand.access$700(TopicCommand.java:78)
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand$TopicService.describeTopic(TopicCommand.java:559)
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand.execute(TopicCommand.java:108)
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand.mainNoExit(TopicCommand.java:87)
topic-cKgEgC-init-1 | at org.apache.kafka.tools.TopicCommand.main(TopicCommand.java:82)
topic-cKgEgC-init-1 | (org.apache.kafka.tools.TopicCommand)
topic-cKgEgC-init-1 | Created topic topic-LVJVOX.
And our sample app is successfully writing messages!
sample-app-main-1 | 2024/06/05 14:34:29 connecting to kafka-inT7pD:9092/topic-LVJVOX and sending 'ping' every 5s
sample-app-main-1 | 2024/06/05 14:34:29 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:34 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:39 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:44 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:49 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:54 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:34:59 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:35:04 INFO successfully sent message #bytes=4
sample-app-main-1 | 2024/06/05 14:35:09 INFO successfully sent message #bytes=4
Inspecting the Kafka topic using a console
We can run a Kafka console like the https://github.com/redpanda-data/console and view our topic and the messages being produced.
We can find the Docker network with a docker compose command:
$ echo $(docker compose ls -q)_default
score-eg-kafka-provisioner_default
And the credentials with score-compose:
$ score-compose resources get-outputs 'kafka-topic.default#sample-app.bus' --format '{{.host}}:{{.port}}'
kafka-inT7pD:9092
Putting it together we can attach a console:
$ docker run -p 8080:8080 --network $(docker compose ls -q)_default -e KAFKA_BROKERS=$(score-compose resources get-outputs 'kafka-topic.default#sample-app.bus' --format '{{.host}}:{{.port}}') docker.redpanda.com/redpandadata/console:latest
Adding a persistent volume to store data
The other thing that’s recommended is to explicitly add a persistent volume so that the data is stored in a predictable Docker volume.
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,10 @@
interval: 2s
timeout: 2s
retries: 10
+ volumes:
+ - type: volume
+ source: {{ .Shared.shared_kafka_instance_name }}-data
+ target: /bitnami/kafka
{{ .State.topic }}-init:
image: bitnami/kafka:latest
entrypoint: ["/bin/sh"]
@@ -32,6 +36,9 @@
{{ .Shared.shared_kafka_instance_name }}:
condition: service_healthy
restart: true
+ volumes: |
+ {{ .Shared.shared_kafka_instance_name }}-data:
+ driver: local
outputs: |
When starting, we’ll see a volume being created:
[+] Running 6/6
✔ Network score-eg-kafka-provisioner_default Created
✔ Volume "score-eg-kafka-provisioner_kafka-inT7pD-data" Created <--
✔ Container score-eg-kafka-provisioner-kafka-inT7pD-1 Healthy
✔ Container score-eg-kafka-provisioner-topic-LVJVOX-init-1 Exited
✔ Container score-eg-kafka-provisioner-wait-for-resources-1 Started
✔ Container score-eg-kafka-provisioner-sample-app-main-1 Started
Adding an annotation to expose Kafka ports
Finally, it’s a good idea to support an annotation which allows us to expose the Kafka ports so that other processes or tests can be run against it from outside Docker. We do this for things like the RabbitMQ provisioner as well.
--- a/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
+++ b/.score-compose/00-kafka-topic-provisioner.provisioners.yaml
@@ -21,6 +21,12 @@
interval: 2s
timeout: 2s
retries: 10
+ {{ $publishPort := (dig "annotations" "compose.score.dev/publish-port" "0" .Metadata | atoi) }}
+ {{ if ne $publishPort 0 }}
+ ports:
+ - target: 9092
+ published: {{ $publishPort }}
+ {{ end }}
volumes:
Now if we add that annotation to the kafka-topic resource in our Score file, we’ll see the port exposed:
score-eg-kafka-provisioner-kafka-inT7pD-1 bitnami/kafka:latest "/opt/bitnami/script…" kafka-inT7pD 27 seconds ago Up 26 seconds (healthy) 127.0.0.1:55002->9092/tcp
Verifying our setup
To be doubly sure we’ve done the right thing, let’s provision another kafka topic in our Score file, and validate that it produces 2 separate topics, but on the same Kafka process.
2 init containers, with 2 different topic names:
✔ Container score-eg-kafka-provisioner-kafka-inT7pD-1 Healthy
✔ Container score-eg-kafka-provisioner-topic-LVJVOX-init-1 Exited
✔ Container score-eg-kafka-provisioner-topic-gDyZ62-init-1 Exited
✔ Container score-eg-kafka-provisioner-wait-for-resources-1 Started
✔ Container score-eg-kafka-provisioner-sample-app-main-1 Started
And the console shows both topics created:
Great! Now I could keep this to myself and use it in my projects, or internally in my own team. But really I want to contribute it up to the score-compose project so that others can use it. So with some final polishing, a PR is up here: https://github.com/score-spec/score-compose/pull/150.
🎉 And there we have it, a brand new score-compose provisioner for a Kafka topic. Complete with multi-topic support and a random service and topic name. I hope this helps other folks going through a similar process in the future and contributing to either score-compose or score-k8s.
FAQ
Why don’t we include a username or password in the outputs?
Kafka has many different authentication mechanisms, including mutual TLS, basic username and password, no auth (like we used today), etc. Since there is no standard authentication mechanism, we suggest this is handled using a different resource type. For example, a kafka-auth-password
or something like that. This would then be provisioned on the shared Kafka instance.
Why not clustering or more complex layouts?
The point of the resource-types in docker compose are to provision the most simple implementation that meets the specification. If a user needs a more sophisticated configuration, they should copy the provisioner into their own custom provisioners file, and change it to their liking.
What’s the difference between score-compose and score-k8s provisioners?
They will look very similar, because score-k8s
copied the template mechanism from score-compose
. However, instead of producing services, networks, and volumes, the score-k8s
provisioners create Kubernetes manifests in the manifests
section. Other than this, they are quite similar.