Simple Apache Kafka Connector

Share on:

Overview

Apache kafka connect can be used to get source data inside application from external data sources such as:

  • JDBC Connection
  • NoSQL
  • MongoDB
  • Streaming Service eg. Twitter
  • etc

Following is the simplest useful place for kafka-connect in data pipeline link

png

Solution

There is a standard approach to implement kafka connect in an application given below:

SourceConnector

Implementer of this class has access to basic properties, config map and few callback methods.

SourceTask

Implementer of this class has the actual implementation of connecting to external data source, fetching and then transforming data before dumping into kafka.

AbstractConfig

Straight from doc:

1A convenient base class for configurations to extend.

Let's jump into implmentation of Kafka Connector where we will create Stream of messages and pass it to a topic created on kafka.

 1package in.silentsudo.kafka.connector.source;
 2
 3import org.apache.kafka.common.config.ConfigDef;
 4import org.apache.kafka.connect.connector.Task;
 5import org.apache.kafka.connect.source.SourceConnector;
 6
 7import java.util.ArrayList;
 8import java.util.List;
 9import java.util.Map;
10
11public class StreamConnector extends SourceConnector {
12
13    private StreamSourceConfig config;
14
15    @Override
16    public void start(Map<String, String> props) {
17        config = new StreamSourceConfig(props);
18    }
19
20    @Override
21    public Class<? extends Task> taskClass() {
22        return StreamSourceTask.class;
23    }
24
25    /**
26     * On startup, the Kafka Connect framework will pass each configuration map
27     * contained in the list returned by taskConfigs to a task.
28     */
29    @Override
30    public List<Map<String, String>> taskConfigs(int maxTasks) {
31        List<Map<String, String>> configs = new ArrayList<>(1);
32        configs.add(config.originalsStrings());
33        return configs;
34    }
35
36    @Override
37    public void stop() {
38        //Method not implemented
39    }
40
41    @Override
42    public ConfigDef config() {
43        return new ConfigDef();
44    }
45
46    @Override
47    public String version() {
48        return null;
49    }
50}
 1package in.silentsudo.kafka.connector.source;
 2
 3import org.apache.kafka.common.config.AbstractConfig;
 4import org.apache.kafka.common.config.ConfigDef;
 5
 6import java.util.Map;
 7
 8public class StreamSourceConfig extends AbstractConfig {
 9    public static final String BATCH_SIZE_CONFIG = "batch.size";
10    private static final String BATCH_SIZE_DOC = "Number of data points to retrieve at a time. Defaults to 100 (max value)";
11
12    public StreamSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
13        super(config, parsedConfig);
14    }
15
16    public StreamSourceConfig(Map<String, String> parsedConfig) {
17        this(conf(), parsedConfig);
18    }
19
20    public static ConfigDef conf() {
21        return new ConfigDef();
22    }
23}
 1package in.silentsudo.kafka.connector.source;
 2
 3import org.apache.kafka.connect.data.Schema;
 4import org.apache.kafka.connect.data.SchemaBuilder;
 5import org.apache.kafka.connect.data.Struct;
 6import org.apache.kafka.connect.source.SourceRecord;
 7import org.apache.kafka.connect.source.SourceTask;
 8
 9import java.util.Collections;
10import java.util.HashMap;
11import java.util.List;
12import java.util.Map;
13import java.util.concurrent.atomic.AtomicBoolean;
14
15
16public class StreamSourceTask extends SourceTask {
17
18    private int offset = 0;
19    private static final String MESSAGE = "Message #%d";
20    
21    public StreamSourceTask() {
22    }
23
24    @Override
25    public String version() {
26        return "1.0";
27    }
28
29    @Override
30    public void start(Map<String, String> map) {
31        //Method not Implemented
32    }
33
34    @Override
35    public List<SourceRecord> poll() throws InterruptedException {
36        offset++;
37
38        Thread.sleep(1000);
39        
40        return Collections.singletonList(getRecordForString(String.format(MESSAGE, offset)));
41    }
42
43    @Override
44    public void stop() {
45        System.out.println("Stopped called");
46    }
47
48    private SourceRecord getRecordForString(String content) {
49        return new SourceRecord(
50                sourcePartition(),
51                sourceOffset(),
52                "stream-messages-topic",
53                null,
54                SCHEMA_KEY,
55                buildKeySchemaValue(),
56                SCHEMA_KEY_VALUE,
57                buildKeySchemaContentValue(content)
58        );
59
60    }
61
62    private Schema SCHEMA_KEY = SchemaBuilder.struct()
63            .name("STRING_SCHEMA")
64            .version(1)
65            .field("key", Schema.STRING_SCHEMA)
66            .build();
67
68    private Schema SCHEMA_KEY_VALUE = SchemaBuilder.struct()
69            .name("STRING_SCHEMA_VALUE")
70            .version(1)
71            .field("content", Schema.STRING_SCHEMA)
72            .build();
73
74    private Struct buildKeySchemaValue() {
75        return new Struct(SCHEMA_KEY)
76                .put("key", "stream-messages-key-schema");
77    }
78
79    private Struct buildKeySchemaContentValue(String message) {
80        return new Struct(SCHEMA_KEY_VALUE)
81                .put("content", message);
82    }
83
84
85    private Map<String, String> sourcePartition() {
86        Map<String, String> map = new HashMap<>(1);
87        map.put("partition", "stream-partition");
88        return map;
89    }
90
91    private Map<String, String> sourceOffset() {
92        Map<String, String> map = new HashMap<>(1);
93        map.put("offset", "stream-partition-offset");
94        return map;
95    }
96}

One more important file before we start building, this is our properties config file which we are going to pass to kafka connect stream-connector.properties

1name=SourceConnector
2tasks.max=1
3connector.class=in.silentsudo.kafka.connector.source.StreamConnector
4topic=stream-messages-topic

Finally pom.xml file:

 1<?xml version="1.0" encoding="UTF-8"?>
 2<project xmlns="http://maven.apache.org/POM/4.0.0"
 3         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5    <modelVersion>4.0.0</modelVersion>
 6
 7    <groupId>in.silentsudo.kafka.connector</groupId>
 8    <artifactId>stream-connector</artifactId>
 9    <version>1.0-SNAPSHOT</version>
10
11
12    <dependencies>
13        <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
14        <dependency>
15            <groupId>org.apache.kafka</groupId>
16            <artifactId>connect-api</artifactId>
17            <version>2.4.1</version>
18        </dependency>
19
20    </dependencies>
21
22    <build>
23        <plugins>
24            <plugin>
25                <artifactId>maven-compiler-plugin</artifactId>
26                <configuration>
27                    <source>1.8</source>
28                    <target>1.8</target>
29                </configuration>
30
31            </plugin>
32            <plugin>
33                <groupId>org.apache.maven.plugins</groupId>
34                <artifactId>maven-jar-plugin</artifactId>
35                <version>3.2.0</version>
36            </plugin>
37        </plugins>
38    </build>
39
40</project>

Steps to Execute Kafka-Connector

Start Zookeeper

1zookeeper-server-start.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/zookeeper.properties

Start Kafka Broker

1kafka-server-start.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/server.properties

Create Kafka Topic defined in stream-connector.properties

1kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic stream-messages-topic

Create Topic message consumer

1kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream-messages-topic

Set Application Jar in CLASSPATH var using

1export CLASSPATH=path-to-project.jar

Start Kafka Connect

1connect-standalone.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/connect-standalone.properties stream-connector.properties

All kafka commnds are expected to be in system PATH variable.

comments powered by Disqus