Kafka简单客户端编程实例
时间:2021-02-24 09:38:37|栏目:JAVA代码|点击: 次
今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。
一、创建配置类Config
这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS
package com.lya.kafka;
/**
* 配置项
* @author liuyazhuang
*
*/
public class Config {
/**
* 话题
*/
public static final String TOPIC = "wordcount";
/**
* 线程数
*/
public static final Integer THREADS = 1;
}
二、编程生产者类ProducerDemo
这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。
package com.lya.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* 生产者实例
* @author liuyazhuang
*
*/
public class ProducerDemo {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("zk.connect", "192.168.209.121:2181");
props.put("metadata.broker.list","192.168.209.121:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("zk.connectiontimeout.ms", "15000");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// 发送业务消息
// 读取文件 读取内存数据库 读socket端口
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
producer.send(new KeyedMessage<String, String>(Config.TOPIC,
"this number ===>>> " + i));
}
}
}
三、编写消息者类ConsumerDemo
这个类的主要作用就是消费Kafka中wordcount话题的消息。
package com.lya.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 消费者实例
* @author liuyazhuang
*
*/
public class ConsumerDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("zookeeper.connect", "192.168.209.121:2181");
props.put("group.id", "1111");
props.put("auto.offset.reset", "smallest");
props.put("zk.connectiontimeout.ms", "15000");
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Config.TOPIC, Config.THREADS);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC);
for(final KafkaStream<byte[], byte[]> kafkaStream : streams){
new Thread(new Runnable() {
@Override
public void run() {
for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
}
}
}
四、运行实例
首先,运行消费者类ConsumerDemo
运行结果如下:

没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:

打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。
栏 目:JAVA代码
下一篇:Spring实战之缓存使用condition操作示例
本文标题:Kafka简单客户端编程实例
本文地址:http://www.codeinn.net/misctech/69053.html


阅读排行
- 1Java Swing组件BoxLayout布局用法示例
- 2java中-jar 与nohup的对比
- 3Java邮件发送程序(可以同时发给多个地址、可以带附件)
- 4Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type异常
- 5Java中自定义异常详解及实例代码
- 6深入理解Java中的克隆
- 7java读取excel文件的两种方法
- 8解析SpringSecurity+JWT认证流程实现
- 9spring boot里增加表单验证hibernate-validator并在freemarker模板里显示错误信息(推荐)
- 10深入解析java虚拟机




