Simple Apache Kafka Connector
Overview
Apache kafka connect can be used to get source data inside application from external data sources such as:
- JDBC Connection
- NoSQL
- MongoDB
- Streaming Service eg. Twitter
- etc
Following is the simplest useful place for kafka-connect in data pipeline link
Solution
There is a standard approach to implement kafka connect in an application given below:
SourceConnector
Implementer of this class has access to basic properties, config map and few callback methods.
SourceTask
Implementer of this class has the actual implementation of connecting to external data source, fetching and then transforming data before dumping into kafka.
AbstractConfig
Straight from doc:
1A convenient base class for configurations to extend.
Let's jump into implmentation of Kafka Connector where we will create Stream of messages and pass it to a topic created on kafka.
1package in.silentsudo.kafka.connector.source;
2
3import org.apache.kafka.common.config.ConfigDef;
4import org.apache.kafka.connect.connector.Task;
5import org.apache.kafka.connect.source.SourceConnector;
6
7import java.util.ArrayList;
8import java.util.List;
9import java.util.Map;
10
11public class StreamConnector extends SourceConnector {
12
13 private StreamSourceConfig config;
14
15 @Override
16 public void start(Map<String, String> props) {
17 config = new StreamSourceConfig(props);
18 }
19
20 @Override
21 public Class<? extends Task> taskClass() {
22 return StreamSourceTask.class;
23 }
24
25 /**
26 * On startup, the Kafka Connect framework will pass each configuration map
27 * contained in the list returned by taskConfigs to a task.
28 */
29 @Override
30 public List<Map<String, String>> taskConfigs(int maxTasks) {
31 List<Map<String, String>> configs = new ArrayList<>(1);
32 configs.add(config.originalsStrings());
33 return configs;
34 }
35
36 @Override
37 public void stop() {
38 //Method not implemented
39 }
40
41 @Override
42 public ConfigDef config() {
43 return new ConfigDef();
44 }
45
46 @Override
47 public String version() {
48 return null;
49 }
50}
1package in.silentsudo.kafka.connector.source;
2
3import org.apache.kafka.common.config.AbstractConfig;
4import org.apache.kafka.common.config.ConfigDef;
5
6import java.util.Map;
7
8public class StreamSourceConfig extends AbstractConfig {
9 public static final String BATCH_SIZE_CONFIG = "batch.size";
10 private static final String BATCH_SIZE_DOC = "Number of data points to retrieve at a time. Defaults to 100 (max value)";
11
12 public StreamSourceConfig(ConfigDef config, Map<String, String> parsedConfig) {
13 super(config, parsedConfig);
14 }
15
16 public StreamSourceConfig(Map<String, String> parsedConfig) {
17 this(conf(), parsedConfig);
18 }
19
20 public static ConfigDef conf() {
21 return new ConfigDef();
22 }
23}
1package in.silentsudo.kafka.connector.source;
2
3import org.apache.kafka.connect.data.Schema;
4import org.apache.kafka.connect.data.SchemaBuilder;
5import org.apache.kafka.connect.data.Struct;
6import org.apache.kafka.connect.source.SourceRecord;
7import org.apache.kafka.connect.source.SourceTask;
8
9import java.util.Collections;
10import java.util.HashMap;
11import java.util.List;
12import java.util.Map;
13import java.util.concurrent.atomic.AtomicBoolean;
14
15
16public class StreamSourceTask extends SourceTask {
17
18 private int offset = 0;
19 private static final String MESSAGE = "Message #%d";
20
21 public StreamSourceTask() {
22 }
23
24 @Override
25 public String version() {
26 return "1.0";
27 }
28
29 @Override
30 public void start(Map<String, String> map) {
31 //Method not Implemented
32 }
33
34 @Override
35 public List<SourceRecord> poll() throws InterruptedException {
36 offset++;
37
38 Thread.sleep(1000);
39
40 return Collections.singletonList(getRecordForString(String.format(MESSAGE, offset)));
41 }
42
43 @Override
44 public void stop() {
45 System.out.println("Stopped called");
46 }
47
48 private SourceRecord getRecordForString(String content) {
49 return new SourceRecord(
50 sourcePartition(),
51 sourceOffset(),
52 "stream-messages-topic",
53 null,
54 SCHEMA_KEY,
55 buildKeySchemaValue(),
56 SCHEMA_KEY_VALUE,
57 buildKeySchemaContentValue(content)
58 );
59
60 }
61
62 private Schema SCHEMA_KEY = SchemaBuilder.struct()
63 .name("STRING_SCHEMA")
64 .version(1)
65 .field("key", Schema.STRING_SCHEMA)
66 .build();
67
68 private Schema SCHEMA_KEY_VALUE = SchemaBuilder.struct()
69 .name("STRING_SCHEMA_VALUE")
70 .version(1)
71 .field("content", Schema.STRING_SCHEMA)
72 .build();
73
74 private Struct buildKeySchemaValue() {
75 return new Struct(SCHEMA_KEY)
76 .put("key", "stream-messages-key-schema");
77 }
78
79 private Struct buildKeySchemaContentValue(String message) {
80 return new Struct(SCHEMA_KEY_VALUE)
81 .put("content", message);
82 }
83
84
85 private Map<String, String> sourcePartition() {
86 Map<String, String> map = new HashMap<>(1);
87 map.put("partition", "stream-partition");
88 return map;
89 }
90
91 private Map<String, String> sourceOffset() {
92 Map<String, String> map = new HashMap<>(1);
93 map.put("offset", "stream-partition-offset");
94 return map;
95 }
96}
One more important file before we start building, this is our properties config file which we are going to pass to kafka connect
stream-connector.properties
1name=SourceConnector
2tasks.max=1
3connector.class=in.silentsudo.kafka.connector.source.StreamConnector
4topic=stream-messages-topic
Finally pom.xml
file:
1<?xml version="1.0" encoding="UTF-8"?>
2<project xmlns="http://maven.apache.org/POM/4.0.0"
3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5 <modelVersion>4.0.0</modelVersion>
6
7 <groupId>in.silentsudo.kafka.connector</groupId>
8 <artifactId>stream-connector</artifactId>
9 <version>1.0-SNAPSHOT</version>
10
11
12 <dependencies>
13 <!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
14 <dependency>
15 <groupId>org.apache.kafka</groupId>
16 <artifactId>connect-api</artifactId>
17 <version>2.4.1</version>
18 </dependency>
19
20 </dependencies>
21
22 <build>
23 <plugins>
24 <plugin>
25 <artifactId>maven-compiler-plugin</artifactId>
26 <configuration>
27 <source>1.8</source>
28 <target>1.8</target>
29 </configuration>
30
31 </plugin>
32 <plugin>
33 <groupId>org.apache.maven.plugins</groupId>
34 <artifactId>maven-jar-plugin</artifactId>
35 <version>3.2.0</version>
36 </plugin>
37 </plugins>
38 </build>
39
40</project>
Steps to Execute Kafka-Connector
Start Zookeeper
1zookeeper-server-start.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/zookeeper.properties
Start Kafka Broker
1kafka-server-start.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/server.properties
Create Kafka Topic defined in stream-connector.properties
1kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic stream-messages-topic
Create Topic message consumer
1kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream-messages-topic
Set Application Jar in CLASSPATH
var using
1export CLASSPATH=path-to-project.jar
Start Kafka Connect
1connect-standalone.sh /home/ashish/installed_apps/kafka_2.12-2.3.0/config/connect-standalone.properties stream-connector.properties
All kafka commnds are expected to be in system PATH
variable.