A DIY Guide to Kafka Connectors

You may have heard of the many advantages of using Apache Kafka as part of your Event Driven System. It’s a fast, scalable and fault-tolerant distributed streaming platform that countless enterprise companies use to build real-time streaming data pipelines and applications. Getting data in and out of a Kafka-powered platform, however, can be a challenge.

Enter the Apache Kafka Connector API. It provides classes for creating custom Source Connectors that import data into Kafka and Sink Connectors that export data out of Kafka. Although there are already a number of connectors available through Confluent Hub, many developers find that they need a custom solution to fit their use case. This guide will provide a step-by-step walk-through of the development of a custom connector, from implementing the custom source code to deploying to a Confluent Platform running in Google Kubernetes Engine and all the tips, tricks and gotchas discovered along the way!

Table of Contents

  1. Overview
  2. Pre-requisites
    • Java 8
    • Docker
    • GKE
    • Kubectl
    • Helm
  3. Custom Source Connector Code
  4. Packaging & Configuration Properties
  5. Deploy Confluent Platform to GKE
  6. Deploy Random Long API to GKE
  7. Standalone vs Distributed Mode
  8. Install and Run Connector in Standalone Mode
  9. Install and Run Connector in Distributed Mode
  10. Install and Run Connector with Docker Image
  11. Install and Run Connector Using an External Volume
  12. Closing Remarks

Overview

Custom Kafka Connector Diagram

Our goal is to create a custom Source Connector. With Confluent’s Helm Charts, we can easily get a Kafka environment up and running by deploying the Confluent Platform to Google Kubernetes Engine. We’ll quickly spin up a Spring Boot API with a single GET endpoint that produces a random number, which our custom Source connector will periodically call before publishing the value to a Kafka topic. We’ll also explore four different ways of installing and running a custom Connector.

Check out our companion github repo!

Pre-Requisites

You’ll need the following dependencies on your workstation.

Note: The instructions below are geared toward mac users. If you’re on a Windows or other machine, please refer to each dependency’s documentation for set up instructions.

Java 8

brew tap caskroom/versions
brew cask install java8

Docker

Install Docker for Mac

GKE

We’ll be using the Google Kubernetes Engine (GKE) for our cloud Kubernetes clusters. If you’re new to the Google Cloud Platform (GCP), you’ll get a free year-long trial. Set up an account and then get the gcloud command-line tool set up by following the Quickstart for macOS guide.

Kubectl

To get the kubectl Kubernetes CLI:

$ gcloud components install kubectl

Tip: Uninstall your existing kubectl if you see warnings about version clash, e.g. brew uninstall kubectl if previously installed via homebrew.

Next, configure your gcloud credentials. This will automatically add an entry in your ./kube/config so that your kubectl context will be set to your gke cluster:

$ gcloud container clusters list
$ gcloud container clusters get-credentials <your-cluster-name> --zone <your-zone>

Test out your config by running:

$ kubectl config current-context

Helm

Helm is a package manager for Kubernetes that simplifies the management and deployment of Kubernetes clusters. We’ll be using Helm to easily deploy the Confluent Platform to GKE.

brew install kubernetes-helm

Custom Source Connector Code

To create a custom connector, you need to implement two classes provided by the Kafka Connector API: Connector and Task. Your implementation of Connector will provide some configuration that describes the data to be ingested. The connector itself will divide the job of ingesting data into a set of tasks and sending those tasks to Kafka Connect workers.

Configuration

First, our connector will need to provide some configuration to describe the data that is being imported. To do this, we extend the org.apache.kafka.common.config.AbstractConfig class to describe the configuration properties that will be used for our connector. In our case, the connector will need to know the url for the API endpoint that we want to pull data from, the name of the Kafka topic we wish to write the data to, and the time interval that should elapse between polls.

package io.enfuse.kafka.connect.connector.config;

import java.util.Map;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

public class RandomLongSourceConnectorConfig extends AbstractConfig {

    public static final String API_URL_CONFIG = "api.url";
    private static final String API_ENDPOINT_DOC = "API URL";

    public static final String TOPIC_CONFIG = "topic";
    private static final String TOPIC_DOC = "Topic to write to";

