• Springboot整合HBase


    Springboot整合HBase数据库

    1、添加依赖
    
    <dependency>
        <groupId>com.spring4allgroupId>
        <artifactId>spring-boot-starter-hbaseartifactId>
    dependency>
    <dependency>
        <groupId>org.springframework.datagroupId>
        <artifactId>spring-data-hadoop-hbaseartifactId>
        <version>2.5.0.RELEASEversion>
    dependency>
    <dependency>
        <groupId>org.springframework.datagroupId>
        <artifactId>spring-data-hadoopartifactId>
        <version>2.5.0.RELEASEversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    2、添加配置
    通过Yaml方式配置
    spring:
      hbase:
         zookeeper:
          quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org
          property:
             clientPort: 2181
      data:
        hbase:
          quorum: XXX
          rootDir: XXX
          nodeParent: XXX
    
    zookeeper:
      znode:
        parent: /hbase
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    3、添加配置类
    @Configuration
    public class HBaseConfig {
        @Bean
        public HBaseService getHbaseService() {
            //设置临时的hadoop环境变量,之后程序会去这个目录下的\bin目录下找winutils.exe工具,windows连接hadoop时会用到
            //System.setProperty("hadoop.home.dir", "D:\\Program Files\\Hadoop");
            //执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
            org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
            return new HBaseService(conf);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    4、工具类的方式实现HBASE操作
    @Service
    public class HBaseService {
    
        private Admin admin = null;
        private Connection connection = null;
    
        public HBaseService(Configuration conf) {
            connection = ConnectionFactory.createConnection(conf);
                admin = connection.getAdmin();
        }
    
        //创建表 create , {NAME => , VERSIONS => }
        public boolean creatTable(String tableName, List<String> columnFamily) {
            //列族column family
            List<ColumnFamilyDescriptor> cfDesc = new ArrayList<>(columnFamily.size());
            columnFamily.forEach(cf -> {
                cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder(
                    Bytes.toBytes(cf)).build());
            });
            //表 table
            TableDescriptor tableDesc = TableDescriptorBuilder
                .newBuilder(TableName.valueOf(tableName))
                .setColumnFamilies(cfDesc).build();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                log.debug("table Exists!");
            } else {
                admin.createTable(tableDesc);
                log.debug("create table Success!");
            }
            close(admin, null, null);
            return true;
        }
    
        public List<String> getAllTableNames() {
            List<String> result = new ArrayList<>();
            TableName[] tableNames = admin.listTableNames();
            for (TableName tableName : tableNames) {
                result.add(tableName.getNameAsString());
            }
            close(admin, null, null);
            return result;
        }
    
        public Map<String, Map<String, String>> getResultScanner(String tableName) {
            Scan scan = new Scan();
            return this.queryData(tableName, scan);
        }
    
        private Map<String, Map<String, String>> queryData(String tableName, Scan scan) {
            // 
            Map<String, Map<String, String>> result = new HashMap<>();
            ResultScanner rs = null;
            //获取表
            Table table = null;
            table = getTable(tableName);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                // 每一行数据
                Map<String, String> columnMap = new HashMap<>();
                String rowKey = null;
                // 行键,列族和列限定符一起确定一个单元(Cell)
                for (Cell cell : r.listCells()) {
                    if (rowKey == null) {
                        rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
                    }
                    columnMap.put(
                        //列限定符
                        Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()),
                        //列族
                        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
                if (rowKey != null) {
                    result.put(rowKey, columnMap);
                }
            }
            close(null, rs, table);
    
            return result;
        }
    
        public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) {
            Table table = null;
            table = getTable(tableName);
            putData(table, rowKey, tableName, familyName, columns, values);
            close(null, null, table);
    
        }
    
        private void putData(Table table, String rowKey, String tableName, 
                             String familyName, String[] columns, String[] values) {
            //设置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));
            if (columns != null && values != null && columns.length == values.length) {
                for (int i = 0; i < columns.length; i++) {
                    if (columns[i] != null && values[i] != null) {
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    } else {
                        throw new NullPointerException(MessageFormat.format(
                            "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i]));
                    }
                }
            }
            table.put(put);
            log.debug("putData add or update data Success,rowKey:" + rowKey);
            table.close();
    
        }
        private Table getTable(String tableName) throws IOException {
            return connection.getTable(TableName.valueOf(tableName));
        }
    
        private void close(Admin admin, ResultScanner rs, Table table) {
            if (admin != null) {
                try {
                    admin.close();
                } catch (IOException e) {
                    log.error("关闭Admin失败", e);
                }
    
                if (rs != null) {
                    rs.close();
                }
    
                if (table != null) {
                    rs.close();
                }
    
                if (table != null) {
                    try {
                        table.close();
                    } catch (IOException e) {
                        log.error("关闭Table失败", e);
                    }
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    测试类
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    class HBaseApplicationTests {
        @Resource
        private HBaseService hbaseService;
        //测试创建表
        @Test
        public void testCreateTable() {
            hbaseService.creatTable("test_base", Arrays.asList("a", "back"));
        }
        //测试加入数据
        @Test
        public void testPutData() {
            hbaseService.putData("test_base", "000001", "a", new String[]{
                    "project_id", "varName", "coefs", "pvalues", "tvalues",
                    "create_time"}, new String[]{"40866", "mob_3", "0.9416",
                    "0.0000", "12.2293", "null"});
            hbaseService.putData("test_base", "000002", "a", new String[]{
                    "project_id", "varName", "coefs", "pvalues", "tvalues",
                    "create_time"}, new String[]{"40866", "idno_prov", "0.9317",
                    "0.0000", "9.8679", "null"});
            hbaseService.putData("test_base", "000003", "a", new String[]{
                    "project_id", "varName", "coefs", "pvalues", "tvalues",
                    "create_time"}, new String[]{"40866", "education", "0.8984",
                    "0.0000", "25.5649", "null"});
        }
        //测试遍历全表
        @Test
        public void testGetResultScanner() {
            Map<String, Map<String, String>> result2 = hbaseService.getResultScanner("test_base");
            System.out.println("-----遍历查询全表内容-----");
            result2.forEach((k, value) -> {
                System.out.println(k + "--->" + value);
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    三、使用spring-data-hadoop-hbase

    3、配置类
    @Configuration
    public class HBaseConfiguration {
     
        @Value("${hbase.zookeeper.quorum}")
        private String zookeeperQuorum;
     
        @Value("${hbase.zookeeper.property.clientPort}")
        private String clientPort;
     
        @Value("${zookeeper.znode.parent}")
        private String znodeParent;
     
        @Bean
        public HbaseTemplate hbaseTemplate() {
            org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
            conf.set("hbase.zookeeper.quorum", zookeeperQuorum);
            conf.set("hbase.zookeeper.property.clientPort", clientPort);
            conf.set("zookeeper.znode.parent", znodeParent);
            return new HbaseTemplate(conf);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    4、业务类中使用HbaseTemplate

    这个是作为工具类

    @Service
    @Slf4j
    public class HBaseService {
     
     
        @Autowired
        private HbaseTemplate hbaseTemplate;
     	
     	//查询列簇
        public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, 
                                               String stopRowkey, String column, String qualifier) {
            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            if (StringUtils.isNotBlank(column)) {
                log.debug("{}", column);
                filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL,
                           new BinaryComparator(Bytes.toBytes(column))));
            }
            if (StringUtils.isNotBlank(qualifier)) {
                log.debug("{}", qualifier);
                filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, 
                           new BinaryComparator(Bytes.toBytes(qualifier))));
            }
            Scan scan = new Scan();
            if (filterList.getFilters().size() > 0) {
                scan.setFilter(filterList);
            }
            scan.setStartRow(Bytes.toBytes(startRowkey));
            scan.setStopRow(Bytes.toBytes(stopRowkey));
     
            return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
        }
     
        public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, 
                                              String familyColumn, String column) {
            return rowKeys.stream().map(rk -> {
                if (StringUtils.isNotBlank(familyColumn)) {
                    if (StringUtils.isNotBlank(column)) {
                        return hbaseTemplate.get(tableName, rk, familyColumn, 
                                    column, (rowMapper, rowNum) -> rowMapper);
                    } else {
                        return hbaseTemplate.get(tableName, rk, familyColumn,
                                    (rowMapper, rowNum) -> rowMapper);
                    }
                }
                return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper);
            }).collect(Collectors.toList());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    四、使用spring-boot-starter-data-hbase

    参考:https://blog.csdn.net/cpongo1/article/details/89550486

    ## 下载spring-boot-starter-hbase代码
    git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git
    ## 安装
    cd spring-boot-starter-hbase
    mvn clean install
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2、添加配置项
    • spring.data.hbase.quorum 指定 HBase 的 zk 地址
    • spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存储的路径
    • spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
    3、定义好DTO
    @Data
    public class City {
        private Long id;
        private Integer age;
        private String cityName;  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    4、创建对应rowMapper
    public class CityRowMapper implements RowMapper<City> {
     
        private static byte[] COLUMN_FAMILY = "f".getBytes();
        private static byte[] NAME = "name".getBytes();
        private static byte[] AGE = "age".getBytes();
     
        @Override
        public City mapRow(Result result, int rowNum) throws Exception {
            String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME));
            int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE));
     
            City dto = new City();
            dto.setCityName(name);
            dto.setAge(age);
            return dto;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    5、操作实现增改查
    • HbaseTemplate.find 返回 HBase 映射的 City 列表
    • HbaseTemplate.get 返回 row 对应的 City 信息
    • HbaseTemplate.saveOrUpdates 保存或者更新
      如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能
    @Service
    public class CityServiceImpl implements CityService {
     
        @Autowired private HbaseTemplate hbaseTemplate;
     	//查询
        public List<City> query(String startRow, String stopRow) {
            Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
            scan.setCaching(5000);
            List<City> dtos = this.hbaseTemplate.find("people_table", scan, new CityRowMapper());
            return dtos;
        }
     	//查询
        public City query(String row) {
            City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper());
            return dto;
        }
     	//新增或者更新
        public void saveOrUpdate() {
            List<Mutation> saveOrUpdates = new ArrayList<Mutation>();
            Put            put           = new Put(Bytes.toBytes("135xxxxxx"));
            put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test"));
            saveOrUpdates.add(put);
            this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Springboot整合Influxdb

    中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html

    注意,项目建立在spring-boot-web基础上

    1、添加依赖
    <dependency>
        <groupId>org.influxdbgroupId>
        <artifactId>influxdb-javaartifactId>
        <version>2.15version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2、添加配置
    spring:
      influx:
        database: my_sensor1
        password: admin
        url: http://127.0.0.1:6086
        user: admin
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    3、编写配置类
    @Configuration
    public class InfluxdbConfig {
            
        @Value("${spring.influx.url}")
        private String influxDBUrl; 
    
        @Value("${spring.influx.user}")
        private String userName;    
    
        @Value("${spring.influx.password}")
        private String password;    
    
        @Value("${spring.influx.database}")
        private String database;    
    
        @Bean("influxDB")
        public InfluxDB influxdb(){     
            InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password);
            try {
                
                /** 
                 * 异步插入:
                 * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒    
                 * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒   
                 * 满足任何一个条件就会发送一次写的请求。
                 */
                influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS);
                
            } catch (Exception e) { 
                e.printStackTrace();
            } finally { 
                //设置默认策略
                influxDB.setRetentionPolicy("sensor_retention");    
            }
            //设置日志输出级别
            influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);  
            return influxDB;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    4、InfluxDB原生API实现
    @SpringBootTest(classes = {MainApplication.class})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class InfluxdbDBTest {
    
        @Autowired
        private InfluxDB influxDB;
        
        //measurement
        private final String measurement = "sensor";
        
        @Value("${spring.influx.database}")
        private String database;
        
        /**
         * 批量插入第一种方式
         */
        @Test
        public void insert(){
            List<String> lines = new ArrayList<String>();       
            Point point = null;     
            for(int i=0;i<50;i++){          
                point = Point.measurement(measurement)
                .tag("deviceId", "sensor" + i)
                .addField("temp", 3)
                .addField("voltage", 145+i)
                .addField("A1", "4i")
                .addField("A2", "4i").build();
                lines.add(point.lineProtocol());
            }
            //写入
            influxDB.write(lines);
        }
        
        /**
         * 批量插入第二种方式
         */
        @Test
        public void batchInsert(){
            BatchPoints batchPoints = BatchPoints
                    .database(database)
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();
          //遍历sqlserver获取数据
          for(int i=0;i<50;i++){
            //创建单条数据对象——表名
            Point point = Point.measurement(measurement)
              //tag属性——只能存储String类型
                    .tag("deviceId", "sensor" + i)
                    .addField("temp", 3)
                    .addField("voltage", 145+i)
                    .addField("A1", "4i")
                    .addField("A2", "4i").build();
            //将单条数据存储到集合中
            batchPoints.point(point);
          }
          //批量插入
          influxDB.write(batchPoints); 
        }
        
        /**
         * 获取数据
         */
        @Test
        public void datas(@RequestParam Integer page){
            int pageSize = 10;
            // InfluxDB支持分页查询,因此可以设置分页查询条件
            String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
            
            String queryCondition = "";  //查询条件暂且为空
            // 此处查询所有内容,如果
            String queryCmd = "SELECT * FROM "
                // 查询指定设备下的日志信息
                // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                // + 策略name + "." + measurement
                + measurement
                // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                + queryCondition
                // 查询结果需要按照时间排序
                + " ORDER BY time DESC"
                // 添加分页查询条件
                + pageQuery;
            
            QueryResult queryResult = influxDB.query(new Query(queryCmd, database));
            System.out.println("query result => "+queryResult);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    5、采用封装工具类
    1、创建实体类
    @Data
    @Measurement(name = "sensor")
    public class Sensor {
    
        @Column(name="deviceId",tag=true)
        private String deviceId;
        
        @Column(name="temp")
        private float temp;
        
        @Column(name="voltage")
        private float voltage;
        
        @Column(name="A1")
        private float A1;
        
        @Column(name="A2")
        private float A2;
        
        @Column(name="time")
        private String time;    
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    2、创建工具类
    @Component
    public class InfluxdbUtils {
    
        @Autowired
        private InfluxDB influxDB;
        
        @Value("${spring.influx.database}")
        private String database;    
        
        /**
         * 新增单条记录,利用java的反射机制进行新增操作
         */
        @SneakyThrows
        public void insertOne(Object obj){
            //获取度量
            Class<?> clasz = obj.getClass();
            Measurement measurement = clasz.getAnnotation(Measurement.class);
            //构建
            Point.Builder builder = Point.measurement(measurement.name());
            // 获取对象属性
            Field[] fieldArray = clasz.getDeclaredFields();
            Column column = null;
            for(Field field : fieldArray){
                    column = field.getAnnotation(Column.class);
                    //设置属性可操作
                    field.setAccessible(true); 
                    if(column.tag()){
                        //tag属性只能存储String类型
                        builder.tag(column.name(), field.get(obj).toString());
                    }else{
                        //设置field
                        if(field.get(obj) != null){
                            builder.addField(column.name(), field.get(obj).toString());
                        }
                    }
            }
            influxDB.write(builder.build());
        }
        
        /**
         * 批量新增,方法一
         */
        @SneakyThrows
        public void insertBatchByRecords(List<?> records){
            List<String> lines = new ArrayList<String>();   
            records.forEach(record->{
                Class<?> clasz = record.getClass();
                //获取度量
                Measurement measurement = clasz.getAnnotation(Measurement.class);
                //构建
                Point.Builder builder = Point.measurement(measurement.name());
                Field[] fieldArray = clasz.getDeclaredFields();
                Column column = null;
                for(Field field : fieldArray){
                        column = field.getAnnotation(Column.class);
                        //设置属性可操作
                        field.setAccessible(true); 
                        if(column.tag()){
                            //tag属性只能存储String类型
                            builder.tag(column.name(), field.get(record).toString());
                        }else{
                            //设置field
                            if(field.get(record) != null){
                                builder.addField(column.name(), field.get(record).toString());
                            }
                        }
                }
                lines.add(builder.build().lineProtocol());
            });
            influxDB.write(lines);
        }
        
        /**
         * 批量新增,方法二
         */
        @SneakyThrows
        public void insertBatchByPoints(List<?> records){
            BatchPoints batchPoints = BatchPoints.database(database)
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();
            records.forEach(record->{
                Class<?> clasz = record.getClass();
                //获取度量
                Measurement measurement = clasz.getAnnotation(Measurement.class);
                //构建
                Point.Builder builder = Point.measurement(measurement.name());
                Field[] fieldArray = clasz.getDeclaredFields();
                Column column = null;
                for(Field field : fieldArray){
                        column = field.getAnnotation(Column.class);
                        //设置属性可操作
                        field.setAccessible(true); 
                        if(column.tag()){
                            //tag属性只能存储String类型
                            builder.tag(column.name(), field.get(record).toString());
                        }else{
                            //设置field
                            if(field.get(record) != null){
                                builder.addField(column.name(), field.get(record).toString());
                            }
                        }
                }
                batchPoints.point(builder.build());
            });
            influxDB.write(batchPoints);
        }
        
        /**
         * 查询,返回Map集合
         * @param query 完整的查询语句
         */
        public List<Object> fetchRecords(String query){
            List<Object> results = new ArrayList<Object>();
            QueryResult queryResult = influxDB.query(new Query(query, database));
            queryResult.getResults().forEach(result->{
                result.getSeries().forEach(serial->{
                    List<String> columns = serial.getColumns();
                    int fieldSize = columns.size();
                    serial.getValues().forEach(value->{     
                        Map<String,Object> obj = new HashMap<String,Object>();
                        for(int i=0;i<fieldSize;i++){   
                            obj.put(columns.get(i), value.get(i));
                        }
                        results.add(obj);
                    });
                });
            });
            return results;
        }
        
        /**
         * 查询,返回map集合
         * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
         * @param measurement 度量,不可为空;
         */
        public List<Object> fetchRecords(String fieldKeys, String measurement){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
            return this.fetchRecords(query.toString());
        }
        
        /**
         * 查询,返回map集合
         * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
         * @param measurement 度量,不可为空;
         */
        public List<Object> fetchRecords(String fieldKeys, String measurement, String order){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);
            query.append(" order by ").append(order);       
            return this.fetchRecords(query.toString());
        }
        
        /**
         * 查询,返回map集合
         * @param fieldKeys 查询的字段,不可为空;不可为单独的tag
         * @param measurement 度量,不可为空;
         */
        public List<Object> fetchRecords(String fieldKeys, String measurement, String order, String limit){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);
            query.append(" order by ").append(order);
            query.append(limit);
            return this.fetchRecords(query.toString());
        }
        
        /**
         * 查询,返回对象的list集合
         */
        @SneakyThrows
        public <T> List<T> fetchResults(String query, Class<?> clasz){
            List results = new ArrayList<>();
            QueryResult queryResult = influxDB.query(new Query(query, database));
            queryResult.getResults().forEach(result->{
                result.getSeries().forEach(serial->{
                    List<String> columns = serial.getColumns();
                    int fieldSize = columns.size();     
                    serial.getValues().forEach(value->{ 
                        Object obj = null;
                            obj = clasz.newInstance();
                            for(int i=0;i<fieldSize;i++){   
                                String fieldName = columns.get(i);
                                Field field = clasz.getDeclaredField(fieldName);
                                field.setAccessible(true);
                                Class<?> type = field.getType();
                                if(type == float.class){
                                    field.set(obj, Float.valueOf(value.get(i).toString()));
                                }else{
                                    field.set(obj, value.get(i));
                                }                           
                            }
                        results.add(obj);
                    });
                });
            });
            return results;
        }
        
        /**
         * 查询,返回对象的list集合
         */
        public <T> List<T> fetchResults(String fieldKeys, String measurement, Class<?> clasz){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);     
            return this.fetchResults(query.toString(), clasz);
        }
        
        /**
         * 查询,返回对象的list集合
         */
        public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, Class<?> clasz){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);
            query.append(" order by ").append(order);
            return this.fetchResults(query.toString(), clasz);
        }
        
        /**
         * 查询,返回对象的list集合
         */
        public <T> List<T> fetchResults(String fieldKeys, String measurement, String order, String limit, Class<?> clasz){
            StringBuilder query = new StringBuilder();
            query.append("select ").append(fieldKeys).append(" from ").append(measurement);
            query.append(" order by ").append(order);
            query.append(limit);        
            return this.fetchResults(query.toString(), clasz);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    3、使用工具类的测试代码
    @SpringBootTest(classes = {MainApplication.class})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class InfluxdbUtilTest {
    
        @Autowired
        private InfluxdbUtils influxdbUtils;
        
        /**
         * 插入单条记录
         */
        @Test
        public void insert(){
          Sensor sensor = new Sensor();
          sensor.setA1(10);
          sensor.setA2(10);
          sensor.setDeviceId("0002");
          sensor.setTemp(10L);
          sensor.setTime("2021-01-19");
          sensor.setVoltage(10);
          influxdbUtils.insertOne(sensor);
        }
        
        /**
         * 批量插入第一种方式
         */
        @GetMapping("/index22")
        public void batchInsert(){  
            List<Sensor> sensorList = new ArrayList<Sensor>();
            for(int i=0; i<50; i++){
                Sensor sensor = new Sensor();
                sensor.setA1(2);
                sensor.setA2(12);
                sensor.setTemp(9);
                sensor.setVoltage(12);
                sensor.setDeviceId("sensor4545-"+i);
                sensorList.add(sensor);
            }
            influxdbUtils.insertBatchByRecords(sensorList);
        }
        
        /**
         * 批量插入第二种方式
         */
        @GetMapping("/index23")
        public void batchInsert1(){ 
            List<Sensor> sensorList = new ArrayList<Sensor>();
            Sensor sensor = null;
            for(int i=0; i<50; i++){
                sensor = new Sensor();
                sensor.setA1(2);
                sensor.setA2(12);
                sensor.setTemp(9);
                sensor.setVoltage(12);
                sensor.setDeviceId("sensor4545-"+i);
                sensorList.add(sensor);
            }
            influxdbUtils.insertBatchByPoints(sensorList);
        }
            
        /**
         * 查询数据
         */
        @GetMapping("/datas2")
        public void datas(@RequestParam Integer page){
            int pageSize = 10;
            // InfluxDB支持分页查询,因此可以设置分页查询条件
            String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
            
            String queryCondition = "";  //查询条件暂且为空
            // 此处查询所有内容,如果
            String queryCmd = "SELECT * FROM sensor"
                // 查询指定设备下的日志信息
                // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                // + 策略name + "." + measurement
                // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                + queryCondition
                // 查询结果需要按照时间排序
                + " ORDER BY time DESC"
                // 添加分页查询条件
                + pageQuery;
            
            List<Object> sensorList = influxdbUtils.fetchRecords(queryCmd);
            System.out.println("query result => {}"+sensorList );
        }
        
        /**
         * 获取数据
         */
        @GetMapping("/datas21")
        public void datas1(@RequestParam Integer page){
            int pageSize = 10;
            // InfluxDB支持分页查询,因此可以设置分页查询条件
            String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize;
            
            String queryCondition = "";  //查询条件暂且为空
            // 此处查询所有内容,如果
            String queryCmd = "SELECT * FROM sensor"
                // 查询指定设备下的日志信息
                // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加;
                // + 策略name + "." + measurement
                // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度)
                + queryCondition
                // 查询结果需要按照时间排序
                + " ORDER BY time DESC"
                // 添加分页查询条件
                + pageQuery;
            List<Sensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class);
            //List sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class);
            sensorList.forEach(sensor->{
                System.out.println("query result => {}"+sensorList );
            });     
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    6、采用封装数据模型的方式
    1、在Influxdb库中创建存储策略
    CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
    
    • 1
    2、创建数据模型
    @Data
    @Measurement(name = "m_order_payment",
    		database = "db_order", 
    		retentionPolicy = "rp_order_payment")
    public class OrderPayment implements Serializable  {
    
        // 统计批次
        @Column(name = "batch_id", tag = true)
        private String batchId;
    
        // 哪个BU
        @Column(name = "bu_id", tag = true)
        private String buId;
    
        // BU 名称
        @Column(name = "bu_name")
        private String buName;
    
        // 总数
        @Column(name = "total_count", tag = true)
        private String totalCount;
    
        // 支付量
        @Column(name = "pay_count", tag = true)
        private String payCount;
    
        // 金额
        @Column(name = "total_money", tag = true)
        private String totalMoney;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    3、创建Mapper
    public class InfluxMapper extends InfluxDBMapper {
    
        public InfluxMapper(InfluxDB influxDB) {
            super(influxDB);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    4、配置Mapper
    @Log4j2
    @Configuration
    public class InfluxAutoConfiguration {
    
        @Bean
        public InfluxMapper influxMapper(InfluxDB influxDB) {
            InfluxMapper influxMapper = new InfluxMapper(influxDB);
            return influxMapper;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    5、测试CRUD
    @SpringBootTest(classes = {MainApplication.class})
    @RunWith(SpringJUnit4ClassRunner.class)
    public class InfluxdbMapperTest {
    
    
        @Autowired
        private InfluxMapper influxMapper;
    
    
        @Test
        public void save(OrderPayment product) {
            influxMapper.save(product);
        }
        @Test
        public void queryAll() {
            List<OrderPayment> products = influxMapper.query(OrderPayment.class);
            System.out.println(products);
        }
    
        @Test
        public void queryByBu(String bu) {
            String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu);
            Query query = new Query(sql, "db_order");
            List<OrderPayment> products = influxMapper.query(query, OrderPayment.class);
            System.out.println(products);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    参考:https://blog.csdn.net/cpongo1/article/details/89550486

    https://github.com/SpringForAll/spring-boot-starter-hbase

    https://github.com/JeffLi1993/springboot-learning-example

  • 相关阅读:
    (附源码)node.js宠物医生预约平台 毕业设计 030945
    3D视觉应用案例:引导板件定位抓取
    【医学影像】LIDC-IDRI数据集的无痛制作
    物联网浏览器(IoTBrowser)-简单介绍
    Mobpush厂商通道回执配置指南
    Oracle DBlink使用方法
    力扣第1047题 删除字符串中的所有相邻重复项 c++string stack巧解
    【Python】进阶(学习笔记)
    GC8837国产驱动芯片,可以替代TI的DRV8837C,具有 PWM(IN/IN)输入接口, 与行业标准器件兼容,并具有过温保护功能。
    大数据之Stream流
  • 原文地址:https://blog.csdn.net/QingChunBuSanChang/article/details/132596960