加入收藏 | 设为首页 | 会员中心 | 我要投稿 源码网 (https://www.900php.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Unix > 正文

从未如此简单:10分钟带你逆袭Kafka!

发布时间:2020-03-23 21:36:39 所属栏目:Unix 来源:站长网
导读:副标题#e# 【51CTO.com原创稿件】Apache Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式发布-订阅消息系统, 使用 Scala 与 Java 语言编写,能够将消息从一个端点传递到另一个端点。 图片来自 Pexels 较之传统的消息中间件(例如 ActiveMQ、RabbitMQ

              System.out.println("提交次数, offsets = " + offsets); 

              System.out.println("exception = " + e); 

            } 

          }); 

        } 

    } 

Spring Boot 使用 Kafka

现在大家的开发过程中,很多都用的是 Spring Boot 的项目,直接启动了,如果还是用原生的 API,就是有点 Low 了啊,那 Kafka 是如何和 Spring Boot 进行联合的呢?

maven 配置:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> 

   <dependency> 

     <groupId>org.apache.kafka</groupId> 

     <artifactId>kafka-clients</artifactId> 

     <version>2.1.1</version> 

   </dependency> 

添加配置文件,在 application.properties 中加入如下配置信息:

Kafka 连接地址:

spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 

生产者:

spring.kafka.producer.acks = 0 

spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer 

spring.kafka.producer.retries = 3 

spring.kafka.producer.batch-size = 4096 

spring.kafka.producer.buffer-memory = 33554432 

spring.kafka.producer.compression-type = gzip 

消费者:

spring.kafka.consumer.group-id = mygroup 

spring.kafka.consumer.auto-commit-interval = 5000 

spring.kafka.consumer.heartbeat-interval = 3000 

spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer 

spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer 

spring.kafka.consumer.auto-offset-reset = earliest 

spring.kafka.consumer.enable-auto-commit = true 

# listenner, 标识消费者监听的个数 

spring.kafka.listener.concurrency = 8 

# topic的名字 

kafka.topic1 = topic1 

生产者:

import lombok.extern.slf4j.Slf4j; 

import org.springframework.beans.factory.annotation.Value; 

import org.springframework.kafka.core.KafkaTemplate; 

 

@Service 

@Slf4j 

public class MyKafkaProducerServiceImpl implements MyKafkaProducerService { 

        @Resource 

    private KafkaTemplate<String, String> kafkaTemplate; 

        // 读取配置文件 

    @Value("${kafka.topic1}") 

    private String topic; 

 

    @Override 

    public void sendKafka() { 

      kafkaTemplate.send(topic, "hell world"); 

    } 

消费者:

@Component 

@Slf4j 

public class MyKafkaConsumer { 

  @KafkaListener(topics = "${kafka.topic1}") 

    public void listen(ConsumerRecord<?, ?> record) { 

        Optional<?> kafkaMessage = Optional.ofNullable(record.value()); 

        if (kafkaMessage.isPresent()) { 

            log.info("----------------- record =" + record); 

            log.info("------------------ message =" + kafkaMessage.get()); 

(编辑:源码网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读