• ElasticSearch DSL与java API示例


    以elasticsearch-rest-client 6.8、spring-data-elasticsearch 3.2举例,展示各种复杂查询逻辑的表达方式

    一、按时间范围、关键字查询

    A && B格式的查询语句:

    {
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			}]
    		}
    	},
    	"sort": {
    		"@timestamp": {
    			"order": "desc"
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    把rangeQuery、termQuery添加到root层级的boolQuery的filter列表里即可实现:

    RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(TIMESTAMP);
    rangeQuery.gte(from);
    rangeQuery.lt(to);
    TermQueryBuilder termQuery = QueryBuilders.termQuery("span_type", "svc_link_rest_response")
     
    BoolQueryBuilder rootBoolQuery = QueryBuilders.boolQuery();
    rootBoolQuery.filter(rangeQuery);
    rootBoolQuery.filter(termQuery);
     
    // 开始执行查询
    NativeSearchQueryBuilder nsqb = new NativeSearchQueryBuilder().withIndices(aryIndex)
            .withIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN).withQuery(rootBoolQuery)
            .withSort(new FieldSortBuilder(TIMESTAMP).order(SortOrder.DESC))
            .withPageable(PageRequest.of(0, 10));
    SearchHits searchHits = esTemplate.query(nsqb.build(), 
            new ResultsExtractor<SearchHits>(){
    	        @Override
    	        public SearchHits extract(SearchResponse response) {
    	            return response.getHits();
    	        }
            });
    List<Map<String, Object>> result = new ArrayList<>();
    for (SearchHit searchHit : searchHits.getHits()) {
        result.add(searchHit.getSourceAsMap());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    二、构造“或”元组

    (一)(A && B) && (C || D || E)

    查询语句:

    {
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			}],
    			"should": [{
    				"wildcard": {
    					"url": {
    						"wildcard": "*CCC*"
    					}
    				}
    			},
    			{
    				"wildcard": {
    					"url": {
    						"wildcard": "*DDD*"
    					}
    				}
    			},
    			{
    				"wildcard": {
    					"url": {
    						"wildcard": "*EEE*"
    					}
    				}
    			}],
    			"minimum_should_match": "1"
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    把wildcardQuery添加到root层级的boolQuery的should列表里即可实现:

    rootBoolQuery.should(QueryBuilders.wildcardQuery("url", "*CCC*"));
    rootBoolQuery.should(QueryBuilders.wildcardQuery("url", "*DDD*"));
    rootBoolQuery.should(QueryBuilders.wildcardQuery("url", "*EEE*"));
    rootBoolQuery.minimumShouldMatch(1);
    
    • 1
    • 2
    • 3
    • 4

    (二)A && B && (C || D) && (E || F)

    查询语句:

    {
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			},
    			{
    				"bool": {
    					"should": [{
    						"wildcard": {
    							"url": {
    								"wildcard": "*CCC*"
    							}
    						}
    					},
    					{
    						"wildcard": {
    							"url": {
    								"wildcard": "*DDD*"
    							}
    						}
    					}],
    					"minimum_should_match": "1"
    				}
    			},
    			{
    				"bool": {
    					"should": [{
    						"wildcard": {
    							"status_code": {
    								"wildcard": "40*"
    							}
    						}
    					},
    					{
    						"wildcard": {
    							"status_code": {
    								"wildcard": "50*"
    							}
    						}
    					}],
    					"minimum_should_match": "1"
    				}
    			}]
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    1、构建两个次级boolQuery,分别构造为一个“或”元组
    2、把次级boolQuery添加到root层级的boolQuery的filter列表里

    BoolQueryBuilder secondBoolQuery1 = QueryBuilders.boolQuery();
    secondBoolQuery1.should(QueryBuilders.wildcardQuery("url", "*CCC*"));
    secondBoolQuery1.should(QueryBuilders.wildcardQuery("url", "*DDD*"));
    secondBoolQuery1.minimumShouldMatch(1);
     
    BoolQueryBuilder secondBoolQuery2 = QueryBuilders.boolQuery();
    secondBoolQuery2.should(QueryBuilders.wildcardQuery("status_code", "40*"));
    secondBoolQuery2.should(QueryBuilders.wildcardQuery("status_code", "50*"));
    secondBoolQuery2.minimumShouldMatch(1);
     
    rootBoolQuery.filter(secondBoolQuery1);
    rootBoolQuery.filter(secondBoolQuery2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    (三)A && B && (C || D) && !(E || F)

    查询语句:

    {
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			},
    			{
    				"bool": {
    					"should": [{
    						"wildcard": {
    							"url": {
    								"wildcard": "*CCC*"
    							}
    						}
    					},
    					{
    						"wildcard": {
    							"url": {
    								"wildcard": "*DDD*"
    							}
    						}
    					}],
    					"minimum_should_match": "1"
    				}
    			},
    			{
    				"bool": {
    					"must_not": [{
    						"wildcard": {
    							"status_code": {
    								"wildcard": "20*"
    							}
    						}
    					},
    					{
    						"wildcard": {
    							"status_code": {
    								"wildcard": "30*"
    							}
    						}
    					}]
    				}
    			}]
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    1、构建一个次级boolQuery,构造为一个需包含的“或”元组
    2、构建另一个次级boolQuery,构造为一个需排除的“或”元组
    3、把次级boolQuery添加到root层级的boolQuery的filter列表里

    BoolQueryBuilder secondBoolQuery1 = QueryBuilders.boolQuery();
    secondBoolQuery1.should(QueryBuilders.wildcardQuery("url", "*CCC*"));
    secondBoolQuery1.should(QueryBuilders.wildcardQuery("url", "*DDD*"));
    secondBoolQuery1.minimumShouldMatch(1);
     
    BoolQueryBuilder secondBoolQuery2 = QueryBuilders.boolQuery();
    secondBoolQuery2.mustNot(QueryBuilders.wildcardQuery("status_code", "20*"));
    secondBoolQuery2.mustNot(QueryBuilders.wildcardQuery("status_code", "30*"));
     
    rootBoolQuery.filter(secondBoolQuery1);
    rootBoolQuery.filter(secondBoolQuery2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    PS:其实这个例子,用一个boolFilter的filter+should+mustNot就能实现,不需要定义次级boolFilter

    三、全文检索

    A && B && C格式的查询语句,其中A、B是精确匹配,C是全文检索:

    {
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			},
    			{
    				"simple_query_string": {
    					"query": "-\"排除的词组\" +\"必须包含的词组\"",
    					"flags": -1,
    					"default_operator": "or",
    					"analyze_wildcard": false,
    					"minimum_should_match": "-30%",
    					"auto_generate_synonyms_phrase_query": true,
    					"fuzzy_prefix_length": 0,
    					"fuzzy_max_expansions": 50,
    					"fuzzy_transpositions": true
    				}
    			}]
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    把simpleQueryStringBuilder添加到root层级的boolQuery的filter列表里即可实现:

    SimpleQueryStringBuilder simpleQueryStringBuilder = QueryBuilders.simpleQueryStringQuery("-\"排除的词组\" +\"必须包含的词组\"");
    simpleQueryStringBuilder.minimumShouldMatch("-30%");
     
    rootBoolQuery.filter(simpleQueryStringBuilder);
    
    • 1
    • 2
    • 3
    • 4

    simpleQueryStringBuilder直接对文档对象所有text字段合集进行全文检索,如果需要对指定text字段进行检索,可使用matchQuery。

    四、分组统计

    (一)按时间段聚合

    查询一小时的数据,把每一分钟的数据汇聚到一个bucket

    {
    	"size": 0,
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1685583000000,
    						"to": 1685586600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			}]
    		}
    	},
    	"aggregations": {
            "date_histogram": {
                // 为了让查询结果里包含所有空的bucket,需要设置extended_bounds
                "extended_bounds": {
                    "min": 1685583000000,
                    // 最后一个bucket的起始值,1685586600000 - 60*1000
                    "max": 1685586540000
                },
                "field": "@timestamp",
                "fixed_interval": "1m",
                "min_doc_count": 0
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    注意精确裁剪查询范围,并控制返回的bucket数量

    // 裁剪时间范围
    long lowerBound = start - start % 60000;
    long upperBound = end - end % 60000;
    BoolQueryBuilder rootBoolQuery = QueryBuilders.boolQuery();
    rootBoolQuery.filter(QueryBuilders.termQuery("span_type", "svc_link_rest_response"));
    rootBoolQuery.filter(QueryBuilders.rangeQuery(TIMESTAMP).gte(lowerBound).lt(upperBound));
    log.info("query: " + rootBoolQuery.toString());
    NativeSearchQueryBuilder nsqb = new NativeSearchQueryBuilder().withMaxResults(0).withQuery(rootBoolQuery);
    // 按每分钟汇总统计
    DateHistogramAggregationBuilder agg = AggregationBuilders.dateHistogram("histogram").field(TIMESTAMP)
            .fixedInterval(DateHistogramInterval.minutes(1)).minDocCount(0)
            // // 精确控制返回的bucket数量
            .extendedBounds(new LongBounds(lowerBound, upperBound - 60000));
    log.info("agg: " + agg.toString());
    // 执行查询
    nsqb.addAggregation(agg);
    SearchHits<?> searchHits = esTemplate.search(nsqb.build(), Void.class, IndexCoordinates.of(aryIndex));
    // 获取统计结果
    Histogram histogram = searchHits.getAggregations().get("histogram");
    Map<Long, Long> result = new HashMap<>();
    int i = 0;
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    for (Histogram.Bucket entry : histogram.getBuckets()) {
        i++;
        ZonedDateTime time = ((ZonedDateTime) entry.getKey()).withZoneSameInstant(ZoneId.of("GMT+8"));
        result.put(time.toInstant().toEpochMilli(), entry.getDocCount());
        log.info("{} - {}: {}", i, formatter.format(time), entry.getDocCount());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    spring-data-elasticsearch 5、elasticsearch-java 8的写法如下:

    // 裁剪时间范围
    long lowerBound = start - start % 60000;
    long upperBound = end - end % 60000;
    Query filterApp = Query
            .of((Query.Builder b) -> b.term((TermQuery.Builder tqb) -> tqb.field("span_type").value("svc_link_rest_response")));
    Query filterTime = Query.of(b -> b
            .range((RangeQuery.Builder rqb) -> rqb.field(TIMESTAMP).gte(JsonData.of(lowerBound )).lt(JsonData.of(upperBound))));
    Query rootBoolQuery = Query
            .of(b -> b.bool((BoolQuery.Builder bqb) -> bqb.filter(filterApp, filterTime)));
    log.info("query: " + rootBoolQuery.toString());
    NativeSearchQueryBuilder nsqb = new NativeSearchQueryBuilder().withMaxResults(0).withQuery(rootBoolQuery);
    // 按每分钟聚合
    Aggregation histogramAgg = Aggregation
            .of(b -> b.dateHistogram(dhab -> dhab.field(TIMESTAMP).fixedInterval(tb -> tb.time("1m")).minDocCount(0)
                    .extendedBounds(ebb -> ebb.min(FieldDateMath.of(fdmb -> fdmb.value((double) lowerBound)))
                            .max(FieldDateMath.of(fdmb -> fdmb.value((double) upperBound - 60000))))));
    
    log.info("agg: " + histogramAgg.toString());
    // 执行查询
    nsqb.withAggregation("histogram", histogramAgg);
    log.info("index: {}", Arrays.toString(aryIndex));
    SearchHits<?> searchHits = esTemplate.search(nsqb.build(), Void.class, IndexCoordinates.of(aryIndex));
    // 获取统计结果
    ElasticsearchAggregations aggs = (ElasticsearchAggregations) searchHits.getAggregations();
    Buckets<DateHistogramBucket> buckets = aggs.get("histogram").aggregation().getAggregate().dateHistogram()
            .buckets();
    Map<Long, Long> result = new HashMap<>();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    int i = 0;
    for (DateHistogramBucket bucket : buckets.array()) {
        i++;
        result.put(bucket.key(), bucket.docCount());
        log.info("{} - {} - {}: {}", i, bucket.key(), sdf.format(new Date(bucket.key())), bucket.docCount());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    (二)多级聚合

    DSL:

    {
    	"size": 0,
    	"query": {
    		"bool": {
    			"filter": [{
    				"range": {
    					"@timestamp": {
    						"from": 1661875200000,
    						"to": 1661961600000,
    						"include_lower": true,
    						"include_upper": false
    					}
    				}
    			},
    			{
    				"term": {
    					"span_type": {
    						"value": "svc_link_rest_response"
    					}
    				}
    			}]
    		}
    	},
    	"aggregations": {
    	    // 一条查询语句可以同时进行多维统计,按url统计的结果就放在名为“urls”的结果集里
    		"urls": {
    		    // terms方法把过滤后的数据,按照url字段进行分组(bucket)
    		    // 并且在每个url bucket的_count字段存储了bucket内的记录条数,在key字段存储了url的值
    			"terms": {
    				"field": "url",
    				"size": 10000
    			},
    			"aggregations": {
    			    // avg方法不继续生成子bucket,只统计当前url bucket内的平均执行时长,结果放在span_avg字段
    				"span_avg": {
    					"avg": {
    						"field": "span"
    					}
    				},
    				// bucket_selector方法舍弃调用记录不足100的url bucket
    				"select_doc_count": {
    					"bucket_selector": {
    						"buckets_path": {
    							"the_doc_count": "_count"
    						},
    						"script": {
    							"source": "params.the_doc_count >= 100"
    						}
    					}
    				},
    				// bucket_sort方法按span_avg字段的值对url bucket进行倒排
    				"sort_span_avg": {
    					"bucket_sort": {
    						"sort": [{
    							"span_avg": {
    								"order": "desc"
    							}
    						}],
    						"size": 100
    					}
    				}
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    查询时,会同时返回hits和aggregations,由于我们只要统计结果,设置withMaxResults为0,不返回rootBoolQuery的查询结果:

    NativeSearchQueryBuilder nsqb = new NativeSearchQueryBuilder().withMaxResults(0).withQuery(rootBoolQuery);
    
    // 第一级聚合,按url维度分组(生成bucket)
    TermsAggregationBuilder urlsAgg = AggregationBuilders.terms("urls").field(URL).size(10000);
    // 第二级聚合,对url bucket进一步处理
    // // 统计各个url bucket内的平均执行时长:span_avg
    AvgAggregationBuilder avgAgg = AggregationBuilders.avg("span_avg").field("span");
    urlsAgg.subAggregation(avgAgg);
    // // 舍弃包含的记录数不足100的url bucket
    Map<String, String> bucketsPathsMap = new HashMap<>();
    bucketsPathsMap.put("the_doc_count", "_count");
    Script script = new Script("params.the_doc_count >= 100");
    BucketSelectorPipelineAggregationBuilder bucketSelector = PipelineAggregatorBuilders
        .bucketSelector("select_doc_count", bucketsPathsMap, script);
    urlsAgg.subAggregation(bucketSelector);
    // // 按span_avg的值对url bucket进行倒排
    FieldSortBuilder sort = SortBuilders.fieldSort("span_avg").order(SortOrder.DESC);
    BucketSortPipelineAggregationBuilder bucketSort = PipelineAggregatorBuilders
        .bucketSort("sort_span_avg", Arrays.asList(sort)).size(100);
    urlsAgg.subAggregation(bucketSort);
    
    // 执行查询统计
    log.debug("aggs: " + urlsAgg.toString());
    nsqb.addAggregation(urlsAgg);
    Aggregations aggregations = esTemplate.query(nsqb.build(), 
            new ResultsExtractor<Aggregations>(){
    	        @Override
    	        public Aggregations extract(SearchResponse response) {
    	            return response.getAggregations();
    	        }
            });
    // 获取统计结果
    Terms terms = aggregations.get("urls");
    Map<String, Double> result = new HashMap<>();
    for (Terms.Bucket entry : terms.getBuckets()) {
        String url = entry.getKeyAsString();
        Avg avg = entry.getAggregations().get("span_avg");
        result.put(url, avg.getValue());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    spring-data-elasticsearch 5、elasticsearch-java 8的写法如下:

    NativeQueryBuilder nsqb = new NativeQueryBuilder().withMaxResults(0).withQuery(rootBoolQuery);
    // 第二级聚合,对url bucket进一步处理
    // // 统计各个url bucket内的平均执行时长:span_avg
    Aggregation avgAgg = Aggregation.of(b -> b.avg(ab -> ab.field(SPAN)));
    // // 舍弃包含的记录数不足100的url bucket
    Map<String, String> bucketsPathsMap = new HashMap<>(2);
    bucketsPathsMap.put("the_doc_count", "_count");
    Aggregation bucketSelector = Aggregation
            .of(b -> b.bucketSelector(bsb -> bsb.bucketsPath(bpb -> bpb.dict(bucketsPathsMap))
                    .script(sb -> sb.inline(isb -> isb.lang("painless").source("params.the_doc_count >= 100")))));
    // // 按span_avg的值对url bucket进行倒排
    Aggregation bucketSort = Aggregation
            .of((Aggregation.Builder b) -> b.bucketSort((BucketSortAggregation.Builder bsb) -> bsb
                    .sort((SortOptions.Builder sob) -> sob
                            .field((FieldSort.Builder fsb) -> fsb.field("span_avg").order(SortOrder.Desc)))
                    .size(100)));
    // 第一级聚合,按url汇总统计
    Aggregation urlsAgg = Aggregation.of(b -> b.terms(tqb -> tqb.field(URL).size(10000))
            // // 把二级聚合的配置引入
            .aggregations("span_avg", avgAgg)
            .aggregations("select_doc_count", bucketSelector)
            .aggregations("sort_span_avg", bucketSort));
    log.info("agg: " + urlsAgg.toString());
    // 执行查询
    nsqb.withAggregation("urls", urlsAgg);
    log.info("index: {}", Arrays.toString(aryIndex));
    SearchHits<?> searchHits = esTemplate.search(nsqb.build(), Void.class, IndexCoordinates.of(aryIndex));
    // 获取统计结果
    ElasticsearchAggregations aggs = (ElasticsearchAggregations) searchHits.getAggregations();
    Buckets<StringTermsBucket> buckets = aggs.get("urls").aggregation().getAggregate().sterms().buckets();
    Map<String, Double> result = new HashMap<>();
    for (StringTermsBucket bucket : buckets.array()) {
        String url = bucket.key().stringValue();
        double avgSpan = bucket.aggregations().get("span_avg").avg().value();
        result.put(url, avgSpan);
        log.info("{}, avg:{}, count:{}", url, avgSpan, bucket.docCount());
    }
    return result;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • 相关阅读:
    【Matplotlib绘制图像大全】(十七):散点图
    python 逻辑控制语句、循环语句
    【学习笔记】【DOA子空间算法】7 MUSIC-like 算法
    量化程序化交易如何去使用以及执行?
    强化学习问题(六)--- 无法安装gym 0.21.0
    vite和webpack的区别
    Unity PackageManager 在离线环境下使用
    如何使用Semaphore和CompletableFuture搭配实现控制线程并发数量并等待所有线程执行完成之后在执行其它操作
    《三国志》游戏的数据表设计与优化
    Linux中Locate命令查找不全
  • 原文地址:https://blog.csdn.net/wzp1986/article/details/126638252