• 一次JAVA频繁写大文件的记录


    需求:

    用户需要根据配置的终端,每天导出昨天的数据;

    现状:

    终端数据上报->解析完成->解析数据放入Topic->消费入库OTS;

    方案:

    (一)、查询OTS,导出数据;
    (二)、用另外一个Group消费解析的Topic(不能影响OTS入库的性能,需要实时查询),之后数据直接写入;

    需要考虑的问题:

    (一)、上报数据存在分包,也就是一条数据,因为终端内存原因,切割成几条数据上报;
    (二)、需要对数据排序;
    (三)、一个终端的数据文件可以达到350M以上,最大要支持300辆以上的车辆数据导出;
    (四)、每天消费的数据有4500w;
    (五)、消息存在乱序,补发,补发一般需要到第二天9点后才完成;
    (六)、用户需要处理下载数据,需要在第二天中午12点前提供下载链接;

    选定方案:

    查询OTS:
    优点:操作简单(数据已合并,可以直接排序),正常下载就可以;
    缺点:下载时间不确定,且需要9点后下载,12点前提供数据,最多只有3个小时的处理时间,但数据下载可能当天都下载不完昨天的数据,很大可能满足不了客户中午需要数据的需求,即使多线程下载,也存在内存,CPU等瓶颈;
    新Group消费:
    优点:只要代码OK,消费,写入性能OK,就可以保证在规定的时间提供数据;
    缺点:需要通过代码处理上面的问题,且开发代码,要考虑性能,不能出现消息大面积堆积的问题; 综合上面的考虑:只能选择新Group消费。

    技术实现:

    (一)、针对数据乱序,消息合并,排序在消费时不做处理,最后使用半个小时做数据整理;
    (二)、直接写TXT文件,数据整理时改为用户使用的CSV,尽量降低消费时间;
    (三)、引入Spring缓存,提高性能,引入Redis缓存,防止集群创建同一个文件夹;
    (四)、文件过大,文件夹下文件过多时,消息堆积严重,写入信息和查找文件耗时太大,通过文件切割,每个文件超过35M就不在写入;每次新增文件都在服务器IP下面新建当前毫秒的文件夹,这样每个文件夹就都不会太大,每个文件夹下文件都不会太多,消费OK;
    (五)、数据整理,根据文件名前缀分组文件,单线程写文件,多线程处理,机器CPU和内存使用飙升,Full GC严重,效率非常低(服务器配置OK路过),350M数据读入内存,合并数据,排序,存在一次Full GC,但实际处理数据在10分钟以内就处理完了。

    核心代码实现:

    (一)、数据消费

    package com.suyun.file.service.listen;
    
    import com.alibaba.fastjson.JSONObject;
    import com.fasterxml.jackson.core.type.TypeReference;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import com.google.common.hash.Hashing;
    import com.suyun.common.kafka.JsonSerializer;
    import com.suyun.common.kafka.KafkaHelper;
    import com.suyun.common.kafka.KafkaRecord;
    import com.suyun.common.lang.Tuple2;
    import com.suyun.file.service.cache.ExportVehicleDataCache;
    import com.suyun.file.service.cache.FilePathCache;
    import com.suyun.file.service.helper.OSSHelper;
    import com.suyun.file.service.util.FileUtils;
    import com.suyun.file.service.util.NetworkUtil;
    import com.suyun.vehicle.Topics;
    import com.suyun.vehicle.api.dto.CanExportConfigDto;
    import com.suyun.vehicle.api.service.ICanExportConfigService;
    import com.suyun.vehicle.model.VehicleDataMap;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.ValueOperations;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.time.LocalDateTime;
    import java.time.LocalTime;
    import java.time.ZoneOffset;
    import java.time.format.DateTimeFormatter;
    import java.util.*;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * Description:
     *
     * @Author: leo.xiong
     * @CreateDate: 2020/11/11 10:18
     * @Email: leo.xiong@suyun360.com
     * @Since:
     */
    @Slf4j
    @Service
    public class VehicleDataKafkaListener {
        private TypeReference<KafkaRecord<VehicleDataMap>> dataType = new TypeReference<KafkaRecord<VehicleDataMap>>() {
        };
    
        private static final String GROUP_ID = "CID_alikafka_suyun_file_group";
    
        public static final String OSS_BUCKET_NAME = "suyun-nevtsp-foreverfile";
    
        public static final String DAY_EXPORT_DIR = "/vehicle/can_data/day_export/";
    
        public static final String TXT = ".txt";
    
        public static final String CSV = ".csv";
    
        public static final String DATE_TIME = "dateTime";
    
        public static final String VERSION_ID = "versionId";
    
        public static final String DATE_FORMAT_YMD = "yyyyMMdd";
    
        public static final String TIME_FORMAT_Y_M_D = "yyyy-MM-dd HH:mm:ss";
    
        /**
         * 第二天9点开始压缩前一天的数据,7:30之后上报的补发数据无需处理
         */
        private static final long NINE_HOUR = (7 * 3600 + 1800) * 1000;
        /**
         * 最大文件大小为35M
         */
        private static final long MAX_FILE_LENGTH = 35 * 1024 * 1024;
    
        @Autowired
        private KafkaHelper kafkaHelper;
    
        @Resource(name = "executorServices")
        private ExecutorService[] executorServices;
    
        @Autowired
        private OSSHelper ossHelper;
    
        @Autowired
        private FilePathCache filePathCache;
    
        @Autowired
        private ExportVehicleDataCache exportVehicleDataCache;
    
        @Autowired
        private ICanExportConfigService canExportConfigService;
    
        @Resource(name = "redisTemplate")
        private ValueOperations<String, String> valueOperations;
    
        @KafkaListener(id = "vehicleDataKafkaListener", groupId = GROUP_ID, topics = Topics.VEHICLE_DATA, errorHandler = "consumerAwareErrorHandler", containerFactory = "batchKafkaListenerContainerFactory")
        public void vehicleDataUpdatedHandler(List<ConsumerRecord<String, byte[]>> consumerRecords, Acknowledgment ack) {
            Long systemTime = System.currentTimeMillis();
            String currentDay = formatDate(systemTime, DATE_FORMAT_YMD);
            if (StringUtils.isEmpty(existOrCreateDirectory(currentDay, true))) {
                return;
            }
            Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap = new HashMap<>(2);
            //当前时间如果大于7:30,则表示昨天的文件已经生成,不能再处理昨天的数据信息
            boolean isGenerateYesterdaySFile = (systemTime - NINE_HOUR) >= getTimestampByLocalDateTime(LocalDateTime.now().with(LocalTime.MIN));
            buildDataMap(consumerRecords, isGenerateYesterdaySFile, currentDay, reportDayVehicleIdTimeStampKafkaRecordMap);
            if (CollectionUtils.isEmpty(reportDayVehicleIdTimeStampKafkaRecordMap)) {
                ack.acknowledge();
                return;
            }
            for (Map.Entry<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleTimestampDataEntry : reportDayVehicleIdTimeStampKafkaRecordMap.entrySet()) {
                String reportDay = reportDayVehicleTimestampDataEntry.getKey();
                String directory = existOrCreateDirectory(reportDay, false);
                if (StringUtils.isEmpty(directory)) {
                    continue;
                }
                Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleTimestampDataEntry.getValue();
                vehicleIdTimestampKafkaRecordMap.forEach((vehicleId, timestampKafkaRecordMap) -> {
                    int hash = Hashing.consistentHash(vehicleId.hashCode(), executorServices.length);
                    executorServices[hash].execute(() -> {
                        List<KafkaRecord<VehicleDataMap>> values = Lists.newArrayList(timestampKafkaRecordMap.values());
                        try {
                            process(reportDay, directory, vehicleId, timestampKafkaRecordMap, values);
                        } catch (Exception e) {
                            log.warn("写入失败 reportDay: {} directory: {} vehicleId: {}", reportDay, directory, vehicleId, e);
                            for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {
                                kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
                            }
                        }
                    });
                });
            }
            ack.acknowledge();
        }
    
        /**
         * 执行数据处理
         *
         * @param reportDay
         * @param directory
         * @param vehicleId
         * @param timestampKafkaRecordMap
         * @param values
         * @return
         */
        private void process(String reportDay, String directory, String vehicleId, Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap, List<KafkaRecord<VehicleDataMap>> values) {
            CanExportConfigDto canExportConfigDto = null;
            try {
                canExportConfigDto = getCanExportConfigDto(values.get(0).getData(), reportDay);
            } catch (Exception e) {
                for (KafkaRecord<VehicleDataMap> kafkaRecord : values) {
                    kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
                }
            }
            if (canExportConfigDto == null) {
                return;
            }
            StringBuffer content = new StringBuffer();
            for (KafkaRecord<VehicleDataMap> value : timestampKafkaRecordMap.values()) {
                VehicleDataMap vehicleDataMap = value.getData();
                Map<String, Object> codeValueMap = Maps.newHashMapWithExpectedSize(vehicleDataMap.getValues().size() + 2);
                vehicleDataMap.getValues().forEach((code, valueInfo) -> {
                    Tuple2<Double, String> tuple2 = vehicleDataMap.getValue(code);
                    if (tuple2 == null) {
                        return;
                    }
                    codeValueMap.put(code, tuple2._1().toString());
                });
                if (CollectionUtils.isEmpty(codeValueMap)) {
                    log.warn("数据信息为空: vehicleDataMap: {}", vehicleDataMap);
                    continue;
                }
                //保存解析版本,导出需要根据解析版本获取信号名信息
                if (!codeValueMap.containsKey(VERSION_ID)) {
                    codeValueMap.put(VERSION_ID, vehicleDataMap.getVersionId());
                }
                if (!codeValueMap.containsKey(DATE_TIME)) {
                    codeValueMap.put(DATE_TIME, String.valueOf(vehicleDataMap.getTimestamp()));
                }
                content.append(JSONObject.toJSONString(codeValueMap) + "\n");
            }
            if (content.length() == 0) {
                return;
            }
            String fileName = canExportConfigDto.getPlateNo() + "(" + reportDay + "000000" + "~" + reportDay + "235959" + ")" + TXT;
            writeTxt(content.toString(), vehicleId, reportDay, directory, fileName);
        }
    
        /**
         * 组装数据信息
         *
         * @param consumerRecords
         * @param currentDay
         * @param reportDayVehicleIdTimeStampKafkaRecordMap
         */
        private void buildDataMap(List<ConsumerRecord<String, byte[]>> consumerRecords, boolean isGenerateYesterdaySFile, String currentDay, Map<String, Map<String, Map<Long, KafkaRecord<VehicleDataMap>>>> reportDayVehicleIdTimeStampKafkaRecordMap) {
            consumerRecords.forEach(consumerRecord -> {
                KafkaRecord<VehicleDataMap> kafkaRecord = JsonSerializer.deserialize(consumerRecord.value(), dataType);
                //无用数据
                if (kafkaRecord == null || kafkaRecord.getData() == null || CollectionUtils.isEmpty(kafkaRecord.getData().getValues())) {
                    return;
                }
                //解析版本ID不能为空,否则取不到信号名称,导出无效
                if (StringUtils.isEmpty(kafkaRecord.getData().getVersionId())) {
                    return;
                }
                VehicleDataMap vehicleDataMap = kafkaRecord.getData();
                String reportDay = formatDate(vehicleDataMap.getTimestamp(), DATE_FORMAT_YMD);
                //如果上报时间和当前时间不是同一天,并且已经过了昨天生成文件日期,补发处理数据已无效
                if (!currentDay.equals(reportDay) && isGenerateYesterdaySFile) {
                    log.info("补发数据 车辆ID: {} 上报时间: {} 接收时间: {}", vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp(), vehicleDataMap.getReceiveTime());
                    return;
                }
    
                CanExportConfigDto canExportConfigDto = null;
                try {
                    canExportConfigDto = getCanExportConfigDto(vehicleDataMap, reportDay);
                } catch (Exception e) {
                    kafkaHelper.forward2RetryTopic(kafkaRecord, GROUP_ID);
                }
                if (canExportConfigDto == null) {
                    return;
                }
                Map<String, Map<Long, KafkaRecord<VehicleDataMap>>> vehicleIdTimestampKafkaRecordMap = reportDayVehicleIdTimeStampKafkaRecordMap.get(reportDay);
                if (vehicleIdTimestampKafkaRecordMap == null) {
                    vehicleIdTimestampKafkaRecordMap = new HashMap<>(150);
                    reportDayVehicleIdTimeStampKafkaRecordMap.put(reportDay, vehicleIdTimestampKafkaRecordMap);
                }
                Map<Long, KafkaRecord<VehicleDataMap>> timestampKafkaRecordMap = vehicleIdTimestampKafkaRecordMap.get(vehicleDataMap.getVehicleId());
                if (timestampKafkaRecordMap == null) {
                    timestampKafkaRecordMap = Maps.newHashMap();
                    vehicleIdTimestampKafkaRecordMap.put(vehicleDataMap.getVehicleId(), timestampKafkaRecordMap);
                }
                KafkaRecord<VehicleDataMap> oldKafkaRecord = timestampKafkaRecordMap.get(vehicleDataMap.getTimestamp());
                if (oldKafkaRecord != null) {
                    oldKafkaRecord.getData().putAll(kafkaRecord.getData().getValues());
                }
                timestampKafkaRecordMap.put(vehicleDataMap.getTimestamp(), kafkaRecord);
            });
        }
    
        /**
         * 写入文件信息
         *
         * @param content
         * @param vehicleId
         * @param reportDay
         * @param directory
         * @param fileName
         */
        private void writeTxt(String content, String vehicleId, String reportDay, String directory, String fileName) {
            String filePath = existOrCreateFile(reportDay, vehicleId, directory, fileName);
            if (StringUtils.isEmpty(filePath)) {
                return;
            }
            FileUtils.writeToFile(filePath, content, StandardCharsets.UTF_8.name(), true);
        }
    
        /**
         * 获取配置信息
         *
         * @param vehicleDataMap
         * @param reportDay
         * @return
         */
        private CanExportConfigDto getCanExportConfigDto(VehicleDataMap vehicleDataMap, String reportDay) throws Exception {
            CanExportConfigDto canExportConfigDto = exportVehicleDataCache.get(vehicleDataMap.getVehicleId() + ":" + reportDay);
            //是否需要导出,配置时间为天,导出数据也是按天导出
            if (canExportConfigDto == null) {
                try {
                    canExportConfigDto = canExportConfigService.queryActiveCanExportConfig(vehicleDataMap.getVehicleId(), vehicleDataMap.getTimestamp());
                } catch (Exception e) {
                    throw new Exception(e.getMessage());
                }
                if (canExportConfigDto == null) {
                    exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, new CanExportConfigDto());
                    return null;
                }
                exportVehicleDataCache.put(vehicleDataMap.getVehicleId() + ":" + reportDay, canExportConfigDto);
            } else if (StringUtils.isEmpty(canExportConfigDto.getVehicleId())) {
                return null;
            }
            return canExportConfigDto;
        }
    
        /**
         * 目录是否存在,不存在创建
         *
         * @param currentDay
         * @param isCreate   是否创建
         * @return 返回文件夹路径
         */
        public String existOrCreateDirectory(String currentDay, boolean isCreate) {
            String directoryPath = filePathCache.get(FilePathCache.DIRECTORY + currentDay);
            if (StringUtils.isNotEmpty(directoryPath)) {
                return directoryPath;
            }
            String directory = ossHelper.getBucketPathMap().get(OSS_BUCKET_NAME) + DAY_EXPORT_DIR + currentDay;
            File file = new File(directory);
            if (file.isDirectory()) {
                filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);
                return directory;
            }
            if (!isCreate) {
                return "";
            }
            boolean createFlag = valueOperations.setIfAbsent(currentDay + FilePathCache.DIRECTORY, NetworkUtil.IP, 2, TimeUnit.DAYS);
            if (createFlag) {
                file.mkdir();
            } else {
                try {
                    Thread.sleep(1000);
                    existOrCreateDirectory(currentDay, false);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            filePathCache.put(FilePathCache.DIRECTORY + currentDay, directory);
            return directory;
        }
    
        /**
         * 文件是否存在,不存在创建
         * 一个文件过大,影响写入性能,默认35M重新生成一个文件
         *
         * @param currentDay
         * @param vehicleId
         * @param directory
         * @param fileName
         * @return 返回文件路径
         */
        private String existOrCreateFile(String currentDay, String vehicleId, String directory, String fileName) {
            String filePath = filePathCache.get(FilePathCache.FILE + currentDay + ":" + vehicleId);
            if (StringUtils.isNotEmpty(filePath)) {
                File file = new File(filePath);
                if (file.isFile() && file.length() <= MAX_FILE_LENGTH) {
                    return filePath;
                }
            }
            String newDirectory = directory + "/" + NetworkUtil.IP_REPLACE_DOT + "/" + System.currentTimeMillis();
            File directoryFile = new File(newDirectory);
            directoryFile.mkdirs();
            String fileNamePath = newDirectory + "/" + fileName;
            File file = new File(fileNamePath);
            try {
                file.createNewFile();
                filePathCache.put(FilePathCache.FILE + currentDay + ":" + vehicleId, fileNamePath);
            } catch (IOException e) {
                log.error("创建文件失败 fileNamePath: {}", fileNamePath, e);
                return null;
            }
            return fileNamePath;
        }
    
        public static String formatDate(Long timestamp, String pattern) {
            DateTimeFormatter formatter = DateTimeFormatter.ofPattern(pattern);
            return getLocalDateTimeByLong(timestamp).format(formatter);
        }
    
        public static LocalDateTime getLocalDateTimeByLong(Long timestamp) {
            long nanoOfSecond = (timestamp % 1000) * 1000000;
            LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(timestamp / 1000, (int) nanoOfSecond, getSystemZoneOffset());
            return localDateTime;
        }
    
        public static long getTimestampByLocalDateTime(LocalDateTime localDateTime) {
            return localDateTime.toEpochSecond(getSystemZoneOffset()) * 1000;
        }
    
        public static ZoneOffset getSystemZoneOffset() {
            return ZoneOffset.of("+" + TimeZone.getDefault().getRawOffset() / 3600 / 1000);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383

    (二)、文件整理

    package com.suyun.file.service.task.service;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.schedulerx.worker.processor.ProcessResult;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import com.google.common.collect.Sets;
    import com.suyun.file.service.cache.VersionCanSignalCache;
    import com.suyun.file.service.helper.CsvHelper;
    import com.suyun.file.service.helper.OSSHelper;
    import com.suyun.file.service.listen.VehicleDataKafkaListener;
    import com.suyun.file.service.util.FileUtils;
    import com.suyun.vehicle.api.dto.BatchExportInfoDto;
    import com.suyun.vehicle.api.dto.VersionCanSignalDto;
    import com.suyun.vehicle.api.service.IBatchExportInfoService;
    import com.suyun.vehicle.api.service.ICanProtocolService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.*;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.stream.Collectors;
    
    /**
     * Description:
     * 

    * *

    * * @Author: leo.xiong * @CreateDate: 2022/8/19 14:10 * @Email: leo.xiong@suyun360.com * @Since: */
    @Slf4j @Service public class ZipFileTaskService { /** * 最大版本大小 */ private static final int VERSION_SIZE = 8; /** * 列数 */ private static final int NUMBER_OF_COLUMNS = 600; /** * 单位数 */ private static final int NUMBER_OF_UNITS = 100; /** * 行数 */ private static final int NUMBER_OF_ROWS = 45000; @Autowired private VehicleDataKafkaListener vehicleDataKafkaListener; @Resource(name = "executorZipService") private ExecutorService executorZipService; @Autowired private VersionCanSignalCache versionCanSignalCache; @Autowired private ICanProtocolService canProtocolService; @Autowired private IBatchExportInfoService batchExportInfoService; @Autowired private CsvHelper csvHelper; @Autowired private OSSHelper ossHelper; private static final int MAX_FAILED_TRY_TIME = 3; /** * 整理数据突然中断导致的错误,删除CSV,重新整理 */ private static final int COLLATE_DATA_ERROR = 2; public ProcessResult process(String preDay, AtomicInteger times) { String directory = vehicleDataKafkaListener.existOrCreateDirectory(preDay, false); if (StringUtils.isEmpty(directory)) { log.warn("文件夹不存在 directory: {}", directory); return new ProcessResult(false); } File directoryFile = new File(directory); Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"txt", "csv"}, true); if (CollectionUtils.isEmpty(fileList)) { log.warn("文件不存在"); return new ProcessResult(false); } Map<String, List<File>> fileNameListMap = fileList.parallelStream().collect(Collectors.groupingBy(file -> { String fileName = file.getName(); String fileNameValue = fileName.split(preDay)[0] + preDay + "000000" + "~" + preDay + "235959" + ")"; return fileNameValue; })); Map<String, String> fileFailedMsgMap = Maps.newHashMap(); Set<String> exportSuccessSet = new HashSet<>(500); Set<String> exportFailedSet = Sets.newHashSet(); executorZipService.execute(() -> { processTxt(directory, fileNameListMap, fileFailedMsgMap, exportSuccessSet, exportFailedSet); Collection<File> txtFileList = FileUtils.listFiles(directoryFile, new String[]{"txt"}, true); if (CollectionUtils.isEmpty(txtFileList)) { zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet); return; } if (times.get() < MAX_FAILED_TRY_TIME) { times.addAndGet(1); process(preDay, times); return; } for (File file : txtFileList) { String plateNo = file.getName().substring(0, file.getName().length() - 35); fileFailedMsgMap.put(plateNo, "txt文件解析失败"); exportFailedSet.add(plateNo); } zipFileList(preDay, directory, fileFailedMsgMap, exportSuccessSet, exportFailedSet); }); return new ProcessResult(true); } private void processTxt(String directory, Map<String, List<File>> fileNameListMap, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) { fileNameListMap.forEach((fileNameValue, files) -> { long beginTime = System.currentTimeMillis(); log.info("process file name: {}", fileNameValue); String plateNo = fileNameValue.substring(0, fileNameValue.length() - 31); try { List<File> txtFileList = Lists.newArrayListWithExpectedSize(files.size()); if (files.size() == 1) { File fileOne = files.get(0); if (fileOne.length() == 0) { fileOne.delete(); return; } if (fileOne.getName().endsWith(VehicleDataKafkaListener.CSV)) { return; } txtFileList.add(fileOne); } else { for (File file : files) { if (file.length() == 0) { file.delete(); continue; } if (file.getName().endsWith(VehicleDataKafkaListener.CSV)) { file.delete(); continue; } txtFileList.add(file); } } if (CollectionUtils.isEmpty(txtFileList)) { return; } organizeFile(txtFileList, directory, fileNameValue); exportSuccessSet.add(plateNo); } catch (Exception e) { log.warn("数据处理失败 fileNameValue: {}", fileNameValue, e); fileFailedMsgMap.put(fileNameValue, e.getMessage()); exportFailedSet.add(plateNo); } log.info("process file name: {} use time: {} ms", fileNameValue, System.currentTimeMillis() - beginTime); }); } /** * 整理数据,整理完成,删除数据 * * @param fileList * @param directory * @param fileNameValue */ private void organizeFile(List<File> fileList, String directory, String fileNameValue) { try { List<String> contentList = readLine(fileList); //使用到的版本,code,signalName映射 Map<String, Map<String, String>> versionIdCodeSignalNameMap = new HashMap<>(VERSION_SIZE); //多个版本合并后的信号大小,作为头部信息(需要固定列顺序,所以不用Set) List<String> headerList = new ArrayList<>(NUMBER_OF_COLUMNS); //存在值的列 Set<String> validHeaderSet = new HashSet<>(NUMBER_OF_COLUMNS); //需要补充单位信息 Map<String, String> signalNameUnitMap = new HashMap<>(NUMBER_OF_UNITS); //行数 Map<Long, Map<String, String>> timestampValuesMap = new HashMap<>(NUMBER_OF_ROWS); for (String content : contentList) { if (StringUtils.isEmpty(content)) { continue; } Map<String, String> codeValueMap = null; try { codeValueMap = JSONObject.parseObject(content, Map.class); } catch (Exception e) { log.warn("content parse error: {}", content); continue; } String versionId = codeValueMap.get(VehicleDataKafkaListener.VERSION_ID); if (StringUtils.isEmpty(versionId)) { continue; } String dateTime = codeValueMap.get(VehicleDataKafkaListener.DATE_TIME); if (StringUtils.isEmpty(dateTime)) { continue; } List<VersionCanSignalDto> canSignalByVersionIdList = versionCanSignalCache.get(versionId); if (canSignalByVersionIdList == null) { canSignalByVersionIdList = canProtocolService.findCanSignalByVersionIdList(Lists.newArrayList(versionId)); if (CollectionUtils.isEmpty(canSignalByVersionIdList)) { versionCanSignalCache.put(versionId, Collections.EMPTY_LIST); continue; } versionCanSignalCache.put(versionId, canSignalByVersionIdList); } else if (CollectionUtils.isEmpty(canSignalByVersionIdList)) { continue; } Map<String, String> codeSignalNameMap = null; if (!versionIdCodeSignalNameMap.containsKey(versionId)) { codeSignalNameMap = new HashMap<>(NUMBER_OF_COLUMNS); versionIdCodeSignalNameMap.put(versionId, codeSignalNameMap); for (VersionCanSignalDto versionCanSignalDto : canSignalByVersionIdList) { if (headerList.contains(versionCanSignalDto.getSignalName())) { continue; } headerList.add(versionCanSignalDto.getSignalName()); codeSignalNameMap.put(versionCanSignalDto.getCode(), versionCanSignalDto.getSignalName()); if (StringUtils.isEmpty(versionCanSignalDto.getUnit())) { continue; } signalNameUnitMap.put(versionCanSignalDto.getSignalName(), versionCanSignalDto.getUnit()); } } else { codeSignalNameMap = versionIdCodeSignalNameMap.get(versionId); } Long timestamp = Long.valueOf(dateTime); Map<String, String> signalNameValueMap = timestampValuesMap.get(timestamp); if (signalNameValueMap == null) { signalNameValueMap = new HashMap<>(NUMBER_OF_COLUMNS); timestampValuesMap.put(timestamp, signalNameValueMap); } for (Map.Entry<String, String> codeValueEntry : codeSignalNameMap.entrySet()) { String code = codeValueEntry.getKey(); String value = codeValueMap.get(code); if (StringUtils.isEmpty(value)) { continue; } String signalName = codeValueEntry.getValue(); validHeaderSet.add(signalName); signalNameValueMap.put(signalName, value); } } //如果不存在有值的信号信息,直接返回 if (CollectionUtils.isEmpty(validHeaderSet)) { for (File file : fileList) { file.delete(); } return; } writeCsv(directory, fileNameValue, headerList, validHeaderSet, timestampValuesMap, signalNameUnitMap); //删除TXT文件 for (File file : fileList) { file.delete(); } } catch (Exception e) { log.warn("整理文件错误 filePath: {}", fileList.get(0).getAbsolutePath(), e); } } private List<String> readLine(List<File> fileList) throws IOException { List<String> contentList = new ArrayList<>(60000); for (File file : fileList) { List<String> readList = FileUtils.readLines(file, StandardCharsets.UTF_8); if (CollectionUtils.isEmpty(readList)) { continue; } contentList.addAll(readList); } return contentList; } /** * 数据写入CSV文件 * * @param directory * @param fileNameValue * @param headerList * @param validHeaderSet * @param timestampValuesMap * @param signalNameUnitMap */ private void writeCsv(String directory, String fileNameValue, List<String> headerList, Set<String> validHeaderSet, Map<Long, Map<String, String>> timestampValuesMap, Map<String, String> signalNameUnitMap) { headerList.removeIf(header -> !validHeaderSet.contains(header)); Map<Long, Map<String, String>> sortMap = Maps.newLinkedHashMapWithExpectedSize(timestampValuesMap.size()); //根据Key升序排序Map timestampValuesMap.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEachOrdered(x -> sortMap.put(x.getKey(), x.getValue())); String[] headers = new String[headerList.size()]; //头部字段需要加上单位信息[单位]; String[] headerUnits = new String[headerList.size() + 2]; headerUnits[0] = "time"; headerUnits[1] = "上报时间"; for (int i = 0, len = headerList.size(); i < len; i++) { headers[i] = headerList.get(i); String unit = signalNameUnitMap.get(headers[i]); if (StringUtils.isEmpty(unit)) { headerUnits[i + 2] = headers[i]; } else { headerUnits[i + 2] = headers[i] + "[" + unit + "]"; } } List<String[]> valueList = Lists.newArrayListWithExpectedSize(sortMap.size()); AtomicInteger atomicInteger = new AtomicInteger(-1); sortMap.forEach((timestamp, signalValueMap) -> { String[] values = new String[headerUnits.length]; boolean isValidFlag = false; for (int i = 0; i < headers.length; i++) { String value = signalValueMap.get(headers[i]); if (StringUtils.isNotEmpty(value)) { if (!isValidFlag) { isValidFlag = true; } values[i + 2] = value; } else { values[i + 2] = ""; } } //如果没有一个有效值,此行数据无需写入 if (!isValidFlag) { return; } values[0] = String.valueOf(atomicInteger.addAndGet(1)); values[1] = VehicleDataKafkaListener.formatDate(timestamp, VehicleDataKafkaListener.TIME_FORMAT_Y_M_D) + "\t"; valueList.add(values); }); csvHelper.writeToCsv(directory, fileNameValue + VehicleDataKafkaListener.CSV, headerUnits, valueList); } /** * 压缩文件,保存记录 * * @param preDay * @param directory * @param fileFailedMsgMap * @param exportSuccessSet * @param exportFailedSet * @return */ private ProcessResult zipFileList(String preDay, String directory, Map<String, String> fileFailedMsgMap, Set<String> exportSuccessSet, Set<String> exportFailedSet) { BatchExportInfoDto batchExportInfoDto = new BatchExportInfoDto(); batchExportInfoDto.setDataDate(preDay); if (!CollectionUtils.isEmpty(exportSuccessSet)) { batchExportInfoDto.setSuccessPlateNo(StringUtils.join(exportSuccessSet, ",")); } if (!CollectionUtils.isEmpty(exportFailedSet)) { batchExportInfoDto.setFailedPlateNo(StringUtils.join(exportFailedSet, ",")); } if (CollectionUtils.isEmpty(fileFailedMsgMap)) { File directoryFile = new File(directory); Collection<File> fileList = FileUtils.listFiles(directoryFile, new String[]{"csv"}, true); FileUtils.zipFiles(directoryFile.getParent() + "/" + preDay + ".zip", fileList); FileUtils.deleteDirectory(directory); batchExportInfoDto.setFileName(preDay + ".zip"); batchExportInfoDto.setUrl(ossHelper.getHttpOssPrefix(VehicleDataKafkaListener.OSS_BUCKET_NAME, VehicleDataKafkaListener.DAY_EXPORT_DIR) + batchExportInfoDto.getFileName()); batchExportInfoDto.setStatus("1"); batchExportInfoService.saveOrUpdate(batchExportInfoDto); return new ProcessResult(true); } fileFailedMsgMap.forEach((fileName, errorMsg) -> { log.warn("处理失败文件信息 fileName: {} errorMsg: {}", fileName, errorMsg); }); batchExportInfoDto.setStatus("0"); batchExportInfoService.saveOrUpdate(batchExportInfoDto); return new ProcessResult(true); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
  • 相关阅读:
    Linux 5.20 可能将版本号升级为 Linux 6.0
    【面试普通人VS高手系列】CPU飙高系统反应慢怎么排查?
    常用类面试题总结(一)
    【PG】PostgreSQL13主从流复制部署(详细可用)
    【Linux】chown命令使用
    不动产数据质量提升_电子档案挂接
    C#条件运算符
    指纹浏览器有什么用?盘点指纹浏览器八大应用场景
    面向嵌入式系统的轻量级框架分析
    一款GUI跨平台自动化测试工具分享——Squish,支持Qt框架
  • 原文地址:https://blog.csdn.net/xionglangs/article/details/126616967