Creating A Local Kafka Cluster for Testing: A Weekend Project
I have implemented a local Kafka cluster to use in tests. Source code is here.
I’ve been working in developing Ballerina Kafka module for some time now. It’s been PITA to test this connector! I needed a Kafka cluster to start, then shut down, then start gain, etc.
Some of the requirements I had are:
- To start a Kafka server programatically, with some custom settings, like custom port, custom security protocol, etc.
- To consumer messages (which I produced using my ballerina producers) programatically and check them.
- To produce messages to a specific topic programmatically. (So that I can consume them using my ballerina consumers).
- To create topics in the cluster programmatically.
I searched for a solution, and the most of the solutions involved using a docker image, which wasn’t helpful in my case. I did not have a way to test these in a docker environment.
Then I used Debezium, which is not intended to use in these kind of situations, but became helpful. Although it worked, there were some complexities in working with Debezium in the tests.
Then I got to know about the Apache Flume, where they create a local Kafka cluster for their tests. It is almost what I searched for, but still missing some functionalities.
Then I thought of implementing my own Kafka cluster, by combining the both. I did this using the builder design pattern, to so that I can create the cluster step-by-step to reduce the complexity. The cluster has the following components:
- A ZooKeeper server
- One or more Kafka Servers
- An optional Kafka consumer
- An optional Kafka Producer
Note: This setup is only used for testing purposes. It is not recommended to use in a production environment at all!
First I implemented the
LocalKafkaServer which is the Kafka server used in the Kafka cluster. I used
KafkaServer in Apache Kafka library for this. Following is the code:
Then I created the ZooKeeper server, using Apache Zookeeper. To run this, we have to run the ZooKeeper from a different thread. Hence, we start the ZooKeeper once we instantiate it. Following is the code:
Next part is where I created the Kafka cluster, which is the most important part, in my opinion. Following is the code:
Let’s go through with the implementation of the Kafka cluster.
First, in the constructor, we initialize the Kafka cluster. We need to set the following for this.
which will be used as the root directory of the ZooKeeper and the Kafka server logs.
The host where the Kafka cluster is running. This is optional, and if it is not provided,
localhostwill be used.
Then inside the constructor, we initialize default properties for the ZooKeeper and the Kafka server, by using the default properties files shipped with Kafka. (Note: This method will not work as shown in the gist, since it uses resource as stream to read the files. Check the original github project for working sample. Gist is only used for showing the implementation steps.)
We also create a List of brokers in the constructor. This list will keep all the brokers we create inside the cluster.
Then we have
withZooKeeper() method. This method will create and run our Local ZooKeeper server. It needs the following parameters.
The client port of the ZooKeeper server, to which the ZooKeeper will listen.
Provide additional properties for the ZooKeeper server, if there are any. You can omit setting this, if the default settings are adequate.
Once we call this method, the ZooKeeper will start running in a separate thread. It will create a directory named
zookeeper inside the
dataDir provided in the constructor.
After that, we have
withBroker() method, to create a broker. This method needs the following parameters
This is basically the security protocol used in Kafka broker. But providing this will not set any security protocol to the broker itself. It rather uses this value to create the
listenerconfiguration of the broker. (If you want to provide the security protocol, use
customPropertiesto set them.) As per the Kafka convention, the
listenershould be appended by the security protocol. Otherwise, we have to provide the
listener.security.protocol.mapconfiguration. Since we use this for testing, we don’t need to go that far.
The port by which the Kafka server listens to the Kafka clients.
Provide additional properties for the Kafka server, if there are any. You can omit this, if the default settings are adequate, which is the case in most of the time.
This method will create a broker with the given settings. This method will set the following settings additionally.
- A data directory named
kafka inside the provided
- ZooKeeper connect config using the previously created zookeeper configurations
- Broker ID configuration by the number of existing brokers.
After calling these methods, we can start our Kafka cluster, using the
start() method. By calling the
start() method, we start all the brokers we created.
Additionally we can create Kafka Consumers, or Kafka Producers to communicate with the cluster. For this, we have two additional methods.
The first one is
withConsumer method. You have to create a consumer using this method in order to consume messages from the created Kafka cluster. This method requires the following parameters.
This is the group ID used for the consumer.
This is the list of topics to subscribe. The consumer can subscribe to one or more topics.
This consumer will automatically connect to the created Kafka cluster. The connection details are handled internally. In addition to that, there are some other settings which are set automatically.
First one is that this consumer will consumer one message at a time. This is because the consumer is used in testing. Things get complicated if the consumer consumes messages in batches. Hence it is limited to single message at a time.
The next thing is the offset commit is set to the earliest. The reason behind this is that this consumer is used when the Kafka cluster is started and will be closed when the cluster is closed. There’s no point of missing any previous messages here. Hence the consumer will start to consume from the beginning.
Next method is
withProducer() method. You have to create a producer using this method in order to send messages to the created Kafka cluster. This method requires the following parameters. I’m not going to explain these since they are self explanatory.
This method will create a producer with the provided settings, but it also adds some other settings. The first thing is the Kafka broker connection settings. This setting is handled internally. Another setting is that the
batch.size setting is set to
0. This means that the producer do not wait to create batches of messages, instead it sends the messages immediately.
Finally I want to mention the following additional functions in the Kafka cluster.
Stops the Kafka cluster. This will stop the Kafka consumer, Kafka producer, Kafka server and it will delete all the log directories too.
This method can be used to create topics in our Kafka cluster. It requires topic name, number of partitions and the replication factor of the topic.
This will consume exactly one message at a time, and returns the String value of the message.
There are two overloaded methods to send a message. One is to send with a key, the other is to send without a key.
Hope this will help someone. Also there is a proposal to provide a test kit for Kafka, but this is still a work on progress. Once it completed, I hope we can use it.
You can go to the repository and find out more details about the implementation. Please let me know if there are any issues, or improvement needs to be added.