streamredis

package
v0.5.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2024 License: MIT Imports: 16 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DELAY_QUEUE_NAME = "glue:delayqueue:stream"
)
View Source
const (
	Proto = "streamredis"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	EnableDeadLetter bool //开启死信队列
	DeadLetterQueue  string
	// contains filtered or unexported fields
}

Consumer Consumer

func NewConsumer

func NewConsumer(configName string, config config.Config) (consumer *Consumer, err error)

NewConsumerByConfig 创建新的Consumer

func (*Consumer) Close

func (consumer *Consumer) Close() error

Close 关闭当前连接

func (*Consumer) Connect

func (consumer *Consumer) Connect() (err error)

Connect 连接服务器

func (*Consumer) Consume

func (consumer *Consumer) Consume(task queue.TaskInfo, callback queue.ConsumeCallback) (err error)

Consume 注册消费信息

func (*Consumer) Start

func (consumer *Consumer) Start() error

func (*Consumer) Unconsume

func (consumer *Consumer) Unconsume(queue string)

UnConsume 取消注册消费

type ConsumerOptions

type ConsumerOptions struct {
	DeadLetterQueue   string `json:"deadletter_queue"  yaml:"deadletter_queue"`    //开启死信队列
	Concurrency       int    `json:"concurrency" yaml:"concurrency"`               //并发的消费者数量
	BufferSize        int    `json:"buffer_size" yaml:"buffer_size"`               //队列长度
	BlockingTimeout   int    `json:"blocking_timeout" yaml:"blocking_timeout"`     //获取消息阻塞时间秒
	VisibilityTimeout int    `json:"visibility_timeout" yaml:"visibility_timeout"` //消息保留时间 秒
	ReclaimInterval   int    `json:"reclaim_interval" yaml:"reclaim_interval"`     //失败消息重试周期
	GroupName         string `json:"group_name" yaml:"group_name"`
}

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer memcache配置文件

func NewProducer

func NewProducer(config config.Config, opts ...queue.Option) (m *Producer, err error)

NewProducerByConfig 根据配置文件创建一个redis连接

func (*Producer) BatchPush added in v0.4.6

func (p *Producer) BatchPush(key string, msgList ...queue.Message) error

func (*Producer) Close

func (c *Producer) Close() error

Close 释放资源

func (*Producer) Count

func (c *Producer) Count(key string) (int64, error)

Count 获取列表中的元素个数

func (*Producer) DelayPush added in v0.1.19

func (c *Producer) DelayPush(key string, msg queue.Message, delaySeconds int64) error

Push 向存于 key 的列表的尾部插入所有指定的值

func (*Producer) Push

func (c *Producer) Push(key string, msg queue.Message) error

Push 向存于 key 的列表的尾部插入所有指定的值

type ProductOptions

type ProductOptions struct {
	StreamMaxLength      int64 `json:"stream_max_len" yaml:"stream_max_len"`           //消息队列长度
	ApproximateMaxLength bool  `json:"approximate_max_len" yaml:"approximate_max_len"` //使用近似队列长度
	DelayInterval        int   `json:"delay_interval" yaml:"delay_interval"`
}

type QueueItem

type QueueItem struct {
	QueueName         string
	Concurrency       int
	BufferSize        int
	VisibilityTimeout time.Duration
	// contains filtered or unexported fields
}

func (QueueItem) GetBufferSize added in v0.5.6

func (s QueueItem) GetBufferSize() int

func (QueueItem) GetConcurrency added in v0.2.0

func (q QueueItem) GetConcurrency() int

func (QueueItem) GetQueue added in v0.2.0

func (q QueueItem) GetQueue() string

func (QueueItem) GetVisibilityTimeout added in v0.5.6

func (s QueueItem) GetVisibilityTimeout() time.Duration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL