sub

package
v0.0.0-...-941bdb3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2020 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {
	S frame.CmdSender

	Topic      string
	ConsumerID uint64

	ReqID      *msg.MonotonicID
	Dispatcher *frame.Dispatcher // handles request/response state

	Queue chan msg.Message

	Omu            sync.Mutex           // protects following
	Overflow       []*api.MessageIdData // IDs of messages that were dropped because of full buffer
	OverflowSignal chan struct{}

	Mu           sync.Mutex // protects following
	IsClosed     bool
	Closedc      chan struct{}
	IsEndOfTopic bool
	EndOfTopicc  chan struct{}

	Unactive bool // Unactive will change when you receive a msg of ActiveConsumerChange
}

Consumer handles all consumer related state.

func (*Consumer) Ack

func (c *Consumer) Ack(msg msg.Message) error

Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.

func (*Consumer) Close

func (c *Consumer) Close(ctx context.Context) error

Close closes the consumer. The channel returned from the Closed method will then unblock upon successful closure.

func (*Consumer) Closed

func (c *Consumer) Closed() <-chan struct{}

Closed returns a channel that will block _unless_ the consumer has been closed, in which case the channel will have been closed and unblocked.

func (*Consumer) ConnClosed

func (c *Consumer) ConnClosed() <-chan struct{}

ConnClosed unblocks when the consumer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the consumer.

func (*Consumer) Flow

func (c *Consumer) Flow(permits uint32) error

Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accuMulate these messages before the application is ready to consume them. After the consumer is ready, the client needs to give permission to the broker to push messages.

func (*Consumer) HandleCloseConsumer

func (c *Consumer) HandleCloseConsumer(f frame.Frame) error

HandleCloseConsumer should be called when a CLOSE_CONSUMER message is received associated with this consumer.

func (*Consumer) HandleMessage

func (c *Consumer) HandleMessage(f frame.Frame) error

HandleMessage should be called for all MESSAGE messages received for this consumer.

func (*Consumer) HandleReachedEndOfTopic

func (c *Consumer) HandleReachedEndOfTopic(f frame.Frame) error

HandleReachedEndOfTopic should be called for all received REACHED_END_OF_TOPIC messages associated with this consumer.

func (*Consumer) Messages

func (c *Consumer) Messages() <-chan msg.Message

Messages returns a read-only channel of messages received by the consumer. The channel will never be closed by the consumer.

func (*Consumer) ReachedEndOfTopic

func (c *Consumer) ReachedEndOfTopic() <-chan struct{}

ReachedEndOfTopic unblocks whenever the topic has been "terminated" and all the messages on the subscription were acknowledged.

func (*Consumer) RedeliverOverflow

func (c *Consumer) RedeliverOverflow(ctx context.Context) (int, error)

RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003

func (*Consumer) RedeliverUnacknowledged

func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error

RedeliverUnacknowledged uses the protocol option REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages.

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(ctx context.Context) error

Unsubscribe the consumer from its topic.

type Pubsub

type Pubsub struct {
	S          frame.CmdSender
	ReqID      *msg.MonotonicID
	ProducerID *msg.MonotonicID
	ConsumerID *msg.MonotonicID

	Dispatcher    *frame.Dispatcher // handles request response state
	Subscriptions *Subscriptions
}

Pubsub is responsible for creating producers and consumers on a give topic.

func NewPubsub

func NewPubsub(s frame.CmdSender, dispatcher *frame.Dispatcher, subscriptions *Subscriptions, reqID *msg.MonotonicID) *Pubsub

NewPubsub returns a ready-to-use pubsub.

func (*Pubsub) Producer

func (t *Pubsub) Producer(ctx context.Context, topic, producerName string) (*pub.Producer, error)

Producer creates a new producer for the given topic and producerName.

func (*Pubsub) Subscribe

func (t *Pubsub) Subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType,
	initialPosition api.CommandSubscribe_InitialPosition, queue chan msg.Message) (*Consumer, error)

Subscribe subscribes to the given topic. The queueSize determines the buffer size of the Consumer.Messages() channel.

type Subscriptions

type Subscriptions struct {
	Cmu       sync.RWMutex // protects following
	Consumers map[uint64]*Consumer

	Pmu       sync.Mutex // protects following
	Producers map[uint64]*pub.Producer
}

Subscriptions is responsible for storing producers and consumers based on their IDs.

func NewSubscriptions

func NewSubscriptions() *Subscriptions

NewSubscriptions returns a ready-to-use subscriptions.

func (*Subscriptions) AddConsumer

func (s *Subscriptions) AddConsumer(c *Consumer)

func (*Subscriptions) AddProducer

func (s *Subscriptions) AddProducer(p *pub.Producer)

func (*Subscriptions) DelConsumer

func (s *Subscriptions) DelConsumer(c *Consumer)

func (*Subscriptions) DelProducer

func (s *Subscriptions) DelProducer(p *pub.Producer)

func (*Subscriptions) HandleActiveConsumerChange

func (s *Subscriptions) HandleActiveConsumerChange(consumerID uint64, f frame.Frame) error

func (*Subscriptions) HandleCloseConsumer

func (s *Subscriptions) HandleCloseConsumer(consumerID uint64, f frame.Frame) error

func (*Subscriptions) HandleCloseProducer

func (s *Subscriptions) HandleCloseProducer(producerID uint64, f frame.Frame) error

func (*Subscriptions) HandleMessage

func (s *Subscriptions) HandleMessage(consumerID uint64, f frame.Frame) error

func (*Subscriptions) HandleReachedEndOfTopic

func (s *Subscriptions) HandleReachedEndOfTopic(consumerID uint64, f frame.Frame) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL