前言
要实现这个功能,只需要代理 ConnectionFactory 即可,代码非常简单:
class DelegatedR2dbcConnectionFactory(private val delegate: ConnectionFactory) : ConnectionFactory by delegate {
override fun create(): Publisher<out Connection> {
return when (val created = delegate.create()) {
is Mono<out Connection> -> {
created.map { DelegatedR2dbcConnection(it) }
}
is Flux<out Connection> -> {
created.map { DelegatedR2dbcConnection(it) }
}
else -> {
throw IllegalStateException("Unsupported publisher type ${created::class.qualifiedName} when creating connection")
}
}
}
}
然后直接在 DelegatedR2dbcConnection 重写 createStatement 方法就行了。
但是实测下来,这种方法虽然可行,但在 WebFlux 的环境下会导致事务失效。事务失效的体现是 BEGIN 这条 SQL 在连接 A 中执行,其余的业务 SQL 在连接B 中执行,这样就导致异常回滚只回滚连接 A 的 SQL 语句,造成事务失效。
目前还没有定位到具体的原因,考虑到时间有限,于是决定用另一种办法,直接代理 DatabaseClient 从最底层修改 SQL 语句。
实现思路
注意到 org.springframework.r2dbc.core.DatabaseClient 下有下面两个接口:
- GenericExecuteSpec sql(String sql);
- GenericExecuteSpec sql(Supplier<String> sqlSupplier);
可以通过重写这两个方法来实现 SQL 拦截修改:
fun databaseClient(connectionFactory: ConnectionPool): DatabaseClient {
val databaseClient = DatabaseClient.builder()
.connectionFactory(connectionFactory)
.executeFunction { statement ->
statement.execute()
}
.build()
return object : DatabaseClient by databaseClient {
override fun sql(sql: String): DatabaseClient.GenericExecuteSpec {
return databaseClient.sql(CrystalFrameworkSQLModifier.processSql(sql))
}
override fun sql(sqlSupplier: Supplier<String>): DatabaseClient.GenericExecuteSpec {
return databaseClient.sql {
CrystalFrameworkSQLModifier.processSql(sqlSupplier.get())
}
}
}
}
看起来没什么问题,但实际运行起来会抛出异常提示找不到参数。
通过堆栈跟踪发现,这里的 Supplier<String> 并没有想像中的那么简单,实际上是 org.springframework.data.r2dbc.core.DefaultStatementMapper.DefaultPreparedOperation 这个对象。
当我正准备继续代理这个对象的时候发现,这个 DefaultStatementMapper 居然是 package-private 级别的,因此对外根本就不可见。
幸运的是,它实现的 PreparedOperation 接口是公开的,内部有 String toQuery(); 方法可以用于获取最终的 SQL 语句。
通过堆栈跟踪发现,其底层确实是通过这个方法来拿到最终 SQL 的。
因此实现思路变成:
- 代理 DatabaseClient 对象
- 重写 DatabaseClient$sql() 函数
- 代理 PreparedOperation 对象
- 修改 SQL 语句并返回
具体实现
@Bean
fun databaseClient(connectionFactory: ConnectionPool): DatabaseClient {
val databaseClient = DatabaseClient.builder()
.connectionFactory(connectionFactory)
.executeFunction { statement ->
statement.execute()
}
.build()
return object : DatabaseClient by databaseClient {
override fun sql(sql: String): DatabaseClient.GenericExecuteSpec {
return databaseClient.sql(CrystalFrameworkSQLModifier.processSql(sql))
}
override fun sql(sqlSupplier: Supplier<String>): DatabaseClient.GenericExecuteSpec {
val delegate = sqlSupplier as PreparedOperation<*>
return databaseClient.sql(object : PreparedOperation<Any> {
override fun getSource(): Any = delegate.source
override fun bindTo(target: BindTarget) {
delegate.bindTo(target)
}
override fun toQuery(): String {
return CrystalFrameworkSQLModifier.processSql(delegate.toQuery())
}
})
}
}
}
结束语
研究这件事情的起因是想用 @Transactional 注解实现事务,但无论我怎么尝试事务虽然正常打印出回滚日志却实际上没有回滚。
首先排除了常见的 @Transactional 注解失效的原因,又验证了数据库确实可以回滚事务。
然后将 kotlin suspend 函数改为 Publisher 函数,依然不起任何作用。
直到我将函数精简到只有一行 repository 操作并对比日志才发现,开启事务执行的 BEGIN 语句和这条业务 SQL 根本不在同一个连接中,于是想到之前是通过拦截 ConnectionFactory 实现的 SQL 拦截。
虽然我不了解在 reactive 环境下 springboot 是怎么管理事务连接的,但经过调试发现,只要使用默认的 ConnectionPool 而不是自己创建的代理对象,事务就可以正常被回滚。
因此想到可以直接在 DatabaseClient 层拦截修改最终的 SQL 语句。




