• springboot -sse -flux 服务器推送消息


    先说BUG处理,遇到提示异步问题 Async support must be enabled on a servlet and for all filters involved in async request processing. This is done in Java code using the Servlet API or by adding "true" to servlet and filter declarations in web.xml.

    springboot在@WebFilter注解处,加入urlPatterns = { "/*" },asyncSupported = true

    springmvc在web.xml处理

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://java.sun.com/xml/ns/javaee"
    3.          xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
    4.          version="3.0">
    5. <filter-mapping>
    6.   <filter-name>shiroFilter</filter-name>
    7.   <url-pattern>/*</url-pattern>
    8.   <dispatcher>REQUEST</dispatcher>
    9.   <dispatcher>ASYNC</dispatcher>
    10. </filter-mapping>
    • demo1,服务器间隔一定时间推送内容
    1.     接口方法
    1. @GetMapping(path = "/sse/{userId}",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    2. public Flux> sse(@PathVariable String userId) {
    3. // 每两秒推送一次
    4. return Flux.interval(Duration.ofSeconds(2)).map(seq->
    5. Tuples.of(seq, LocalDateTime.now())).log()//序号和时间
    6. .map(data-> ServerSentEvent.builder().id(userId).data(data.getT1().toString()).build());//推送内容
    7. }

    2.前端代码

    1. <!DOCTYPE html>
    2. <html xmlns:th="http://www.thymeleaf.org">
    3. <head>
    4. <meta charset="UTF-8"/>
    5. <title>服务器推送事件</title>
    6. </head>
    7. <body>
    8. <div>
    9. <div id="data"></div>
    10. <div id="result"></div><br/>
    11. </div>
    12. <script th:inline="javascript" >
    13. //服务器推送事件
    14. if (typeof (EventSource) !== "undefined") {
    15. var source1 = new EventSource("http://localhost:9000/api/admin/test/sse/1");
    16. //当抓取到消息时
    17. source1.onmessage = function (evt) {
    18. document.getElementById("data").innerHTML = document.getElementById("data").innerHTML+"股票行情:" + evt.data;
    19. };
    20. } else {
    21. //注意:ie浏览器不支持
    22. document.getElementById("result").innerHTML = "抱歉,你的浏览器不支持 server-sent 事件...";
    23. var xhr;
    24. var xhr2;
    25. if (window.XMLHttpRequest){
    26. //IE7+, Firefox, Chrome, Opera, Safari浏览器支持该方法
    27. xhr=new XMLHttpRequest();
    28. xhr2=new XMLHttpRequest();
    29. }else{
    30. //IE6, IE5 浏览器不支持,使用ActiveXObject方法代替
    31. xhr=new ActiveXObject("Microsoft.XMLHTTP");
    32. xhr2=new ActiveXObject("Microsoft.XMLHTTP");
    33. }
    34. console.log(xhr);
    35. console.log(xhr2);
    36. xhr.open('GET', '/sse/countDown');
    37. xhr.send(null);//发送请求
    38. xhr.onreadystatechange = function() {
    39. console.log("s响应状态:" + xhr.readyState);
    40. //2是空响应,3是响应一部分,4是响应完成
    41. if (xhr.readyState > 2) {
    42. //这儿可以使用response(对应json)与responseText(对应text)
    43. var newData = xhr.response.substr(xhr.seenBytes);
    44. newData = newData.replace(/\n/g, "#");
    45. newData = newData.substring(0, newData.length - 1);
    46. var data = newData.split("#");
    47. console.log("获取到的数据:" + data);
    48. document.getElementById("result").innerHTML = data;
    49. //长度重新赋值,下次截取时需要使用
    50. xhr.seenBytes = xhr.response.length;
    51. }
    52. }
    53. xhr2.open('GET', '/sse/retrieve');
    54. xhr2.send(null);//发送请求
    55. xhr2.onreadystatechange = function() {
    56. console.log("s响应状态:" + xhr2.readyState);
    57. //0: 请求未初始化,2 请求已接收,3 请求处理中,4 请求已完成,且响应已就绪
    58. if (xhr2.readyState > 2) {
    59. //这儿可以使用response(对应json)与responseText(对应text)
    60. var newData1 = xhr2.response.substr(xhr2.seenBytes);
    61. newData1 = newData1.replace(/\n/g, "#");
    62. newData1 = newData1.substring(0, newData1.length - 1);
    63. var data1 = newData1.split("#");
    64. console.log("获取到的数据:" + data1);
    65. document.getElementById("data").innerHTML = data1;
    66. //长度重新赋值,下次截取时需要使用
    67. xhr2.seenBytes = xhr2.response.length;
    68. }
    69. }
    70. }
    71. </script>
    72. </body>
    73. </html>
    • demo2 订阅服务器消息,服务器send推送消息完成后,关闭sse.close

    1.接口方法以及工具类

    1. @GetMapping(path = "/sse/sub",produces = MediaType.TEXT_EVENT_STREAM_VALUE )
    2. public SseEmitter subscribe(@RequestParam String questionId,HttpServletResponse response) {
    3. // 简单异步发消息 ====
    4. //questionId 订阅id,id对应了sse对象
    5. new Thread(() -> {
    6. try {
    7. Thread.sleep(1000);
    8. for (int i = 0; i < 10; i++) {
    9. Thread.sleep(500);
    10. SSEUtils.pubMsg(questionId, questionId + " - kingtao come " + i);
    11. }
    12. } catch (Exception e) {
    13. e.printStackTrace();
    14. } finally {
    15. // 消息发送完关闭订阅
    16. SSEUtils.closeSub(questionId);
    17. }
    18. }).start();
    19. // =================
    20. return SSEUtils.addSub(questionId);
    21. }

    工具类

    1. import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    2. import java.util.Map;
    3. import java.util.concurrent.ConcurrentHashMap;
    4. public class SSEUtils {
    5. // timeout
    6. private static Long DEFAULT_TIME_OUT = 2*60*1000L;
    7. // 订阅表
    8. private static Map<String, SseEmitter> subscribeMap = new ConcurrentHashMap<>();
    9. /** 添加订阅 */
    10. public static SseEmitter addSub(String questionId) {
    11. if (null == questionId || "".equals(questionId)) {
    12. return null;
    13. }
    14. SseEmitter emitter = subscribeMap.get(questionId);
    15. if (null == emitter) {
    16. emitter = new SseEmitter(DEFAULT_TIME_OUT);
    17. subscribeMap.put(questionId, emitter);
    18. }
    19. return emitter;
    20. }
    21. /** 发消息 */
    22. public static void pubMsg(String questionId, String msg) {
    23. SseEmitter emitter = subscribeMap.get(questionId);
    24. if (null != emitter) {
    25. try {
    26. // 更规范的消息结构看源码
    27. emitter.send(SseEmitter.event().data(msg));
    28. } catch (Exception e) {
    29. // e.printStackTrace();
    30. }
    31. }
    32. }
    33. /**
    34. * 关闭订阅
    35. * @param questionId
    36. */
    37. public static void closeSub(String questionId) {
    38. SseEmitter emitter = subscribeMap.get(questionId);
    39. if (null != emitter) {
    40. try {
    41. emitter.complete();
    42. subscribeMap.remove(questionId);
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. }
    48. }

    2.前端代码

    1. <!DOCTYPE html>
    2. <html lang="en">
    3. <head>
    4. <meta charset="UTF-8">
    5. <title>sse</title>
    6. </head>
    7. <body>
    8. <div>
    9. <label>问题id</label>
    10. <input type="text" id="questionId">
    11. <button onclick="subscribe()">订阅</button>
    12. <hr>
    13. <label>F12-console控制台查看消息</label>
    14. </div>
    15. <script>
    16. function subscribe() {
    17. let questionId = document.getElementById('questionId').value;
    18. let url = 'http://localhost:9000/api/admin/test/sse/sub?questionId=' + questionId;
    19. let eventSource = new EventSource(url);
    20. eventSource.onmessage = function (e) {
    21. console.log(e.data);
    22. };
    23. eventSource.onopen = function (e) {
    24. console.log(e,1);
    25. // todo
    26. };
    27. eventSource.onerror = function (e) {
    28. // todo
    29. console.log(e,2);
    30. eventSource.close()
    31. };
    32. }
    33. </script>
    34. </body>
    35. </html>

  • 相关阅读:
    Unity3D学习笔记12——渲染纹理
    LeetCode8-字符串转换整数(atoi)
    突破编程_C++_面试(内存管理)
    华为云云耀云服务器L实例评测 | 华为云云耀云服务器L实例使用教学
    ALSA pcm接口的概念解释
    网络延迟及故障分析与排查实战
    JavaScript游戏引擎列表
    git学习笔记——git pull篇
    中国高端水果元宇宙
    【环境搭建】linux docker-compose安装rocketmq
  • 原文地址:https://blog.csdn.net/qq_32784303/article/details/134511496