Documentation ¶
Index ¶
- Constants
- Variables
- func ContextWithQueueService(ctx context.Context) context.Context
- func RegisterBackendImplementor(backend Backend)
- func RegisterIdempotentImplementor(idempotent Idempotent)
- func RegisterImplementor(s Queue)
- func StreamServerInterceptor() grpc.StreamServerInterceptor
- func UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type Backend
- type ConsumeOffset
- type Consumer
- type ConsumerFunc
- type Idempotent
- type Message
- type MessageOperation
- type MessageWrapper
- type PingMessage
- func (m *PingMessage) Begin()
- func (m *PingMessage) Cancel()
- func (m *PingMessage) Content() []byte
- func (m *PingMessage) End()
- func (m *PingMessage) Fail()
- func (m *PingMessage) Group() string
- func (m *PingMessage) IsPing() bool
- func (m *PingMessage) Key() string
- func (m *PingMessage) NotBefore() time.Time
- func (m *PingMessage) Properties() map[string]string
- func (m *PingMessage) Requeue()
- func (m *PingMessage) Retry() int
- func (m *PingMessage) Timestamp() time.Time
- func (m *PingMessage) Topic() string
- type ProcessStatus
- type PublishOption
- type PublishOptions
- type Queue
- type SubscribeOption
- func WithConsumeComponent(component string) SubscribeOption
- func WithConsumeConcurrency(concurrency int) SubscribeOption
- func WithConsumeContext(ctx context.Context) SubscribeOption
- func WithConsumeGroup(name string) SubscribeOption
- func WithConsumeIdempotent(impl Idempotent) SubscribeOption
- func WithConsumeRetry(retry int) SubscribeOption
- func WithInitOffset(offset ConsumeOffset) SubscribeOption
- type SubscribeOptions
Constants ¶
const ( DefaultGroup = "default" DefaultConcurrency = 1 DefaultMaxRetry = 3 )
Variables ¶
var EmptyPublishOptions = func() *PublishOptions { key, _ := snowflake.NextID() return &PublishOptions{ Context: context.Background(), Sequence: key, Key: strconv.FormatUint(key, 10), Properties: map[string]string{}, } }
var EmptySubscribeOptions = func() *SubscribeOptions { return &SubscribeOptions{ Context: context.Background(), Group: DefaultGroup, Concurrency: DefaultConcurrency, MaxRetry: DefaultMaxRetry, Idempotent: IdempotentImplementor(), } }
Functions ¶
func RegisterBackendImplementor ¶
func RegisterBackendImplementor(backend Backend)
RegisterBackendImplementor registers the queue backend service implementor.
func RegisterIdempotentImplementor ¶
func RegisterIdempotentImplementor(idempotent Idempotent)
RegisterIdempotentImplementor registers the idempotent service implementor.
func RegisterImplementor ¶
func RegisterImplementor(s Queue)
RegisterImplementor registers the queue service implementor.
func StreamServerInterceptor ¶
func StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a new streaming server interceptor for message queue service.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptor for message queue service.
Types ¶
type Backend ¶
type Backend interface { // Type returns backend type. Type() string // Ping connects the backend server if not connected. // Will be called before every Read/Write operation. Ping() error // Read subscribes the message of the specified topic. Read(topic string, ch chan<- MessageWrapper, opts *SubscribeOptions) error // Write publishes content data to the specified queue. Write(topic string, content []byte, opts *PublishOptions) error }
Backend interface.
func BackendImplementor ¶
func BackendImplementor() Backend
BackendImplementor returns the queue backend service implementor.
type ConsumeOffset ¶
type ConsumeOffset int
const ( ConsumeFromLatest ConsumeOffset = iota ConsumeFromEarliest )
type ConsumerFunc ¶
type Idempotent ¶
type Idempotent interface { // BeforeProcess should be invoked before process message. // Returns true to continue the message processing. // Returns false to invoke Cancel for the message. BeforeProcess(Message) bool // AfterProcess should be invoked after processing. AfterProcess(Message, ProcessStatus) }
Idempotent interface.
func IdempotentImplementor ¶
func IdempotentImplementor() Idempotent
IdempotentImplementor returns the idempotent service implementor.
type Message ¶
type Message interface { // Topic name of this message. Topic() string // Group name of this message. Group() string // Key returns the unique key ID of this message. Key() string // Content returns the message body content. Content() []byte // Properties returns the properties of this message. Properties() map[string]string // Timestamp indicates the creation time of the message. Timestamp() time.Time // NotBefore indicates the message should not be processed before this timestamp. NotBefore() time.Time // Retry times. Retry() int // IsPing returns true for a ping message. IsPing() bool }
Message interface.
type MessageOperation ¶
type MessageOperation interface { // Begin to process the message. Begin() // Cancel indicates the message should be ignored. Cancel() // End indicates a successful process. End() // Requeue indicates the message should be retried. Requeue() // Fail indicates a failed process. Fail() }
MessageOperation interface.
type MessageWrapper ¶
type MessageWrapper interface { Message MessageOperation }
MessageWrapper interface.
type PingMessage ¶
type PingMessage struct{}
func (*PingMessage) Cancel ¶
func (m *PingMessage) Cancel()
Cancel indicates the message should be ignored.
func (*PingMessage) Content ¶
func (m *PingMessage) Content() []byte
Content returns the message body content.
func (*PingMessage) IsPing ¶
func (m *PingMessage) IsPing() bool
IsPing returns true for a ping message.
func (*PingMessage) Key ¶
func (m *PingMessage) Key() string
Key returns the unique key ID of this message.
func (*PingMessage) NotBefore ¶
func (m *PingMessage) NotBefore() time.Time
NotBefore indicates the message should not be processed before this timestamp.
func (*PingMessage) Properties ¶
func (m *PingMessage) Properties() map[string]string
Properties returns the properties of this message.
func (*PingMessage) Requeue ¶
func (m *PingMessage) Requeue()
Requeue indicates the message should be retried.
func (*PingMessage) Timestamp ¶
func (m *PingMessage) Timestamp() time.Time
Timestamp indicates the creation time of the message.
type ProcessStatus ¶
type ProcessStatus int
ProcessStatus type.
const ( Created ProcessStatus = iota Processing Canceled Succeeded Failed Requeued )
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithProperty ¶
func WithProperty(key, value string) PublishOption
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
func WithPublishDelay ¶
func WithPublishDelay(delay time.Duration) PublishOption
func WithPublishSequence ¶
func WithPublishSequence(seq uint64) PublishOption
func WithPublishUniqueKey ¶
func WithPublishUniqueKey(key string) PublishOption
type PublishOptions ¶
type Queue ¶
type Queue interface { // Publish writes a message body to the specified topic. Publish(topic string, content []byte, opts ...PublishOption) error // Subscribe consumes the messages of the specified topic. Subscribe(topic string, handler Consumer, opts ...SubscribeOption) error }
Queue interface.
func ContextQueueService ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithConsumeComponent ¶
func WithConsumeComponent(component string) SubscribeOption
func WithConsumeConcurrency ¶
func WithConsumeConcurrency(concurrency int) SubscribeOption
func WithConsumeContext ¶
func WithConsumeContext(ctx context.Context) SubscribeOption
func WithConsumeGroup ¶
func WithConsumeGroup(name string) SubscribeOption
func WithConsumeIdempotent ¶
func WithConsumeIdempotent(impl Idempotent) SubscribeOption
func WithConsumeRetry ¶
func WithConsumeRetry(retry int) SubscribeOption
func WithInitOffset ¶
func WithInitOffset(offset ConsumeOffset) SubscribeOption
type SubscribeOptions ¶
type SubscribeOptions struct { context.Context Component string Group string Concurrency int MaxRetry int InitOffset ConsumeOffset Idempotent Idempotent }