Documentation
¶
Index ¶
- type Consumer
- func (c *Consumer) Ack(msg msg.Message) error
- func (c *Consumer) Close(ctx context.Context) error
- func (c *Consumer) Closed() <-chan struct{}
- func (c *Consumer) ConnClosed() <-chan struct{}
- func (c *Consumer) Flow(permits uint32) error
- func (c *Consumer) HandleCloseConsumer(f frame.Frame) error
- func (c *Consumer) HandleMessage(f frame.Frame) error
- func (c *Consumer) HandleReachedEndOfTopic(f frame.Frame) error
- func (c *Consumer) Messages() <-chan msg.Message
- func (c *Consumer) ReachedEndOfTopic() <-chan struct{}
- func (c *Consumer) RedeliverOverflow(ctx context.Context) (int, error)
- func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error
- func (c *Consumer) Unsubscribe(ctx context.Context) error
- type Pubsub
- type Subscriptions
- func (s *Subscriptions) AddConsumer(c *Consumer)
- func (s *Subscriptions) AddProducer(p *pub.Producer)
- func (s *Subscriptions) DelConsumer(c *Consumer)
- func (s *Subscriptions) DelProducer(p *pub.Producer)
- func (s *Subscriptions) HandleActiveConsumerChange(consumerID uint64, f frame.Frame) error
- func (s *Subscriptions) HandleCloseConsumer(consumerID uint64, f frame.Frame) error
- func (s *Subscriptions) HandleCloseProducer(producerID uint64, f frame.Frame) error
- func (s *Subscriptions) HandleMessage(consumerID uint64, f frame.Frame) error
- func (s *Subscriptions) HandleReachedEndOfTopic(consumerID uint64, f frame.Frame) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { S frame.CmdSender Topic string ConsumerID uint64 ReqID *msg.MonotonicID Dispatcher *frame.Dispatcher // handles request/response state Queue chan msg.Message Omu sync.Mutex // protects following Overflow []*api.MessageIdData // IDs of messages that were dropped because of full buffer OverflowSignal chan struct{} Mu sync.Mutex // protects following IsClosed bool Closedc chan struct{} IsEndOfTopic bool EndOfTopicc chan struct{} Unactive bool // Unactive will change when you receive a msg of ActiveConsumerChange }
Consumer handles all consumer related state.
func (*Consumer) Ack ¶
Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.
func (*Consumer) Close ¶
Close closes the consumer. The channel returned from the Closed method will then unblock upon successful closure.
func (*Consumer) Closed ¶
func (c *Consumer) Closed() <-chan struct{}
Closed returns a channel that will block _unless_ the consumer has been closed, in which case the channel will have been closed and unblocked.
func (*Consumer) ConnClosed ¶
func (c *Consumer) ConnClosed() <-chan struct{}
ConnClosed unblocks when the consumer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the consumer.
func (*Consumer) Flow ¶
Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accuMulate these messages before the application is ready to consume them. After the consumer is ready, the client needs to give permission to the broker to push messages.
func (*Consumer) HandleCloseConsumer ¶
HandleCloseConsumer should be called when a CLOSE_CONSUMER message is received associated with this consumer.
func (*Consumer) HandleMessage ¶
HandleMessage should be called for all MESSAGE messages received for this consumer.
func (*Consumer) HandleReachedEndOfTopic ¶
HandleReachedEndOfTopic should be called for all received REACHED_END_OF_TOPIC messages associated with this consumer.
func (*Consumer) Messages ¶
Messages returns a read-only channel of messages received by the consumer. The channel will never be closed by the consumer.
func (*Consumer) ReachedEndOfTopic ¶
func (c *Consumer) ReachedEndOfTopic() <-chan struct{}
ReachedEndOfTopic unblocks whenever the topic has been "terminated" and all the messages on the subscription were acknowledged.
func (*Consumer) RedeliverOverflow ¶
RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003
func (*Consumer) RedeliverUnacknowledged ¶
RedeliverUnacknowledged uses the protocol option REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages.
type Pubsub ¶
type Pubsub struct { S frame.CmdSender ReqID *msg.MonotonicID ProducerID *msg.MonotonicID ConsumerID *msg.MonotonicID Dispatcher *frame.Dispatcher // handles request response state Subscriptions *Subscriptions }
Pubsub is responsible for creating producers and consumers on a give topic.
func NewPubsub ¶
func NewPubsub(s frame.CmdSender, dispatcher *frame.Dispatcher, subscriptions *Subscriptions, reqID *msg.MonotonicID) *Pubsub
NewPubsub returns a ready-to-use pubsub.
func (*Pubsub) Subscribe ¶
func (t *Pubsub) Subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, initialPosition api.CommandSubscribe_InitialPosition, queue chan msg.Message) (*Consumer, error)
Subscribe subscribes to the given topic. The queueSize determines the buffer size of the Consumer.Messages() channel.
type Subscriptions ¶
type Subscriptions struct { Cmu sync.RWMutex // protects following Consumers map[uint64]*Consumer Pmu sync.Mutex // protects following Producers map[uint64]*pub.Producer }
Subscriptions is responsible for storing producers and consumers based on their IDs.
func NewSubscriptions ¶
func NewSubscriptions() *Subscriptions
NewSubscriptions returns a ready-to-use subscriptions.
func (*Subscriptions) AddConsumer ¶
func (s *Subscriptions) AddConsumer(c *Consumer)
func (*Subscriptions) AddProducer ¶
func (s *Subscriptions) AddProducer(p *pub.Producer)
func (*Subscriptions) DelConsumer ¶
func (s *Subscriptions) DelConsumer(c *Consumer)
func (*Subscriptions) DelProducer ¶
func (s *Subscriptions) DelProducer(p *pub.Producer)
func (*Subscriptions) HandleActiveConsumerChange ¶
func (s *Subscriptions) HandleActiveConsumerChange(consumerID uint64, f frame.Frame) error
func (*Subscriptions) HandleCloseConsumer ¶
func (s *Subscriptions) HandleCloseConsumer(consumerID uint64, f frame.Frame) error
func (*Subscriptions) HandleCloseProducer ¶
func (s *Subscriptions) HandleCloseProducer(producerID uint64, f frame.Frame) error
func (*Subscriptions) HandleMessage ¶
func (s *Subscriptions) HandleMessage(consumerID uint64, f frame.Frame) error
func (*Subscriptions) HandleReachedEndOfTopic ¶
func (s *Subscriptions) HandleReachedEndOfTopic(consumerID uint64, f frame.Frame) error