    public static final String SLEEP_CONFIG = "sleep.seconds";
    private static final String SLEEP_DOC = "Time in seconds that connector will wait until querying api again";

    private final String url;
    private final String topic;
    private final int sleepInSeconds;

    public RandomLongSourceConnectorConfig(Map<?, ?> originals) {
        super(config(), originals);
        this.url = getString(API_URL_CONFIG);
        this.topic = getString(TOPIC_CONFIG);
        this.sleepInSeconds = getInt(SLEEP_CONFIG);
    }

    public static ConfigDef config() {
        return new ConfigDef()
                .define(API_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, API_ENDPOINT_DOC)
                .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC)
                .define(SLEEP_CONFIG, ConfigDef.Type.INT, 60, ConfigDef.Importance.MEDIUM, SLEEP_DOC);
    }

    public String getUrl() {
        return url;
    }

    public String getTopic() {
        return topic;
    }

    public int getSleepInSeconds() {
        return sleepInSeconds;
    }
}

Note that the config() method returns a ConfigDef type, which can be used to describe the type of your configuration and any validators that should be used, as well as their level of priority. In our example, we’re keeping it simple and are not using any built-in or custom validators, but in a production connector it is highly recommended that you validate your configs.

The Source Connector

Our custom Source Connector extends the abstract org.apache.kafka.connect.source.SourceConnector class:

public class RandomLongSourceConnector extends SourceConnector {
}

SourceConnector in turn extends Connector, which is an abstract class with the following unimplemented methods:

  • start
  • stop
  • taskClass
  • taskConfigs
  • config
  • version

In the following sections, we’ll take a close look at each method some example implementations.

start

    private RandomLongSourceConnectorConfig randomLongSourceConnectorConfig;

    @Override
    public void start(Map<String, String> props) {
        randomLongSourceConnectorConfig = new RandomLongSourceConnectorConfig(props);
    }

Upon startup, the Connector will create a new instance of our RandomLongSourceConnectorConfig class, passing in the properties it received when invoked either through the Kafka Connect REST API or through the command line.

stop

This is where you’ll release any resources when the Connector is stopped. There’s not much to do in our case:

    @Override
    public void stop() {}

taskClass

This method provides the class name of our custom implementation of Task, which we have yet to implement:

    @Override
    public Class<? extends Task> taskClass() {
        return RandomLongSourceTask.class;
    }

taskConfigs

This method provides a set of configs for tasks. Tasks run on separate threads, so your connector can perform multiple tasks in parallel. In our example, we only need one task for doing the simple job of getting a random Long value, but in more complex scenarios, it may make sense to break down a job into separate tasks. For example, say that your custom source connector needs to pull user data from several different tables in a database. You can parallelize the job of getting that data by splitting the work between different tasks– say, one task per table.

taskConfigs takes in an int value for maxTasks, which is automatically pulled from the configuration properties you provide for your custom connector via a .properties file (when starting the connector with the connect-standalone command) or through the Kafka Connect REST API. You can use the maxTasks value to determine how many sets of configs you’ll need, with each set being used by a separate task.

Here, our task needs to know three things:

  1. the url to poll to get random Long values,
  2. the kafka topic to write to, and
  3. the number of seconds to wait before the next poll.

The code below allows for multiple tasks (as many as the value of maxTasks), but we really only need one task to run for demo purposes.

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> configs = new ArrayList<>(maxTasks);
        for (int i = 0; i < maxTasks; i++) {
            Map<String, String> config = new HashMap<>(3);
            config.put(API_URL_CONFIG, randomLongSourceConnectorConfig.getString(API_URL_CONFIG));
            config.put(SLEEP_CONFIG, Integer.toString(randomLongSourceConnectorConfig.getInt(SLEEP_CONFIG)));
            config.put(TOPIC_CONFIG, randomLongSourceConnectorConfig.getString(TOPIC_CONFIG));
            configs.add(config);
        }
        return configs;
    }

config

Yep, you guessed it– config returns, well, config. The return value must not be null; otherwise, you will not be able to successfully start up your connector.

    @Override
    public ConfigDef config() {
        return RandomLongSourceConnectorConfig.config();
    }

