• 14.4、SpringWebFlux-1


    1、前置知识

    SpringMVCSpringBootMavenJava8 新特性

    2、基本介绍

    官方文档

    Web on Reactive Stack (spring.io)


    • Spring5 添加新的模块,用于 web 开发的,功能 SpringMVC 类似的,WebFlux 使用当前一种比较流程响应的编程出现的框架

    • 使用传统 web 框架,比如 SpringMVC,这些基于 Servlet 容器, WebFlux 是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持。核心是基于 Reactor 的相关 API 实现的

    • 什么是异步非阻塞

      • 异步和同步
        • 针对调用者,调用者发请求,
        • 若等着对方回应之后才去做其他事情就是同步,
        • 若发送请求之后才不等着对方回应就去做其他事情就是异步
      • 非阻塞与阻塞
        • 针对被调用者
        • 被调用者收到请求之后,做完了请求任务之后才给出反馈就是阻塞(阻塞了调用者,等待被调用者处理完成)
        • 收到请求之后马上给出反馈然后再去做事情就是非阻塞
      • 例如:
        • 面试完在家等着面试结构就是同步,面试完之后去另一家公司面试就是异步。
        • 面试完之后让回家等候通知就是阻塞,面试完之后直接通知你结果就是非阻塞
    • 特点:

      • 非阻塞式:在有限的资源下,提高系统的吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
      • 函数式编程:Spring5 框架基于 java8WebFlux 使用 Java8 函数式编程方式实现路由请求

    比较 SpringMVC

    image-20221117204114765

    • 第一:两个框架都可以使用注解方式,都运行在 Tomcat 容器中
    • 第二:SpringMVC 采用命令式编程,WebFlux 采用异步响应式

    3、响应式编程

    3.1、什么是响应式编程

    [响应式编程](响应式编程_百度百科 (baidu.com))是一种面向数据流和变化传播的编程范式。

    • 这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

    • 电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

    image-20221117210049351


    3.2、Java8 及其之前版本

    • 提供的观察者模式的两个类 ObserverObservable
    import java.util.Observable;
    public class ObserverDemo extends Observable {
        public static void main(String[] args) {
            ObserverDemo data = new ObserverDemo();
            //添加观察者
            data.addObserver((o, a) -> {
                System.out.println("发生了变化:" + a);
            });
            //添加观察者
            data.addObserver((o, a) -> {
                System.out.println("收到被观察者通知,准备改变:" + a);
            });
            data.setChanged();//数据变化
            data.notifyObservers("hello");//通知
            data.clearChanged();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    测试结果

    image-20221117212919240

    3.3、Java9 及其之后

    发布订阅

    image-20221117215259805

    Publisher

    Publisher函数式接口,负责发布数据。Publisher中函数subscribe负责绑定Subscriber

        //Publisher是函数式接口,负责发布数据。
        //发布者
        @FunctionalInterface
        public static interface Publisher<T> {
    
            //Publisher中函数subscribe负责绑定Subscriber
            public void subscribe(Subscriber<? super T> subscriber);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Subscriber

    Subscriber是静态内部接口,负责订阅消费数据。Subscriber中定义四个方法,分别是onSubscribeonNextonErroronComplete,这四个方法会在相应情况下触发

    • onSubscribe;订阅成功后触发,并且表明可以开始接收订阅数据了
    • onNext:获取接受数据的下一项
    • onError:在发布者或订阅遇到错误时触发
    • onComplete:接受完所有的数据后触发
       //订阅者
        public static interface Subscriber<T> {
            public void onSubscribe(Subscription subscription);
    
            public void onNext(T item);
    
           
            public void onError(Throwable throwable);
    
           
            public void onComplete();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Subscription

    Subscription关联PublisherSubscriber,表示PublisherSubscriber的订阅关系。Subscription使用requestcancel方法管理PublisherSubscriber的消费

    • request:请求获取数据的次数
    • cancel:取消订阅,订阅者不在接受数据
    	public static interface Subscription {
            
            public void request(long n);
    
           
            public void cancel();
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Processor

    Processor继承了PublisherSubscriber接口,表示既是生产者,又是订阅者的特殊对象。Processor一般作为数据的中转处理站, 将数据处理之后发给下个订阅者

    public final class Flow {
    
        private Flow() {} // 不可实例化
    
    
    
    
    
    
        
        public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
        }
    
        static final int DEFAULT_BUFFER_SIZE = 256;
    
        public static int defaultBufferSize() {
            return DEFAULT_BUFFER_SIZE;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.4、Reactor实现

    • 响应式编程操作中,Reactor 满足 Reactive 规范
    • Reactor 有两个核心类,MonoFlux,这两个类都实现接口 Publisher,提供了丰富操作符。
      • Flux 对象实现发布者,返回 N 个元素;
      • Mono 实现发布者,返回 0 0 0 或者 1 1 1 个元素
    • FluxMono 都是数据流的发布者,使用 FluxMono 都可以发出三种数据信号
      • 元素值、错误信号、完成信号
      • 错误信号和完成信号都代表终止信号。
        • 终止信号用于告诉订阅者数据流结束了
        • 错误信号终止数据流,同时把错误信息传递给订阅者

    image-20221117222927105


    引入响相应的依赖

    <dependency>
        <groupId>io.projectreactorgroupId>
        <artifactId>reactor-coreartifactId>
        <version>3.1.5.RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    编写代码

    public class TestReactor {
        public static void main(String[] args) {
            //just方法直接声明(就是发布者发布了一些数据)
            Flux.just(1, 2, 3, 4);
            Mono.just(1);
    
            //其他的方法
            Integer[] arr = new Integer[]{1, 2, 3, 4};
            Flux.fromArray(arr);
    
            //声明集合
            List<Integer> list = Arrays.asList(arr);
            Flux.fromIterable(list);
    
            //声明流
            Flux.fromStream(list.stream());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    没有订阅者,是不会输出的。

    • 调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流。

    1.声明订阅者

    • public class TestReactor {
          public static void main(String[] args) {
              //just方法直接声明(就是发布者发布了一些数据),接收者进行消费
              Flux.just(1, 2, 3, 4).subscribe(System.out::println);
              Mono.just(1).subscribe(System.out::println);
      
      • 1
      • 2
      • 3
      • 4
      • 5

    2.输出结果

    • image-20221117224706133

    3.5、三种信号特点

    • 错误信号和完成信号都是终止信号,不能共存的
    • 若没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
    • 若没有错误信号,没有完成信号,表示是无限数据流

    例如:

    public class TestReactor {
        public static void main(String[] args) {
            //错误信号
            Flux.error(new RuntimeException());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.6、操作符

    对数据流进行一道道操作,称为操作符,不如工厂的流水线

    就和 stream 流一样

    第一 map

    • 元素映射为新元素

    image-20221117230433825

    Stream<Stream<Object>> streamStream = new ArrayList<>().stream().map(Stream::of);
    
    • 1

    第二 flatMap

    • 元素映射为流

    • 把每个元素先变成一个流,然后把所有流连接成一个流。

    • 例如:stream 中的 flatmap

      Integer[] arr = new Integer[]{1, 2, 3, 4};
      //声明集合
      List<Integer> list = Arrays.asList(arr);
      Stream<Integer> integerStream = list.stream().flatMap((map) -> {
          return Stream.of(map + 1);
      });
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
  • 相关阅读:
    JUC中的锁、信号量、并发集合
    程序环境和预处理
    Klocwork 2023.2 windows
    SMB:使用 Ansible 自动化配置 samba 客户端服务端
    C语言数组和指针笔试题(五)(一定要看)
    029-从零搭建微服务-消息队列(一)
    CF1477D Nezzar and Hidden Permutations 题解
    Microsoft SQL Server manual
    2023-09-07力扣每日一题
    2024牛客寒假算法基础集训营6
  • 原文地址:https://blog.csdn.net/qq_67720621/article/details/127914182