微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下

文章目录

  • 一、数据聚合
    • 1.1 聚合种类
    • 1.2 DSL实现聚合
    • 1.3 RestAPI实现聚合
    • 1.4 演示:多条件聚合
    • 二、自动补全
      • 2.1 拼音分词器
      • 2.2 自定义分词器
      • 2.3 DSL自动补全查询
      • 2.5 实现酒店搜索框自动补全
        • 2.5.1 修改酒店索引库数据结构
        • 2.5.2 RestAPI实现自动补全查询
        • 2.5.3 实战
        • 三、数据同步
          • 3.1 实现数据同步的方法
          • 3.2 使用消息队列MQ实现数据同步
            • 3.2.1 导入hotel-admin
            • 3.2.2 声明交换机、队列、routingkey
            • 四、集群
              • 4.1 搭建ES集群
              • 4.2 集群职责和脑裂问题
              • 4.3 集群故障转移
              • 4.4 集群分布式存储与查询

                一、数据聚合

                1.1 聚合种类

                聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

                1. 桶(Bucket)聚合:用来对文档做分组

                  TermAggregation:按照文档字段值分组

                  Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组

                2. 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等

                  Avg:求平均值

                  Max:求最大值

                  Min:求最小值

                  Stats:同时求max、min、avg、sum等

                3. 管道(pipeline)聚合:其它聚合的结果为基础做聚合

                注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。

                1.2 DSL实现聚合

                # 使用DSL实现聚合
                # 1.bucket桶聚合 + 限定聚合范围
                # 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌
                GET /hotel/_search
                { "query": { "range": { "price": { "lte": 200
                      }
                    }
                  },
                  "size": 0, //设置size为0,结果中不包含文档,只包含聚合结果
                  "aggs": { // 定义聚合
                    "brandAgg": { // 定义聚合名
                      "terms": { // 聚合类型,按照品牌名聚合,所以选择term
                        "field": "brand", // 参与聚合字段
                        "order": { "_count": "asc"  //指定排序规则 升序
                        }, 
                        "size": 20 //希望获得聚合结果数
                      }
                    }
                  }
                }
                # 2.Metrics聚合
                # 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序)
                GET /hotel/_search
                { "size": 0,
                  "aggs": { "brandAgg": { "terms": { "field": "brand",
                        "size": 20,
                        "order": { "score_stats.avg": "desc"
                        }
                      },
                      "aggs": { //子聚合
                        "score_stats": { //子聚合名
                          "stats": { //聚合类型,stats可以计算min、max、avg等
                            "field": "score"  //聚合字段
                          }
                        }
                      }
                    }
                  }
                }
                

                1.3 RestAPI实现聚合

                 /**
                     * 桶bucket聚合
                     */
                    @Test
                    void testAgg() throws IOException { // 1.准备请求
                        SearchRequest request = new SearchRequest("hotel");
                        // 2.请求参数
                        // 2.1.size
                        request.source().size(0);
                        // 2.2.聚合
                        request.source().aggregation(
                                AggregationBuilders.terms("brandAgg").field("brand").size(20));
                        // 3.发出请求
                        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
                        
                        // 4.解析结果
                        Aggregations aggregations = response.getAggregations();
                        // 4.1.根据聚合名称,获取聚合结果
                        Terms brandAgg = aggregations.get("brandAgg");
                        // 4.2.获取buckets
                        List buckets = brandAgg.getBuckets();
                        // 4.3.遍历
                        for (Terms.Bucket bucket : buckets) { String brandName = bucket.getKeyAsString();
                            System.out.println("brandName = " + brandName);
                            long docCount = bucket.getDocCount();
                            System.out.println("docCount = " + docCount);
                        }
                    }
                

                1.4 演示:多条件聚合

                @Slf4j
                @Service
                public class HotelService extends ServiceImpl implements IHotelService { @Autowired
                    private RestHighLevelClient restHighLevelClient;
                    @Override
                    public Map> filters() { try { // 1.准备请求
                            SearchRequest request = new SearchRequest("hotel");
                            // 2.请求参数
                            // 2.1.size
                            request.source().size(0);
                            // 2.2.聚合
                            buildAggregation(request);
                            // 3.发出请求
                            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                            // 4.解析结果
                            Map> result = new HashMap<>();
                            Aggregations aggregations = response.getAggregations();
                            // 4.1.根据品牌名称,获取聚合结果
                            List brandList = getAggByName(aggregations, "brandAgg");
                            // 放入map
                            result.put("品牌",brandList);
                            // 4.2.根据城市名称,获取聚合结果
                            List cityList = getAggByName(aggregations, "cityAgg");
                            // 放入map
                            result.put("城市",cityList);
                            // 4.3.根据星级名称,获取聚合结果
                            List starList = getAggByName(aggregations, "starAgg");
                            // 放入map
                            result.put("星级",starList);
                            return result;
                        } catch (IOException e) { throw new RuntimeException(e);
                        }
                    }
                    public List getAggByName(Aggregations aggregations, String aggName) { // 4.1.根据聚合名称,获取聚合结果
                        Terms brandAgg = aggregations.get(aggName);
                        // 4.2.获取buckets
                        List buckets = brandAgg.getBuckets();
                        // 4.3.遍历
                        List brandList = new ArrayList<>();
                        for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString();
                            brandList.add(key);
                        }
                        return brandList;
                    }
                    public void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders
                                .terms("brandAgg")
                                        .field("brand")
                                        .size(100));
                        request.source().aggregation(AggregationBuilders
                                .terms("cityAgg")
                                .field("city")
                                .size(100));
                        request.source().aggregation(AggregationBuilders
                                .terms("starAgg")
                                .field("starName")
                                .size(100));
                    }
                }
                

                测试

                @SpringBootTest
                public class HotelDemoApplicationTest { @Autowired
                    private IHotelService hotelService;
                    @Test
                    void contextLoads(){ Map> filters = hotelService.filters();
                        System.out.println(filters);
                    }
                }
                

                结果:

                二、自动补全

                自动补全如下图所示:

                2.1 拼音分词器

                要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin

                安装方式与IK分词器一样,分三步:

                1. 解压
                2. 上传到虚拟机中,elasticsearch的plugin目录
                3. 重启elasticsearch
                4. 测试

                2.2 自定义分词器

                演示:

                # 自定义拼音分词器
                PUT /test
                { "settings": { "analysis": { "analyzer": { "my_analyzer": { "tokenizer": "ik_max_word",
                          "filter": "py"
                        }
                      },
                      "filter": { "py": { "type": "pinyin",
                          "keep_full_pinyin": false,
                          "keep_joined_full_pinyin": true,
                          "keep_original": true,
                          "limit_first_letter_length": 16,
                          "remove_duplicated_term": true,
                          "none_chinese_pinyin_tokenize": false
                        }
                      }
                    }
                  },
                  "mappings": { "properties": { "name": { "type": "text",
                        "analyzer": "my_analyzer",
                        "search_analyzer": "ik_smart"
                      }
                    }
                  }
                }
                POST /test/_doc/1
                { "id": 1,
                  "name": "狮子"
                }
                POST /test/_doc/2
                { "id": 2,
                  "name": "虱子"
                }
                GET /test/_search
                { "query": { "match": { "name": "掉入狮子笼咋办"
                    }
                  }
                }
                

                注意:拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可

                2.3 DSL自动补全查询

                查询语法如下

                // 自动补全查询
                POST /test/_search
                { "suggest": { "title_suggest": { // 自定义补全查询名称
                      "text": "s", // 关键字
                      "completion": { "field": "title", // 补全字段
                        "skip_duplicates": true, // 跳过重复的
                        "size": 10 // 获取前10条结果
                      }
                    }
                  }
                }
                

                演示:

                # 2.自动补全
                # 2.1 创建一个 自动补全的索引库 属性有title
                DELETE /test
                PUT test
                { "mappings": { "properties": { "title":{ "type": "completion"
                      }
                    }
                  }
                }
                # 2.2 插入示例数据
                POST test/_doc
                { "title": ["Sony", "WH-1000XM3"]
                }
                POST test/_doc
                { "title": ["SK-II", "PITERA"]
                }
                POST test/_doc
                { "title": ["Nintendo", "switch"]
                }
                # 2.3 自动补全查询
                # 例:输入一个关键字s,看自动补全的结果
                # 结果:"SK-II"、"Sony"和"switch"
                POST /test/_search
                { "suggest": { "title_suggest": { "text": "s", 
                      "completion": { "field": "title",
                        "skip_duplicates": true, 
                        "size": 10 
                      }
                    }
                  }
                }
                

                结果:

                2.5 实现酒店搜索框自动补全

                2.5.1 修改酒店索引库数据结构

                1.修改索引库结构

                # 酒店数据索引库
                GET /hotel/_mapping
                DELETE /hotel
                PUT /hotel
                { "settings": { "analysis": { "analyzer": { "text_anlyzer": { "tokenizer": "ik_max_word",
                          "filter": "py"
                        },
                        "completion_analyzer": { "tokenizer": "keyword",
                          "filter": "py"
                        }
                      },
                      "filter": { "py": { "type": "pinyin",
                          "keep_full_pinyin": false,
                          "keep_joined_full_pinyin": true,
                          "keep_original": true,
                          "limit_first_letter_length": 16,
                          "remove_duplicated_term": true,
                          "none_chinese_pinyin_tokenize": false
                        }
                      }
                    }
                  },
                  "mappings": { "properties": { "id":{ "type": "keyword"
                      },
                      "name":{ "type": "text",
                        "analyzer": "text_anlyzer",
                        "search_analyzer": "ik_smart",
                        "copy_to": "all"
                      },
                      "address":{ "type": "keyword",
                        "index": false
                      },
                      "price":{ "type": "integer"
                      },
                      "score":{ "type": "integer"
                      },
                      "brand":{ "type": "keyword",
                        "copy_to": "all"
                      },
                      "city":{ "type": "keyword"
                      },
                      "starName":{ "type": "keyword"
                      },
                      "business":{ "type": "keyword",
                        "copy_to": "all"
                      },
                      "location":{ "type": "geo_point"
                      },
                      "pic":{ "type": "keyword",
                        "index": false
                      },
                      "all":{ "type": "text",
                        "analyzer": "text_anlyzer",
                        "search_analyzer": "ik_smart"
                      },
                      "suggestion":{ "type": "completion",
                          "analyzer": "completion_analyzer"
                      }
                    }
                  }
                }
                # 自动补全查询
                GET /hotel/_search
                { "suggest": { "mySuggestion": { "text": "shang",
                      "completion": { "field": "suggestion",
                        "skip_duplicates": true, 
                        "size": 10 
                      }
                    }
                  }
                }
                

                2.修改HotelDoc

                @Data
                @NoArgsConstructor
                public class HotelDoc { private Long id;
                    private String name;
                    private String address;
                    private Integer price;
                    private Integer score;
                    private String brand;
                    private String city;
                    private String starName;
                    private String business;
                    private String location;
                    private String pic;
                    private Object distance; //新加加字段"距离":酒店距你选择位置的距离
                    private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记
                    private List suggestion;//新加该字段用于自动补全
                    public HotelDoc(Hotel hotel) { this.id = hotel.getId();
                        this.name = hotel.getName();
                        this.address = hotel.getAddress();
                        this.price = hotel.getPrice();
                        this.score = hotel.getScore();
                        this.brand = hotel.getBrand();
                        this.city = hotel.getCity();
                        this.starName = hotel.getStarName();
                        this.business = hotel.getBusiness();
                        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
                        this.pic = hotel.getPic();
                        // 自动补全字段的处理
                        this.suggestion = new ArrayList<>();
                        // 添加品牌、城市
                        this.suggestion.add(this.brand);
                        this.suggestion.add(this.city);
                        // 判断商圈是否包含/
                        if (this.business.contains("/")) { // business有多个值,需要切割
                            String[] arr = this.business.split("/");
                            // business的每个值都要加入到suggestion中
                            Collections.addAll(this.suggestion, arr);
                        }else{ this.suggestion.add(this.business);
                        }
                    }
                }
                

                3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果

                2.5.2 RestAPI实现自动补全查询

                 /**
                     * 自动补全查询
                     */
                    @Test
                    void testSuggest() throws IOException { // 1.准备请求
                        SearchRequest request = new SearchRequest("hotel");
                        // 2.请求参数
                        request.source().suggest(new SuggestBuilder().addSuggestion(
                                        "hotelSuggest",
                                        SuggestBuilders
                                                .completionSuggestion("suggestion")
                                                .size(10)
                                                .skipDuplicates(true)
                                                .prefix("s")
                                ));
                        // 3.发出请求
                        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
                        // 4.解析结果
                        Suggest suggest = response.getSuggest();
                        // 4.1.根据补全查询名称,获取补全结果
                        CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
                        // 4.2.获取options
                        for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果
                            String str = option.getText().toString();
                            System.out.println(str);
                        }
                    }
                

                2.5.3 实战

                Mapper层

                @RestController
                @RequestMapping("hotel")
                public class HotelController { @Autowired
                    private IHotelService hotelService;
                    @PostMapping("list")
                    public PageResult search(@RequestBody RequestParams params) { return hotelService.search(params);
                    }
                    @PostMapping("filters")
                    public Map> getFilters(@RequestBody RequestParams params) { return hotelService.filters(params);
                    }
                    @GetMapping("suggestion")
                    public List getSuggestion(@RequestParam("key") String key) { return hotelService.getSuggestion(key);
                    }
                }
                

                Service层

                @Slf4j
                @Service
                public class HotelService extends ServiceImpl implements IHotelService { @Autowired
                    private RestHighLevelClient restHighLevelClient;
                    /**
                     * 自动补全查询
                     */
                    @Override
                    public List getSuggestion(String key)  { try { // 1.准备请求
                            SearchRequest request = new SearchRequest("hotel");
                            // 2.请求参数
                            request.source().suggest(new SuggestBuilder().addSuggestion(
                                    "hotelSuggest",
                                    SuggestBuilders
                                            .completionSuggestion("suggestion")
                                            .size(10)
                                            .skipDuplicates(true)
                                            .prefix(key)
                            ));
                            // 3.发出请求
                            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                            // 4.解析结果
                            Suggest suggest = response.getSuggest();
                            // 4.1.根据补全查询名称,获取补全结果
                            CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
                            // 4.2.获取options
                            List result = new ArrayList<>();
                            for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果
                                String str = option.getText().toString();
                                result.add(str);
                            }
                            return result;
                        } catch (IOException e) { throw new RuntimeException(e);
                        }
                    }
                }
                

                结果演示

                三、数据同步

                3.1 实现数据同步的方法

                3.2 使用消息队列MQ实现数据同步

                3.2.1 导入hotel-admin

                3.2.2 声明交换机、队列、routingkey

                由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。

                一、对消费者hotel-demo的操作

                1. 引入amqp依赖和配置rabbitmq的yml文件
                  org.springframework.boot spring-boot-starter-amqp 
                server:
                  port: 8089
                spring:
                  datasource:
                    url: jdbc:mysql://mysql:3306/heima?useSSL=false
                    username: root
                    password: 123
                    driver-class-name: com.mysql.jdbc.Driver
                  rabbitmq:
                    host: 192.168.150.101
                    port: 5672
                    username: itcast
                    password: 123321
                    virtual-host: /
                logging:
                  level:
                    cn.itcast: debug
                  pattern:
                    dateformat: HH:mm:ss:SSS
                mybatis-plus:
                  configuration:
                    map-underscore-to-camel-case: true
                  type-aliases-package: cn.itcast.hotel.pojo
                
                1. 定义mq的一些常量
                public class HotelMqConstants { // 交换机名称
                    public static final String EXCHANGE_NAME = "hotel.topic";
                    // 新增修改队列
                    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
                    // 删除队列
                    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
                    // 新增修改的RoutingKey
                    public static final String INSERT_KEY = "hotel.insert";
                    // 删除的RoutingKey
                    public static final String DELETE_KEY = "hotel.delete";
                }
                
                1. 声明交换机和队列,并监听MQ消息【注解方式】
                @Component
                public class HotelListener { @Autowired
                    private IHotelService hotelService;
                    @RabbitListener(bindings = @QueueBinding(
                            value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),
                            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
                            key = HotelMqConstants.INSERT_KEY
                    ))
                    public void listenHotelInsert(Long hotelId){ // 新增
                        hotelService.saveById(hotelId);
                    }
                    @RabbitListener(bindings = @QueueBinding(
                            value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),
                            exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),
                            key = HotelMqConstants.DELETE_KEY
                    ))
                    public void listenHotelDelete(Long hotelId){ // 删除
                        hotelService.deleteById(hotelId);
                    }
                }
                

                【bean方式】

                @Configuration
                public class MqConfig { @Bean
                    public TopicExchange topicExchange(){ return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
                    }
                    @Bean
                    public Queue insertQueue(){ return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
                    }
                    @Bean
                    public Queue deleteQueue(){ return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
                    }
                    @Bean
                    public Binding insertQueueBinding(){ return BindingBuilder
                                .bind(insertQueue())
                                .to(topicExchange())
                                .with(HotelMqConstants.INSERT_KEY);
                    }
                    @Bean
                    public Binding deleteQueueBinding(){ return BindingBuilder
                                .bind(deleteQueue())
                                .to(topicExchange())
                                .with(HotelMqConstants.DELETE_KEY);
                    }
                }
                
                1. RestAPI实现删改
                @Slf4j
                @Service
                public class HotelService extends ServiceImpl implements IHotelService { @Autowired
                    private RestHighLevelClient restHighLevelClient;
                    /**
                     * 搜索框查询
                     */
                    @Override
                    public PageResult search(RequestParams params) { try { // 1.准备Request
                            SearchRequest request = new SearchRequest("hotel");
                            // 2.准备请求参数
                            // 2.1.多条件查询和过滤
                            buildBasicQuery(params, request);
                            // 2.2.分页
                            int page = params.getPage();
                            int size = params.getSize();
                            request.source().from((page - 1) * size).size(size);
                            /**
                             * 2.3.距离排序
                             */
                            String location = params.getLocation();
                            if (StringUtils.isNotBlank(location)) {// 不为空则查询
                                request.source().sort(SortBuilders
                                        .geoDistanceSort("location", new GeoPoint(location))
                                        .order(SortOrder.ASC)
                                        .unit(DistanceUnit.KILOMETERS)
                                );
                            }
                            // 3.发送请求
                            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                            // 4.解析响应
                            return handleResponse(response);
                        } catch (IOException e) { throw new RuntimeException("搜索数据失败", e);
                        }
                    }
                    /**
                     * 复合查询
                     */
                    private void buildBasicQuery(RequestParams params, SearchRequest request) { // 1.准备Boolean复合查询
                        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
                        /**
                         * 1.查询关键字
                         * must参与 算分
                         */
                        // 1.1.关键字搜索,match查询,放到must中
                        String key = params.getKey();
                        if (StringUtils.isNotBlank(key)) { // 不为空,根据关键字查询
                            boolQuery.must(QueryBuilders.matchQuery("all", key));
                        } else { // 为空,查询所有
                            boolQuery.must(QueryBuilders.matchAllQuery());
                        }
                        /**
                         * 2.条件过滤:多条件复合查询
                         * 根据 “品牌 城市 星级 价格范围” 过滤数据
                         * filter不参与 算分
                         */
                        // 1.2.品牌
                        String brand = params.getBrand();
                        if (StringUtils.isNotBlank(brand)) { // 不为空则查询
                            boolQuery.filter(QueryBuilders.termQuery("brand", brand));
                        }
                        // 1.3.城市
                        String city = params.getCity();
                        if (StringUtils.isNotBlank(city)) {// 不为空则查询
                            boolQuery.filter(QueryBuilders.termQuery("city", city));
                        }
                        // 1.4.星级
                        String starName = params.getStarName();
                        if (StringUtils.isNotBlank(starName)) {// 不为空则查询
                            boolQuery.filter(QueryBuilders.termQuery("starName", starName));
                        }
                        // 1.5.价格范围
                        Integer minPrice = params.getMinPrice();
                        Integer maxPrice = params.getMaxPrice();
                        if (minPrice != null && maxPrice != null) {// 不为空则查询
                            maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;
                            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));
                        }
                        /**
                         * 3.算分函数查询
                         * 置顶功能:给你置顶的酒店添加一个标记,并按其算分
                         */
                        FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(
                                boolQuery, // 原始查询,boolQuery
                                new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组
                                        new FunctionScoreQueryBuilder.FilterFunctionBuilder(
                                                QueryBuilders.termQuery("isAD", true), // 过滤条件
                                                ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数
                                        )
                                }
                        );
                        /**
                         * 4.设置查询条件
                          */
                        request.source().query(functionScoreQuery);
                    }
                    /**
                     * 结果解析
                     */
                    private PageResult handleResponse(SearchResponse response) { SearchHits searchHits = response.getHits();
                        // 4.1.总条数
                        long total = searchHits.getTotalHits().value;
                        // 4.2.获取文档数组
                        SearchHit[] hits = searchHits.getHits();
                        // 4.3.遍历
                        List hotels = new ArrayList<>(hits.length);
                        for (SearchHit hit : hits) { // 4.4.获取source
                            String json = hit.getSourceAsString();
                            // 4.5.反序列化,非高亮的
                            HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
                            // 4.6.处理高亮结果
                            // 1)获取高亮map
                            Map map = hit.getHighlightFields();
                            if (map != null && !map.isEmpty()) { // 2)根据字段名,获取高亮结果
                                HighlightField highlightField = map.get("name");
                                if (highlightField != null) { // 3)获取高亮结果字符串数组中的第1个元素
                                    String hName = highlightField.getFragments()[0].toString();
                                    // 4)把高亮结果放到HotelDoc中
                                    hotelDoc.setName(hName);
                                }
                            }
                            // 4.8.排序信息
                            Object[] sortValues = hit.getSortValues(); // 获取排序结果
                            if (sortValues.length > 0) { /**
                                 * 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离
                                 */
                                hotelDoc.setDistance(sortValues[0]);
                            }
                            // 4.9.放入集合
                            hotels.add(hotelDoc);
                        }
                        return new PageResult(total, hotels);
                    }
                    /**
                     * 多条件聚合
                     */
                    @Override
                    public Map> filters(RequestParams params) { try { // 1.准备请求
                            SearchRequest request = new SearchRequest("hotel");
                            // 2.请求参数
                            // 2.1.query查询信息
                            buildBasicQuery(params, request);
                            // 2.2.size
                            request.source().size(0);
                            // 2.3.聚合
                            buildAggregation(request);
                            // 3.发出请求
                            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                            // 4.解析结果
                            Map> result = new HashMap<>();
                            Aggregations aggregations = response.getAggregations();
                            // 4.1.根据品牌名称,获取聚合结果
                            List brandList = getAggByName(aggregations, "brandAgg");
                            // 放入map
                            result.put("品牌",brandList);
                            // 4.2.根据城市名称,获取聚合结果
                            List cityList = getAggByName(aggregations, "cityAgg");
                            // 放入map
                            result.put("城市",cityList);
                            // 4.3.根据星级名称,获取聚合结果
                            List starList = getAggByName(aggregations, "starAgg");
                            // 放入map
                            result.put("星级",starList);
                            return result;
                        } catch (IOException e) { throw new RuntimeException(e);
                        }
                    }
                    public List getAggByName(Aggregations aggregations, String aggName) { // 4.1.根据聚合名称,获取聚合结果
                        Terms brandAgg = aggregations.get(aggName);
                        // 4.2.获取buckets
                        List buckets = brandAgg.getBuckets();
                        // 4.3.遍历
                        List brandList = new ArrayList<>();
                        for (Terms.Bucket bucket : buckets) { String key = bucket.getKeyAsString();
                            brandList.add(key);
                        }
                        return brandList;
                    }
                    public void buildAggregation(SearchRequest request) { request.source().aggregation(AggregationBuilders
                                .terms("brandAgg")
                                        .field("brand")
                                        .size(100));
                        request.source().aggregation(AggregationBuilders
                                .terms("cityAgg")
                                .field("city")
                                .size(100));
                        request.source().aggregation(AggregationBuilders
                                .terms("starAgg")
                                .field("starName")
                                .size(100));
                    }
                    /**
                     * 自动补全查询
                     */
                    @Override
                    public List getSuggestion(String key)  { try { // 1.准备请求
                            SearchRequest request = new SearchRequest("hotel");
                            // 2.请求参数
                            request.source().suggest(new SuggestBuilder().addSuggestion(
                                    "hotelSuggest",
                                    SuggestBuilders
                                            .completionSuggestion("suggestion")
                                            .size(10)
                                            .skipDuplicates(true)
                                            .prefix(key)
                            ));
                            // 3.发出请求
                            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
                            // 4.解析结果
                            Suggest suggest = response.getSuggest();
                            // 4.1.根据补全查询名称,获取补全结果
                            CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");
                            // 4.2.获取options
                            List result = new ArrayList<>();
                            for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) { // 4.3.获取补全的结果
                                String str = option.getText().toString();
                                result.add(str);
                            }
                            return result;
                        } catch (IOException e) { throw new RuntimeException(e);
                        }
                    }
                    @Override
                    public void deleteById(Long hotelId) { try { // 1.创建request
                            DeleteRequest request = new DeleteRequest("hotel", hotelId.toString());
                            // 2.发送请求
                            restHighLevelClient.delete(request, RequestOptions.DEFAULT);
                        } catch (IOException e) { throw new RuntimeException("删除酒店数据失败", e);
                        }
                    }
                    @Override
                    public void saveById(Long hotelId) { try { // 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)
                            Hotel hotel = getById(hotelId);
                            // 转换
                            HotelDoc hotelDoc = new HotelDoc(hotel);
                            // 1.创建Request
                            IndexRequest request = new IndexRequest("hotel").id(hotelId.toString());
                            // 2.准备参数
                            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
                            // 3.发送请求
                            restHighLevelClient.index(request, RequestOptions.DEFAULT);
                        } catch (IOException e) { throw new RuntimeException("新增酒店数据失败", e);
                        }
                    }
                }
                

                二、对发送者hotel-admin的操作

                1. 引入amqp依赖和配置rabbitmq的yml文件【同上】
                2. 定义mq的一些常量【同上】
                3. 当发送者对mysql数据库改动时,发送消息给MQ
                @RestController
                @RequestMapping("hotel")
                public class HotelController { @Autowired
                    private IHotelService hotelService;
                    // 注入发送消息的api
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    /**
                     * 根据id查询
                     */
                    @GetMapping("/{id}")
                    public Hotel queryById(@PathVariable("id") Long id){ return hotelService.getById(id);
                    }
                    /**
                     * 查询当前页内容
                     */
                    @GetMapping("/list")
                    public PageResult hotelList(
                            @RequestParam(value = "page", defaultValue = "1") Integer page,
                            @RequestParam(value = "size", defaultValue = "1") Integer size
                    ){ Page result = hotelService.page(new Page<>(page, size));
                        return new PageResult(result.getTotal(), result.getRecords());
                    }
                    /**
                     * 新增,并发送给mq消息
                     */
                    @PostMapping
                    public void saveHotel(@RequestBody Hotel hotel){ // 新增酒店
                        hotelService.save(hotel);
                        // 发送MQ消息
                        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
                    }
                    /**
                     * 修改,并发送给mq消息
                     */
                    @PutMapping()
                    public void updateById(@RequestBody Hotel hotel){ if (hotel.getId() == null) { throw new InvalidParameterException("id不能为空");
                        }
                        hotelService.updateById(hotel);
                        // 发送MQ消息
                        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
                    }
                    /**
                     * 删除,并发送给mq消息
                     */
                    @DeleteMapping("/{id}")
                    public void deleteById(@PathVariable("id") Long id) { hotelService.removeById(id);
                        // 发送MQ消息
                        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
                    }
                }
                

                四、集群

                单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

                >> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点

                >> 单点故障问题:将分片数据在不同节点备份(replica )

                4.1 搭建ES集群

                我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。

                部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

                1. 创建es集群

                  首先编写一个docker-compose文件,内容如下:

                version: '2.2'
                services:
                  es01:
                    image: elasticsearch:7.12.1
                    container_name: es01
                    environment:
                      - node.name=es01
                      - cluster.name=es-docker-cluster
                      - discovery.seed_hosts=es02,es03
                      - cluster.initial_master_nodes=es01,es02,es03
                      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
                    volumes:
                      - data01:/usr/share/elasticsearch/data
                    ports:
                      - 9200:9200
                    networks:
                      - elastic
                  es02:
                    image: elasticsearch:7.12.1
                    container_name: es02
                    environment:
                      - node.name=es02
                      - cluster.name=es-docker-cluster
                      - discovery.seed_hosts=es01,es03
                      - cluster.initial_master_nodes=es01,es02,es03
                      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
                    volumes:
                      - data02:/usr/share/elasticsearch/data
                    ports:
                      - 9201:9200
                    networks:
                      - elastic
                  es03:
                    image: elasticsearch:7.12.1
                    container_name: es03
                    environment:
                      - node.name=es03
                      - cluster.name=es-docker-cluster
                      - discovery.seed_hosts=es01,es02
                      - cluster.initial_master_nodes=es01,es02,es03
                      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
                    volumes:
                      - data03:/usr/share/elasticsearch/data
                    networks:
                      - elastic
                    ports:
                      - 9202:9200
                volumes:
                  data01:
                    driver: local
                  data02:
                    driver: local
                  data03:
                    driver: local
                networks:
                  elastic:
                    driver: bridge
                

                es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

                vi /etc/sysctl.conf
                

                添加下面的内容:

                vm.max_map_count=262144
                

                然后执行命令,让配置生效:

                sysctl -p
                

                通过docker-compose启动集群:

                docker-compose up -d
                
                1. 集群状态监控

                kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

                这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

                课前资料已经提供了安装包:

                解压即可使用,非常方便。

                解压好的目录如下:

                进入对应的bin目录:

                双击其中的cerebro.bat文件即可启动服务。

                访问http://localhost:9000 即可进入管理界面:

                输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

                绿色的条,代表集群处于绿色(健康状态)。

                1. 创建索引库

                1)利用kibana的DevTools创建索引库

                在DevTools中输入指令:

                PUT /itcast
                { "settings": { "number_of_shards": 3, // 分片数量
                    "number_of_replicas": 1 // 副本数量
                  },
                  "mappings": { "properties": { // mapping映射定义 ...
                    }
                  }
                }
                

                2)利用cerebro创建索引库

                利用cerebro还可以创建索引库:

                填写索引库信息:

                点击右下角的create按钮:

                1. 查看分片效果

                回到首页,即可查看索引库分片效果:

                4.2 集群职责和脑裂问题

                4.3 集群故障转移

                4.4 集群分布式存储与查询