version

Lastly, we need to override the version method, which supplies the version of your connector:

 public static final String VERSION = "0.1.0";

 [...]

 @Override
public String version() {
    return VERSION;
}

To keep things simple, we’ve hard-coded VERSION, but it’s better practice to instead create another class that pulls the version from a .properties file and provides a static method, e.g. getVersion(), that returns the version. Then you can invoke that static method here. To see an example of this, check out our github repo for our Version class

The Source Task

As with the Connector, to create a custom Task, you will have to extend a base Task class and provide the implementation for some standard lifecycle methods. You can see that our custom Task inherits from SourceTask:

import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RandomLongSourceTask extends SourceTask {
    private static Logger logger = LoggerFactory.getLogger(RandomLongSourceTask.class);
}

In our RandomLongSourceTask, we will be overriding four methods:

  • start
  • stop
  • poll
  • version

As before, we’ll examine some sample implementations of each method.

start

Kafka Connect Workers start up each task on a dedicated thread. Each task continually calls the poll method, which– as we’ll see shortly– houses the logic for pulling data from your source and prepping that data for import into a Kafka topic. Our poll method will need to know:

  • the url hit to get a random Long value,
  • our desired length of time to block the Task’s thread until its next invocation of poll, and
  • the Kafka topic to publish to.

Remember when we implemented taskConfigs(int maxTasks) in RandomLongSourceConnector? Each Map in the List that taskConfigs returns is passed to a Task that the Kafka Connect Worker spins up. In particular, the configuration Map is passed to the Task’s start method, where you can access the configuration values for later use in your poll method. You could simply grab whatever configuration values you need directly from the Map<String, String> props passed into start; however, it is better practice to make use of the RandomLongSourceConnectorConfig class that we made earlier. If you later add any built-in or custom validators to your Config class, then those validators will be invoked upon task startup as well.

private RandomLongSourceConnectorConfig randomLongSourceConnectorConfig;
private int sleepInSeconds;
private String apiUrl;
private String topic;

@Override
public void start(Map<String, String> props) {
    logger.info("Starting source task with properties {}", props);

    randomLongSourceConnectorConfig = new RandomLongSourceConnectorConfig(props);
    apiUrl = randomLongSourceConnectorConfig.getString(API_URL_CONFIG);
    sleepInSeconds = randomLongSourceConnectorConfig.getInt(SLEEP_CONFIG);
    topic = randomLongSourceConnectorConfig.getString(TOPIC_CONFIG);
}

Note: The getString and getInt methods are provided by the base AbstractConfig class that RandomLongSourceConnectorConfig extends.

stop

Note that the stop method is synchronized; each Task may block its thread indefinitely, so stop needs to be called by a different thread in the Worker. This is where you will want to release any resources.

    @Override
    public synchronized void stop() {
        logger.info("Stopping source task");
        stopLatch.countDown();
    }

poll

This method will be called repeatedly, so note that we introduce a CountDownLatch#await to set the time interval between invocations of poll:

    import static io.enfuse.kafka.connect.connector.util.RandomLongHttpClient.httpClient;

    [...]

    private CountDownLatch stopLatch = new CountDownLatch(1);
    private boolean shouldWait = false;

    [...]

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        boolean shouldStop = false;
        if (shouldWait) {
            logger.debug("Waiting for {} seconds for the next poll", sleepInSeconds);
            shouldStop = stopLatch.await(sleepInSeconds, TimeUnit.SECONDS);
        }
        if (!shouldStop) {
            logger.debug("Started new polling");
            shouldWait = true;
            return getSourceRecords();
        } else {
            logger.debug("Received signal to stop, didn't poll anything");
            return null;
        }
    }

    private List<SourceRecord> getSourceRecords() {
        SourceRecord record = new SourceRecord(
                null,
                null,
                topic,
                VALUE_SCHEMA,
                getRandomLongFromApi());
        return Collections.singletonList(record);
    }

    private Long getRandomLongFromApi() {
        HttpGet httpGet = new HttpGet("http://" + apiUrl + "/random/long");

        try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
            return Long.parseLong(EntityUtils.toString(response.getEntity()));
        } catch (ClientProtocolException e) {
            logger.error("Error consuming GET /random/long: {}", e);
        } catch (IOException e) {
            logger.error("IO Error: {}", e);
        }
        return null;
    }

The poll method returns a List of SourceRecords, which contain information about:

  • the topic to write to
  • the value to write
  • the type of the value being written
  • the source partition (for example, filename or table name) to differentiate the source a record came from
  • the source offset (for example, position in file or value in the timestamp column of a table) for resuming consumption of data in case of restart

In our scenario, it doesn’t make sense to have a source partition, since our source is always the same endpoint. But if we were hitting multiple endpoints to get different types of values, it would be reasonable to have pass in different sourcePartitions for each endpoint.

Similarly, since we simply hit an endpoint and either get a random value or not, our sourceOffset is null.
However, if your custom Task involves breaking large files into chunks before reading them, then a sourceOffset that indicates
the last read position in the file would be helpful! Or if your task involves reading from a table, then a sourceOffset
with a timestamp value could help you determine the last consumed row.

version

As before, return the version of your connector:

    @Override
    public String version() {
        return VERSION;
    }

Packaging & Configuration Properties

There are a number of ways to install and run a Kafka Connector, but in all cases, you will need to provide separate sets of configuration properties for running a worker and for your custom connector. One worker property is plugin.path, which will specify the path to either a zip or an uber-jar that contains your connector code and its dependencies.

Packaging an uber-jar

To create an uber-jar in a gradle project, first add the following plugin to your build.gradle:

plugins {
    id 'com.github.johnrengelman.shadow' version '5.0.0'
}

To create the uber-jar, run:

$ ./gradlew clean shadowJar

You can find your uber-jar under build/libs/<your jar name>-all.jar

Configuration Properties

This article contains walk-throughs for four ways of installing and running our custom Connector. You may need to provide configuration properties for your Connect Worker and custom Connector differently, depending on the type of installation:

  1. manual installation in standalone mode
    • randomlong-connect-standalone.properties file to configure a standalone worker
    • randomlong-connector.properties file to configure your custom Randomlong Connector
  2. manual installation in distibuted mode
    • randomlong-connect-distributed.properties file to configure a distributed mode worker
    • a POST request body with configuration values for your custom Randomlong Connector
  3. docker image with pre-installed custom connector and
  4. K8s pod with connector jar in mounted volume
    • env variables to configure a distributed mode worker
    • a POST request body with configuration values for your custom Randomlong Connector

Check out our github repo for sample properties files.

For now, create three .properties files:

  • randomlong-connect-standalone.properties
  • randomlong-connect-distributed.properties
  • randomlong-connector.properties

Standalone Mode Worker Configuration

randomlong-connect-standalone.properties

bootstrap.servers=<kafka broker host>:9092
rest.port=8084
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

plugin.path=/usr/share/java/kafka-connect-randomlong

Don’t forget to modify the host for bootstrap.servers! The default value for rest.port is 8083. You may use the default value if you do not already have a connect worker running on that port.

Distributed Mode Worker Configuration

To start up a connector in distributed mode, you will need several additional configuration properties, including group.id to identify the Connect cluster group the worker belongs to and a set of configs related to kafka topics for storing offset, configs, and status. These topic configs need to be the same for all the workers with the same group.id:

randomlong-connect-distributed.properties

bootstrap.servers=<kafka broker host>:9092

group.id=connect-randomlong-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.topic=connect-randomlong-offsets
offset.storage.replication.factor=1

config.storage.topic=connect-randomlong-configs
config.storage.replication.factor=1

status.storage.topic=connect-randomlong-status
status.storage.replication.factor=1

offset.flush.interval.ms=10000

rest.port=8085

plugin.path=/usr/share/java/kafka-connect-randomlong

Random Long Source Connector Configuration

randomlong-connector.properties

name=randomlong_source_connector
connector.class=io.enfuse.kafka.connect.connector.RandomLongSourceConnector
tasks.max=1
api.url=<host>:8080
topic=randomlong_topic
sleep.seconds=5

Don’t forget to provide the host for the api endpoint you want to poll from.

Deploy Confluent Platform to GKE

You should have a GCP account with access to GKE, the gcloud and kubectl command line tools installed and configured, and helm installed. If not, refer to the Pre-Requisites section above.

  1. We’ll use the Helm Charts provided by Confluent in their cp-helm-charts github repo. First, add the Confluent charts to your helm repo:
    • $ helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/ 
    • $ helm repo update
  2. Initialize helm and add Tiller to your Kubernetes cluster:
    • $ helm init --history-max 200
    The --history-max option is recommended as it allows for purging of objects in the helm history.
  3. Install the Confluent Platform:
    • $ helm install confluentinc/cp-helm-charts --name my-confluent-oss
    If you do not provide a custom name, a random name will be generated for you. Try helm list to confirm the release. Try kubectl get pods to see all the running confluent pods. At minimal, you should see three kafka brokers and a connect server.

For more information on using the Helm Charts to install the Confluent Platform, see the Confluent Docs.

Deploy Random Long API to GKE

Here you’ll need a simple app that exposes a GET /random/long endpoint that returns a random long value. Refer to our repo for a sample Java Spring Boot app with this endpoint. Our repo also contains a Dockerfile that will allow you to build an image and push to your Google Container Registry (GCR):

  1. cd into the directory of your project that contains your Dockerfile.
  2. Build the docker image:
    • $ docker build . -t randomlong-api
  3. Tag the docker image in preparation for pushing it to GCR:
    • $ docker tag randomlong-api us.gcr.io/<your gke project name>/randomlong-api
  4. Make sure your docker cli is authenticated to push to GCR:
    • $ gcloud auth configure-docker
  5. Push the docker image to GCR:
    • $ docker push us.gcr.io/<your gke project name>/randomlong-api
  6. Run the container:
    • $ kubectl run randomlong-api --image=us.gcr.io/<your gke project name>/randomlong-api --port=8080
  7. Expose a service for the api:
    • $ kubectl expose deployment randomlong-api --type=ClusterIP --name=randomlong-api-service

Standalone vs Distributed Mode

Kafka Connect workers can run in standalone or distributed mode. A standalone worker is a single worker (you may think of it as a single process) that executes all connectors and tasks. The upside of running in standalone mode is that you have relatively simpler configuration requirements than running in distributed mode. And, of course, a single worker uses less resources than multiple workers. The downside, however, is that since you have only one process running all your connectors and tasks, you have zero fault-tolerance and poor scalability.

So, when would you want to run a worker in standalone mode? Well, standalone mode is fine for testing and development purposes. Standalone mode may also make sense if you have a use case where you know that you need only one agent and fault-tolerance and scalability are not important. In most all production cases, though, you will want to run your workers in distributed mode.

In distributed mode, multiple workers share a group.id, and connectors and tasks are balanced across all the workers. In addition to a shared group.id, workers in distributed mode make use of several kafka topics for information about offsets, configuration, and status to support re-balancing of connectors and tasks across remaining workers when one crashes, is added or is removed.

Install and Run Connector in Standalone Mode

  1. Earlier we created a .properties file to configure a standalone worker and another file to configure our randomlong connector. We need to copy those files into the /etc/kafka directory in our connect server. Assuming your .properties files are in a directory named config, run these commands:
    • $ kubectl get pods // to see the name of your kafka connect pod 
    • $ kubectl cp /config/randomlong-connect-standalone.properties <kafka-connect-pod-name>:/etc/kafka -c cp-kafka-connect-server 
    • $ kubectl cp /config/randomlong-connector.properties <kafka-connect-pod-name>:/etc/kafka -c cp-kafka-connect-server
  2. Build an uber-jar with
    • $ ./gradlew clean shadowJar 
  3. Copy the uber-jar from /connector/build/libs into the /usr/share/java/kafka-connect-randomlong-connector directory in the kafka connect container:
    • $ kubectl cp connector/build/libs/connector-all.jar <kafka-connect-pod-name>:/usr/share/java/kafka-connect-randomlong/ -c cp-kafka-connect-server
  4. SSH into the Kafka Connect container and run /usr/bin/connect-standalone:
    • $ kubectl exec -ti <kafka-connect-pod-name> -c cp-kafka-connect-server bash 
    • $ /usr/bin/connect-standalone /etc/kafka/randomlong-connect-standalone.properties /etc/kafka/randomlong-connector.properties

