前言
本篇博客是一篇elasticsearch的使用案例,包括结合MybatisPlus使用ES,如何保证MySQL和es的数据一致性,另外使用了RabbitMQ进行解耦,自定义了发消息的方法。
其他相关的Elasticsearch的文章列表如下:
-
Elasticsearch的Docker版本的安装和参数设置 & 端口开放和浏览器访问
-
Elasticsearch的可视化Kibana工具安装 & IK分词器的安装和使用
-
Elasticsearch的springboot整合 & Kibana进行全查询和模糊查询
目录
- 前言
- 引出
- 结合MybatisPlus使用ES
- 1.引入依赖
- 2.进行配置
- 3.实体类上加入注解
- 4.创建操作的 Repository
- 5.初始化es中的数据
- 6.进行全查询以及分页
- 带条件分页查询
- es和mysql的数据一致性
- 延迟双删
- 加锁的方式
- 用rabbitmq进行解耦
- 配置yml文件
- rabbitmq的配置类
- callback回调方法
- 自定义发消息工具类
- 进行消息的发送
- 接收到消息,更新es
- 总结
引出
1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。
结合MybatisPlus使用ES
1.引入依赖
org.springframework.boot spring-boot-starter-data-elasticsearch mysql mysql-connector-java runtime com.alibaba druid-spring-boot-starter com.baomidou mybatis-plus-boot-starter org.springframework.boot spring-boot-starter-data-redis 2.进行配置
package com.tianju.es.config; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.context.annotation.Configuration; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.RestClients; import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration; /** * 你也可以不继承 AbstractElasticsearchConfiguration 类,而将 ESConfig 写成一般的配置类的型式。 * 不过继承 AbstractElasticsearchConfiguration 好处在于,它已经帮我们配置好了elasticsearchTemplate 直接使用。 */ @Configuration public class ESConfig extends AbstractElasticsearchConfiguration { @Override public RestHighLevelClient elasticsearchClient() { ClientConfiguration clientConfiguration = ClientConfiguration.builder() .connectedTo("192.168.111.130:9200") .build(); return RestClients.create(clientConfiguration).rest(); } }
3.实体类上加入注解
package com.tianju.es.entity; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.math.BigDecimal; /** * 产品,包括库存,价格信息 */ @Data @NoArgsConstructor @AllArgsConstructor @TableName("finance_sku") @Document(indexName = "finance_sku") public class FinanceSkuES { @TableId(value = "ID",type = IdType.AUTO) private Long id; @TableField("finance_sku_describe") @Field(index = true,analyzer = "ik_smart", searchAnalyzer = "ik_smart",type = FieldType.Text) private String detail; // 详情 @TableField("finance_sku_price") private BigDecimal price; @TableField("finance_sku_stock") private Long stock; @TableField("finance_state") private Integer status; }
参数解释
@Document(indexName = "books", shards = 1, replicas = 0) @Data public class Book { @Id @Field(type = FieldType.Integer) private Integer id; @Field(type = FieldType.Keyword) private String title; @Field(type = FieldType.Text) private String press; @Field(type = FieldType.Keyword) private String author; @Field(type = FieldType.Keyword,index=false) private BigDecimal price; @Field(type = FieldType.Text) private String description; }
- @Document :注解会对实体中的所有属性建立索引;
indexName = “books” :表示创建一个名称为 “books” 的索引;
shards = 1 : 表示只使用一个分片;
replicas = 0 : 表示不使用复制备份;
index = false: 不能索引查询
- @Field(type = FieldType.Keyword) : 用以指定字段的数据类型。
4.创建操作的 Repository
从它的祖先们那里继承了大量的现成的方法,除此之外,它还可以按 spring data 的规则定义特定的方法。
package com.tianju.es.mapper; import com.tianju.es.entity.FinanceSkuES; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; import org.springframework.stereotype.Repository; /** * 操作es,类似于之前的mapper */ @Repository public interface SkuESMapper extends ElasticsearchRepository
{ /** * 根据关键字进行 分词 分页查询 sku数据 * @param detail 查询条件 * @param pageable 分页 * @return */ Page findFinanceSkuESByDetail(String detail, Pageable pageable); /** * 根据id进行删除 * @param id */ void removeFinanceSkuESById(Long id); } 5.初始化es中的数据
运行的后台信息
查看es页面的信息,index management
6.进行全查询以及分页
进行全查询
{ "content": [ { "id": 1, "detail": "HUAWEI MateBook X Pro 2023 微绒典藏版 13代酷睿i7 32GB 2TB 14.2英寸3.1K原色全面屏 墨蓝", "price": 13999.0, "stock": 50, "status": 1 }, { "id": 2, "detail": "HUAWEI Mate 60 Pro+ 16GB+1TB 宣白", "price": 9999.0, "stock": 60, "status": 1 }, { "id": 3, "detail": "iPhone 15 Pro Max 超视网膜 XDR 显示屏", "price": 9299.0, "stock": 46, "status": 1 }, { "id": 4, "detail": "MacBook Air Apple M2 芯片 8 核中央处理器 8 核图形处理器 8GB 统一内存 256GB 固态硬盘", "price": 8999.0, "stock": 60, "status": 1 } ], "pageable": { "sort": { "empty": true, "sorted": false, "unsorted": true }, "offset": 0, "pageSize": 4, "pageNumber": 0, "paged": true, "unpaged": false }, "totalElements": 4, "last": true, "totalPages": 1, "number": 0, "size": 4, "sort": { "empty": true, "sorted": false, "unsorted": true }, "numberOfElements": 4, "first": true, "empty": false }
带条件分页查询
注意分页查询的page从0开始,尝试发现需要输入分词器分词后最小单元,比如hu不是最小单元,而HUAWEI是
分词器进行分词的结果
es和mysql的数据一致性
延迟双删
@Override public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) { // 把es看做是缓存,如何保证es 和 mysql的 数据一致性? // 延迟双删的模式 // 1.先删除缓存 es skuESMapper.deleteAll(); // 2.更新数据库 mysql updateById(financeSkuES); // 3.延时操作 try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } // 4.再次删除缓存 es skuESMapper.deleteAll(); // 5.最后更新缓存 es skuESMapper.saveAll(list()); Optional
byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: "+byId); return byId.get(); } 上面代码有不妥的地方,我这里是修改,结果一开始直接从es中全部删除,应该是根据id把修改的数据删除,然后把修改好的数据set进es里面
加锁的方式
感觉好像没什么用的样子,就是用了一下加锁
用rabbitmq进行解耦
配置yml文件
spring: main: allow-circular-references: true datasource: driver-class-name: com.mysql.cj.jdbc.Driver ### 本地的数据库 url: jdbc:mysql://127.0.0.1:3306/consumer_finance_product?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8&allowMultiQueries=true username: root password: 123 # redis的相关配置 redis: host: 119.3.162.127 port: 6379 database: 0 password: Pet3927 # rabbitmq相关 rabbitmq: host: 192.168.111.130 port: 5672 username: admin password: 123 virtual-host: /test # 生产者保证消息可靠性 publisher-returns: true publisher-confirm-type: correlated # 设置手动确认 listener: simple: acknowledge-mode: manual
rabbitmq的配置类
将Java对象转换成json字符串传输
package com.tianju.es.rabbit; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public static final String ES_EXCHANGE = "es_exchange"; public static final String ES_QUEUE = "es_queue"; public static final String ES_KEY = "es_key"; @Bean public DirectExchange directExchange(){ return new DirectExchange(ES_EXCHANGE); } @Bean public Queue esQueue(){ return new Queue(ES_QUEUE); } @Bean public Binding esQueueToDirectExchange(){ return BindingBuilder.bind(esQueue()) .to(directExchange()) .with(ES_KEY); } /** * 将对象转换为json字符串 * @return */ @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(messageConverter());// 修改转换器 return rabbitTemplate; } }
callback回调方法
package com.tianju.es.rabbit; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; /** * 生产者消息可靠性 */ // RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback @Configuration @Slf4j public class CallbackConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; // 初始化 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); rabbitTemplate.setMandatory(true); } /** * 不管成功或者失败都会执行 * @param correlationData correlation对象需要在 发送消息时候 给 * @param ack true表示成功,false表示发送失败 * @param cause 如果失败的话,会写失败原因;如果成功,返回为null */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.debug("ack是否成功:"+ack); log.debug("cause信息:"+cause); if (correlationData!=null){ JSONObject jsonObject = JSON.parseObject(correlationData.getReturnedMessage().getBody()); String exchange = correlationData.getReturnedMessage().getMessageProperties().getReceivedExchange(); String routingKey = correlationData.getReturnedMessage().getMessageProperties().getReceivedRoutingKey(); log.debug("消息体:"+jsonObject); log.debug("交换机:"+exchange); log.debug("路由key:"+routingKey); } if (ack){ return; } // 失败了 // 1、重试重试上限次数(默认值5)每重试一次时间间隔会增加 // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。 // 重发上限次数(默认值5)超过阈值会转人工处理 // 2、把消息体、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。 // 重发上限次数(默认值5)超过阈值会转人工处理 // 2.1需要把相关的信息存放到数据中,表字段:消息体、交换机名称、路由键、状态、次数 // 2.2定时任务(单体:spring定时任务 分布式:XxL-job),发送消息 } /** * 只有失败了才会执行 * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。 } }
自定义发消息工具类
package com.tianju.common.util; import com.alibaba.fastjson2.JSON; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.data.redis.core.StringRedisTemplate; @Slf4j public class RabbitUtil { /** * 延迟队列,发送消息,到达时间后进入死信队列中 * @param rabbitTemplate 调用的rabbitTemplate * @param redisTemplate 用来在redis里面存token * @param msg 发送的消息 * @param token 发送的token,用于保证幂等性 * @param ttl 如果是延迟消费,则消息的过期时间,到达改时间后进入死信交换机,到死信队列中 * @param exchange 交换机名字 * @param routingKey 路由键名字 * @param
发送消息的实体类 */ public static void sendMsg(RabbitTemplate rabbitTemplate, StringRedisTemplate redisTemplate, T msg,String token,Integer ttl, String exchange,String routingKey) { log.debug("给交换机[{}]通过路由键[{}]发送消息 {},token为{}",exchange,routingKey,msg,token); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { redisTemplate.opsForValue().set(token, token,5*60000); message.getMessageProperties().setMessageId(token); if (ttl!=null){ message.getMessageProperties().setExpiration(ttl.toString()); } return message; } }; CorrelationData correlationData = new CorrelationData(); // 消息体 Message message = new Message(JSON.toJSONBytes(msg)); // 交换机名称 message.getMessageProperties().setReceivedExchange(exchange); // 路由键 message.getMessageProperties().setReceivedRoutingKey(routingKey); correlationData.setReturnedMessage(message); // 发送MQ消息 rabbitTemplate.convertAndSend(exchange, // 发给交换机 routingKey, // 根据这个routingKey就会给到TTL队列,到时间成死信,发给死信交换机,到死信队列 msg, messagePostProcessor, correlationData ); } } 进行消息的发送
接口
package com.tianju.es.service; import com.baomidou.mybatisplus.extension.service.IService; import com.tianju.es.entity.FinanceSkuES; public interface SkuService extends IService
{ /** * 延迟双删的方式,保证es 缓存 和 mysql数据库的数据一致性 * @param financeSkuES 修改的数据 * @return */ FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES); /** * 加锁的方式,不过感觉没啥用的样子 * @param financeSkuES * @return */ FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES); /** * 通过rabbitmq进行解耦 * @param financeSkuES * @return */ String updateByIdRabbitMQ(FinanceSkuES financeSkuES); } 实现类
package com.tianju.es.service.impl; import cn.hutool.core.util.IdUtil; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.tianju.common.util.RabbitUtil; import com.tianju.es.entity.FinanceSkuES; import com.tianju.es.mapper.SkuESMapper; import com.tianju.es.mapper.SkuMybatisPlusMapper; import com.tianju.es.rabbit.RabbitConfig; import com.tianju.es.service.SkuService; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import java.util.Collection; import java.util.Optional; import java.util.UUID; @Service public class SkuServiceImpl extends ServiceImpl
implements SkuService { @Autowired private SkuESMapper skuESMapper; @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RabbitTemplate rabbitTemplate; @Override public FinanceSkuES updateByIddDoubleDelete(FinanceSkuES financeSkuES) { // 把es看做是缓存,如何保证es 和 mysql的 数据一致性? // 延迟双删的模式 // 1.先删除缓存 es skuESMapper.deleteAll(); // 2.更新数据库 mysql updateById(financeSkuES); // 3.延时操作 try { Thread.sleep(3000); } catch (InterruptedException e) { throw new RuntimeException(e); } // 4.再次删除缓存 es skuESMapper.deleteAll(); // 5.最后更新缓存 es skuESMapper.saveAll(list()); Optional byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: "+byId); return byId.get(); } @Override public FinanceSkuES updateByIdRedisLock(FinanceSkuES financeSkuES) { // 第二种方式加锁 String uuid = UUID.randomUUID().toString(); // 相当于setnx指令 Boolean skuLock = stringRedisTemplate.opsForValue().setIfAbsent("skuLock", uuid); try { if (skuLock){ // 抢到了锁 skuESMapper.deleteAll(); updateById(financeSkuES); } }finally { if (uuid.equals(stringRedisTemplate.opsForValue().get("skuLock"))){ stringRedisTemplate.delete("skuLock"); } } skuESMapper.saveAll(list()); Optional byId = skuESMapper.findById(financeSkuES.getId()); log.debug("byId: "+byId); return byId.get(); } @Override public String updateByIdRabbitMQ(FinanceSkuES financeSkuES) { // 采用rabbitmq进行解耦 updateById(financeSkuES); FinanceSkuES skuES = getById(financeSkuES.getId()); String uuid = IdUtil.fastUUID(); RabbitUtil.sendMsg( rabbitTemplate,stringRedisTemplate,skuES,uuid,null, RabbitConfig.ES_EXCHANGE,RabbitConfig.ES_KEY ); return "已经发送消息:"+skuES; } } 接收到消息,更新es
接收到消息进行es的更新,把原来的删除,把最新的set进去
package com.tianju.es.rabbit; import com.rabbitmq.client.Channel; import com.tianju.es.entity.FinanceSkuES; import com.tianju.es.mapper.SkuESMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import java.io.IOException; @Slf4j @Component public class ESListener { @Autowired private StringRedisTemplate redisTemplate; @Autowired private SkuESMapper skuESMapper; @RabbitListener(queues = RabbitConfig.ES_QUEUE) public void esUpdate(FinanceSkuES financeSkuES, Message message, Channel channel) { String messageId = message.getMessageProperties().getMessageId(); log.debug("进行业务----> 监听到队列{}的消息,messageId为{}",financeSkuES,messageId); try { // 幂等性 if (redisTemplate.delete(messageId)){ // 根据id删除原有的 es 数据 // 然后把新的数据set进来 log.debug("处理es的业务,删除原有的,替换最新的"); skuESMapper.removeFinanceSkuESById(financeSkuES.getId()); skuESMapper.save(financeSkuES); } // 手动签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }catch (Exception e){ // 幂等性 redisTemplate.opsForValue().set(messageId,messageId,5*60000); // 1、重试重试上限次数(默认值5) 每重试一次时间间隔会增加 // 2、把消息、交换机名称、路由键等相关的消息保存到数据库,有一个程序定时扫描相关的消息,然后重新发送消息。 // 重发上限次数(默认值5)超过阈值会转人工处理 // 已知的消息,交换机,路由器,消息 message.getBody() 消息发送给的是监听的队列 try { channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException ex) { throw new RuntimeException(ex); } } } }
后台打印的日志
总结
1.elasticsearch的使用案例,包括结合MybatisPlus使用ES;
2.如何保证MySQL和es的数据一致性;
3.使用了RabbitMQ进行解耦,自定义了发消息的方法。
- @Document :注解会对实体中的所有属性建立索引;