第一步,安装kafka,我上一篇文章写了docker安装的详细步骤:
https://blog.csdn.net/wuyongde_0922/article/details/126192435
建立项目,直接采用idea的initializr方式:
引入pom文件:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0modelVersion>
<parent>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-parentartifactId>
<version>2.3.4.RELEASEversion>
<relativePath/>
parent>
<groupId>com.wydgroupId>
<artifactId>springboot-kafkaartifactId>
<version>1.0.1version>
<name>springboot-kafkaname>
<properties>
<java.version>1.8java.version>
properties>
<dependencies>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
dependency>
<dependency>
<groupId>com.google.code.gsongroupId>
<artifactId>gsonartifactId>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.18.22version>
<scope>providedscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-maven-pluginartifactId>
plugin>
plugins>
build>
project>
引入application.yml文件:
server:
port: 8005
servlet:
context-path: /kafka
spring:
kafka:
bootstrap-servers: 47.106.177.244:9092 #bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
consumer:
group-id: consumer-group-wyd
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: true
auto-commit-interval: 100ms
producer:
acks: 1
retries: 3 #若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
kafka:
topic:
order: order
建立 初始化bean类:
package com.wyd.config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.wyd.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Arrays;
import java.util.Date;
@Component
@Slf4j
public class KafkaConfig {
@Value("${kafka.topic.order}")
private String orderTopic;
/**
* 创建topic
*/
@Bean
public boolean createOrderTopic(){
log.info("kafka创建topic:{}",orderTopic);
NewTopic topic = new NewTopic(orderTopic,2,(short)1);
return true;
}
}
生产者发送消息工具类:
package com.wyd.config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.wyd.entity.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Date;
@Component
@Slf4j
public class KafkaSender {
private final KafkaTemplate<String, String> kafkaTemplate;
//构造器方式注入 kafkaTemplate
public KafkaSender(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
private Gson gson = new GsonBuilder().create();
public void send(String topicName,String msg) {
try{
//生产消息
Message message = new Message();
message.setId(System.currentTimeMillis());
message.setMsg(msg);
message.setSendTime(new Date());
log.info("发送消息>>>message :{}", gson.toJson(message));
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topicName, gson.toJson(message));
listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("发送成功回调:{}",gson.toJson(result.getProducerRecord().value()));
}
@Override
public void onFailure(Throwable ex) {
log.info("发送失败回调");
}
});
}catch (Exception e){
log.info("发送异常");
e.printStackTrace();
}
}
}
实体类message:
package com.wyd.entity;
import lombok.Data;
import java.util.Date;
/**
* @author johnny
* @create 2020-09-23 上午9:21
**/
@Data
public class Message {
private Long id;
private String msg;
private Date sendTime;
}
消费者类 :
package com.wyd.hander;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = {"${kafka.topic.order}"},groupId="${spring.kafka.consumer.group-id}")
public void receiveOrder(ConsumerRecord<String, String> record) {
String topic = record.topic();
String msg = record.value();
log.info("消费者1接受消息:topic-->"+topic+",msg->>"+msg);
}
}
测试类:
package com.wyd.controller;
import com.wyd.config.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class TestController {
@Autowired
private KafkaSender kafkaSender;
@GetMapping("sendMessage/{msg}")
public String sendMessage(@PathVariable("msg") String msg){
kafkaSender.send("order",msg);
return "kafka消息已发送";
}
}
启动类:
package com.wyd;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootKafkaApplication.class, args);
}
}
访问地址:
http://localhost:8005/kafka/sendMessage/haha
查看控制台日志:
发送、结束正常。
项目地址:
https://gitee.com/wuyongde/springboot-kafka.git