Kafka 并没有提供延时队列,可以用 Redis 的 ZSet 来实现。
ZSet 是按 score 排序的,可以用消息实际消费的时间作为 score,然后取 [0, 当前时间]
区间内的消息即可实现延时队列。
基本接口
首先定义一个基本接口 DelayedMessageQueue
:
interface DelayedMessageQueue<V> {
/**
* Post a delayed message
*
* @param data Data
* @param delayedSeconds Delayed Seconds
* @return Message post successfully
*/
fun post(data: V, delayedSeconds: Long): Boolean
/**
* Check and consume delayed message
*
*/
fun check()
}
Redis ZSet
先创建一个抽象类 RedisZSetDelayedQueue
实现上面的 DelayedMessageQueue
接口:
abstract class RedisZSetDelayedQueue<V: Any>(
private val redisOperator: RedisOperator<V>,
private val redisKey: String,
private val sendMessage: (V) -> Boolean
) : DelayedMessageQueue<V> {
}
post方法:
override fun post(data: V, delayedSeconds: Long): Boolean {
val executeTime = Instant.now().plusSeconds(delayedSeconds)
return try {
redisOperator.redisTemplate
.opsForZSet()
.addIfAbsent(redisKey, data, (executeTime.toEpochMilli() / 1000).toDouble())
true
} catch (e: Exception) {
logger.warn("Message post failed", e)
false
}
}
check方法:
override fun check() {
val currentTime = Instant.now().toEpochMilli() / 1000
val messages = redisOperator.redisTemplate
.opsForZSet()
.rangeByScore(redisKey, 0.0, currentTime.toDouble())
messages?.forEach {
val sendResult = sendMessage.invoke(it)
if (sendResult) {
redisOperator.redisTemplate.opsForZSet().remove(redisKey, it)
}
}
}
Kafka 实现
abstract class KafkaRedisZSetDelayedQueue<K, V: Any>(
redisOperator: RedisOperator<V>,
redisKey: String,
kafkaTemplate: KafkaTemplate<K, V>,
topic: String,
onFailed: ((V) -> Unit)? = null,
onSuccess: ((V) -> Unit)? = null
) : RedisZSetDelayedQueue<V>(
redisOperator,
redisKey,
sendMessage = {
(kafkaTemplate.send(topic, it).get().recordMetadata != null).also { result ->
if (result)
onSuccess?.invoke(it)
else
onFailed?.invoke(it)
}
}
)
使用示例
注意 check()
方法需要定期执行,可以用 Cron Job 或者 @Schedule 注解,只需要继承 KafkaRedisZSetDelayedQueue
类即可。
@Component
class NotePublishMQ(
redisOperator: RedisOperator<NotePublishDTO>,
kafkaTemplate: KafkaTemplate<String, NotePublishDTO>,
) : KafkaRedisZSetDelayedQueue<String, NotePublishDTO>(
redisOperator, "delayed-mq:note-publish", kafkaTemplate, "note-publish"
) {
private val logger = logger()
@Resource
private lateinit var notePublishProcessService: NotePublishProcessService
@KafkaListener(topics = ["note-publish"], groupId = "note-publisher")
fun listen(dto: NotePublishDTO, acknowledgment: Acknowledgment) {
try {
notePublishProcessService.publishNote(dto)
} catch (e: Exception) {
// Failed, retry in 3 minutes
super.post(dto, 180)
logger.warn("Note ${dto.title} by ${dto.userId} publish failed, retry in 3 minutes")
}
acknowledgment.acknowledge()
}
@Scheduled(fixedDelay = 1000)
override fun check() {
super.check()
}
}