业务验收阶段,遇到了一个由于成员变量导致的线程问题
有一个kafka切面,用来处理某些功能在调用前后的发送消息,资产类型type是成员变量定义;
资产1类型推送消息是以zichan1为节点;资产2类型推送消息是以zichan2为节点;
当多个线程调用切面类时,由于切面类中使用成员变量且为动态数据时,此时会出现根据资产类型推送消息错误;例如资产1在调用功能时,切面类的type字段为zichan1;同时有资产2调用功能时,此时的切面类的type字段为zichan2;导致资产1在调用功能前推送的是zichan1,在调用功能后推送的是zichan2的消息标识。
当多个线程同时调用时,成员变量则会只采用最后一次调用的值。
下面简单描述下情景:
kafkaAspect类
- import com.alibaba.fastjson.JSON;
- import com.example.demo.constant.StageCodeConstant;
- import com.example.demo.entity.KafkaSendMessageConstant;
- import com.example.demo.util.DateUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.ArrayUtils;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.reflect.MethodSignature;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- @Aspect
- @Slf4j
- public class KafkaProcessAspect {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
-
- private String assetType = "";
- private String startTime = "";
-
- @Around("@annotation(kafkaProcess)")
- public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
- Object object = null;
- assetType = (String) getParamValue(joinPoint, "assetType");
- startTime = DateUtils.getTime();
- object = joinPoint.proceed();
- //推送消息
- String stageCode = StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());
- KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
- messageConstant.setStageCode(stageCode);
- messageConstant.setStartTime(startTime);
- messageConstant.setEndTime(DateUtils.getTime());
- log.info("资产类型{}推送消息:{}", assetType, JSON.toJSONString(messageConstant));
- return object;
- }
-
- private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
- Object[] params = joinPoint.getArgs();
- String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
-
- if (parameterNames == null || parameterNames.length == 0) {
- return null;
- }
- for (String param : parameterNames) {
- if (param.equals(paramName)) {
- int index = ArrayUtils.indexOf(parameterNames, param);
- return params[index];
- }
- }
- return null;
- }
-
- }
由于controller中的请求地址是采用占位符定义,后使用@PathVariable可以获取的类型
Controller类
- import com.example.demo.aop.KafkaProcess;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/test")
- public class TestController {
-
- private static final Logger logger = LoggerFactory.getLogger(TestController.class);
-
- @PostMapping("/{assetType}/cmpt")
- @KafkaProcess(functionName = "JS")
- public void cmpt(@PathVariable("assetType") String assetType) {
- logger.info("{}接口开始", assetType);
- long startTime = System.currentTimeMillis();
-
- for (int i = 0; i < 20000; i++) {
- for (int j = 0; j < 20000; j++) {
- int m = i * j;
- logger.debug("i*j={}",m);
- }
- }
-
- long endTime = System.currentTimeMillis();
- logger.info("{}接口结束,耗时{}", assetType, endTime - startTime);
- }
-
- }
资产类型枚举 AssetTypeEnum
- public enum AssetTypeEnum {
-
- ZICHAN_1("zichan1","资产1"),
- ZICHAN_2("zichan2","资产2");
-
- // 成员变量
- private String code;
-
- private String desc;
-
- public String getCode() {
- return code;
- }
-
- public void setCode(String code) {
- this.code = code;
- }
-
- public String getDesc() {
- return desc;
- }
-
- public void setDesc(String desc) {
- this.desc = desc;
- }
-
- // 构造方法
- AssetTypeEnum(String code,String desc) {
- this.code = code;
- this.desc = desc;
- }
-
- }
推送消息 KafkaSendMessageConstant
- public class KafkaSendMessageConstant {
-
- private String stageCode;
- private String startTime;
- private String endTime;
-
- public String getStageCode() {
- return stageCode;
- }
-
- public void setStageCode(String stageCode) {
- this.stageCode = stageCode;
- }
-
- public String getStartTime() {
- return startTime;
- }
-
- public void setStartTime(String startTime) {
- this.startTime = startTime;
- }
-
- public String getEndTime() {
- return endTime;
- }
-
- public void setEndTime(String endTime) {
- this.endTime = endTime;
- }
- }
节点常量类 StageCodeConstant ;根据不同资产类型赋值不同功能的推送标识
- import java.util.HashMap;
- import java.util.Map;
-
- public class StageCodeConstant {
-
- public static final Map
> STAGE_CODE = new HashMap<>(); -
- static {
- //资产1
- STAGE_CODE.put(AssetTypeEnum.ZICHAN_1.getCode(),
- new HashMap
() {{ - put("JS", "ZICHAN1-JS");
- }});
- //资产2
- STAGE_CODE.put(AssetTypeEnum.ZICHAN_2.getCode(),
- new HashMap
() {{ - put("JS", "ZICHAN2-JS");
- }});
- }
- }
用postman调用资产2接口

后立即调用资产1接口

此时出现这种结果:

会发现,调用资产2的时候发送消息还是资产1的信息;然后资产1发送的消息也是资产1的信息
此时有两个解决办法,一个是将doAround()和其他方法合并为一个方法,将成员变量调整为局部变量;另一个则为将该成员变量设置为一个对象,对这个对象进行线程设置,保证doAround()和doBefore()获取的是同一个对象的数据
- import com.alibaba.fastjson.JSON;
- import com.example.demo.constant.StageCodeConstant;
- import com.example.demo.entity.KafkaSendMessageConstant;
- import com.example.demo.util.DateUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.ArrayUtils;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.reflect.MethodSignature;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- @Aspect
- @Slf4j
- public class KafkaProcessAspect {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
-
- @Around("@annotation(kafkaProcess)")
- public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
- Object object = null;
- String assetType = (String) getParamValue(joinPoint, "assetType");
- String startTime = DateUtils.getTime();
- object = joinPoint.proceed();
- //推送消息
- String stageCode = StageCodeConstant.STAGE_CODE.get(assetType).get(kafkaProcess.functionName());
- KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
- messageConstant.setStageCode(stageCode);
- messageConstant.setStartTime(startTime);
- messageConstant.setEndTime(DateUtils.getTime());
- log.info("资产类型{}推送消息:{}", assetType, JSON.toJSONString(messageConstant));
- return object;
- }
-
- private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
- Object[] params = joinPoint.getArgs();
- String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
-
- if (parameterNames == null || parameterNames.length == 0) {
- return null;
- }
- for (String param : parameterNames) {
- if (param.equals(paramName)) {
- int index = ArrayUtils.indexOf(parameterNames, param);
- return params[index];
- }
- }
- return null;
- }
-
- }
成员变量KafkaSingleDTO
- public class KafkaSingleDTO {
-
- private String assetType="";
- private String date="";
-
- public String getAssetType() {
- return assetType;
- }
-
- public void setAssetType(String assetType) {
- this.assetType = assetType;
- }
-
- public String getDate() {
- return date;
- }
-
- public void setDate(String date) {
- this.date = date;
- }
- }
对象单实例获取 KafkaSingleUtil
- import com.example.demo.entity.KafkaSingleDTO;
-
- public class KafkaSingleUtil {
-
- private static ThreadLocal
START = new ThreadLocal<>(); -
- public static KafkaSingleDTO getObject() {
- KafkaSingleDTO singleDTO = START.get();
- if (singleDTO == null) {
- singleDTO = new KafkaSingleDTO();
- }
- return singleDTO;
- }
-
- }
kafkaAsspect拦截器
- import com.alibaba.fastjson.JSON;
- import com.example.demo.constant.StageCodeConstant;
- import com.example.demo.entity.KafkaSendMessageConstant;
- import com.example.demo.entity.KafkaSingleDTO;
- import com.example.demo.util.DateUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.ArrayUtils;
- import org.aspectj.lang.ProceedingJoinPoint;
- import org.aspectj.lang.annotation.Around;
- import org.aspectj.lang.annotation.Aspect;
- import org.aspectj.lang.annotation.Before;
- import org.aspectj.lang.reflect.MethodSignature;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- @Component
- @Aspect
- @Slf4j
- public class KafkaProcessAspect {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaProcessAspect.class);
-
- private static String assetType="";
- private static String startTime="";
-
- @Around("@annotation(kafkaProcess)")
- public Object doAround(ProceedingJoinPoint joinPoint, KafkaProcess kafkaProcess) throws Throwable {
- Object object = null;
- assetType = (String) getParamValue(joinPoint, "assetType");
- startTime = DateUtils.getTime();
- KafkaSingleDTO singleDTO = KafkaSingleUtil.getObject();
- singleDTO.setAssetType(assetType);
- singleDTO.setDate(startTime);
-
- object = joinPoint.proceed();
- //推送消息--方法调用后
- String stageCode = StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());
- KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
- messageConstant.setStageCode(stageCode);
- messageConstant.setStartTime(singleDTO.getDate());
- messageConstant.setEndTime(DateUtils.getTime());
- log.info("资产类型{}方法后推送消息:{}", singleDTO.getAssetType(), JSON.toJSONString(messageConstant));
- return object;
- }
-
- @Before("@annotation(kafkaProcess)")
- public void doBefore(KafkaProcess kafkaProcess){
- //推送消息--方法调用前
- KafkaSingleDTO singleDTO = KafkaSingleUtil.getObject();
- singleDTO.setAssetType(assetType);
- String stageCode = StageCodeConstant.STAGE_CODE.get(singleDTO.getAssetType()).get(kafkaProcess.functionName());
- KafkaSendMessageConstant messageConstant = new KafkaSendMessageConstant();
- messageConstant.setStageCode(stageCode);
- //...消息实体类 可自补充
- log.info("资产类型{}方法前推送消息:{}", singleDTO.getAssetType(), JSON.toJSONString(messageConstant));
- }
-
- private Object getParamValue(ProceedingJoinPoint joinPoint, String paramName) {
- Object[] params = joinPoint.getArgs();
- String[] parameterNames = ((MethodSignature) joinPoint.getSignature()).getParameterNames();
-
- if (parameterNames == null || parameterNames.length == 0) {
- return null;
- }
- for (String param : parameterNames) {
- if (param.equals(paramName)) {
- int index = ArrayUtils.indexOf(parameterNames, param);
- return params[index];
- }
- }
- return null;
- }
- }
最终结果:

到此结束!