时间:2023-03-13 12:12:23 | 栏目:JAVA代码 | 点击:次
首先kafka监听不得到数据,检查如下
如果出现监听不到数据的问题,那么就试试更改方法一二,如果不可以在去试试方法三,之前出现这个问题也是查过 一般查到都会说 “低版本的服务器接收不到高版本的生产者发送的消息”,但是净由测试使用 用1.0.5RELEASE 和 2.6.3反复测试,并没有任何的问题。
如果按照版本一致,那么根本就不现实,因为可能不同的项目,springboot版本不一致的话,可能有的springboot版本低,那么你还得要求自己维护项目版本升级?如果出现第四种情况就无话可说了。
重复数据的发送问题如下
目前我是使用的Redis进行的排重法,用的是Redis中的set,保证里面不存在重复,保证Redis里面不会存入太多的脏数据。并定期清理
粘贴一下我的排重(Redis排重法)
//kafka prefix
String cache = "kafka_cache";
//kafka suffix
Calendar c = Calendar.getInstance();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//0点,目前是为了设置为这一天的固定时间。这个完全可以去写个工具类自己弄,为了看的更清楚,麻烦了一点的写入
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
String gtimeStart = sdf2.format(c.getTime());
long time = sdf.parse(gtimeStart).getTime();
//此位置为了设置是否是新的一天,新的一天需要设置定时时间,保证redis中不会存储太多无用数据
Boolean flag = false;
//数据接收
Set<String> range = new HashSet<>();
//判断是否存在
if (redisTemplate.hasKey(cache + time)) {
//存在则取出这个set
range = redisTemplate.opsForSet().members(cache + time);
}else {
//不存在,则为下面过期时间的设置铺垫
flag = true;
}
//判断监听到的数据是否是重复
if (range.contains("测试需要")) {
//重复则排出,根据逻辑自己修改
continue;
} else {
//添加进去
redisTemplate.opsForSet().add(cache + time, i+"");
if (flag){
//设置为24小时,保证新一天使用,之前使用的存储会消失掉
redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
//不会在进入这个里面,如果多次的存入过期时间,那么这个key的过期时间就永远是24小时,一直就不会过期
flag = false;
}
}
原因是因为在不同groupId之下,kafka接收到以后,会给监听他的每一个组发送一个他所收到的消息,但是两个消费端监听同一个租,那么就只有一个消费端可以消费到。
# 指定kafka 代理地址,可以多个,用逗号间隔 spring.kafka.bootstrap-servers= localhost:9092 # 指定默认消费者group id spring.kafka.consumer.group-id= test # 是否自动提交 spring.kafka.consumer.enable-auto-commit= true # 提交间隔的毫秒 spring.kafka.consumer.auto-commit-interval.ms=60000 # 最大轮询的次数 spring.kafka.consumer.max-poll-records=1 # 将偏移量重置为最新偏移量 spring.kafka.consumer.auto-offset-reset=earliest # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer