• Reactor 之 手把手教你 Spring Boot 整合 Reactor


    Reactor 是一个完全非阻塞的 JVM 响应式编程基础,有着高效的需求管理(背压的形式)。它直接整合 Java8 的函数式 API,尤其是 CompletableFutureStream,还有 Duration 。提供了可组合的异步化序列 API — Flux (对于 [N] 个元素) and Mono (对于 [0|1] 元素) — 并广泛实现 响应式Stream 规范。

    这次带大家从零开始,使用 Spring Boot 框架建立一个 Reactor 响应式项目。

    1 创建项目

    使用 https://start.spring.io/ 创建项目。添加依赖项:H2、Lombok、Spring Web、JPA、JDBC

    然后导入 Reactor

    <dependency>
        <groupId>io.projectreactorgroupId>
        <artifactId>reactor-coreartifactId>
    dependency>
    <dependency>
        <groupId>io.projectreactorgroupId>
        <artifactId>reactor-testartifactId>
        <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2 集成 H2 数据库

    application.properties 文件中添加 H2 数据连接信息。此外,端口使用 8081(随意,本地未被使用的端口即可)。

    server.port=8081
    ################ H2 数据库 基础配置 ##############
    spring.datasource.driverClassName=org.h2.Driver
    spring.datasource.url=jdbc:h2:~/user
    spring.datasource.username=sa
    spring.datasource.password=
    
    spring.jpa.database=h2
    spring.jpa.hibernate.ddl-auto=update
    spring.h2.console.path=/h2-console
    spring.h2.console.enable=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3 创建测试类

    3.1 user 实体

    建立简单数据操作实体 User。

    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import javax.persistence.*;
    
    /**
     * @Author: prepared
     * @Date: 2022/8/29 21:40
     */
    @Data
    @NoArgsConstructor
    @Table(name = "t_user")
    @Entity
    public class User {
    
    	@Id
    	@GeneratedValue(strategy = GenerationType.AUTO)
    	private Long id;
    
    	private String userName;
    
    	private int age;
    
    	private String sex;
    
    	public User(String userName, int age, String sex) {
    		this.userName = userName;
    		this.age = age;
    		this.sex = sex;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.2 UserRepository

    数据模型层使用 JPA 框架。

    import com.prepared.user.domain.User;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.stereotype.Repository;
    
    /**
     * @Author: prepared
     * @Date: 2022/8/29 21:45
     */
    @Repository
    public interface UserRepository extends JpaRepository<User, Long> {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3.3 UserService

    service 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。所有接口返回 Mono/Flux 对象。

    最佳实践:所有的第三方接口、IO 耗时比较长的操作都可以放在 Mono 对象中。

    doOnError 监控异常情况;

    doFinally 监控整体执行情况,如:耗时、调用量监控等。

    import com.prepared.user.dao.UserRepository;
    import com.prepared.user.domain.User;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Service;
    import reactor.core.publisher.Mono;
    
    import javax.annotation.Resource;
    import java.util.List;
    
    /**
     * @Author: prepared
     * @Date: 2022/8/29 21:45
     */
    @Service
    public class UserService {
    
    	private Logger logger = LoggerFactory.getLogger(UserService.class);
    
    	@Resource
    	private UserRepository userRepository;
    
    	public Mono<Boolean> save(User user) {
    		long startTime = System.currentTimeMillis();
    		return Mono.fromSupplier(() -> {
    					return userRepository.save(user) != null;
    				})
    				.doOnError(e -> {
    					// 打印异常日志&增加监控(自行处理)
    					logger.error("save.user.error, user={}, e", user, e);
    				})
    				.doFinally(e -> {
    					// 耗时 & 整体健康
    					logger.info("save.user.time={}, user={}", user, System.currentTimeMillis() - startTime);
    				});
    	}
    
    	public Mono<User> findById(Long id) {
    		long startTime = System.currentTimeMillis();
    		return Mono.fromSupplier(() -> {
    					return userRepository.getReferenceById(id);
    				}).doOnError(e -> {
    					// 打印异常日志&增加监控(自行处理)
    					logger.error("findById.user.error, id={}, e", id, e);
    				})
    				.doFinally(e -> {
    					// 耗时 & 整体健康
    					logger.info("findById.user.time={}, id={}", id, System.currentTimeMillis() - startTime);
    				});
    	}
    
    	public Mono<List<User>> list() {
    		long startTime = System.currentTimeMillis();
    		return Mono.fromSupplier(() -> {
    					return userRepository.findAll();
    				}).doOnError(e -> {
    					// 打印异常日志&增加监控(自行处理)
    					logger.error("list.user.error, e", e);
    				})
    				.doFinally(e -> {
    					// 耗时 & 整体健康
    					logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
    				});
    	}
      
      public Flux<User> listFlux() {
    		long startTime = System.currentTimeMillis();
    		return Flux.fromIterable(userRepository.findAll())
    				.doOnError(e -> {
    					// 打印异常日志&增加监控(自行处理)
    					logger.error("list.user.error, e", e);
    				})
    				.doFinally(e -> {
    					// 耗时 & 整体健康
    					logger.info("list.user.time={}, ", System.currentTimeMillis() - startTime);
    				});
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    3.4 UserController

    controller 增加两个方法,add 方法,用来添加数据;list 方法,用来查询所有数据。

    list 方法还有另外一种写法,这就涉及到 Mono 和 Flux 的不同了。

    返回List可以使用Mono> ,也可以使用 Flux

    • Mono 是一个特定的 Publisher,最多可以发出一个元素
    • Flux 是一个标准的 Publisher,表示为发出 0 到 N 个元素的异步序列
    import com.prepared.user.domain.User;
    import com.prepared.user.service.UserService;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import reactor.core.publisher.Mono;
    
    import javax.annotation.Resource;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @Author: prepared
     * @Date: 2022/8/29 21:47
     */
    @RestController
    public class UserController {
    
    	@Resource
    	private UserService userService;
    
    	@RequestMapping("/add")
    	public Mono<Boolean> add() {
    		User user = new User("xiaoming", 10, "F");
    		return userService.save(user) ;
    
    	}
    
    	@RequestMapping("/list")
    	public Mono<List<User>> list() {
    		return userService.list();
    	}
    }
    
    	@RequestMapping("/listFlux")
    	public Flux<User> listFlux() {
    		return userService.listFlux();
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3.5 SpringReactorApplication 添加注解支持

    Application 启动类添加注解 @EnableJpaRepositories

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
    
    /**
     * Hello world!
     */
    @SpringBootApplication
    @EnableJpaRepositories
    public class SpringReactorApplication {
    	public static void main(String[] args) {
    		SpringApplication.run(SpringReactorApplication.class, args);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    测试

    启动项目,访问 localhost:8081/add,正常返回 true。

    查询所有数据,访问localhost:8081/list,可以看到插入的数据,已经查询出来了。PS:我这里执行了多次 add,所以有多条记录。

    后台日志:

    2022-09-05 20:13:17.385  INFO 15696 --- [nio-8082-exec-2] com.prepared.user.service.UserService    : list.user.time=181, 
    
    • 1

    执行了 UserService list() 方法的 doFinnally 代码块,打印耗时日志。

    总结

    响应式编程的优势是不会阻塞。那么正常我们的代码中有哪些阻塞的操作呢?

    1. Futureget() 方法;
    2. Reactor 中的 block() 方法,subcribe() 方法,所以在使用 Reactor 的时候,除非编写测试代码,否则不要直接调用以上两个方法;
    3. 同步方法调用,所以高并发情况下,会使用异步调用(如Future)来提升响应速度。

    下一篇,讲解如何将熔断、限流框架 resilience4j 整合到项目中,敬请期待。

  • 相关阅读:
    .NET 8 Release Candidate 1 (RC1)现已发布,包括许多针对ASP.NET Core的重要改进!
    accelerate 的一个tip:early stopping 处可能存在的bug
    Linux使用操作(一)
    2022“杭电杯”中国大学生算法设计超级联赛(7)
    玩转gRPC—深入概念与原理
    微信小程序订阅消息前后端示例
    第一个SpringBoot项目的创建
    【数据结构】线性表(三)循环链表的各种操作(创建、插入、查找、删除、修改、遍历打印、释放内存空间)
    linux: 切换并配置root
    揭秘古代手术工具与技术:从中国起源的医疗奇迹
  • 原文地址:https://blog.csdn.net/Prepared/article/details/126733273