Tips For When Things Go Wrong

  • If you see an address already in use error, you may be attempting to spin up a connector with the same rest.port value as an already running connector. Try changing the port in your randomlong-connect-standalone.properties file and try again.
  • If you see a Could not find class io.enfuse.kafka.connect.connector.RandomLongSourceConnector error, double check the plugin.path in randomlong-connect-standalone.properties. It needs to point to a directory that contains your connector uber-jar. Kafka Connect will look recursively in the directory you specified, so it should still work even if plugin.path points to a directory that is a parent of whichever directory houses your uber-jar.
  • If your connector attempts to start but then immediately shuts down, you might not have the correct addresses configured for your kafka brokers. Double check the bootstrap.servers value in randomlong-connect-standalone.properties.
  • If the Kafka topic you’ve provided in randomlong-connector.properties does not already exist, Kafka Connect will create the topic for you. However, if your connector fails due to the topic not existing, try manually creating the topic first and try again.

Install and Run Connector in Distributed Mode

  1. Earlier we created a .properties file to configure a distributed mode worker. Assuming your .properties files are in a directory named config, run these commands to copy the .properties file to the /etc/kafka directory of your connect server:
    • $ kubectl get pods // to see the name of your kafka connect pod 
    • $ kubectl cp /config/randomlong-connect-distributed.properties <kafka-connect-pod-name>:/etc/kafka -c cp-kafka-connect-server
  2. As before, you will need your connector uber-jar in the /user/share/java/kafka-connect-randomlong directory of the kafka connect container. See above step #3 from previous section.
  3. kubectl exec into the kafka connect container and run:
    • $ /usr/bin/connect-distributed /etc/kafka/randomlong-connect-distributed.properties
    • Note that /usr/bin/connect-distributed only takes the configuration properties for the connect workers. Configuration for your custom connector will be passed through the Kafka Connect REST API, which we’ll do in the next step.
  4. Set up port-forwarding to the rest port for your custom connector:
    • $ kubectl port-forward <kafka-connect-pod-name> 8085:8085
    • See the rest.port property in randomlong-connect-distributed.properties to see which port to use.
  5. Submit a POST request to the Kafka Connect REST API to create your new connector, passing in the required configuration properties through the request body:
    curl -X POST \
      http://localhost:8083/connectors \
      -H 'Accept: */*' \
      -H 'Content-Type: application/json' \
      -d '{
        "name": "randomlong_source_connector",
        "config": {
            "connector.class": "io.enfuse.kafka.connect.connector.RandomLongSourceConnector",
            "api.url": "<host>:8080",
            "topic": "randomlong_topic",
            "sleep.seconds": 5
    }'

Don’t forget to modify the value for api.url in your request body!

Install and Run Connector with Docker Image

In the previous sections, we reviewed how to manually install a custom connector. Manual installations are relatively simple and sufficient while working on a POC or for learning purposes. However, for development or deployment of a production-grade connector, installation of your connector should be handled by an automated CI/CD pipeline. If your team uses Docker, you can build an image with your custom connector pre-installed to be run in your various environments. In the following example, we first build the uber-jar locally and then copy it into the /usr/share/java/kafka-connect-randomlong directory for the container, but you could instead pull your uber-jar from an artifactory repository.

First create the following Dockerfile:

FROM confluentinc/cp-kafka-connect-base:5.2.1

ENV CONNECT_GROUP_ID connect-randomlong-cluster
ENV CONNECT_PLUGIN_PATH /usr/share/java/kafka-connect-randomlong
ENV CONNECT_BOOTSTRAP_SERVERS PLAINTEXT://enfuse-cp-kafka-headless:9092
ENV CONNECT_REST_ADVERTISED_HOST_NAME (v1:status.podIP)

