Kafka Connect Basics
Hi, this article is about Kafka connect!
Introduction
Kafka connect is a tool for streaming data between Kafka and other systems. It is distributed and scalable by
default and since it’s a standardized tool there are lots of connectors already available.
Connectors connect Kafka to a system or vice versa. There are two types of connectors
Source: Source connectors grab data from an existing system e.g: MariaDB, PostgreSQL, S3, Jira, and others, and stream the data into one or more Kafka topics.
Sink: Sink connectors grab the data from the topics and ingests it to a new system, eg: MongoDB, Snowflake, S3.
If you want to stream change data capture events from your databases, the Debezium provides
connectors that allow you
to do just that. CDC is an append only log that identifies changes in databases, using a cdc stream you can
replicate or reconstruct a database, additionally you can react on events by processing them in an external system.
Kafka connect can be deployed in standalone mode or distributed as a cluster of workers.
It features a RESTful interface for interacting with it:
configuring connectors
starting, stopping, pausing connectors
viewing connector status
resting connector offsets
It also allows you to apply various transformations on a message.
Apache Kafka has an amazing documentation section
on Kafka Connect.
Here’s a diagram of a system built with Kafka connect, it replicates data from PostgreSQL and MariaDB into
ElasticSearch. ElasticSearch offers a lot of tools for searching through the data with fast and good accuracy.
Rest API
For reference, I’ve copied all the operations from
the REST API
documentation and put them into a table.
Method Path Description GET /connectors return a list of active connectors. POST /connectors create a new connector. GET /connectors/{name} get information about a specific connector. DELETE /connectors/{name} deletes a connector. GET /connectors/{name}/config get the configuration parameters for a specific connector. PUT /connectors/{name}/config update the configuration parameters for a specific connector. PATCH /connectors/{name}/config patch the configuration parameters for a specific connector. GET /connectors/{name}/status get current status of the connector. GET /connectors/{name}/tasks get a list of tasks currently running for a connector. GET /connectors/{name}/tasks/{taskid}/status get current status of the task. PUT /connectors/{name}/pause pause the connector and its tasks, which stops message processing until the connector is resumed. PUT /connectors/{name}/stop stop the connector and shut down its tasks. PUT /connectors/{name}/resume resume a paused or stopped connector. POST /connectors/{name}/restart restart a connector and its tasks instances. POST /connectors/{name}/tasks/{taskId}/restart restart an individual task. PUT /connectors/{name}/topics/reset send a request to empty the set of active topics of a connector. GET /connectors/{name}/offsets get the current offsets for a connector. DELETE /connectors/{name}/offsets reset the offsets for a connector. GET /connector-plugins return a list of connector plugins installed in the Kafka Connect cluster. GET /connector-plugins/{plugin-type}/config get the configuration definition for the specified plugin. PUT /connector-plugins/{connector-type}/config/validate validate the provided configuration values against the configuration definition.
To start a new connector instance you would usually use POST on /connectors with a config body:
{
"name": "my-jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydb",
"connection.user": "myuser",
"connection.password": "mypassword",
"mode": "incrementing",
"incrementing.column.name": "id",
"table.whitelist": "users",
"poll.interval.ms": "5000",
"topic.prefix": "pg.",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"transforms": "maskSensitive",
"transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSensitive.fields": "email,phone",
"transforms.maskSensitive.replacement": "****"
}
}
Converters
Converters are used by connect in order to convert values from a type to another. Converts apply to the kafka message
key and kafka message value. For example, if you have the following JSON message:
{"data": 1}A string converter will put that message as a string in the Kafka topic, where as a JSON converter will keep it JSON.
There are also binary format converters like Avro and ProtoBuf,
that help reduce the message size by packing the message into the compact format. A downside of this format is that you
need the message schema in order to deserialize it.
You can also write your own converter and load it into Kafka connect.
To set the converters you use the following keys:
key.converter: Sets the converter for the message key.
value.converter: Sets the converter for the message value.
Here are some common converter classes:
org.apache.kafka.connect.storage.StringConverter
org.apache.kafka.connect.json.JsonConverter
org.apache.kafka.connect.converters.ByteArrayConverter
io.confluent.connect.json.JsonSchemaConverter (Requires schema registry)
io.confluent.connect.protobuf.ProtobufConverter (Requires schema registry)
io.confluent.connect.avro.AvroConverter (Requires schema registry)
And you usually set a converter with:
{
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true"
}
By also setting value.converter.schemas.enable to true you will receive the schema of the JSON message along
with the payload.
Schema Registry
The schema registry is another component that acts as a cache for the message schemas.
Binary formats like Avro or Protobuf cannot be decoded by their receiver without the message’s schema, and
sending the schema with each message increases the message size.
The purpose of the schema registry is to keep all schemas together in a database and let producers and consumers
request the schema only when needed, so that messages can be produced in the kafka topic without including the schema.
This component is optional, and it’s only required when using binary formats like Avro or Protobuf.
Transforms
You can apply various transformations on messages that are processed by the connector.
Common transforms include masking fields, dropping fields, replacing values, renaming fields and more.
Cast - Cast fields or the entire key or value to a specific type
DropHeaders - Remove headers by name
ExtractField - Extract a specific field from Struct and Map and include only this field in results
Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages
Flatten - Flatten a nested data structure
HeaderFrom - Copy or move fields in the key or value to the record headers
HoistField - Wrap the entire event as a single field inside a Struct or a Map
InsertField - Add a field using either static data or record metadata
InsertHeader - Add a header using static data
MaskField - Replace field with valid null value for the type (0, empty string, etc) or custom replacement (non-empty string or numeric value only)
RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression
ReplaceField - Filter or rename fields
SetSchemaMetadata - modify the schema name or version
TimestampConverter - Convert timestamps between different formats
TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps
ValueToKey - Replace the record key with a new key formed from a subset of fields in the record value
Source: https://kafka.apache.org/41/kafka-connect/user-guide/#transformations
To apply transforms you would include them into the connector config:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"transforms": "maskSensitive",
"transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskSensitive.fields": "sensitiveField",
"transforms.maskSensitive.replacement": "****"
}
Docker Compose
You can start a pre-configured Kafka Connect instance along with a Kafka cluster for development or playing around
using this docker-compose file.
services:
broker:
image: confluentinc/cp-kafka:8.0.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
# schema-registry:
# image: confluentinc/cp-schema-registry:8.0.0
# hostname: schema-registry
# container_name: schema-registry
# depends_on:
# - broker
# ports:
# - "8081:8081"
# environment:
# SCHEMA_REGISTRY_HOST_NAME: schema-registry
# SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: confluentinc/cp-kafka-connect:8.1.2
hostname: connect
container_name: connect
depends_on:
- broker
# - schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
Additionally, you can also download the kafka binary archive and run
connect with bin/connect-standalone.sh config/connect-standalone.properties.
That’s all, I hope this article gave you a rough idea of Kafka Connect and it’s capabilities.


