queue

package
v0.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

View Source
const (
	QueueKey string = "qk"
)
View Source
const (
	TypeNode = "queues"
)

Variables

View Source
var DefaultMaxQueueLen = 100

默认最大队列长度 100

View Source
var (
	MaxRetrtCount int64 = 3 //最大重试三次
)
View Source
var Nil error = errors.New("Queue Nil")

Functions

func DeregisterProducer

func DeregisterProducer(name string)

Deregister 清理配置适配器

func DeregisterrConsumer

func DeregisterrConsumer(name string)

Deregister 清理配置适配器

func NewBuilder

func NewBuilder() container.StandardBuilder

func RegisterConsumer

func RegisterConsumer(resolver MqcResover)

RegisterConsumer 注册消息消费

func RegisterProducer

func RegisterProducer(resolver MqpResover)

RegisterProducer 注册配置文件适配器

Types

type ConsumeCallback

type ConsumeCallback func(IMQCMessage)

type IComponentQueue

type IComponentQueue interface {
	GetQueue(name string) (q IQueue)
}

IComponentQueue Component Queue

type IMQC

type IMQC interface {
	Connect() error
	Consume(task TaskInfo, callback ConsumeCallback) (err error)
	Unconsume(queue string)
	Start()
	Close()
}

IMQC consumer接口

func NewMQC

func NewMQC(proto, configName string, setting config.Config) (IMQC, error)

NewMQC 根据适配器名称及参数返回配置处理器

type IMQCMessage

type IMQCMessage interface {
	RetryCount() int64
	Ack() error
	Nack(error) error
	Original() string
	GetMessage() Message
}

IMQCMessage 队列消息

type IMQP

type IMQP interface {
	Push(key string, value Message) error
	DelayPush(key string, value Message, delaySeconds int64) error
	Count(key string) (int64, error)
	Close() error
}

IMQP 消息生产

func NewMQP

func NewMQP(proto string, setting config.Config, opts ...Option) (IMQP, error)

NewMQP 根据适配器名称及参数返回配置处理器

type IQueue

type IQueue interface {
	Send(ctx context.Context, key string, value interface{}) error
	DelaySend(ctx context.Context, key string, value interface{}, delaySeconds int64) error
	Count(key string) (int64, error)
}

IQueue 消息队列

type Message

type Message interface {
	Header() map[string]string
	Body() map[string]interface{}
	String() string
}

func NewMsg added in v0.3.0

func NewMsg(obj interface{}, opts ...MsgOption) (msg Message, err error)

type MqcResover added in v0.4.4

type MqcResover interface {
	Name() string
	Resolve(configName string, setting config.Config) (IMQC, error)
}

mqcResover 定义消息消费解析器

type MqpResover added in v0.4.4

type MqpResover interface {
	Name() string
	Resolve(setting config.Config, opts ...Option) (IMQP, error)
}

MqpResover 定义配置文件转换方法

type MsgItem

type MsgItem struct {
	HeaderMap xtypes.SMap `json:"header"`
	BodyMap   xtypes.XMap `json:"body"`
	// contains filtered or unexported fields
}

func (*MsgItem) Body

func (w *MsgItem) Body() map[string]interface{}

func (*MsgItem) Header

func (w *MsgItem) Header() map[string]string

func (*MsgItem) String

func (w *MsgItem) String() string

type MsgOption added in v0.4.3

type MsgOption func(m *MsgWrap)

func WithHeader added in v0.3.0

func WithHeader(key, val string) MsgOption

func WithXRequestID added in v0.3.0

func WithXRequestID(reqId string) MsgOption

type MsgWrap added in v0.3.0

type MsgWrap struct {
	HeaderMap xtypes.SMap `json:"header,omitempty"`
	BodyMap   xtypes.XMap `json:"body"`
	// contains filtered or unexported fields
}

func (*MsgWrap) Body added in v0.3.0

func (w *MsgWrap) Body() map[string]interface{}

func (*MsgWrap) Header added in v0.3.0

func (w *MsgWrap) Header() map[string]string

func (*MsgWrap) String added in v0.3.0

func (w *MsgWrap) String() string

type Option added in v0.3.0

type Option func(*Options)

Option 配置选项

func WithDBIndex added in v0.4.3

func WithDBIndex(idx int) Option

func WithOption added in v0.4.3

func WithOption(key string, val any) Option

func WithPoolSize added in v0.4.3

func WithPoolSize(size int) Option

type Options added in v0.4.3

type Options struct {
	CfgData map[string]any
}

type StandardQueue

type StandardQueue interface {
	GetQueue(name string, opts ...Option) (q IQueue)
}

func NewStandardQueue

func NewStandardQueue(c container.Container) StandardQueue

NewStandardQueue 创建queue

type TaskInfo added in v0.2.0

type TaskInfo interface {
	GetQueue() string
	GetConcurrency() int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL