Elasticsearch使用——结合MybatisPlus使用ES & es和MySQL数据一致性 & 结合RabbitMQ实现解耦

前言

本篇博客是一篇elasticsearch的使用案例,包括结合MybatisPlus使用ES,如何保证MySQL和es的数据一致性,另外使用了RabbitMQ进行解耦,自定义了发消息的方法。

其他相关的Elasticsearch的文章列表如下:

  • Elasticsearch的Docker版本的安装和参数设置 & 端口开放和浏览器访问

  • Elasticsearch的可视化Kibana工具安装 & IK分词器的安装和使用

  • Elasticsearch的springboot整合 & Kibana进行全查询和模糊查询

    目录

    • 前言
    • 引出
    • 结合MybatisPlus使用ES
      • 1.引入依赖
      • 2.进行配置
      • 3.实体类上加入注解
      • 4.创建操作的 Repository
      • 5.初始化es中的数据
      • 6.进行全查询以及分页
        • 带条件分页查询
        • es和mysql的数据一致性
          • 延迟双删
          • 加锁的方式
          • 用rabbitmq进行解耦
            • 配置yml文件
            • rabbitmq的配置类
            • callback回调方法
            • 自定义发消息工具类
            • 进行消息的发送
            • 接收到消息,更新es
            • 总结

              引出


              1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;

              2.如何保证MySQL和es的数据一致性;

              3.使用了RabbitMQ进行解耦,自定义了发消息的方法。

              结合MybatisPlus使用ES

              1.引入依赖

                org.springframework.boot spring-boot-starter-data-elasticsearch    mysql mysql-connector-java runtime    com.alibaba druid-spring-boot-starter    com.baomidou mybatis-plus-boot-starter   org.springframework.boot spring-boot-starter-data-redis 

              2.进行配置

              package com.tianju.es.config;
              import org.elasticsearch.client.RestHighLevelClient;
              import org.springframework.context.annotation.Configuration;
              import org.springframework.data.elasticsearch.client.ClientConfiguration;
              import org.springframework.data.elasticsearch.client.RestClients;
              import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
              /**
               * 你也可以不继承 AbstractElasticsearchConfiguration 类,而将 ESConfig 写成一般的配置类的型式。
               * 不过继承 AbstractElasticsearchConfiguration 好处在于,它已经帮我们配置好了elasticsearchTemplate 直接使用。
               */
              @Configuration
              public class ESConfig extends AbstractElasticsearchConfiguration { @Override
                  public RestHighLevelClient elasticsearchClient() { ClientConfiguration clientConfiguration =
                              ClientConfiguration.builder()
                                      .connectedTo("192.168.111.130:9200")
                                      .build();
                      return RestClients.create(clientConfiguration).rest();
                  }
              }
              

              3.实体类上加入注解

              package com.tianju.es.entity;
              import com.baomidou.mybatisplus.annotation.IdType;
              import com.baomidou.mybatisplus.annotation.TableField;
              import com.baomidou.mybatisplus.annotation.TableId;
              import com.baomidou.mybatisplus.annotation.TableName;
              import lombok.AllArgsConstructor;
              import lombok.Data;
              import lombok.NoArgsConstructor;
              import org.springframework.data.elasticsearch.annotations.Document;
              import org.springframework.data.elasticsearch.annotations.Field;
              import org.springframework.data.elasticsearch.annotations.FieldType;
              import java.math.BigDecimal;
              /**
               * 产品,包括库存,价格信息
               */
              @Data
              @NoArgsConstructor
              @AllArgsConstructor
              @TableName("finance_sku")
              @Document(indexName = "finance_sku")
              public class FinanceSkuES { @TableId(value = "ID",type = IdType.AUTO)
                  private Long id;
                  @TableField("finance_sku_describe")
                  @Field(index = true,analyzer = "ik_smart",
                          searchAnalyzer = "ik_smart",type = FieldType.Text)
                  private String detail; // 详情
                  @TableField("finance_sku_price")
                  private BigDecimal price;
                  @TableField("finance_sku_stock")
                  private Long stock;
                  @TableField("finance_state")
                  private Integer status;
              }
              

              参数解释

              @Document(indexName = "books", shards = 1, replicas = 0)
              @Data
              public class Book { @Id
                  @Field(type = FieldType.Integer)
                  private Integer id;
                  
                  @Field(type = FieldType.Keyword)
                  private String title;
                  
                  @Field(type = FieldType.Text)
                  private String press;
                  
                  @Field(type = FieldType.Keyword)
                  private String author;
                  
                  @Field(type = FieldType.Keyword,index=false)
                  private BigDecimal price;
                  
                  @Field(type = FieldType.Text)
                  private String description;
              }
              
              • @Document :注解会对实体中的所有属性建立索引;

                indexName = “books” :表示创建一个名称为 “books” 的索引;

                shards = 1 : 表示只使用一个分片;

                replicas = 0 : 表示不使用复制备份;

                index = false: 不能索引查询

              • @Field(type = FieldType.Keyword) : 用以指定字段的数据类型。

                4.创建操作的 Repository

                从它的祖先们那里继承了大量的现成的方法,除此之外,它还可以按 spring data 的规则定义特定的方法。

                package com.tianju.es.mapper;
                import com.tianju.es.entity.FinanceSkuES;
                import org.springframework.data.domain.Page;
                import org.springframework.data.domain.Pageable;
                import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
                import org.springframework.stereotype.Repository;
                /**
                 * 操作es,类似于之前的mapper
                 */
                @Repository
                public interface SkuESMapper extends ElasticsearchRepository { /**
                     * 根据关键字进行 分词 分页查询 sku数据
                     * @param detail 查询条件
                     * @param pageable 分页
                     * @return
                     */
                    Page findFinanceSkuESByDetail(String detail, Pageable pageable);
                    /**
                     * 根据id进行删除
                     * @param id
                     */
                    void removeFinanceSkuESById(Long id);
                }
                

                5.初始化es中的数据

                运行的后台信息

                查看es页面的信息,index management

                6.进行全查询以及分页

                进行全查询

                { "content": [
                    { "id": 1,
                      "detail": "HUAWEI MateBook X Pro 2023 微绒典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨蓝",
                      "price": 13999.0,
                      "stock": 50,
                      "status": 1
                    },
                    { "id": 2,
                      "detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白",
                      "price": 9999.0,
                      "stock": 60,
                      "status": 1
                    },
                    { "id": 3,
                      "detail": "iPhone 15 Pro Max 超视网膜 XDR 显示屏",
                      "price": 9299.0,
                      "stock": 46,
                      "status": 1
                    },
                    { "id": 4,
                      "detail": "MacBook Air Apple M2 芯片 8 核中央处理器 8 核图形处理器 8GB 统一内存 256GB 固态硬盘",
                      "price": 8999.0,
                      "stock": 60,
                      "status": 1
                    }
                  ],
                  "pageable": { "sort": { "empty": true,
                      "sorted": false,
                      "unsorted": true
                    },
                    "offset": 0,
                    "pageSize": 4,
                    "pageNumber": 0,
                    "paged": true,
                    "unpaged": false
                  },
                  "totalElements": 4,
                  "last": true,
                  "totalPages": 1,
                  "number": 0,
                  "size": 4,
                  "sort": { "empty": true,
                    "sorted": false,
                    "unsorted": true
                  },
                  "numberOfElements": 4,
                  "first": true,
                  "empty": false
                }
                

                带条件分页查询

                注意分页查询的page从0开始,尝试发现需要输入分词器分词后最小单元,比如hu不是最小单元,而HUAWEI是

                分词器进行分词的结果

                es和mysql的数据一致性

                延迟双删

                 @Override
                    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) { // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?
                        // 延迟双删的模式
                        // 1.先删除缓存 es
                        skuESMapper.deleteAll();
                        // 2.更新数据库 mysql
                        updateById(financeSkuES);
                        // 3.延时操作
                        try { Thread.sleep(3000);
                        } catch (InterruptedException e) { throw new RuntimeException(e);
                        }
                        // 4.再次删除缓存 es
                        skuESMapper.deleteAll();
                        // 5.最后更新缓存 es
                        skuESMapper.saveAll(list());
                        Optional byId = skuESMapper.findById(financeSkuES.getId());
                        log.debug("byId: "+byId);
                        return byId.get();
                    }
                

                上面代码有不妥的地方,我这里是修改,结果一开始直接从es中全部删除,应该是根据id把修改的数据删除,然后把修改好的数据set进es里面

                加锁的方式

                感觉好像没什么用的样子,就是用了一下加锁

                用rabbitmq进行解耦

                配置yml文件

                spring:
                  main:
                    allow-circular-references: true
                  datasource:
                    driver-class-name: com.mysql.cj.jdbc.Driver
                    ### 本地的数据库
                    url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true
                    username: root
                    password: 123
                  # redis的相关配置
                  redis:
                    host: 119.3.162.127
                    port: 6379
                    database: 0
                    password: Pet3927
                  # rabbitmq相关
                  rabbitmq:
                    host: 192.168.111.130
                    port: 5672
                    username: admin
                    password: 123
                    virtual-host: /test
                    # 生产者保证消息可靠性
                    publisher-returns: true
                    publisher-confirm-type: correlated
                    # 设置手动确认
                    listener:
                      simple:
                        acknowledge-mode: manual
                

                rabbitmq的配置类

                将Java对象转换成json字符串传输

                package com.tianju.es.rabbit;
                import org.springframework.amqp.core.Binding;
                import org.springframework.amqp.core.BindingBuilder;
                import org.springframework.amqp.core.DirectExchange;
                import org.springframework.amqp.core.Queue;
                import org.springframework.amqp.rabbit.connection.ConnectionFactory;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
                import org.springframework.amqp.support.converter.MessageConverter;
                import org.springframework.context.annotation.Bean;
                import org.springframework.context.annotation.Configuration;
                @Configuration
                public class RabbitConfig { public static final String ES_EXCHANGE = "es_exchange";
                    public static final String ES_QUEUE = "es_queue";
                    public static final String ES_KEY = "es_key";
                    @Bean
                    public DirectExchange directExchange(){ return new DirectExchange(ES_EXCHANGE);
                    }
                    @Bean
                    public Queue esQueue(){ return new Queue(ES_QUEUE);
                    }
                    @Bean
                    public Binding esQueueToDirectExchange(){ return BindingBuilder.bind(esQueue())
                                .to(directExchange())
                                .with(ES_KEY);
                    }
                    /**
                     * 将对象转换为json字符串
                     * @return
                     */
                    @Bean
                    public MessageConverter messageConverter(){ return  new Jackson2JsonMessageConverter();
                    }
                    @Bean
                    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                        rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器
                        return rabbitTemplate;
                    }
                }
                

                callback回调方法

                package com.tianju.es.rabbit;
                import com.alibaba.fastjson2.JSON;
                import com.alibaba.fastjson2.JSONObject;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.core.Message;
                import org.springframework.amqp.rabbit.connection.CorrelationData;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.context.annotation.Configuration;
                import javax.annotation.PostConstruct;
                /**
                 * 生产者消息可靠性
                 */
                // RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback
                @Configuration
                @Slf4j
                public class CallbackConfig
                        implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired
                    private RabbitTemplate rabbitTemplate;
                    // 初始化
                    @PostConstruct
                    public void init(){ rabbitTemplate.setConfirmCallback(this);
                        rabbitTemplate.setReturnCallback(this);
                        rabbitTemplate.setMandatory(true);
                    }
                    /**
                     * 不管成功或者失败都会执行
                     * @param correlationData correlation对象需要在 发送消息时候 给
                     * @param ack true表示成功,false表示发送失败
                     * @param cause 如果失败的话,会写失败原因;如果成功,返回为null
                     */
                    @Override
                    public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.debug("ack是否成功:"+ack);
                        log.debug("cause信息:"+cause);
                        if (correlationData!=null){ JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody());
                            String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange();
                            String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey();
                            log.debug("消息体:"+jsonObject);
                            log.debug("交换机:"+exchange);
                            log.debug("路由key:"+routingKey);
                        }
                        if (ack){ return;
                        }
                        // 失败了
                        // 1、重试重试上限次数(默认值5)每重试一次时间间隔会增加
                        // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
                        // 重发上限次数(默认值5)超过阈值会转人工处理
                        // 2、把消息体、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
                        // 重发上限次数(默认值5)超过阈值会转人工处理
                        // 2.1需要把相关的信息存放到数据中,表字段:消息体、交换机名称、路由键、状态、次数
                        // 2.2定时任务(单体:spring定时任务  分布式:XxL-job),发送消息
                
                    }
                    /**
                     * 只有失败了才会执行
                     * @param message
                     * @param replyCode
                     * @param replyText
                     * @param exchange
                     * @param routingKey
                     */
                    @Override
                    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
                    }
                }
                

                自定义发消息工具类

                package com.tianju.common.util;
                import com.alibaba.fastjson2.JSON;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.AmqpException;
                import org.springframework.amqp.core.Message;
                import org.springframework.amqp.core.MessagePostProcessor;
                import org.springframework.amqp.rabbit.connection.CorrelationData;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.data.redis.core.StringRedisTemplate;
                @Slf4j
                public class RabbitUtil { /**
                     * 延迟队列,发送消息,到达时间后进入死信队列中
                     * @param rabbitTemplate 调用的rabbitTemplate
                     * @param redisTemplate 用来在redis里面存token
                     * @param msg 发送的消息
                     * @param token 发送的token,用于保证幂等性
                     * @param ttl 如果是延迟消费,则消息的过期时间,到达改时间后进入死信交换机,到死信队列中
                     * @param exchange 交换机名字
                     * @param routingKey 路由键名字
                     * @param  发送消息的实体类
                     */
                    public static  void sendMsg(RabbitTemplate rabbitTemplate,
                                                    StringRedisTemplate redisTemplate,
                                                    T msg,String token,Integer ttl,
                                                    String exchange,String routingKey) { log.debug("给交换机[{}]通过路由键[{}]发送消息 {},token为{}",exchange,routingKey,msg,token);
                        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override
                            public Message postProcessMessage(Message message) throws AmqpException { redisTemplate.opsForValue().set(token, token,5*60000);
                                message.getMessageProperties().setMessageId(token);
                                if (ttl!=null){ message.getMessageProperties().setExpiration(ttl.toString());
                                }
                                return message;
                            }
                        };
                        CorrelationData correlationData = new CorrelationData();
                        // 消息体
                        Message message = new Message(JSON.toJSONBytes(msg));
                        // 交换机名称
                        message.getMessageProperties().setReceivedExchange(exchange);
                        // 路由键
                        message.getMessageProperties().setReceivedRoutingKey(routingKey);
                        correlationData.setReturnedMessage(message);
                        // 发送MQ消息
                        rabbitTemplate.convertAndSend(exchange, // 发给交换机
                                routingKey, // 根据这个routingKey就会给到TTL队列,到时间成死信,发给死信交换机,到死信队列
                                msg,
                                messagePostProcessor,
                                correlationData
                        );
                    }
                }
                

                进行消息的发送

                接口

                package com.tianju.es.service;
                import com.baomidou.mybatisplus.extension.service.IService;
                import com.tianju.es.entity.FinanceSkuES;
                public interface SkuService extends IService { /**
                     * 延迟双删的方式,保证es 缓存 和 mysql数据库的数据一致性
                     * @param financeSkuES 修改的数据
                     * @return
                     */
                    FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES);
                    /**
                     * 加锁的方式,不过感觉没啥用的样子
                     * @param financeSkuES
                     * @return
                     */
                    FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES);
                    /**
                     * 通过rabbitmq进行解耦
                     * @param financeSkuES
                     * @return
                     */
                    String updateByIdRabbitMQ(FinanceSkuES financeSkuES);
                }
                

                实现类

                package com.tianju.es.service.impl;
                import cn.hutool.core.util.IdUtil;
                import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
                import com.tianju.common.util.RabbitUtil;
                import com.tianju.es.entity.FinanceSkuES;
                import com.tianju.es.mapper.SkuESMapper;
                import com.tianju.es.mapper.SkuMybatisPlusMapper;
                import com.tianju.es.rabbit.RabbitConfig;
                import com.tianju.es.service.SkuService;
                import org.springframework.amqp.rabbit.core.RabbitTemplate;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.data.redis.core.StringRedisTemplate;
                import org.springframework.stereotype.Service;
                import java.util.Collection;
                import java.util.Optional;
                import java.util.UUID;
                @Service
                public class SkuServiceImpl extends ServiceImpl implements SkuService { @Autowired
                    private SkuESMapper skuESMapper;
                    @Autowired
                    private StringRedisTemplate stringRedisTemplate;
                    @Autowired
                    private RabbitTemplate rabbitTemplate;
                    @Override
                    public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) { // 把es看做是缓存,如何保证es 和 mysql的 数据一致性?
                        // 延迟双删的模式
                        // 1.先删除缓存 es
                        skuESMapper.deleteAll();
                        // 2.更新数据库 mysql
                        updateById(financeSkuES);
                        // 3.延时操作
                        try { Thread.sleep(3000);
                        } catch (InterruptedException e) { throw new RuntimeException(e);
                        }
                        // 4.再次删除缓存 es
                        skuESMapper.deleteAll();
                        // 5.最后更新缓存 es
                        skuESMapper.saveAll(list());
                        Optional byId = skuESMapper.findById(financeSkuES.getId());
                        log.debug("byId: "+byId);
                        return byId.get();
                    }
                    @Override
                    public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) { // 第二种方式加锁
                        String uuid = UUID.randomUUID().toString();
                        // 相当于setnx指令
                        Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid);
                        try { if (skuLock){ // 抢到了锁
                                skuESMapper.deleteAll();
                                updateById(financeSkuES);
                            }
                        }finally { if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){ stringRedisTemplate.delete("skuLock");
                            }
                        }
                        skuESMapper.saveAll(list());
                        Optional byId = skuESMapper.findById(financeSkuES.getId());
                        log.debug("byId: "+byId);
                        return byId.get();
                    }
                    @Override
                    public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) { // 采用rabbitmq进行解耦
                        updateById(financeSkuES);
                        FinanceSkuES skuES = getById(financeSkuES.getId());
                        String uuid = IdUtil.fastUUID();
                        RabbitUtil.sendMsg(
                                rabbitTemplate,stringRedisTemplate,skuES,uuid,null,
                                RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY
                        );
                        return "已经发送消息:"+skuES;
                    }
                }
                

                接收到消息,更新es

                接收到消息进行es的更新,把原来的删除,把最新的set进去

                package com.tianju.es.rabbit;
                import com.rabbitmq.client.Channel;
                import com.tianju.es.entity.FinanceSkuES;
                import com.tianju.es.mapper.SkuESMapper;
                import lombok.extern.slf4j.Slf4j;
                import org.springframework.amqp.core.Message;
                import org.springframework.amqp.rabbit.annotation.RabbitListener;
                import org.springframework.beans.factory.annotation.Autowired;
                import org.springframework.data.redis.core.StringRedisTemplate;
                import org.springframework.stereotype.Component;
                import java.io.IOException;
                @Slf4j
                @Component
                public class ESListener { @Autowired
                    private StringRedisTemplate redisTemplate;
                    @Autowired
                    private SkuESMapper skuESMapper;
                    @RabbitListener(queues = RabbitConfig.ES_QUEUE)
                    public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) { String messageId = message.getMessageProperties().getMessageId();
                        log.debug("进行业务----> 监听到队列{}的消息,messageId为{}",financeSkuES,messageId);
                        try { // 幂等性
                            if (redisTemplate.delete(messageId)){ // 根据id删除原有的 es 数据
                                // 然后把新的数据set进来
                                log.debug("处理es的业务,删除原有的,替换最新的");
                                skuESMapper.removeFinanceSkuESById(financeSkuES.getId());
                                skuESMapper.save(financeSkuES);
                            }
                            // 手动签收消息
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        }catch (Exception e){ // 幂等性
                            redisTemplate.opsForValue().set(messageId,messageId,5*60000);
                            // 1、重试重试上限次数(默认值5) 每重试一次时间间隔会增加
                            // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。
                            // 重发上限次数(默认值5)超过阈值会转人工处理
                            // 已知的消息,交换机,路由器,消息 message.getBody()  消息发送给的是监听的队列
                            try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                            } catch (IOException ex) { throw new RuntimeException(ex);
                            }
                        }
                    }
                }
                

                后台打印的日志


                总结

                1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;

                2.如何保证MySQL和es的数据一致性;

                3.使用了RabbitMQ进行解耦,自定义了发消息的方法。