基于 Kafka+Redis ZSet 实现延时消息队列

Kafka 并没有提供延时队列,可以用 Redis 的 ZSet 来实现。

ZSet 是按 score 排序的,可以用消息实际消费的时间作为 score,然后取 [0, 当前时间] 区间内的消息即可实现延时队列。

基本接口

首先定义一个基本接口 DelayedMessageQueue

interface DelayedMessageQueue<V> {
    /**
     * Post a delayed message
     *
     * @param data Data
     * @param delayedSeconds Delayed Seconds
     * @return Message post successfully
     */
    fun post(data: V, delayedSeconds: Long): Boolean

    /**
     * Check and consume delayed message
     *
     */
    fun check()
}

Redis ZSet

先创建一个抽象类 RedisZSetDelayedQueue 实现上面的 DelayedMessageQueue 接口:

abstract class RedisZSetDelayedQueue<V: Any>(
    private val redisOperator: RedisOperator<V>,
    private val redisKey: String,
    private val sendMessage: (V) -> Boolean
) : DelayedMessageQueue<V> {

}

post方法:

override fun post(data: V, delayedSeconds: Long): Boolean {
    val executeTime = Instant.now().plusSeconds(delayedSeconds)
    return try {
        redisOperator.redisTemplate
            .opsForZSet()
            .addIfAbsent(redisKey, data, (executeTime.toEpochMilli() / 1000).toDouble())
        true
    } catch (e: Exception) {
        logger.warn("Message post failed", e)
        false
    }
}

check方法:

override fun check() {
    val currentTime = Instant.now().toEpochMilli() / 1000
    val messages = redisOperator.redisTemplate
        .opsForZSet()
        .rangeByScore(redisKey, 0.0, currentTime.toDouble())
    messages?.forEach {
        val sendResult = sendMessage.invoke(it)
        if (sendResult) {
            redisOperator.redisTemplate.opsForZSet().remove(redisKey, it)
        }
    }
}

Kafka 实现

abstract class KafkaRedisZSetDelayedQueue<K, V: Any>(
    redisOperator: RedisOperator<V>,
    redisKey: String,
    kafkaTemplate: KafkaTemplate<K, V>,
    topic: String,
    onFailed: ((V) -> Unit)? = null,
    onSuccess: ((V) -> Unit)? = null
) : RedisZSetDelayedQueue<V>(
    redisOperator,
    redisKey,
    sendMessage = {
        (kafkaTemplate.send(topic, it).get().recordMetadata != null).also { result ->
            if (result)
                onSuccess?.invoke(it)
            else
                onFailed?.invoke(it)
        }
    }
)

使用示例

注意 check() 方法需要定期执行,可以用 Cron Job 或者 @Schedule 注解,只需要继承 KafkaRedisZSetDelayedQueue 类即可。

@Component
class NotePublishMQ(
    redisOperator: RedisOperator<NotePublishDTO>,
    kafkaTemplate: KafkaTemplate<String, NotePublishDTO>,
) : KafkaRedisZSetDelayedQueue<String, NotePublishDTO>(
    redisOperator, "delayed-mq:note-publish", kafkaTemplate, "note-publish"
) {
    private val logger = logger()

    @Resource
    private lateinit var notePublishProcessService: NotePublishProcessService

    @KafkaListener(topics = ["note-publish"], groupId = "note-publisher")
    fun listen(dto: NotePublishDTO, acknowledgment: Acknowledgment) {
        try {
            notePublishProcessService.publishNote(dto)
        } catch (e: Exception) {
            // Failed, retry in 3 minutes
            super.post(dto, 180)
            logger.warn("Note ${dto.title} by ${dto.userId} publish failed, retry in 3 minutes")
        }
        acknowledgment.acknowledge()
    }

    @Scheduled(fixedDelay = 1000)
    override fun check() {
        super.check()
    }
}
未经允许禁止转载本站内容,经允许转载后请严格遵守CC-BY-NC-ND知识共享协议4.0,代码部分则采用GPL v3.0协议
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