streamredis

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 3, 2023 License: MIT Imports: 18 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DELAY_QUEUE_NAME = "glue:delayqueue:stream"
)
View Source
const (
	Proto = "streamredis"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer Consumer

func NewConsumer

func NewConsumer(config config.Config) (consumer *Consumer, err error)

NewConsumerByConfig 创建新的Consumer

func (*Consumer) Close

func (consumer *Consumer) Close()

Close 关闭当前连接

func (*Consumer) Connect

func (consumer *Consumer) Connect() (err error)

Connect 连接服务器

func (*Consumer) Consume

func (consumer *Consumer) Consume(task queue.TaskInfo, callback queue.ConsumeCallback) (err error)

Consume 注册消费信息

func (*Consumer) Start

func (consumer *Consumer) Start()

func (*Consumer) Unconsume

func (consumer *Consumer) Unconsume(queue string)

UnConsume 取消注册消费

type ConsumerOptions

type ConsumerOptions struct {
	Concurrency       int    `json:"concurrency" yaml:"concurrency"`               //并发的消费者数量
	BufferSize        int    `json:"buffer_size" yaml:"buffer_size"`               //队列长度
	BlockingTimeout   int    `json:"blocking_timeout" yaml:"blocking_timeout"`     //获取消息阻塞时间秒
	VisibilityTimeout int    `json:"visibility_timeout" yaml:"visibility_timeout"` //消息保留时间 秒
	ReclaimInterval   int    `json:"reclaim_interval" yaml:"reclaim_interval"`     //失败消息重试周期
	GroupName         string `json:"group_name" yaml:"group_name"`
}

type MsgBody

type MsgBody struct {
	HeaderMap xtypes.SMap `json:"header"`
	BodyMap   xtypes.XMap `json:"body"`
	// contains filtered or unexported fields
}

func (*MsgBody) Body

func (m *MsgBody) Body() map[string]interface{}

func (*MsgBody) Header

func (m *MsgBody) Header() map[string]string

func (*MsgBody) String

func (m *MsgBody) String() string

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer memcache配置文件

func NewProducer

func NewProducer(config config.Config) (m *Producer, err error)

NewProducerByConfig 根据配置文件创建一个redis连接

func (*Producer) Close

func (c *Producer) Close() error

Close 释放资源

func (*Producer) Count

func (c *Producer) Count(key string) (int64, error)

Count 获取列表中的元素个数

func (*Producer) DelayPush added in v0.1.19

func (c *Producer) DelayPush(key string, msg queue.Message, delaySeconds int64) error

Push 向存于 key 的列表的尾部插入所有指定的值

func (*Producer) Push

func (c *Producer) Push(key string, msg queue.Message) error

Push 向存于 key 的列表的尾部插入所有指定的值

type ProductOptions

type ProductOptions struct {
	DelayQueueName       string `json:"delay_queue_name" yaml:"delay_queue_name"`
	StreamMaxLength      int64  `json:"stream_max_len" yaml:"stream_max_len"`           //消息队列长度
	ApproximateMaxLength bool   `json:"approximate_max_len" yaml:"approximate_max_len"` //使用近似队列长度
	RangeSeconds         int    `json:"range_seconds" yaml:"range_seconds"`
	DelayInterval        int    `json:"delay_interval" yaml:"delay_interval"`
}

type QueueItem

type QueueItem struct {
	QueueName   string
	Concurrency int //等于0 ,代表不限制
	// contains filtered or unexported fields
}

func (QueueItem) GetConcurrency added in v0.2.0

func (q QueueItem) GetConcurrency() int

func (QueueItem) GetQueue added in v0.2.0

func (q QueueItem) GetQueue() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL