queue

package
v0.2.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 24, 2023 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	TypeNode = "queues"
)

Variables

View Source
var DefaultMaxQueueLen = 100

默认最大队列长度 500

View Source
var Nil error = errors.New("Queue Nil")

Functions

func DeregisterProducer

func DeregisterProducer(name string)

Deregister 清理配置适配器

func DeregisterrConsumer

func DeregisterrConsumer(name string)

Deregister 清理配置适配器

func NewBuilder

func NewBuilder() container.StandardBuilder

func RegisterConsumer

func RegisterConsumer(resolver mqcResover)

RegisterConsumer 注册消息消费

func RegisterProducer

func RegisterProducer(resolver imqpResover)

RegisterProducer 注册配置文件适配器

Types

type ConsumeCallback

type ConsumeCallback func(IMQCMessage)

type IComponentQueue

type IComponentQueue interface {
	GetQueue(name string) (q IQueue)
}

IComponentQueue Component Queue

type IMQC

type IMQC interface {
	Connect() error
	Consume(task TaskInfo, callback ConsumeCallback) (err error)
	Unconsume(queue string)
	Start()
	Close()
}

IMQC consumer接口

func NewMQC

func NewMQC(proto string, setting config.Config) (IMQC, error)

NewMQC 根据适配器名称及参数返回配置处理器

type IMQCMessage

type IMQCMessage interface {
	RetryCount() int64
	Ack() error
	Nack(error) error
	Original() string
	GetMessage() Message
}

IMQCMessage 队列消息

type IMQP

type IMQP interface {
	Push(key string, value Message) error
	DelayPush(key string, value Message, delaySeconds int64) error
	Count(key string) (int64, error)
	Close() error
}

IMQP 消息生产

func NewMQP

func NewMQP(proto string, setting config.Config) (IMQP, error)

NewMQP 根据适配器名称及参数返回配置处理器

type IQueue

type IQueue interface {
	Send(ctx context.Context, key string, value interface{}) error
	DelaySend(ctx context.Context, key string, value interface{}, delaySeconds int64) error
	Count(key string) (int64, error)
}

IQueue 消息队列

type Message

type Message interface {
	Header() map[string]string
	Body() map[string]interface{}
	String() string
}

type MsgItem

type MsgItem struct {
	HeaderMap xtypes.SMap `json:"header"`
	BodyMap   xtypes.XMap `json:"body"`
	// contains filtered or unexported fields
}

func (*MsgItem) Body

func (w *MsgItem) Body() map[string]interface{}

func (*MsgItem) Header

func (w *MsgItem) Header() map[string]string

func (*MsgItem) String

func (w *MsgItem) String() string

type StandardQueue

type StandardQueue interface {
	GetQueue(name ...string) (q IQueue)
}

func NewStandardQueue

func NewStandardQueue(c container.Container) StandardQueue

NewStandardQueue 创建queue

type TaskInfo added in v0.2.0

type TaskInfo interface {
	GetQueue() string
	GetConcurrency() int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL