• 成员变量为动态数据时不可轻易使用


    问题描述

    业务验收阶段,遇到了一个由于成员变量导致的线程问题

    有一个kafka切面,用来处理某些功能在调用前后的发送消息,资产类型type是成员变量定义;

    资产1类型推送消息是以zichan1为节点;资产2类型推送消息是以zichan2为节点;

    当多个线程调用切面类时,由于切面类中使用成员变量且为动态数据时,此时会出现根据资产类型推送消息错误;例如资产1在调用功能时,切面类的type字段为zichan1;同时有资产2调用功能时,此时的切面类的type字段为zichan2;导致资产1在调用功能前推送的是zichan1,在调用功能后推送的是zichan2的消息标识。

    原因

    当多个线程同时调用时,成员变量则会只采用最后一次调用的值。

    下面简单描述下情景:

    kafkaAspect类

    1. import com.alibaba.fastjson.JSON;
    2. import com.example.demo.constant.StageCodeConstant;
    3. import com.example.demo.entity.KafkaSendMessageConstant;
    4. import com.example.demo.util.DateUtils;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.apache.commons.lang.ArrayUtils;
    7. import org.aspectj.lang.ProceedingJoinPoint;
    8. import org.aspectj.lang.annotation.Around;
    9. import org.aspectj.lang.annotation.Aspect;
    10. import org.aspectj.lang.reflect.MethodSignature;
    11. import org.slf4j.Logger;
    12. import org.slf4j.LoggerFactory;
    13. import org.springframework.stereotype.Component;
    14. @Component
    15. @Aspect
    16. @Slf4j
    17. public class KafkaProcessAspect {
    18. private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
    19. private String assetType = "";
    20. private String startTime = "";
    21. @Around("@annotation(kafkaProcess)")
    22. public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
    23. Object object = null;
    24. assetType = (String) getParamValue(joinPoint, "assetType");
    25. startTime = DateUtils.getTime();
    26. object = joinPoint.proceed();
    27. //推送消息
    28. String stageCode = StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());
    29. KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
    30. messageConstant.setStageCode(stageCode);
    31. messageConstant.setStartTime(startTime);
    32. messageConstant.setEndTime(DateUtils.getTime());
    33. log.info("资产类型{}推送消息:{}", assetType, JSON.toJSONString(messageConstant));
    34. return object;
    35. }
    36. private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
    37. Object[] params = joinPoint.getArgs();
    38. String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
    39. if (parameterNames == null || parameterNames.length == 0) {
    40. return null;
    41. }
    42. for (String param : parameterNames) {
    43. if (param.equals(paramName)) {
    44. int index = ArrayUtils.indexOf(parameterNames, param);
    45. return params[index];
    46. }
    47. }
    48. return null;
    49. }
    50. }

     由于controller中的请求地址是采用占位符定义,后使用@PathVariable可以获取的类型

    Controller类

    1. import com.example.demo.aop.KafkaProcess;
    2. import org.slf4j.Logger;
    3. import org.slf4j.LoggerFactory;
    4. import org.springframework.web.bind.annotation.PathVariable;
    5. import org.springframework.web.bind.annotation.PostMapping;
    6. import org.springframework.web.bind.annotation.RequestMapping;
    7. import org.springframework.web.bind.annotation.RestController;
    8. @RestController
    9. @RequestMapping("/test")
    10. public class TestController {
    11. private static final Logger logger = LoggerFactory.getLogger(TestController.class);
    12. @PostMapping("/{assetType}/cmpt")
    13. @KafkaProcess(functionName = "JS")
    14. public void cmpt(@PathVariable("assetType") String assetType) {
    15. logger.info("{}接口开始", assetType);
    16. long startTime = System.currentTimeMillis();
    17. for (int i = 0; i < 20000; i++) {
    18. for (int j = 0; j < 20000; j++) {
    19. int m = i * j;
    20. logger.debug("i*j={}",m);
    21. }
    22. }
    23. long endTime = System.currentTimeMillis();
    24. logger.info("{}接口结束,耗时{}", assetType, endTime - startTime);
    25. }
    26. }

     资产类型枚举 AssetTypeEnum

    1. public enum AssetTypeEnum {
    2. ZICHAN_1("zichan1","资产1"),
    3. ZICHAN_2("zichan2","资产2");
    4. // 成员变量
    5. private String code;
    6. private String desc;
    7. public String getCode() {
    8. return code;
    9. }
    10. public void setCode(String code) {
    11. this.code = code;
    12. }
    13. public String getDesc() {
    14. return desc;
    15. }
    16. public void setDesc(String desc) {
    17. this.desc = desc;
    18. }
    19. // 构造方法
    20. AssetTypeEnum(String code,String desc) {
    21. this.code = code;
    22. this.desc = desc;
    23. }
    24. }

    推送消息 KafkaSendMessageConstant

    1. public class KafkaSendMessageConstant {
    2. private String stageCode;
    3. private String startTime;
    4. private String endTime;
    5. public String getStageCode() {
    6. return stageCode;
    7. }
    8. public void setStageCode(String stageCode) {
    9. this.stageCode = stageCode;
    10. }
    11. public String getStartTime() {
    12. return startTime;
    13. }
    14. public void setStartTime(String startTime) {
    15. this.startTime = startTime;
    16. }
    17. public String getEndTime() {
    18. return endTime;
    19. }
    20. public void setEndTime(String endTime) {
    21. this.endTime = endTime;
    22. }
    23. }

    节点常量类 StageCodeConstant ;根据不同资产类型赋值不同功能的推送标识

    1. import java.util.HashMap;
    2. import java.util.Map;
    3. public class StageCodeConstant {
    4. public static final Map> STAGE_CODE = new HashMap<>();
    5. static {
    6. //资产1
    7. STAGE_CODE.put(AssetTypeEnum.ZICHAN_1.getCode(),
    8. new HashMap() {{
    9. put("JS", "ZICHAN1-JS");
    10. }});
    11. //资产2
    12. STAGE_CODE.put(AssetTypeEnum.ZICHAN_2.getCode(),
    13. new HashMap() {{
    14. put("JS", "ZICHAN2-JS");
    15. }});
    16. }
    17. }

    用postman调用资产2接口

    后立即调用资产1接口

    此时出现这种结果:

    会发现,调用资产2的时候发送消息还是资产1的信息;然后资产1发送的消息也是资产1的信息

    解决

    此时有两个解决办法,一个是将doAround()和其他方法合并为一个方法,将成员变量调整为局部变量;另一个则为将该成员变量设置为一个对象,对这个对象进行线程设置,保证doAround()和doBefore()获取的是同一个对象的数据

    解决方法一

    1. import com.alibaba.fastjson.JSON;
    2. import com.example.demo.constant.StageCodeConstant;
    3. import com.example.demo.entity.KafkaSendMessageConstant;
    4. import com.example.demo.util.DateUtils;
    5. import lombok.extern.slf4j.Slf4j;
    6. import org.apache.commons.lang.ArrayUtils;
    7. import org.aspectj.lang.ProceedingJoinPoint;
    8. import org.aspectj.lang.annotation.Around;
    9. import org.aspectj.lang.annotation.Aspect;
    10. import org.aspectj.lang.reflect.MethodSignature;
    11. import org.slf4j.Logger;
    12. import org.slf4j.LoggerFactory;
    13. import org.springframework.stereotype.Component;
    14. @Component
    15. @Aspect
    16. @Slf4j
    17. public class KafkaProcessAspect {
    18. private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
    19. @Around("@annotation(kafkaProcess)")
    20. public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
    21. Object object = null;
    22. String assetType = (String) getParamValue(joinPoint, "assetType");
    23. String startTime = DateUtils.getTime();
    24. object = joinPoint.proceed();
    25. //推送消息
    26. String stageCode = StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());
    27. KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
    28. messageConstant.setStageCode(stageCode);
    29. messageConstant.setStartTime(startTime);
    30. messageConstant.setEndTime(DateUtils.getTime());
    31. log.info("资产类型{}推送消息:{}", assetType, JSON.toJSONString(messageConstant));
    32. return object;
    33. }
    34. private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
    35. Object[] params = joinPoint.getArgs();
    36. String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
    37. if (parameterNames == null || parameterNames.length == 0) {
    38. return null;
    39. }
    40. for (String param : parameterNames) {
    41. if (param.equals(paramName)) {
    42. int index = ArrayUtils.indexOf(parameterNames, param);
    43. return params[index];
    44. }
    45. }
    46. return null;
    47. }
    48. }

    解决方法二

    成员变量KafkaSingleDTO

    1. public class KafkaSingleDTO {
    2. private String assetType="";
    3. private String date="";
    4. public String getAssetType() {
    5. return assetType;
    6. }
    7. public void setAssetType(String assetType) {
    8. this.assetType = assetType;
    9. }
    10. public String getDate() {
    11. return date;
    12. }
    13. public void setDate(String date) {
    14. this.date = date;
    15. }
    16. }

    对象单实例获取 KafkaSingleUtil

    1. import com.example.demo.entity.KafkaSingleDTO;
    2. public class KafkaSingleUtil {
    3. private static ThreadLocal START = new ThreadLocal<>();
    4. public static KafkaSingleDTO getObject() {
    5. KafkaSingleDTO singleDTO = START.get();
    6. if (singleDTO == null) {
    7. singleDTO = new KafkaSingleDTO();
    8. }
    9. return singleDTO;
    10. }
    11. }

    kafkaAsspect拦截器

    1. import com.alibaba.fastjson.JSON;
    2. import com.example.demo.constant.StageCodeConstant;
    3. import com.example.demo.entity.KafkaSendMessageConstant;
    4. import com.example.demo.entity.KafkaSingleDTO;
    5. import com.example.demo.util.DateUtils;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.apache.commons.lang.ArrayUtils;
    8. import org.aspectj.lang.ProceedingJoinPoint;
    9. import org.aspectj.lang.annotation.Around;
    10. import org.aspectj.lang.annotation.Aspect;
    11. import org.aspectj.lang.annotation.Before;
    12. import org.aspectj.lang.reflect.MethodSignature;
    13. import org.slf4j.Logger;
    14. import org.slf4j.LoggerFactory;
    15. import org.springframework.stereotype.Component;
    16. @Component
    17. @Aspect
    18. @Slf4j
    19. public class KafkaProcessAspect {
    20. private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
    21. private static String assetType="";
    22. private static String startTime="";
    23. @Around("@annotation(kafkaProcess)")
    24. public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
    25. Object object = null;
    26. assetType = (String) getParamValue(joinPoint, "assetType");
    27. startTime = DateUtils.getTime();
    28. KafkaSingleDTO singleDTO = KafkaSingleUtil.getObject();
    29. singleDTO.setAssetType(assetType);
    30. singleDTO.setDate(startTime);
    31. object = joinPoint.proceed();
    32. //推送消息--方法调用后
    33. String stageCode = StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());
    34. KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
    35. messageConstant.setStageCode(stageCode);
    36. messageConstant.setStartTime(singleDTO.getDate());
    37. messageConstant.setEndTime(DateUtils.getTime());
    38. log.info("资产类型{}方法后推送消息:{}", singleDTO.getAssetType(), JSON.toJSONString(messageConstant));
    39. return object;
    40. }
    41. @Before("@annotation(kafkaProcess)")
    42. public void doBefore(KafkaProcess kafkaProcess){
    43. //推送消息--方法调用前
    44. KafkaSingleDTO singleDTO = KafkaSingleUtil.getObject();
    45. singleDTO.setAssetType(assetType);
    46. String stageCode = StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());
    47. KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
    48. messageConstant.setStageCode(stageCode);
    49. //...消息实体类 可自补充
    50. log.info("资产类型{}方法前推送消息:{}", singleDTO.getAssetType(), JSON.toJSONString(messageConstant));
    51. }
    52. private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
    53. Object[] params = joinPoint.getArgs();
    54. String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
    55. if (parameterNames == null || parameterNames.length == 0) {
    56. return null;
    57. }
    58. for (String param : parameterNames) {
    59. if (param.equals(paramName)) {
    60. int index = ArrayUtils.indexOf(parameterNames, param);
    61. return params[index];
    62. }
    63. }
    64. return null;
    65. }
    66. }

    最终结果:

    到此结束!

  • 相关阅读:
    嵌入式分享合集83
    软件测试/测试开发丨明确的编码规范,避免冗余和混乱
    Tomcat最大并发数及在线用户数
    ASP.NET Core Web API 幂等性
    第九章 字符串处理函数
    Shiro框架 02(之认证)
    C++常见面试题汇总
    java计算机毕业设计西安市城市绿地管理系统源码+系统+数据库+lw文档
    supervisor进程管理 ——k8s从入门到高并发系列教程(十一)
    java面向对象-----再谈方法
  • 原文地址:https://blog.csdn.net/qq_37342720/article/details/133936538