前言
WebFlux 是 Spring Framework 5.0 中引入的一种新型反应式编程模型,支持非阻塞 I/O,适用于高并发、高吞吐量的应用程序。在 WebFlux 应用程序中使用事务需要注意以下几点。使用 Reactive R2DBC:WebFlux 支持使用 Reactive R2DBC 访问关系型数据库。R2DBC 是一个反应式的数据库连接规范,它允许开发人员以响应式方式访问关系型数据库。在 WebFlux 应用程序中使用 Reactive R2DBC 可以实现非阻塞式的数据库操作。使用 @Transactional 注解:在 WebFlux 应用程序中,可以使用 @Transactional 注解声明事务边界,将多个数据库操作绑定到同一事务中。需要注意的是,@Transactional 注解需要与 Reactive R2DBC 结合使用,确保事务管理器与 R2DBC 兼容。遵循响应式编程原则:在 WebFlux 应用程序中,需要遵循响应式编程的原则,使用 Mono 和 Flux 等响应式类型来处理数据流,而不是传统的阻塞式方法。这意味着在事务中涉及到的所有操作,都需要返回 Mono 或 Flux 对象,并且保证响应式链条的正确性。
本节内容以关系型数据库mysql为例,通过使用spring-boot-starter-data-r2dbc框架完成关系型数据库的调用,并通过具体的案例实现webflux应用下的数据库事务管理,包含最佳的使用实战案例,通过spring的AOP实现最终的事务控制。
正文
①引入必要的pom组件,其中spring-boot-starter-aop可以不引入,如果不使用aop管理r2dbc的数据库事务
org.springframework.boot spring-boot-starter-aoporg.projectlombok lombokorg.springdoc springdoc-openapi-starter-webflux-ui2.3.0 org.springframework.boot spring-boot-starter-data-r2dbcio.asyncer r2dbc-mysql1.0.6 org.springframework.boot spring-boot-starter-webfluxorg.springframework.boot spring-boot-starter-testtest io.projectreactor reactor-testtest
②创建控制层UserController请求接口测试方法:新增用户
@Operation(summary = "新增用户", description = "新增用户") @PostMapping(value = "saveUser") public Mono saveUser(@RequestBody User user) { MonouserMono = userService.saveUser(user); return userMono.map(ApiResponse::success); }
③创建UserService的业务接口层
/** * 新增用户 * * @param user * @return */ MonosaveUser(User user);
④创建UserServiceImpl的业务实现层
@Override public MonosaveUser(User user) { return userRepository.save(user); }
⑤创建UserRepository数据操作层
package com.yundi.atp.repository; import com.yundi.atp.entity.User; import org.springframework.data.r2dbc.repository.Query; import org.springframework.data.r2dbc.repository.R2dbcRepository; import org.springframework.data.repository.query.Param; import org.springframework.stereotype.Repository; import reactor.core.publisher.Mono; @Repository public interface UserRepository extends R2dbcRepository{ /** * 根据用户名获取用户信息 * * @param name * @return */ Mono findUsersByName(@Param("name") String name); /** * 更新用户信息 * * @param user * @return */ @Query("update user set name = :#{#user.name}, age = :#{#user.age} where id = :#{#user.id}") Mono updateUser(@Param("user") User user); /** * 动态更新用户信息 * * @param user * @return */ @Query("UPDATE User SET " + "name = CASE WHEN :#{#user.name} IS NULL THEN name ELSE :#{#user.name} END, " + "age = CASE WHEN :#{#user.age} IS NULL THEN age ELSE :#{#user.age} END " + "WHERE id = :#{#user.id}") Mono updateUserQuery(@Param("user") User user); }
⑥ 正常通过swagger工具访问用户新增接口,数据可以正常插入数据库
⑦在保存用户的业务方法中人为创建一个异常,验证是否能够正常保存数据
@Override public MonosaveUser(User user) { Mono monoUser = userRepository.save(user); //人为创建一个数学异常 System.out.println(1/0); return monoUser; }
⑧使用swagger工具访问用户新增接口,从打印的日志来看,数据并没有插入成功,并抛出了异常,但是这里并没有数据库的事务处理
PS:这里可能会误导初学者,认为webflux的r2dbc具备天然的事务处理机制,其实不然,这是因为webflux是响应式非阻塞式编程,所有操作都是异步执行,导致业务方法会跳过用户保存的操作而先去执行异常的部分代码,直接消费异常处理结果,真正的用户保存方法并没有执行。
⑨使用@Transactional,加入事务处理注解,查看是否会有事务执行
@Transactional(rollbackFor = Exception.class) @Override public MonosaveUser(User user) { Mono mono = userRepository.save(user); System.out.println(1/0); return mono; }
⑩使用swagger工具访问用户新增接口,从打印的日志来看,加入@Transactional确实切入了事务,但是由于异步执行,跳过了用户保存的方法,用户保存sql并未执行,这里的事务看起来并没有起什么作用
⑪使用Mono的flatMap方法将业务方法改为如下方式,先执行用户保存方法,在触发异常,查看打印结果
@Transactional(rollbackFor = Exception.class) @Override public MonosaveUser(User user) { return userRepository.save(user).flatMap(item -> { System.out.println("item:" + item); System.out.println(1 / 0); return Mono.just(item); }); }
⑫使用swagger工具访问用户新增接口,从打印的日志来看,数据持久化sql操作真实执行了,但并没有真实插入数据库,事务确定已经真实生效,数据进行了回滚
⑬利用事务的传播机制,测试内层事务出现异常,内外层事务都会回滚
@Transactional(rollbackFor = Exception.class) @Override public MonosaveUser(User user) { return userRepository.save(user).flatMap(item -> { System.out.println("item:" + item); return userRepository.findUsersByName(item.getName()).flatMap(it -> { System.out.println("it:" + it); System.out.println(1 / 0); return Mono.just(it); }); }); }
⑭使用swagger工具访问用户新增接口,从打印的日志来看,内外层的sql都执行了,由于内层异常,导致全局的异常回滚,说明异常的传播机制是shengx
⑮最佳实践:使用aop切面实现数据库事务的统一管理,创建全局事务处理配置类ReactiveTransactionConfig
package com.yundi.atp.config; import io.r2dbc.spi.ConnectionFactory; import jakarta.annotation.Resource; import org.springframework.aop.Advisor; import org.springframework.aop.aspectj.AspectJExpressionPointcut; import org.springframework.aop.support.DefaultPointcutAdvisor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.r2dbc.connection.R2dbcTransactionManager; import org.springframework.transaction.ReactiveTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.annotation.EnableTransactionManagement; import org.springframework.transaction.interceptor.DefaultTransactionAttribute; import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource; import org.springframework.transaction.interceptor.TransactionInterceptor; @Configuration @EnableTransactionManagement public class ReactiveTransactionConfig { @Resource private ConnectionFactory connectionFactory; @Bean public ReactiveTransactionManager reactiveTransactionManager() { return new R2dbcTransactionManager(connectionFactory); } @Bean(name = "myTransactionInterceptor") public TransactionInterceptor myTransactionInterceptor() { //写事务控制 DefaultTransactionAttribute txAttr_REQUIRED = new DefaultTransactionAttribute(); txAttr_REQUIRED.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); txAttr_REQUIRED.setTimeout(2000); //读事务控制 DefaultTransactionAttribute txAttr_REQUIRED_READONLY = new DefaultTransactionAttribute(); txAttr_REQUIRED_READONLY.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); NameMatchTransactionAttributeSource source = new NameMatchTransactionAttributeSource(); source.addTransactionalMethod("page*", txAttr_REQUIRED_READONLY); source.addTransactionalMethod("find*", txAttr_REQUIRED_READONLY); source.addTransactionalMethod("query*", txAttr_REQUIRED_READONLY); source.addTransactionalMethod("list*", txAttr_REQUIRED_READONLY); source.addTransactionalMethod("get*", txAttr_REQUIRED_READONLY); source.addTransactionalMethod("save*", txAttr_REQUIRED); source.addTransactionalMethod("add*", txAttr_REQUIRED); source.addTransactionalMethod("update*", txAttr_REQUIRED); source.addTransactionalMethod("remove*", txAttr_REQUIRED); source.addTransactionalMethod("delete*", txAttr_REQUIRED); return new TransactionInterceptor(reactiveTransactionManager(), source); } /** * 切点 * * @return */ @Bean public Advisor advisor() { AspectJExpressionPointcut pointcut = new AspectJExpressionPointcut(); pointcut.setExpression("execution(* com.yundi.atp.service..*(..))"); return new DefaultPointcutAdvisor(pointcut, myTransactionInterceptor()); } }
⑯使用swagger工具访问用户新增接口,从打印的日志来看,不加@Transactional,依然可以实现事务处理
结语
至此,关于webflux项目数据库事务的内容到这里就结束了,我们下期见。。。。。。