queue

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2022 License: MIT Imports: 5 Imported by: 3

Documentation

Index

Constants

View Source
const (
	DefaultGroup       = "default"
	DefaultConcurrency = 1
	DefaultMaxRetry    = 3
)

Variables

View Source
var EmptyPublishOptions = func() *PublishOptions {
	key, _ := snowflake.NextID()
	return &PublishOptions{
		Context:    context.Background(),
		Sequence:   key,
		Key:        strconv.FormatUint(key, 10),
		Properties: map[string]string{},
	}
}
View Source
var EmptySubscribeOptions = func() *SubscribeOptions {
	return &SubscribeOptions{
		Context:     context.Background(),
		Group:       DefaultGroup,
		Concurrency: DefaultConcurrency,
		MaxRetry:    DefaultMaxRetry,
		Idempotent:  IdempotentImplementor(),
	}
}

Functions

func ContextWithQueueService

func ContextWithQueueService(ctx context.Context) context.Context

func RegisterBackendImplementor

func RegisterBackendImplementor(backend Backend)

RegisterBackendImplementor registers the queue backend service implementor.

func RegisterIdempotentImplementor

func RegisterIdempotentImplementor(idempotent Idempotent)

RegisterIdempotentImplementor registers the idempotent service implementor.

func RegisterImplementor

func RegisterImplementor(s Queue)

RegisterImplementor registers the queue service implementor.

func StreamServerInterceptor

func StreamServerInterceptor() grpc.StreamServerInterceptor

StreamServerInterceptor returns a new streaming server interceptor for message queue service.

func UnaryServerInterceptor

func UnaryServerInterceptor() grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a new unary server interceptor for message queue service.

Types

type Backend

type Backend interface {
	// Type returns backend type.
	Type() string
	// Ping connects the backend server if not connected.
	// Will be called before every Read/Write operation.
	Ping() error

	// Read subscribes the message of the specified topic.
	Read(topic string, ch chan<- MessageWrapper, opts *SubscribeOptions) error
	// Write publishes content data to the specified queue.
	Write(topic string, content []byte, opts *PublishOptions) error
}

Backend interface.

func BackendImplementor

func BackendImplementor() Backend

BackendImplementor returns the queue backend service implementor.

type ConsumeOffset

type ConsumeOffset int
const (
	ConsumeFromLatest ConsumeOffset = iota
	ConsumeFromEarliest
)

type Consumer

type Consumer interface {
	Handle(context.Context, Message) error
}

Consumer handler interface.

type ConsumerFunc

type ConsumerFunc func(context.Context, Message) error

func (ConsumerFunc) Handle

func (fn ConsumerFunc) Handle(ctx context.Context, m Message) error

type Idempotent

type Idempotent interface {
	// BeforeProcess should be invoked before process message.
	// Returns true to continue the message processing.
	// Returns false to invoke Cancel for the message.
	BeforeProcess(Message) bool
	// AfterProcess should be invoked after processing.
	AfterProcess(Message, ProcessStatus)
}

Idempotent interface.

func IdempotentImplementor

func IdempotentImplementor() Idempotent

IdempotentImplementor returns the idempotent service implementor.

type Message

type Message interface {
	// Topic name of this message.
	Topic() string
	// Group name of this message.
	Group() string

	// Key returns the unique key ID of this message.
	Key() string
	// Content returns the message body content.
	Content() []byte
	// Properties returns the properties of this message.
	Properties() map[string]string

	// Timestamp indicates the creation time of the message.
	Timestamp() time.Time
	// NotBefore indicates the message should not be processed before this timestamp.
	NotBefore() time.Time

	// Retry times.
	Retry() int
	// IsPing returns true for a ping message.
	IsPing() bool
}

Message interface.

type MessageOperation

type MessageOperation interface {
	// Begin to process the message.
	Begin()
	// Cancel indicates the message should be ignored.
	Cancel()
	// End indicates a successful process.
	End()
	// Requeue indicates the message should be retried.
	Requeue()
	// Fail indicates a failed process.
	Fail()
}

MessageOperation interface.

type MessageWrapper

type MessageWrapper interface {
	Message
	MessageOperation
}

MessageWrapper interface.

type PingMessage

type PingMessage struct{}

func (*PingMessage) Begin

func (m *PingMessage) Begin()

Begin to process the message.

func (*PingMessage) Cancel

func (m *PingMessage) Cancel()

Cancel indicates the message should be ignored.

func (*PingMessage) Content

func (m *PingMessage) Content() []byte

Content returns the message body content.

func (*PingMessage) End

func (m *PingMessage) End()

End indicates a successful process.

func (*PingMessage) Fail

func (m *PingMessage) Fail()

Fail indicates a failed process.

func (*PingMessage) Group

func (m *PingMessage) Group() string

Group name of this message.

func (*PingMessage) IsPing

func (m *PingMessage) IsPing() bool

IsPing returns true for a ping message.

func (*PingMessage) Key

func (m *PingMessage) Key() string

Key returns the unique key ID of this message.

func (*PingMessage) NotBefore

func (m *PingMessage) NotBefore() time.Time

NotBefore indicates the message should not be processed before this timestamp.

func (*PingMessage) Properties

func (m *PingMessage) Properties() map[string]string

Properties returns the properties of this message.

func (*PingMessage) Requeue

func (m *PingMessage) Requeue()

Requeue indicates the message should be retried.

func (*PingMessage) Retry

func (m *PingMessage) Retry() int

Retry times.

func (*PingMessage) Timestamp

func (m *PingMessage) Timestamp() time.Time

Timestamp indicates the creation time of the message.

func (*PingMessage) Topic

func (m *PingMessage) Topic() string

Topic name of this message.

type ProcessStatus

type ProcessStatus int

ProcessStatus type.

const (
	Created ProcessStatus = iota
	Processing
	Canceled
	Succeeded
	Failed
	Requeued
)

type PublishOption

type PublishOption func(*PublishOptions)

func WithProperty

func WithProperty(key, value string) PublishOption

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

func WithPublishDelay

func WithPublishDelay(delay time.Duration) PublishOption

func WithPublishSequence

func WithPublishSequence(seq uint64) PublishOption

func WithPublishUniqueKey

func WithPublishUniqueKey(key string) PublishOption

type PublishOptions

type PublishOptions struct {
	context.Context
	Sequence   uint64
	Key        string
	Delay      time.Duration
	Properties map[string]string
}

type Queue

type Queue interface {
	// Publish writes a message body to the specified topic.
	Publish(topic string, content []byte, opts ...PublishOption) error
	// Subscribe consumes the messages of the specified topic.
	Subscribe(topic string, handler Consumer, opts ...SubscribeOption) error
}

Queue interface.

func ContextQueueService

func ContextQueueService(ctx context.Context) Queue

func Implementor

func Implementor() Queue

Implementor returns the queue service implementor.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func WithConsumeComponent

func WithConsumeComponent(component string) SubscribeOption

func WithConsumeConcurrency

func WithConsumeConcurrency(concurrency int) SubscribeOption

func WithConsumeContext

func WithConsumeContext(ctx context.Context) SubscribeOption

func WithConsumeGroup

func WithConsumeGroup(name string) SubscribeOption

func WithConsumeIdempotent

func WithConsumeIdempotent(impl Idempotent) SubscribeOption

func WithConsumeRetry

func WithConsumeRetry(retry int) SubscribeOption

func WithInitOffset

func WithInitOffset(offset ConsumeOffset) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	context.Context
	Component   string
	Group       string
	Concurrency int
	MaxRetry    int
	InitOffset  ConsumeOffset
	Idempotent  Idempotent
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL