Documentation ¶
Index ¶
- func GetOffsetFromEvent(event *ce.Event) int64
- type Broker
- func (b *Broker) CreateNewQueueIfAbsent(topicName string) (err error)
- func (b *Broker) ExistTopic(topicName string) (exist bool)
- func (b *Broker) PutMessage(topicName string, event *ce.Event) (message *Message, err error)
- func (b *Broker) TakeMessage(topicName string) (*Message, error)
- func (b *Broker) TakeMessageByOffset(topicName string, offset int64) (*Message, error)
- type Consumer
- func (c *Consumer) InitConsumer(properties map[string]string) error
- func (c *Consumer) IsClosed() bool
- func (c *Consumer) IsStarted() bool
- func (c *Consumer) RegisterEventListener(listener *connector.EventListener)
- func (c *Consumer) Shutdown() error
- func (c *Consumer) Start() error
- func (c *Consumer) Subscribe(topicName string) error
- func (c *Consumer) Unsubscribe(topicName string) error
- func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error
- type Factory
- type Message
- type MessageExpireWorker
- type MessageQueue
- type Producer
- func (p *Producer) CheckTopicExist(topicName string) (exist bool, err error)
- func (p *Producer) InitProducer(properties map[string]string) error
- func (p *Producer) IsClosed() bool
- func (p *Producer) IsStarted() bool
- func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) (err error)
- func (p *Producer) Reply(ctx context.Context, event *ce.Event, callback *connector.SendCallback) error
- func (p *Producer) Request(ctx context.Context, event *ce.Event, callback *connector.RequestReplyCallback, ...) error
- func (p *Producer) SendOneway(ctx context.Context, event *ce.Event) (err error)
- func (p *Producer) SetExtFields() error
- func (p *Producer) Shutdown() error
- func (p *Producer) Start() error
- type Resource
- type SubscribeWorker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetOffsetFromEvent ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker a manager of different topic message queue
func (*Broker) CreateNewQueueIfAbsent ¶
func (*Broker) ExistTopic ¶
func (*Broker) PutMessage ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer() *Consumer
func (*Consumer) RegisterEventListener ¶
func (c *Consumer) RegisterEventListener(listener *connector.EventListener)
func (*Consumer) Unsubscribe ¶
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
func (*Message) GetTopicName ¶
type MessageExpireWorker ¶
type MessageExpireWorker struct {
// contains filtered or unexported fields
}
MessageExpireWorker periodically evict expire message
type MessageQueue ¶
type MessageQueue struct {
// contains filtered or unexported fields
}
MessageQueue message storage of standalone broker
func NewMessageQueue ¶
func NewMessageQueue() (*MessageQueue, error)
func NewMessageQueueWithCapacity ¶
func NewMessageQueueWithCapacity(capacity int) (*MessageQueue, error)
func (*MessageQueue) Get ¶
func (q *MessageQueue) Get() (*Message, error)
Get fetch message in top of queue, block if message queue is empty
func (*MessageQueue) GetByOffset ¶
func (q *MessageQueue) GetByOffset(offset int64) (*Message, error)
GetByOffset fetch message of curtain offset return error if message has been deleted, block if message of that offset is not available currently
func (*MessageQueue) GetIfNotEmpty ¶
func (q *MessageQueue) GetIfNotEmpty() *Message
GetIfNotEmpty fetch message in top of queue, return nil if message queue is empty
func (*MessageQueue) PopMessage ¶
func (q *MessageQueue) PopMessage() *Message
PopMessage remove message in the top, return nil if message queue is empty
func (*MessageQueue) Put ¶
func (q *MessageQueue) Put(message *Message)
func (*MessageQueue) Size ¶
func (q *MessageQueue) Size() int
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer standalone producer
func NewProducer ¶
func NewProducer() *Producer
func (*Producer) CheckTopicExist ¶
func (*Producer) SendOneway ¶
func (*Producer) SetExtFields ¶
type SubscribeWorker ¶
type SubscribeWorker struct {
// contains filtered or unexported fields
}
SubscribeWorker pollMessage from topic and manage consume offset
func (*SubscribeWorker) Stop ¶
func (w *SubscribeWorker) Stop()
Click to show internal directories.
Click to hide internal directories.