queue

package
v1.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2023 License: MIT Imports: 3 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DefaultTopic       = "default"
	DefaultConcurrency = 1
	DefaultMaxRetry    = 3
)

Variables

View Source
var EmptyPublishOptions = func() *PublishOptions {
	return &PublishOptions{
		Context: context.Background(),
	}
}
View Source
var EmptySubscribeOptions = func() *SubscribeOptions {
	return &SubscribeOptions{
		Topic:       DefaultTopic,
		Concurrency: DefaultConcurrency,
		MaxRetry:    DefaultMaxRetry,
		Idempotent:  IdempotentImplementor(),
	}
}

Functions

func ContextWithQueueService

func ContextWithQueueService(ctx context.Context) context.Context

func RegisterBackendImplementor

func RegisterBackendImplementor(backend Backend)

RegisterBackendImplementor registers the queue backend service implementor.

func RegisterIdempotentImplementor

func RegisterIdempotentImplementor(idempotent Idempotent)

RegisterIdempotentImplementor registers the idempotent service implementor.

func RegisterImplementor

func RegisterImplementor(s Queue)

RegisterImplementor registers the queue service implementor.

func StreamServerInterceptor

func StreamServerInterceptor() grpc.StreamServerInterceptor

StreamServerInterceptor returns a new streaming server interceptor for message queue service.

func UnaryServerInterceptor

func UnaryServerInterceptor() grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a new unary server interceptor for message queue service.

Types

type Backend

type Backend interface {
	// Type returns backend type.
	Type() string
	// Ping connects the backend server if not connected.
	// Will be called before every Read/Write operation.
	Ping() error
	// MaxDelay returns the max delay duration supported by the backend.
	// A negative value means no limitation.
	// A zero value means delay operation is not supported.
	MaxDelay() time.Duration
	// GetQueues returns all queue names in backend storage.
	GetQueues() ([]string, error)
	// GetTopics returns all queue/topics in backend storage.
	GetTopics() (map[string][]string, error)
	// GetQueueLength returns all topic length of specified queue in backend storage.
	GetQueueLength(queue string) (map[string]int64, error)
	// GetTopicLength returns the specified queue/topic length in backend storage.
	GetTopicLength(queue, topic string) (int64, error)

	// Read subscribes the message of the specified queue and topic.
	Read(ctx context.Context, queue, topic string, ch chan<- MessageWrapper) error
	// Write publishes content data to the specified queue.
	Write(ctx context.Context, queue string, delay time.Duration, content []byte) error
}

Backend interface.

func BackendImplementor

func BackendImplementor() Backend

BackendImplementor returns the queue backend service implementor.

type Consumer

type Consumer interface {
	Handle(context.Context, Message) error
}

Consumer handler interface.

type ConsumerFunc

type ConsumerFunc func(context.Context, Message) error

func (ConsumerFunc) Handle

func (fn ConsumerFunc) Handle(ctx context.Context, m Message) error

type Idempotent

type Idempotent interface {
	// BeforeProcess should be invoked before process message.
	// Returns true to continue the message processing.
	// Returns false to invoke Cancel for the message.
	BeforeProcess(Message) bool
	// AfterProcess should be invoked after processing.
	AfterProcess(Message, ProcessStatus)
}

Idempotent interface.

func IdempotentImplementor

func IdempotentImplementor() Idempotent

IdempotentImplementor returns the idempotent service implementor.

type Message

type Message interface {
	// Queue name of this message.
	Queue() string
	// Topic name of this message.
	Topic() string

	// UniqueID returns the unique ID of this message.
	UniqueID() string
	// Content returns the message body content.
	Content() []byte
	// Timestamp indicates the creation time of the message.
	Timestamp() time.Time
	// NotBefore indicates the message should not be processed before this timestamp.
	NotBefore() time.Time
	// Retry times.
	Retry() int
	// IsPing returns true for a ping message.
	IsPing() bool
}

Message interface.

type MessageOperation

type MessageOperation interface {
	// Begin to process the message.
	Begin()
	// Cancel indicates the message should be ignored.
	Cancel()
	// End indicates a successful process.
	End()
	// Requeue indicates the message should be retried.
	Requeue()
	// Fail indicates a failed process.
	Fail()
}

MessageOperation interface.

type MessageWrapper

type MessageWrapper interface {
	Message
	MessageOperation
}

MessageWrapper interface.

type PingMessage

type PingMessage struct{}

func (*PingMessage) Begin

func (m *PingMessage) Begin()

Begin to process the message.

func (*PingMessage) Cancel

func (m *PingMessage) Cancel()

Cancel indicates the message should be ignored.

func (*PingMessage) Content

func (m *PingMessage) Content() []byte

Content returns the message body content.

func (*PingMessage) End

func (m *PingMessage) End()

End indicates a successful process.

func (*PingMessage) Fail

func (m *PingMessage) Fail()

Fail indicates a failed process.

func (*PingMessage) IsPing

func (m *PingMessage) IsPing() bool

IsPing returns true for a ping message.

func (*PingMessage) NotBefore

func (m *PingMessage) NotBefore() time.Time

NotBefore indicates the message should not be processed before this timestamp.

func (*PingMessage) Queue

func (m *PingMessage) Queue() string

Queue name of this message.

func (*PingMessage) Requeue

func (m *PingMessage) Requeue()

Requeue indicates the message should be retried.

func (*PingMessage) Retry

func (m *PingMessage) Retry() int

Retry times.

func (*PingMessage) Timestamp

func (m *PingMessage) Timestamp() time.Time

Timestamp indicates the creation time of the message.

func (*PingMessage) Topic

func (m *PingMessage) Topic() string

Topic name of this message.

func (*PingMessage) UniqueID

func (m *PingMessage) UniqueID() string

UniqueID returns the unique ID of this message.

type ProcessStatus

type ProcessStatus int

ProcessStatus type.

const (
	Created ProcessStatus = iota
	Processing
	Canceled
	Succeeded
	Failed
	Requeued
)

type PublishOption

type PublishOption func(*PublishOptions)

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

func WithPublishDelay

func WithPublishDelay(delay time.Duration) PublishOption

type PublishOptions

type PublishOptions struct {
	context.Context
	Delay time.Duration
}

type Queue

type Queue interface {
	// Publish writes a message body to the specified queue.
	Publish(queue string, content []byte, opts ...PublishOption) error
	// Subscribe consumes the messages of the specified queue.
	Subscribe(queue string, handler Consumer, opts ...SubscribeOption) error
}

func ContextQueueService

func ContextQueueService(ctx context.Context) Queue

func Implementor

func Implementor() Queue

Implementor returns the queue service implementor.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func WithConsumeComponent

func WithConsumeComponent(component string) SubscribeOption

func WithConsumeConcurrency

func WithConsumeConcurrency(concurrency int) SubscribeOption

func WithConsumeIdempotent

func WithConsumeIdempotent(impl Idempotent) SubscribeOption

func WithConsumeProduct

func WithConsumeProduct(product string) SubscribeOption

func WithConsumeRetry

func WithConsumeRetry(retry int) SubscribeOption

func WithConsumeTopic

func WithConsumeTopic(topic string) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	Component   string
	Product     string
	Topic       string
	Concurrency int
	MaxRetry    int
	Idempotent  Idempotent
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL