Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ErrDuplicateMessage = errors.New("duplicate message")
ErrDuplicateMessage 重复消息
Functions ¶
This section is empty.
Types ¶
type DelayServiceConsumer ¶
type DelayServiceConsumer struct {
// contains filtered or unexported fields
}
DelayServiceConsumer 延迟服务消费者
func NewDelayServiceConsumer ¶
func NewDelayServiceConsumer(producer sarama.SyncProducer, delay time.Duration, realTopic string) *DelayServiceConsumer
func (*DelayServiceConsumer) Cleanup ¶
func (c *DelayServiceConsumer) Cleanup(sarama.ConsumerGroupSession) error
func (*DelayServiceConsumer) ConsumeClaim ¶
func (c *DelayServiceConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
func (*DelayServiceConsumer) Setup ¶
func (c *DelayServiceConsumer) Setup(sarama.ConsumerGroupSession) error
type KafkaDelayQueueProducer ¶
type KafkaDelayQueueProducer struct {
// contains filtered or unexported fields
}
func NewKafkaDelayQueueProducer ¶
func NewKafkaDelayQueueProducer(producer sarama.SyncProducer, delayServiceConsumerGroup sarama.ConsumerGroup, delayTime time.Duration, delayTopic, realTopic string) *KafkaDelayQueueProducer
NewKafkaDelayQueueProducer 延迟队列,包含了生产者和延迟服务 producer 生产者 delayServiceConsumerGroup 延迟服务消费者 delayTime 延迟时间 delayTopic 延迟服务主题 realTopic 真实队列主题
func (*KafkaDelayQueueProducer) SendMessage ¶
func (q *KafkaDelayQueueProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error)
SendMessage 发送消息
type Msg ¶
type Msg struct { Topic string // 消息的主题 Key string // 消息的Key Body []byte // 消息的Body Partition int // 分区号 Delay time.Duration // 延迟时间(秒) ReadyTime time.Time // 消息准备好执行的时间(now + delay) }
Msg 消息
type PartitionRedisDelayQueue ¶
type PartitionRedisDelayQueue struct {
// contains filtered or unexported fields
}
func NewPartitionRedisDelayQueue ¶
func NewPartitionRedisDelayQueue(client *redis.Client) *PartitionRedisDelayQueue
type SimpleRedisDelayQueue ¶
type SimpleRedisDelayQueue struct {
// contains filtered or unexported fields
}
func NewSimpleRedisDelayQueue ¶
func NewSimpleRedisDelayQueue(client *redis.Client) *SimpleRedisDelayQueue
Source Files ¶
Click to show internal directories.
Click to hide internal directories.