Apache Kafka Simple Producer And Consumer

Share on:

Introduction

Hello, now that you know how to use Apache Kafka Buzz Words in a technical discussion, it is also necessary to know how to write a plain system and understand its working. Before that, a small theory, why do we even need apache kafka when our system is highly available on a perfect hardware across multiple regions, hosted on the best cloud service provider and written in the best software technology present @this time. Imagine you have written a request response system which belongs to messaging-pattern coming under the roof of network oriented architectural patterns. Between a request and response exists a process whose time of computation completely depends on the request parameters, usually on the request receiver's computer. If not taken into consideration your system would have certain processes which were started but never arrived at conclusion or are still waiting for some or the other action to happen. This pattern is synchronous in nature not always and tightly coupled. Publish-subscribe messaging system on the other hand can be seen as asynchronous in nature not always and loosely coupled. Neither the consumer no the publisher need to know about each other. Messages are dumped into queue by publishers, read by consumer and processed further until it reaches its detination which can either be a database, input to some analysis or user's terminal or do you think else where it can go :stuck_out_tongue_winking_eye:

I would use Request-Reponse Pattern in mostly front-end facing application and Publish-Subscribe pattern for background processing or One-way request.

How to create a topic

kafka-topics.sh --bootstrap-server localhost:9092 --topic java_topic --create --partitions 3 --replication-factor 1

Produce message to a topic

1kafka-console-producer.sh --broker-list localhost:9092 --topic java_topic
2> type in you message to `java_topic`

You should be presented with secondary prompt with everything goes well with setup and topic creation.

Consume a message from a topic

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic java_topic --from-beginning

--from-beginning tells that every time we run this command messages must be read from 0th index.

Java Implementation

We would create a simple entry program. This would spawn 2 threads:

  • Producer for producing message
  • Consumer to read above produced messages

We will monitor the completion by adding Shutdown hook to runtime and passing CountDownLatch to these 2 threads for smooth termination of our test application.

Dependencies are as below, please check the correct version if you face any compilation or runtime errors:

 1<dependencies>
 2        <dependency>
 3            <groupId>org.apache.kafka</groupId>
 4            <artifactId>kafka-clients</artifactId>
 5            <version>2.3.0</version>
 6        </dependency>
 7
 8        <dependency>
 9            <groupId>org.slf4j</groupId>
10            <artifactId>slf4j-api</artifactId>
11            <version>1.7.25</version>
12        </dependency>
13
14        <dependency>
15            <groupId>org.slf4j</groupId>
16            <artifactId>slf4j-jdk14</artifactId>
17            <version>1.7.25</version>
18        </dependency>
19    </dependencies>

Main.java - Entry Program

 1public class Main {
 2    public static void main(String[] args) {
 3        final String javaTopic = "java_topic";
 4        List<String> topics = Collections.singletonList(javaTopic);
 5        final CountDownLatch latch = new CountDownLatch(2);
 6
 7        final Producer producer = new Producer(javaTopic, latch);
 8        producer.onInit();
 9
10        final Consumer consumer = new Consumer(topics, latch);
11        consumer.onInit();
12
13        final Thread producerThread = new Thread(producer);
14        producerThread.setName("APACHE-KAFKA - PRODUCER");
15
16        final Thread consumerThread = new Thread(consumer);
17        consumerThread.setName("APACHE-KAFKA - CONSUMER");
18
19        producerThread.start();
20        consumerThread.start();
21
22
23        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
24            try {
25                System.out.println("Shut down command received");
26                producer.onDestroy();
27                consumer.onDestroy();
28                System.out.println("Shut down completed");
29            } catch (Error e) {
30                e.printStackTrace();
31            }
32        }));
33
34        try {
35            latch.await();
36            System.out.println("Execution Completed");
37        } catch (InterruptedException e) {
38            e.printStackTrace();
39        }
40
41        System.out.println("*******************************************************");
42
43    }
44}

LifeCycle.java - Marker interface for Producer & Consumer

1public interface LifeCycle {
2
3    void onInit();
4
5    void onDestroy();
6
7}

