加入收藏 | 设为首页 | 会员中心 | 我要投稿 源码网 (https://www.900php.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

从未如此简单:10分钟带你逆袭Kafka!

发布时间:2020-03-23 21:36:39 所属栏目:Unix 来源:站长网
导读:副标题#e# 【51CTO.com原创稿件】Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式发布-订阅消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。 图片来自 Pexels 较之传统的消息中间件(例如 ActiveMQ、RabbitMQ

        properties.put("bootstrap.servers", "192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094"); 

        properties.put("group.id", "mygroup"); 

      // 这里要修改成手动提交 

        properties.put("enable.auto.commit", "false"); 

        // properties.put("auto.commit.interval.ms", "1000"); 

        properties.put("session.timeout.ms", "30000"); 

        properties.put("heartbeat.interval.ms", "10000"); 

        properties.put("auto.offset.reset", "earliest"); 

        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); 

        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 

        this.consumer = new KafkaConsumer<Integer, String>(properties); 

    } 

 

    @Override 

    public void doWork() { 

        consumer.subscribe(Arrays.asList("test2")); 

        ConsumerRecords<Integer, String>records = consumer.poll(1000); 

        for (ConsumerRecord record : records) { 

            System.out.println("topic = " + record.topic()); 

            System.out.println("partition = " + record.partition()); 

            System.out.println("key = " + record.key()); 

            System.out.println("value = " + record.value()); 

 

          //手动同步提交 

          // consumer.commitSync(); 

          //手动异步提交 

          // consumer.commitAsync(); 

          // 带回调公共的手动异步提交 

          consumer.commitAsync((offsets, e) -> { 

            if(e != null) { 

(编辑:源码网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读