ENV CONNECT_INTERNAL_KEY_CONVERTER org.apache.kafka.connect.json.JsonConverter
ENV CONNECT_INTERNAL_VALUE_CONVERTER org.apache.kafka.connect.json.JsonConverter
ENV CONNECT_KEY_CONVERTER org.apache.kafka.connect.storage.StringConverter
ENV CONNECT_VALUE_CONVERTER org.apache.kafka.connect.storage.StringConverter
ENV CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE true
ENV CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE true

ENV CONNECT_OFFSET_STORAGE_TOPIC connect-randomlong-offsets
ENV CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR 1
ENV CONNECT_OFFSET_FLUSH_INTERVAL_MS 10000
ENV CONNECT_CONFIG_STORAGE_TOPIC connect-randomlong-configs
ENV CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR 1
ENV CONNECT_STATUS_STORAGE_TOPIC connect-randomlong-status
ENV CONNECT_STATUS_STORAGE_REPLICATION_FACTOR 1

ENV KAFKA_JMX_PORT 5555

COPY build/libs/connector-all.jar /usr/share/java/kafka-connect-randomlong/connector-all.jar
  1. Modify the env value for CONNECT_BOOTSTRAP_SERVERS in the Dockerfile. You can use the IP Address & port of your kafka brokers or the name of your headless kafka service. Note that most of the env variables are similar to ones we’ve provided during manual installation in distributed mode; however, we need to prefix those variables with CONNECT for them to actually be picked up by the Connect Worker.
  2. cd into the directory of your project that contains your Dockerfile.
  3. Build the docker image:
    • $ docker build . -t randomlong-connector
  4. Tag the docker image in preparation for pushing it to Google Container Registry:
    • $ docker tag randomlong-connector us.gcr.io/<your gke project name>/randomlong-connector
  5. Make sure your docker cli is authenticated to push to GCR:
    • $ gcloud auth configure-docker
  6. Push the docker image to GCR:
    • $ docker push us.gcr.io/<your gke project name>/randomlong-connector
  7. Run the container:
    • $ kubectl run randomlong-connector --image=us.gcr.io/<your gke project name>/randomlong-connector --port=8083
  8. Expose a service for the connector:
    • $ kubectl expose deployment randomlong-connector --type=ClusterIP --name=randomlong-connector-service
  9. Port-forward to the randomlong connector container:
    • $ kubectl get pods // to get the name of your randomlong-connector pod $ kubectl port-forward <randomlong-connector-pod-name> 8083:8083
  10. Submit a POST request to the Kafka Connect REST API to create your new connector, passing in the required configuration properties through the request body:
    curl -X POST \
      http://localhost:8083/connectors \
      -H 'Accept: */*' \
      -H 'Content-Type: application/json' \
      -d '{
        "name": "randomlong_source_connector",
        "config": {
            "connector.class": "io.enfuse.kafka.connect.connector.RandomLongSourceConnector",
            "api.url": "<host>:8080",
            "topic": "randomlong_topic",
            "sleep.seconds": 5
        }
    }'

Don’t forget to modify the value for api.url in your request body!

Install and Run Connector Using an External Volume

An alternative to building a docker image with the connector pre-installed is to place the connector jar in a volume. We’ll create a k8s pod with a container based on a base Kafka Connect image and provide configuration for distributed workers via environment variables. The pod will mount to the volume, and when the connect container is run, it will look in the mount path for the connector jar.

If you only want to install a couple third-party or custom connectors, it would be simpler to go with the previous approach and run custom docker images that have your connector jars pre-installed. However, if you wish to install many third-party jars it may make sense to store them in a volume and have those jars shared across all the containers mounted to the volume.

Here’s a sample randomlong-connector-pod.yaml for a k8s pod:

apiVersion: v1
kind: Pod
metadata:
  name: randomlong-connector
spec:
  containers:
    - name: randomlong-connector
      image: confluentinc/cp-kafka-connect-base:5.2.1
      env:
      - name: CONNECT_GROUP_ID
        value: "connect-randomlong-cluster"
      - name: CONNECT_PLUGIN_PATH
        value: "/usr/share/java/kafka-connect-randomlong"
      - name: CONNECT_BOOTSTRAP_SERVERS
        value: "10.19.253.11:9092"
      - name: CONNECT_REST_ADVERTISED_HOST_NAME
        value: "(v1:status.podIP)"
      - name: CONNECT_INTERNAL_KEY_CONVERTER
        value: "org.apache.kafka.connect.json.JsonConverter"
      - name: CONNECT_INTERNAL_VALUE_CONVERTER
        value: "org.apache.kafka.connect.json.JsonConverter"
      - name: CONNECT_KEY_CONVERTER
        value: "org.apache.kafka.connect.storage.StringConverter"
      - name: CONNECT_VALUE_CONVERTER
        value: "org.apache.kafka.connect.storage.StringConverter"
      - name: CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE
        value: "true"
      - name: CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE
        value: "true"
      - name: CONNECT_OFFSET_STORAGE_TOPIC
        value: "connect-randomlong-offsets"
      - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
        value: "1"
      - name: CONNECT_OFFSET_FLUSH_INTERVAL_MS
        value: "10000"
      - name: CONNECT_CONFIG_STORAGE_TOPIC
        value: "connect-randomlong-configs"
      - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
        value: "1"
      - name: CONNECT_STATUS_STORAGE_TOPIC
        value: "connect-randomlong-status"
      - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
        value: "1"
      - name: KAFKA_JMX_PORT
        value: "5555"
      ports:
        - containerPort: 8083
      volumeMounts:
        - name: workdir
          mountPath: /usr/share/java/kafka-connect-randomlong
  initContainers:
    - name: install-randomlong-connector
      image: openjdk:8
      command: ["/bin/bash","-c"]
      args: ["apt-get update; apt-get --assume-yes install git; cd home; git clone https://github.com/enfuse/kafka-connect-demo.git; cd kafka-connect; ./gradlew shadowJar; cp connector/build/libs/connector-all.jar /data"]
      volumeMounts:
        - name: workdir
          mountPath: "/data"
  dnsPolicy: Default
  volumes:
    - name: workdir
      emptyDir: {}

Note that we use the k8s initContainers feature to first run a temporary container that will build and copy our uber-jar into an ephemeral Volume. This initContainer runs and completes before other containers are fired up. For demo purposes, we do not have an artifactory repository from which to pull our uber-jar, so we instead run several command arguments to clone our repo, build the uber-jar, and then copy the uber-jar into the mount path. This is not recommended! Here, you’ll want to pull a stable versioned jar from an artifactory repository or some other store like GCS (if in GCP). Better yet, if your custom jar becomes verified and offered on Confluent Hub, you can use the confluent-hub cli to fetch your connector. After the install-randomlong-connector initContainer completes, our randomlong-connector container spins up, mounts to the volume and finds the connector uber-jar under /usr/share/java/kafka-connect-randomlong as it starts new Connect workers.

Another thing to note is that we are using the emptyDir Volume type. This is an ephemeral volume that is created when the pod is assigned to a node. The data in the volume survives any container crashes; however, if the pod is removed from the node, then you will lose all the data in the volume. I chose to use emptyDir as it is the simplest type of Volume to demo with. Depending on your cloud provider, you have many different Persistent Volume options. Refer to the K8s docs for more information about configuring your pod to use a persistent volume.

  1. To deploy the pod:
    • $ kubectl apply -f connector/k8s/randomlong-connector-pod.yaml
  2. Port-forward to randomlong-connector:
    • $ kubectl port-forward randomlong-connector 8083:8083
  3. As before, submit a POST request to provide your custom connector configuration properties. (See previous section for example request.)

Closing Remarks

We’ve covered the basic concepts of Kafka Connectors and explored a number of different ways to install and run your own. Stay tuned for up and coming articles that take a deeper dive into Kafka Connector development with more advanced topics like validators, recommenders and transformers, oh my!

If you haven’t already, check out our companion github repo.

For more resources and help on Kafka Connect, check out:

Lastly, special thanks to my colleague Dave Miller for his development contributions to the companion github repo and for his feedback on drafts of this post!