Java Client for publishing and consuming messages from Apache Kafka

I wanted to learn how to use Apache Kafka for publishing and consuming messages from Apache Kafka using Java client, so i followed these steps.
  1. Download the Kafka binaries from Kafka download page
  2. Unzip the kafka tar file by executing tar -xzf kafka_2.9.2-0.8.1.1.tgz. Then go to kafka directory by executing cd kafka_2.9.2-0.8.1.1
  3. Next start the Zookeeper server by executing following command
    
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
  4. Start the Kafka server by executing following command
    
    bin/kafka-server-start.sh config/server.properties
    
  5. Now your Zookeeper and Kafka server are ready and you can download the source code for sample project from here
  6. This is how a Java Client that publishes messages to Kafka looks like, execute it couple of times to publish couple of messages First thing that you have to do while developing a producer is connect to the Kafka server, for that you will set value of metadata.broker.list property to point to the port on which kafka server is listening (You can find value of port and host name from server.properties that you used in step 4. Once you have Producer object you can use it for publishing messages by creating object of kafka.producer.KeyedMessage, you will have to pass name of the topic and message as argument
  7. This is how the Java client for consumer of messages from Kafka looks like, run it and it will start a thread that will keep listening to messages on topic and every time there is a message it will print it to console The HelloKafkaConsumer class extends Thread class. In the constructor of this class first i am creating Properties class with value of zookeeper.connect property equal to the port on which zookeeper server is listening on. In the constructor i am creating object of kafka.javaapi.consumer.ConsumerConnector
    Once the ConsumerConnector is ready in the run() method i am passing it name of the topic on which i want to listen (You can pass multiple topic names here). Everytime there is a new message i am reading it and printing it to console.

18 comments:

Mujadid said...

Hi Sunil! your blog is awesome and very informative. I want to know that instead of showing message on console, "as you have done in cosnsumer thread run method" how we can return this message to caller method . any Idea?

Suresh Kesavan said...

Thanks for your post.. it is very nice.. How to get unread messages from using kafka consumer?

Saurabh Chhajed said...

Hi,

In you consumer you have specified the topic name as "pythontest" it should be "javatest".

Nitesh Tailor said...

How to read meassage from beginning every time when I run this API.

Anonymous said...

Hi sunil, I need some help on retrieving kafka topic metadata. I sent email to you as well regarding this. I could not find any api on this.

Anonymous said...

which scala version did you use?

Barış Çapraz said...

We made java sample archive on Core Java Code Examples

Kanishka Chauhan said...

Hi Sunil,

You have used Scala API provided in "kafka_2.10-0.8.2.2.jar" while I can see similar API in Java provided in "kafka-clients-0.8.2.0.jar".
I am totally confused what to use and how? Most of the examples provided on the internet are using the former jar. Could you please provide me some hints or pointer regarding implementing Consumer using Java API (later jar).

Thanks,
Kanishka Chauhan

Gazal Begum said...

Hey Sunil,

First of all, thank you so much for such an informative blog on Kafka.

Secondly, I had a doubt regarding how to write the messages received from Kafka Consumer to the file (text file).
I have a Kafka Producer code written in java that writes kafka messages. And a consumer code that receives these messages.

Is it possible to write these received messages by consumer to any text file in java?
Can you please help me out in this?

Thanks and Regards,
Gazal

Anonymous said...

Hi sunil
how to read input file from s3 location and how to give output location in kafka
any commands send me my mail id ; jnp.hadoop@gmail.com

Sudhish Kumar said...

Hi Sunil,

Is there any way that the producer can get acknowledgement from the consumer on receiving the messages. I tried with the callback() method in the producer.send() but it doesn't provide any details whether the message was consumed by the consumer

Rgards,
Sudhish

Kanishka Chauhan said...

Yes, there is surely a way to get acknowledgement. You can easily find on internet. There are several examples available explaining the same.

Anonymous said...

Hi, I tried to run the code posted in the blog by making appropriate changes. The producer code is running successfully and publishing the message but the consumer is not able to consume the message. However, if I run the consumer on cmd then it's able to consume the message from the specified topic.
What could be the reason...???

David Kavanagh said...

THANK YOU!!!!!!

Anonymous said...

I m getting this error

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.

Process finished with exit code 0

sumanth reddy said...

Apache kafka(0.9.0.0) : How kafka client acts as both producer and consumer in java ?

Pavan Patil said...
This comment has been removed by the author.
Pavan Patil said...

Hello Sunil,

I am getting below error.

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console. Set system property 'log4j2.debug' to show Log4j2 internal initialization logging.
Exception in thread "main" org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server 'localhost:2181' with timeout of 6000 ms
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1233)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:157)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:131)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:115)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:97)
at kafka.consumer.ZookeeperConsumerConnector.connectZk(ZookeeperConsumerConnector.scala:194)
at kafka.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:142)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:66)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.(ZookeeperConsumerConnector.scala:69)
at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:123)
at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala)
at com.spnotes.kafka.HelloKafkaConsumer.(HelloKafkaConsumer.java:41)
at com.spnotes.kafka.HelloKafkaConsumer.main(HelloKafkaConsumer.java:31)

Can you please me why?.

Thanks,
Pavan Patil
patilpavan8@gmail.com