Producer.java - Kafka Producer

 1public class Producer implements Runnable, LifeCycle {
 2    private KafkaProducer<String, String> producer;
 3    private String javaTopic;
 4    private CountDownLatch latch;
 5    private AtomicBoolean producing = new AtomicBoolean(false);
 6
 7    public Producer(String javaTopic, CountDownLatch latch) {
 8        this.javaTopic = javaTopic;
 9        this.latch = latch;
10    }
11
12
13    public void onInit() {
14        final Properties properties = new Properties();
15        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
16        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
17        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
18        producer = new KafkaProducer<String, String>(properties);
19        producing = new AtomicBoolean(true);
20    }
21
22
23    public void onDestroy() {
24        producing.set(false);
25        producer.close();
26        latch.countDown();
27    }
28
29    @Override
30    public void run() {
31        try {
32            while (producing.get()) {
33                Thread.sleep(1000);
34                final String msg = "Message #" + System.currentTimeMillis();
35                System.out.println("Producing: " + msg);
36                final ProducerRecord<String, String> record = new ProducerRecord<String, String>(javaTopic, msg);
37                producer.send(record);
38                producer.flush();
39            }
40        } catch (InterruptedException ie) {
41            ie.printStackTrace();
42        } finally {
43            System.out.println("Producer improper shut down");
44            latch.countDown();
45        }
46    }
47
48}

Consumer.java - Kafka Consumer

 1public class Consumer implements Runnable, LifeCycle {
 2
 3    private KafkaConsumer<String, String> consumer;
 4    private List<String> topics;
 5    private CountDownLatch latch;
 6
 7    public Consumer(List<String> topics, CountDownLatch latch) {
 8        this.topics = topics;
 9        this.latch = latch;
10    }
11
12    public void onInit() {
13        final Properties properties = new Properties();
14        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
15        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
16        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
17        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "g_id_java_topic");
18        consumer = new KafkaConsumer<String, String>(properties);
19
20    }
21
22
23    public void onDestroy() {
24        consumer.wakeup();
25        consumer.close();
26        latch.countDown();
27    }
28
29    @Override
30    public void run() {
31        consumer.subscribe(topics);
32
33
34        while (true) {
35            try {
36                Thread.sleep(1000);
37            } catch (InterruptedException e) {
38                e.printStackTrace();
39                latch.countDown();
40            }
41            ConsumerRecords<String, String> record = consumer.poll(Duration.ofSeconds(3));
42            Iterator<ConsumerRecord<String, String>> iterator = record.iterator();
43            System.out.println("Total message : {}" + record.count());
44            while (iterator.hasNext()) {
45                final ConsumerRecord<String, String> nextRecord = iterator.next();
46                System.out.println("Consumed topic: " + nextRecord.topic() + "=> " + nextRecord.value());
47            }
48        }
49
50    }
51}

Our main method is spawning producer and consumer as a thread since they implements runnable, they become contender for thread.

  • Producer is Producing message periodically with String and timeInMillis() to a particular topic
  • Consumer has subscribed to this topic and is listening on it evey 3 seconds, and reads all message in bulk in every 3 seconds.
  • Finally when we will try to terminate our application, WakupException will be thrown and count of CountDownLatch will read to 0 as Producer and Consumer both are decrementing it by 1 and we should see exit code 0.

 

Successful execution of this program should yield folowing result:

 1...
 2INFO: [Consumer clientId=consumer-1, groupId=g_id_java_topic] Setting offset for partition java_topic-2 to the committed offset FetchPosition{offset=84788, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ashish:9092 (id: 0 rack: null), epoch=0}}
 3Oct 16, 2019 7:59:54 PM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator refreshCommittedOffsetsIfNeeded
 4INFO: [Consumer clientId=consumer-1, groupId=g_id_java_topic] Setting offset for partition java_topic-0 to the committed offset FetchPosition{offset=84782, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ashish:9092 (id: 0 rack: null), epoch=0}}
 5Oct 16, 2019 7:59:54 PM org.apache.kafka.clients.consumer.internals.ConsumerCoordinator refreshCommittedOffsetsIfNeeded
 6INFO: [Consumer clientId=consumer-1, groupId=g_id_java_topic] Setting offset for partition java_topic-1 to the committed offset FetchPosition{offset=84786, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ashish:9092 (id: 0 rack: null), epoch=0}}
 7Total message : {}1
 8Consumed topic: java_topic=> Message #1571236194542
 9Producing: Message #1571236195575
10Total message : {}1
11Consumed topic: java_topic=> Message #1571236195575
12Producing: Message #1571236196577
13Total message : {}1
14Consumed topic: java_topic=> Message #1571236196577
15Shut down command received
16Shut down completed
17Execution Completed
18*******************************************************

 

Usage

Cool now i know what is apache kafka and how to write it. But wait, where i can use it? Here are some scenarios where i would use apache kafka:

  • Say i have create a video processing server, i will take input file and would convert into video chunks here i can use kafka with streams.
  • A user dashboard application where data is continuously feed in the front end.
  • Background transaction processing and fraud detection.
  • Real-time mapping of drivers-customers in uber like app.
  • And many other places where the client is not expecting an immediate response from the server or tell the server, "hey, this is your data, read, process, update and then let me know when you are done doing things in background to show notification to user.
comments powered by Disqus