Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithQueueService(ctx context.Context) context.Context
- func RegisterBackendImplementor(backend Backend)
- func RegisterIdempotentImplementor(idempotent Idempotent)
- func RegisterImplementor(s Queue)
- func StreamServerInterceptor() grpc.StreamServerInterceptor
- func UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type Backend
- type Consumer
- type ConsumerFunc
- type Idempotent
- type Message
- type MessageOperation
- type MessageWrapper
- type PingMessage
- func (m *PingMessage) Begin()
- func (m *PingMessage) Cancel()
- func (m *PingMessage) Content() []byte
- func (m *PingMessage) End()
- func (m *PingMessage) Fail()
- func (m *PingMessage) IsPing() bool
- func (m *PingMessage) NotBefore() time.Time
- func (m *PingMessage) Queue() string
- func (m *PingMessage) Requeue()
- func (m *PingMessage) Retry() int
- func (m *PingMessage) Timestamp() time.Time
- func (m *PingMessage) Topic() string
- func (m *PingMessage) UniqueID() string
- type ProcessStatus
- type PublishOption
- type PublishOptions
- type Queue
- type SubscribeOption
- func WithConsumeComponent(component string) SubscribeOption
- func WithConsumeConcurrency(concurrency int) SubscribeOption
- func WithConsumeIdempotent(impl Idempotent) SubscribeOption
- func WithConsumeProduct(product string) SubscribeOption
- func WithConsumeRetry(retry int) SubscribeOption
- func WithConsumeTopic(topic string) SubscribeOption
- type SubscribeOptions
Constants ¶
const ( DefaultTopic = "default" DefaultConcurrency = 1 DefaultMaxRetry = 3 )
Variables ¶
var EmptyPublishOptions = func() *PublishOptions { return &PublishOptions{ Context: context.Background(), } }
var EmptySubscribeOptions = func() *SubscribeOptions { return &SubscribeOptions{ Topic: DefaultTopic, Concurrency: DefaultConcurrency, MaxRetry: DefaultMaxRetry, Idempotent: IdempotentImplementor(), } }
Functions ¶
func RegisterBackendImplementor ¶
func RegisterBackendImplementor(backend Backend)
RegisterBackendImplementor registers the queue backend service implementor.
func RegisterIdempotentImplementor ¶
func RegisterIdempotentImplementor(idempotent Idempotent)
RegisterIdempotentImplementor registers the idempotent service implementor.
func RegisterImplementor ¶
func RegisterImplementor(s Queue)
RegisterImplementor registers the queue service implementor.
func StreamServerInterceptor ¶
func StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a new streaming server interceptor for message queue service.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptor for message queue service.
Types ¶
type Backend ¶
type Backend interface { // Type returns backend type. Type() string // Ping connects the backend server if not connected. // Will be called before every Read/Write operation. Ping() error // MaxDelay returns the max delay duration supported by the backend. // A negative value means no limitation. // A zero value means delay operation is not supported. MaxDelay() time.Duration // GetQueues returns all queue names in backend storage. GetQueues() ([]string, error) // GetTopics returns all queue/topics in backend storage. GetTopics() (map[string][]string, error) // GetQueueLength returns all topic length of specified queue in backend storage. GetQueueLength(queue string) (map[string]int64, error) // GetTopicLength returns the specified queue/topic length in backend storage. GetTopicLength(queue, topic string) (int64, error) // Read subscribes the message of the specified queue and topic. Read(ctx context.Context, queue, topic string, ch chan<- MessageWrapper) error // Write publishes content data to the specified queue. Write(ctx context.Context, queue string, delay time.Duration, content []byte) error }
Backend interface.
func BackendImplementor ¶
func BackendImplementor() Backend
BackendImplementor returns the queue backend service implementor.
type ConsumerFunc ¶
type Idempotent ¶
type Idempotent interface { // BeforeProcess should be invoked before process message. // Returns true to continue the message processing. // Returns false to invoke Cancel for the message. BeforeProcess(Message) bool // AfterProcess should be invoked after processing. AfterProcess(Message, ProcessStatus) }
Idempotent interface.
func IdempotentImplementor ¶
func IdempotentImplementor() Idempotent
IdempotentImplementor returns the idempotent service implementor.
type Message ¶
type Message interface { // Queue name of this message. Queue() string // Topic name of this message. Topic() string // UniqueID returns the unique ID of this message. UniqueID() string // Content returns the message body content. Content() []byte // Timestamp indicates the creation time of the message. Timestamp() time.Time // NotBefore indicates the message should not be processed before this timestamp. NotBefore() time.Time // Retry times. Retry() int // IsPing returns true for a ping message. IsPing() bool }
Message interface.
type MessageOperation ¶
type MessageOperation interface { // Begin to process the message. Begin() // Cancel indicates the message should be ignored. Cancel() // End indicates a successful process. End() // Requeue indicates the message should be retried. Requeue() // Fail indicates a failed process. Fail() }
MessageOperation interface.
type MessageWrapper ¶
type MessageWrapper interface { Message MessageOperation }
MessageWrapper interface.
type PingMessage ¶
type PingMessage struct{}
func (*PingMessage) Cancel ¶
func (m *PingMessage) Cancel()
Cancel indicates the message should be ignored.
func (*PingMessage) Content ¶
func (m *PingMessage) Content() []byte
Content returns the message body content.
func (*PingMessage) IsPing ¶
func (m *PingMessage) IsPing() bool
IsPing returns true for a ping message.
func (*PingMessage) NotBefore ¶
func (m *PingMessage) NotBefore() time.Time
NotBefore indicates the message should not be processed before this timestamp.
func (*PingMessage) Requeue ¶
func (m *PingMessage) Requeue()
Requeue indicates the message should be retried.
func (*PingMessage) Timestamp ¶
func (m *PingMessage) Timestamp() time.Time
Timestamp indicates the creation time of the message.
func (*PingMessage) UniqueID ¶
func (m *PingMessage) UniqueID() string
UniqueID returns the unique ID of this message.
type ProcessStatus ¶
type ProcessStatus int
ProcessStatus type.
const ( Created ProcessStatus = iota Processing Canceled Succeeded Failed Requeued )
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
func WithPublishDelay ¶
func WithPublishDelay(delay time.Duration) PublishOption
type Queue ¶
type Queue interface { // Publish writes a message body to the specified queue. Publish(queue string, content []byte, opts ...PublishOption) error // Subscribe consumes the messages of the specified queue. Subscribe(queue string, handler Consumer, opts ...SubscribeOption) error }
func ContextQueueService ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithConsumeComponent ¶
func WithConsumeComponent(component string) SubscribeOption
func WithConsumeConcurrency ¶
func WithConsumeConcurrency(concurrency int) SubscribeOption
func WithConsumeIdempotent ¶
func WithConsumeIdempotent(impl Idempotent) SubscribeOption
func WithConsumeProduct ¶
func WithConsumeProduct(product string) SubscribeOption
func WithConsumeRetry ¶
func WithConsumeRetry(retry int) SubscribeOption
func WithConsumeTopic ¶
func WithConsumeTopic(topic string) SubscribeOption