用户需要根据配置的终端,每天导出昨天的数据;
终端数据上报->解析完成->解析数据放入Topic->消费入库OTS;
(一)、查询OTS,导出数据;
(二)、用另外一个Group消费解析的Topic(不能影响OTS入库的性能,需要实时查询),之后数据直接写入;
(一)、上报数据存在分包,也就是一条数据,因为终端内存原因,切割成几条数据上报;
(二)、需要对数据排序;
(三)、一个终端的数据文件可以达到350M以上,最大要支持300辆以上的车辆数据导出;
(四)、每天消费的数据有4500w;
(五)、消息存在乱序,补发,补发一般需要到第二天9点后才完成;
(六)、用户需要处理下载数据,需要在第二天中午12点前提供下载链接;
查询OTS:
优点:操作简单(数据已合并,可以直接排序),正常下载就可以;
缺点:下载时间不确定,且需要9点后下载,12点前提供数据,最多只有3个小时的处理时间,但数据下载可能当天都下载不完昨天的数据,很大可能满足不了客户中午需要数据的需求,即使多线程下载,也存在内存,CPU等瓶颈;
新Group消费:
优点:只要代码OK,消费,写入性能OK,就可以保证在规定的时间提供数据;
缺点:需要通过代码处理上面的问题,且开发代码,要考虑性能,不能出现消息大面积堆积的问题; 综合上面的考虑:只能选择新Group消费。
(一)、针对数据乱序,消息合并,排序在消费时不做处理,最后使用半个小时做数据整理;
(二)、直接写TXT文件,数据整理时改为用户使用的CSV,尽量降低消费时间;
(三)、引入Spring缓存,提高性能,引入Redis缓存,防止集群创建同一个文件夹;
(四)、文件过大,文件夹下文件过多时,消息堆积严重,写入信息和查找文件耗时太大,通过文件切割,每个文件超过35M就不在写入;每次新增文件都在服务器IP下面新建当前毫秒的文件夹,这样每个文件夹就都不会太大,每个文件夹下文件都不会太多,消费OK;
(五)、数据整理,根据文件名前缀分组文件,单线程写文件,多线程处理,机器CPU和内存使用飙升,Full GC严重,效率非常低(服务器配置OK路过),350M数据读入内存,合并数据,排序,存在一次Full GC,但实际处理数据在10分钟以内就处理完了。
(一)、数据消费
package com.suyun.file.service.listen;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.suyun.common.kafka.JsonSerializer;
import com.suyun.common.kafka.KafkaHelper;
import com.suyun.common.kafka.KafkaRecord;
import com.suyun.common.lang.Tuple2;
import com.suyun.file.service.cache.ExportVehicleDataCache;
import com.suyun.file.service.cache.FilePathCache;
import com.suyun.file.service.helper.OSSHelper;
import com.suyun.file.service.util.FileUtils;
import com.suyun.file.service.util.NetworkUtil;
import com.suyun.vehicle.Topics;
import com.suyun.vehicle.api.dto.CanExportConfigDto;
import com.suyun.vehicle.api.service.ICanExportConfigService;
import com.suyun.vehicle.model.VehicleDataMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Description:
*
* @Author: leo.xiong
* @CreateDate: 2020/11/11 10:18
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Slf4j
@Service
public class VehicleDataKafkaListener {
private TypeReference<KafkaRecord<VehicleDataMap>> dataType = new TypeReference<KafkaRecord<VehicleDataMap>>() {
};
private static final String GROUP_ID = "CID_alikafka_suyun_file_group";
public static final String OSS_BUCKET_NAME = "suyun-nevtsp-foreverfile";
public static final String DAY_EXPORT_DIR = "/vehicle/can_data/day_export/";
public static final String TXT = ".txt";
public static final String CSV = ".csv";
public static final String DATE_TIME = "dateTime";
public static final String VERSION_ID = "versionId";
public static final String DATE_FORMAT_YMD = "yyyyMMdd";
public static final String TIME_FORMAT_Y_M_D = "yyyy-MM-dd HH:mm:ss";
/**
* 第二天9点开始压缩前一天的数据,7:30之后上报的补发数据无需处理
*/
private static final long NINE_HOUR = (7 * 3600 + 1800) * 1000;
/**
* 最大文件大小为35M
*/
private static final long MAX_FILE_LENGTH = 35 * 1024 * 1024;
@Autowired
private KafkaHelper kafkaHelper;
@Resource(name = "executorServices")
private ExecutorService[] executorServices;
@Autowired
private OSSHelper ossHelper;
@Autowired
private FilePathCache filePathCache;
@Autowired
private ExportVehicleDataCache exportVehicleDataCache;
@Autowired
private ICanExportConfigService canExportConfigService;
@Resource(name = "redisTemplate")
private ValueOperations<String, String> valueOperations;
@KafkaListener(id = "vehicleDataKafkaListener", groupId = GROUP_ID, topics = Topics.VEHICLE_DATA, errorHandler = "consumerAwareErrorHandler", containerFactory = "batchKafkaListenerContainerFactory")
public void vehicleDataUpdatedHandler(List<ConsumerRecord<String, byte[]>> consumerRecords, Acknowledgment ack) {
Long systemTime = System.currentTimeMillis();
String currentDay = formatDate(systemTime, DATE_FORMAT_YMD);
if (StringUtils.isEmpty(existOrCreateDirectory(currentDay, true))) {
return;
}
Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap = new HashMap<>(2);
//当前时间如果大于7:30,则表示昨天的文件已经生成,不能再处理昨天的数据信息
boolean isGenerateYesterdaySFile = (systemTime - NINE_HOUR) >= getTimestampByLocalDateTime(LocalDateTime.now().with(LocalTime.MIN));
buildDataMap(consumerRecords, isGenerateYesterdaySFile, currentDay, reportDayVehicleIdTimeStampKafkaRecordMap);
if (CollectionUtils.isEmpty(reportDayVehicleIdTimeStampKafkaRecordMap)) {
ack.acknowledge();
return;
}
for (Map.Entry<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleTimestampDataEntry : reportDayVehicleIdTimeStampKafkaRecordMap.entrySet()) {
String reportDay = reportDayVehicleTimestampDataEntry.getKey();
String directory = existOrCreateDirectory(reportDay, false);
if (StringUtils.isEmpty(directory)) {
continue;
}
Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleTimestampDataEntry.getValue();
vehicleIdTimestampKafkaRecordMap.forEach((vehicleId, timestampKafkaRecordMap) -> {
int hash = Hashing.consistentHash(vehicleId.hashCode(), executorServices.length);
executorServices[hash].execute(() -> {
List<KafkaRecord<VehicleDataMap>> values = Lists.newArrayList(timestampKafkaRecordMap.values());
try {
process(reportDay, directory, vehicleId, timestampKafkaRecordMap, values);
} catch (Exception e) {
log.warn("写入失败 reportDay: {} directory: {} vehicleId: {}", reportDay, directory, vehicleId, e);
for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {
kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
}
}
});
});
}
ack.acknowledge();
}
/**
* 执行数据处理
*
* @param reportDay
* @param directory
* @param vehicleId
* @param timestampKafkaRecordMap
* @param values
* @return
*/
private void process(String reportDay, String directory, String vehicleId, Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap, List<KafkaRecord<VehicleDataMap>> values) {
CanExportConfigDto canExportConfigDto = null;
try {
canExportConfigDto = getCanExportConfigDto(values.get(0).getData(), reportDay);
} catch (Exception e) {
for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {
kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
}
}
if (canExportConfigDto == null) {
return;
}
StringBuffer content = new StringBuffer();
for (KafkaRecord<VehicleDataMap> value : timestampKafkaRecordMap.values()) {
VehicleDataMap vehicleDataMap = value.getData();
Map<String, Object> codeValueMap = Maps.newHashMapWithExpectedSize(vehicleDataMap.getValues().size() + 2);
vehicleDataMap.getValues().forEach((code, valueInfo) -> {
Tuple2<Double, String> tuple2 = vehicleDataMap.getValue(code);
if (tuple2 == null) {
return;
}
codeValueMap.put(code, tuple2._1().toString());
});
if (CollectionUtils.isEmpty(codeValueMap)) {
log.warn("数据信息为空: vehicleDataMap: {}", vehicleDataMap);
continue;
}
//保存解析版本,导出需要根据解析版本获取信号名信息
if (!codeValueMap.containsKey(VERSION_ID)) {
codeValueMap.put(VERSION_ID, vehicleDataMap.getVersionId());
}
if (!codeValueMap.containsKey(DATE_TIME)) {
codeValueMap.put(DATE_TIME, String.valueOf(vehicleDataMap.getTimestamp()));
}
content.append(JSONObject.toJSONString(codeValueMap) + "\n");
}
if (content.length() == 0) {
return;
}
String fileName = canExportConfigDto.getPlateNo() + "(" + reportDay + "000000" + "~" + reportDay + "235959" + ")" + TXT;
writeTxt(content.toString(), vehicleId, reportDay, directory, fileName);
}
/**
* 组装数据信息
*
* @param consumerRecords
* @param currentDay
* @param reportDayVehicleIdTimeStampKafkaRecordMap
*/
private void buildDataMap(List<ConsumerRecord<String, byte[]>> consumerRecords, boolean isGenerateYesterdaySFile, String currentDay, Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap) {
consumerRecords.forEach(consumerRecord -> {
KafkaRecord<VehicleDataMap> kafkaRecord = JsonSerializer.deserialize(consumerRecord.value(), dataType);
//无用数据
if (kafkaRecord == null || kafkaRecord.getData() == null || CollectionUtils.isEmpty(kafkaRecord.getData().getValues())) {
return;
}
//解析版本ID不能为空,否则取不到信号名称,导出无效
if (StringUtils.isEmpty(kafkaRecord.getData().getVersionId())) {
return;
}
VehicleDataMap vehicleDataMap = kafkaRecord.getData();
String reportDay = formatDate(vehicleDataMap.getTimestamp(), DATE_FORMAT_YMD);
//如果上报时间和当前时间不是同一天,并且已经过了昨天生成文件日期,补发处理数据已无效
if (!currentDay.equals(reportDay) && isGenerateYesterdaySFile) {
log.info("补发数据 车辆ID: {} 上报时间: {} 接收时间: {}", vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp(), vehicleDataMap.getReceiveTime());
return;
}
CanExportConfigDto canExportConfigDto = null;
try {
canExportConfigDto = getCanExportConfigDto(vehicleDataMap, reportDay);
} catch (Exception e) {
kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
}
if (canExportConfigDto == null) {
return;
}
Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleIdTimeStampKafkaRecordMap.get(reportDay);
if (vehicleIdTimestampKafkaRecordMap == null) {
vehicleIdTimestampKafkaRecordMap = new HashMap<>(150);
reportDayVehicleIdTimeStampKafkaRecordMap.put(reportDay, vehicleIdTimestampKafkaRecordMap);
}
Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap = vehicleIdTimestampKafkaRecordMap.get(vehicleDataMap.getVehicleId());
if (timestampKafkaRecordMap == null) {
timestampKafkaRecordMap = Maps.newHashMap();
vehicleIdTimestampKafkaRecordMap.put(vehicleDataMap.getVehicleId(), timestampKafkaRecordMap);
}
KafkaRecord<VehicleDataMap> oldKafkaRecord = timestampKafkaRecordMap.get(vehicleDataMap.getTimestamp());
if (oldKafkaRecord != null) {
oldKafkaRecord.getData().putAll(kafkaRecord.getData().getValues());
}
timestampKafkaRecordMap.put(vehicleDataMap.getTimestamp(), kafkaRecord);
});
}
/**
* 写入文件信息
*
* @param content
* @param vehicleId
* @param reportDay
* @param directory
* @param fileName
*/
private void writeTxt(String content, String vehicleId, String reportDay, String directory, String fileName) {
String filePath = existOrCreateFile(reportDay, vehicleId, directory, fileName);
if (StringUtils.isEmpty(filePath)) {
return;
}
FileUtils.writeToFile(filePath, content, StandardCharsets.UTF_8.name(), true);
}
/**
* 获取配置信息
*
* @param vehicleDataMap
* @param reportDay
* @return
*/
private CanExportConfigDto getCanExportConfigDto(VehicleDataMap vehicleDataMap, String reportDay) throws Exception {
CanExportConfigDto canExportConfigDto = exportVehicleDataCache.get(vehicleDataMap.getVehicleId() + ":" + reportDay);
//是否需要导出,配置时间为天,导出数据也是按天导出
if (canExportConfigDto == null) {
try {
canExportConfigDto = canExportConfigService.queryActiveCanExportConfig(vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp());
} catch (Exception e) {
throw new Exception(e.getMessage());
}
if (canExportConfigDto == null) {
exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, new CanExportConfigDto());
return null;
}
exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, canExportConfigDto);
} else if (StringUtils.isEmpty(canExportConfigDto.getVehicleId())) {
return null;
}
return canExportConfigDto;
}
/**
* 目录是否存在,不存在创建
*
* @param currentDay
* @param isCreate 是否创建
* @return 返回文件夹路径
*/
public String existOrCreateDirectory(String currentDay, boolean isCreate) {
String directoryPath = filePathCache.get(FilePathCache.DIRECTORY + currentDay);
if (StringUtils.isNotEmpty(directoryPath)) {
return directoryPath;
}
String directory = ossHelper.getBucketPathMap().get(OSS_BUCKET_NAME) + DAY_EXPORT_DIR + currentDay;
File file = new File(directory);
if (file.isDirectory()) {
filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);
return directory;
}
if (!isCreate) {
return "";
}
boolean createFlag = valueOperations.setIfAbsent(currentDay + FilePathCache.DIRECTORY, NetworkUtil.IP, 2, TimeUnit.DAYS);
if (createFlag) {
file.mkdir();
} else {
try {
Thread.sleep(1000);
existOrCreateDirectory(currentDay, false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);
return directory;
}
/**
* 文件是否存在,不存在创建
* 一个文件过大,影响写入性能,默认35M重新生成一个文件
*
* @param currentDay
* @param vehicleId
* @param directory
* @param fileName
* @return 返回文件路径
*/
private String existOrCreateFile(String currentDay, String vehicleId, String directory, String fileName) {
String filePath = filePathCache.get(FilePathCache.FILE + currentDay + ":" + vehicleId);
if (StringUtils.isNotEmpty(filePath)) {
File file = new File(filePath);
if (file.isFile() && file.length() <= MAX_FILE_LENGTH) {
return filePath;
}
}
String newDirectory = directory + "/" + NetworkUtil.IP_REPLACE_DOT + "/" + System.currentTimeMillis();
File directoryFile = new File(newDirectory);
directoryFile.mkdirs();
String fileNamePath = newDirectory + "/" + fileName;
File file = new File(fileNamePath);
try {
file.createNewFile();
filePathCache.put(FilePathCache.FILE + currentDay + ":" + vehicleId, fileNamePath);
} catch (IOException e) {
log.error("创建文件失败 fileNamePath: {}", fileNamePath, e);
return null;
}
return fileNamePath;
}
public static String formatDate(Long timestamp, String pattern) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
return getLocalDateTimeByLong(timestamp).format(formatter);
}
public static LocalDateTime getLocalDateTimeByLong(Long timestamp) {
long nanoOfSecond = (timestamp % 1000) * 1000000;
LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(timestamp / 1000, (int) nanoOfSecond, getSystemZoneOffset());
return localDateTime;
}
public static long getTimestampByLocalDateTime(LocalDateTime localDateTime) {
return localDateTime.toEpochSecond(getSystemZoneOffset()) * 1000;
}
public static ZoneOffset getSystemZoneOffset() {
return ZoneOffset.of("+" + TimeZone.getDefault().getRawOffset() / 3600 / 1000);
}
}
(二)、文件整理
package com.suyun.file.service.task.service;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.suyun.file.service.cache.VersionCanSignalCache;
import com.suyun.file.service.helper.CsvHelper;
import com.suyun.file.service.helper.OSSHelper;
import com.suyun.file.service.listen.VehicleDataKafkaListener;
import com.suyun.file.service.util.FileUtils;
import com.suyun.vehicle.api.dto.BatchExportInfoDto;
import com.suyun.vehicle.api.dto.VersionCanSignalDto;
import com.suyun.vehicle.api.service.IBatchExportInfoService;
import com.suyun.vehicle.api.service.ICanProtocolService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Description:
*
*
*
*
* @Author: leo.xiong
* @CreateDate: 2022/8/19 14:10
* @Email: leo.xiong@suyun360.com
* @Since:
*/
@Slf4j
@Service
public class ZipFileTaskService {
/**
* 最大版本大小
*/
private static final int VERSION_SIZE = 8;
/**
* 列数
*/
private static final int NUMBER_OF_COLUMNS = 600;
/**
* 单位数
*/
private static final int NUMBER_OF_UNITS = 100;
/**
* 行数
*/
private static final int NUMBER_OF_ROWS = 45000;
@Autowired
private VehicleDataKafkaListener vehicleDataKafkaListener;
@Resource(name = "executorZipService")
private ExecutorService executorZipService;
@Autowired
private VersionCanSignalCache versionCanSignalCache;
@Autowired
private ICanProtocolService canProtocolService;
@Autowired
private IBatchExportInfoService batchExportInfoService;
@Autowired
private CsvHelper csvHelper;
@Autowired
private OSSHelper ossHelper;
private static final int MAX_FAILED_TRY_TIME = 3;
/**
* 整理数据突然中断导致的错误,删除CSV,重新整理
*/
private static final int COLLATE_DATA_ERROR = 2;
public ProcessResult process(String preDay, AtomicInteger times) {
String directory = vehicleDataKafkaListener.existOrCreateDirectory(preDay, false);
if (StringUtils.isEmpty(directory)) {
log.warn("文件夹不存在 directory: {}", directory);
return new ProcessResult(false);
}
File directoryFile = new File(directory);
Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"txt", "csv"}, true);
if (CollectionUtils.isEmpty(fileList)) {
log.warn("文件不存在");
return new ProcessResult(false);
}
Map<String, List<File>> fileNameListMap = fileList.parallelStream().collect(Collectors.groupingBy(file -> {
String fileName = file.getName();
String fileNameValue = fileName.split(preDay)[0] + preDay + "000000" + "~" + preDay + "235959" + ")";
return fileNameValue;
}));
Map<String, String> fileFailedMsgMap = Maps.newHashMap();
Set<String> exportSuccessSet = new HashSet<>(500);
Set<String> exportFailedSet = Sets.newHashSet();
executorZipService.execute(() -> {
processTxt(directory, fileNameListMap, fileFailedMsgMap, exportSuccessSet, exportFailedSet);
Collection<File> txtFileList = FileUtils.listFiles(directoryFile, new String[]{"txt"}, true);
if (CollectionUtils.isEmpty(txtFileList)) {
zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet);
return;
}
if (times.get() < MAX_FAILED_TRY_TIME) {
times.addAndGet(1);
process(preDay, times);
return;
}
for (File file : txtFileList) {
String plateNo = file.getName().substring(0, file.getName().length() - 35);
fileFailedMsgMap.put(plateNo, "txt文件解析失败");
exportFailedSet.add(plateNo);
}
zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet);
});
return new ProcessResult(true);
}
private void processTxt(String directory, Map<String, List<File>> fileNameListMap, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) {
fileNameListMap.forEach((fileNameValue, files) -> {
long beginTime = System.currentTimeMillis();
log.info("process file name: {}", fileNameValue);
String plateNo = fileNameValue.substring(0, fileNameValue.length() - 31);
try {
List<File> txtFileList = Lists.newArrayListWithExpectedSize(files.size());
if (files.size() == 1) {
File fileOne = files.get(0);
if (fileOne.length() == 0) {
fileOne.delete();
return;
}
if (fileOne.getName().endsWith(VehicleDataKafkaListener.CSV)) {
return;
}
txtFileList.add(fileOne);
} else {
for (File file : files) {
if (file.length() == 0) {
file.delete();
continue;
}
if (file.getName().endsWith(VehicleDataKafkaListener.CSV)) {
file.delete();
continue;
}
txtFileList.add(file);
}
}
if (CollectionUtils.isEmpty(txtFileList)) {
return;
}
organizeFile(txtFileList, directory, fileNameValue);
exportSuccessSet.add(plateNo);
} catch (Exception e) {
log.warn("数据处理失败 fileNameValue: {}", fileNameValue, e);
fileFailedMsgMap.put(fileNameValue, e.getMessage());
exportFailedSet.add(plateNo);
}
log.info("process file name: {} use time: {} ms", fileNameValue, System.currentTimeMillis() - beginTime);
});
}
/**
* 整理数据,整理完成,删除数据
*
* @param fileList
* @param directory
* @param fileNameValue
*/
private void organizeFile(List<File> fileList, String directory, String fileNameValue) {
try {
List<String> contentList = readLine(fileList);
//使用到的版本,code,signalName映射
Map<String, Map<String, String>> versionIdCodeSignalNameMap = new HashMap<>(VERSION_SIZE);
//多个版本合并后的信号大小,作为头部信息(需要固定列顺序,所以不用Set)
List<String> headerList = new ArrayList<>(NUMBER_OF_COLUMNS);
//存在值的列
Set<String> validHeaderSet = new HashSet<>(NUMBER_OF_COLUMNS);
//需要补充单位信息
Map<String, String> signalNameUnitMap = new HashMap<>(NUMBER_OF_UNITS);
//行数
Map<Long, Map<String, String>> timestampValuesMap = new HashMap<>(NUMBER_OF_ROWS);
for (String content : contentList) {
if (StringUtils.isEmpty(content)) {
continue;
}
Map<String, String> codeValueMap = null;
try {
codeValueMap = JSONObject.parseObject(content, Map.class);
} catch (Exception e) {
log.warn("content parse error: {}", content);
continue;
}
String versionId = codeValueMap.get(VehicleDataKafkaListener.VERSION_ID);
if (StringUtils.isEmpty(versionId)) {
continue;
}
String dateTime = codeValueMap.get(VehicleDataKafkaListener.DATE_TIME);
if (StringUtils.isEmpty(dateTime)) {
continue;
}
List<VersionCanSignalDto> canSignalByVersionIdList = versionCanSignalCache.get(versionId);
if (canSignalByVersionIdList == null) {
canSignalByVersionIdList = canProtocolService.findCanSignalByVersionIdList(Lists.newArrayList(versionId));
if (CollectionUtils.isEmpty(canSignalByVersionIdList)) {
versionCanSignalCache.put(versionId, Collections.EMPTY_LIST);
continue;
}
versionCanSignalCache.put(versionId, canSignalByVersionIdList);
} else if (CollectionUtils.isEmpty(canSignalByVersionIdList)) {
continue;
}
Map<String, String> codeSignalNameMap = null;
if (!versionIdCodeSignalNameMap.containsKey(versionId)) {
codeSignalNameMap = new HashMap<>(NUMBER_OF_COLUMNS);
versionIdCodeSignalNameMap.put(versionId, codeSignalNameMap);
for (VersionCanSignalDto versionCanSignalDto : canSignalByVersionIdList) {
if (headerList.contains(versionCanSignalDto.getSignalName())) {
continue;
}
headerList.add(versionCanSignalDto.getSignalName());
codeSignalNameMap.put(versionCanSignalDto.getCode(), versionCanSignalDto.getSignalName());
if (StringUtils.isEmpty(versionCanSignalDto.getUnit())) {
continue;
}
signalNameUnitMap.put(versionCanSignalDto.getSignalName(), versionCanSignalDto.getUnit());
}
} else {
codeSignalNameMap = versionIdCodeSignalNameMap.get(versionId);
}
Long timestamp = Long.valueOf(dateTime);
Map<String, String> signalNameValueMap = timestampValuesMap.get(timestamp);
if (signalNameValueMap == null) {
signalNameValueMap = new HashMap<>(NUMBER_OF_COLUMNS);
timestampValuesMap.put(timestamp, signalNameValueMap);
}
for (Map.Entry<String, String> codeValueEntry : codeSignalNameMap.entrySet()) {
String code = codeValueEntry.getKey();
String value = codeValueMap.get(code);
if (StringUtils.isEmpty(value)) {
continue;
}
String signalName = codeValueEntry.getValue();
validHeaderSet.add(signalName);
signalNameValueMap.put(signalName, value);
}
}
//如果不存在有值的信号信息,直接返回
if (CollectionUtils.isEmpty(validHeaderSet)) {
for (File file : fileList) {
file.delete();
}
return;
}
writeCsv(directory, fileNameValue, headerList, validHeaderSet, timestampValuesMap, signalNameUnitMap);
//删除TXT文件
for (File file : fileList) {
file.delete();
}
} catch (Exception e) {
log.warn("整理文件错误 filePath: {}", fileList.get(0).getAbsolutePath(), e);
}
}
private List<String> readLine(List<File> fileList) throws IOException {
List<String> contentList = new ArrayList<>(60000);
for (File file : fileList) {
List<String> readList = FileUtils.readLines(file, StandardCharsets.UTF_8);
if (CollectionUtils.isEmpty(readList)) {
continue;
}
contentList.addAll(readList);
}
return contentList;
}
/**
* 数据写入CSV文件
*
* @param directory
* @param fileNameValue
* @param headerList
* @param validHeaderSet
* @param timestampValuesMap
* @param signalNameUnitMap
*/
private void writeCsv(String directory, String fileNameValue, List<String> headerList, Set<String> validHeaderSet, Map<Long, Map<String, String>> timestampValuesMap, Map<String, String> signalNameUnitMap) {
headerList.removeIf(header -> !validHeaderSet.contains(header));
Map<Long, Map<String, String>> sortMap = Maps.newLinkedHashMapWithExpectedSize(timestampValuesMap.size());
//根据Key升序排序Map
timestampValuesMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEachOrdered(x -> sortMap.put(x.getKey(), x.getValue()));
String[] headers = new String[headerList.size()];
//头部字段需要加上单位信息[单位];
String[] headerUnits = new String[headerList.size() + 2];
headerUnits[0] = "time";
headerUnits[1] = "上报时间";
for (int i = 0, len = headerList.size(); i < len; i++) {
headers[i] = headerList.get(i);
String unit = signalNameUnitMap.get(headers[i]);
if (StringUtils.isEmpty(unit)) {
headerUnits[i + 2] = headers[i];
} else {
headerUnits[i + 2] = headers[i] + "[" + unit + "]";
}
}
List<String[]> valueList = Lists.newArrayListWithExpectedSize(sortMap.size());
AtomicInteger atomicInteger = new AtomicInteger(-1);
sortMap.forEach((timestamp, signalValueMap) -> {
String[] values = new String[headerUnits.length];
boolean isValidFlag = false;
for (int i = 0; i < headers.length; i++) {
String value = signalValueMap.get(headers[i]);
if (StringUtils.isNotEmpty(value)) {
if (!isValidFlag) {
isValidFlag = true;
}
values[i + 2] = value;
} else {
values[i + 2] = "";
}
}
//如果没有一个有效值,此行数据无需写入
if (!isValidFlag) {
return;
}
values[0] = String.valueOf(atomicInteger.addAndGet(1));
values[1] = VehicleDataKafkaListener.formatDate(timestamp, VehicleDataKafkaListener.TIME_FORMAT_Y_M_D) + "\t";
valueList.add(values);
});
csvHelper.writeToCsv(directory, fileNameValue + VehicleDataKafkaListener.CSV, headerUnits, valueList);
}
/**
* 压缩文件,保存记录
*
* @param preDay
* @param directory
* @param fileFailedMsgMap
* @param exportSuccessSet
* @param exportFailedSet
* @return
*/
private ProcessResult zipFileList(String preDay, String directory, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) {
BatchExportInfoDto batchExportInfoDto = new BatchExportInfoDto();
batchExportInfoDto.setDataDate(preDay);
if (!CollectionUtils.isEmpty(exportSuccessSet)) {
batchExportInfoDto.setSuccessPlateNo(StringUtils.join(exportSuccessSet, ","));
}
if (!CollectionUtils.isEmpty(exportFailedSet)) {
batchExportInfoDto.setFailedPlateNo(StringUtils.join(exportFailedSet, ","));
}
if (CollectionUtils.isEmpty(fileFailedMsgMap)) {
File directoryFile = new File(directory);
Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"csv"}, true);
FileUtils.zipFiles(directoryFile.getParent() + "/" + preDay + ".zip", fileList);
FileUtils.deleteDirectory(directory);
batchExportInfoDto.setFileName(preDay + ".zip");
batchExportInfoDto.setUrl(ossHelper.getHttpOssPrefix(VehicleDataKafkaListener.OSS_BUCKET_NAME, VehicleDataKafkaListener.DAY_EXPORT_DIR) + batchExportInfoDto.getFileName());
batchExportInfoDto.setStatus("1");
batchExportInfoService.saveOrUpdate(batchExportInfoDto);
return new ProcessResult(true);
}
fileFailedMsgMap.forEach((fileName, errorMsg) -> {
log.warn("处理失败文件信息 fileName: {} errorMsg: {}", fileName, errorMsg);
});
batchExportInfoDto.setStatus("0");
batchExportInfoService.saveOrUpdate(batchExportInfoDto);
return new ProcessResult(true);
}
}