Documentation ¶
Index ¶
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) Closed() <-chan struct{}
- func (c *Client) Connect(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
- func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
- func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
- func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, ...) (*sub.Consumer, error)
- func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)
- func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*pub.Producer, error)
- func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)
- func (c *Client) Ping(ctx context.Context) error
- type ClientConfig
- type ClientPool
- func (m *ClientPool) ForTopic(ctx context.Context, cfg ClientConfig, topic string) (*ManagedClient, error)
- func (m *ClientPool) Get(cfg ClientConfig) *ManagedClient
- func (m *ClientPool) Partitions(ctx context.Context, cfg ClientConfig, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
- type ConsumerConfig
- type ManagedClient
- type ManagedConsumer
- func (m *ManagedConsumer) Ack(ctx context.Context, msg msg.Message) error
- func (m *ManagedConsumer) Close(ctx context.Context) error
- func (m *ManagedConsumer) Consumer(ctx context.Context) *sub.Consumer
- func (m *ManagedConsumer) ConsumerID(ctx context.Context) uint64
- func (m *ManagedConsumer) Monitor() func()
- func (m *ManagedConsumer) Receive(ctx context.Context) (msg.Message, error)
- func (m *ManagedConsumer) ReceiveAsync(ctx context.Context, msgs chan<- msg.Message) error
- func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)
- func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error
- func (m *ManagedConsumer) Unactive() bool
- func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error
- type ManagedProducer
- func (m *ManagedProducer) Close(ctx context.Context) error
- func (m *ManagedProducer) Monitor() func()
- func (m *ManagedProducer) NewProducer(ctx context.Context) (*pub.Producer, error)
- func (m *ManagedProducer) Reconnect(initial bool) *pub.Producer
- func (m *ManagedProducer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)
- func (m *ManagedProducer) Set(p *pub.Producer)
- func (m *ManagedProducer) Unset()
- type ProducerConfig
- type SubscriptionMode
Constants ¶
This section is empty.
Variables ¶
var ErrorInvalidSubMode = errors.New("invalid subscription mode")
ErrorInvalidSubMode When SubscriptionMode is not one of SubscriptionModeExclusive, SubscriptionModeShard, SubscriptionModeFailover
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct { C *conn.Conn AsyncErrs utils.AsyncErrors Dispatcher *frame.Dispatcher Subscriptions *sub.Subscriptions Connector *conn.Connector Pinger *srv.Pinger Discoverer *srv.Discoverer Pubsub *sub.Pubsub }
Client is a Pulsar client, capable of sending and receiving messages and managing the associated state.
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
NewClient returns a Pulsar client for the given configuration options.
func (*Client) Close ¶
Close closes the connection. The channel returned from `Closed` will unblock. The client should no longer be used after calling Close.
func (*Client) Closed ¶
func (c *Client) Closed() <-chan struct{}
Closed returns a channel that unblocks when the client's connection has been closed and is no longer usable. Users should monitor this channel and recreate the Client if closed. TODO: Rename to Done
func (*Client) Connect ¶
Connect sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is not set in the CONNECT message. See ConnectTLS for TLS auth method. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw
func (*Client) ConnectTLS ¶
func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
ConnectTLS sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is set to tls in the CONNECT message. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw
func (*Client) LookupTopic ¶
func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
LookupTopic returns metadata about the given topic. Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.
The command has to be used in a connection that has already gone through the Connect / Connected initial handshake. See "Topic lookup" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-rxds6i
func (*Client) NewExclusiveConsumer ¶
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan msg.Message) (*sub.Consumer, error)
NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (*Client) NewFailoverConsumer ¶
func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)
NewFailoverConsumer creates a new failover consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (*Client) NewProducer ¶
func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*pub.Producer, error)
NewProducer creates a new producer capable of sending message to the given topic.
func (*Client) NewSharedConsumer ¶
func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan msg.Message) (*sub.Consumer, error)
NewSharedConsumer creates a new shared consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
type ClientConfig ¶
type ClientConfig struct { Addr string // pulsar broker address. May start with pulsar:// DialTimeout time.Duration // timeout to use when establishing TCP connection TLSConfig *tls.Config // TLS configuration. May be nil, in which case TLS will not be used Errs chan<- error // asynchronous errors will be sent here. May be nil PingFrequency time.Duration // how often to PING server PingTimeout time.Duration // how long to wait for PONG response ConnectTimeout time.Duration // how long to wait for CONNECTED response InitialReconnectDelay time.Duration // how long to initially wait to reconnect Client MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Client AuthMethod string AuthData []byte // contains filtered or unexported fields }
ClientConfig is used to configure a Pulsar client.
func (ClientConfig) ConnAddr ¶
func (c ClientConfig) ConnAddr() string
ConnAddr returns the address that should be used for the TCP connection. It defaults to phyAddr if set, otherwise Addr. This is to support the proxying through a broker, as determined during topic lookup.
func (ClientConfig) SetDefaults ¶
func (c ClientConfig) SetDefaults() ClientConfig
setDefaults returns a modified config with appropriate zero values set to defaults.
type ClientPool ¶
type ClientPool struct {
// contains filtered or unexported fields
}
ClientPool provides a thread-safe store for ManagedClients, based on their address.
func (*ClientPool) ForTopic ¶
func (m *ClientPool) ForTopic(ctx context.Context, cfg ClientConfig, topic string) (*ManagedClient, error)
ForTopic performs topic lookup for the given topic and returns the ManagedClient for the discovered topic information. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-6g0lo incubator-pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
func (*ClientPool) Get ¶
func (m *ClientPool) Get(cfg ClientConfig) *ManagedClient
Get returns the ManagedClient for the given client configuration. First the cache is checked for an existing client. If one doesn't exist, a new one is created and cached, then returned.
func (*ClientPool) Partitions ¶
func (m *ClientPool) Partitions(ctx context.Context, cfg ClientConfig, topic string) (*api.CommandPartitionedTopicMetadataResponse, error)
type ConsumerConfig ¶
type ConsumerConfig struct { ClientConfig Topic string Name string // subscription name SubMode SubscriptionMode // SubscriptionMode Earliest bool // if true, subscription cursor set to beginning QueueSize int // number of messages to buffer before dropping messages NewConsumerTimeout time.Duration // maximum duration to create Consumer, including topic lookup InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Producer }
ConsumerConfig is used to configure a ManagedConsumer.
func (ConsumerConfig) SetDefaults ¶
func (m ConsumerConfig) SetDefaults() ConsumerConfig
SetDefaults returns a modified config with appropriate zero values set to defaults.
type ManagedClient ¶
type ManagedClient struct {
// contains filtered or unexported fields
}
ManagedClient wraps a Client with re-connect and connection management logic.
func NewManagedClient ¶
func NewManagedClient(cfg ClientConfig) *ManagedClient
NewManagedClient returns a ManagedClient for the given address. The Client will be created and monitored in the background.
func (*ManagedClient) Done ¶
func (m *ManagedClient) Done() <-chan struct{}
Done returns a channel that unblocks when the ManagedClient has been closed.
func (*ManagedClient) Get ¶
func (m *ManagedClient) Get(ctx context.Context) (*Client, error)
Get returns the managed Client in a thread-safe way. If the client is temporarily unavailable, Get will block until either it becomes available or the context expires.
There is no guarantee that the returned Client will be connected or stay connected.
func (*ManagedClient) Stop ¶
func (m *ManagedClient) Stop() error
Stop closes the Client if possible, and/or stops it from re-connecting. The ManagedClient shouldn't be used after calling Stop.
type ManagedConsumer ¶
type ManagedConsumer struct {
// contains filtered or unexported fields
}
ManagedConsumer wraps a Consumer with reconnect logic.
func NewManagedConsumer ¶
func NewManagedConsumer(cp *ClientPool, cfg ConsumerConfig) *ManagedConsumer
NewManagedConsumer returns an initialized ManagedConsumer. It will create and recreate a Consumer for the given discovery address and topic on a background goroutine.
func (*ManagedConsumer) Ack ¶
Ack acquires a consumer and Sends an ACK message for the given message.
func (*ManagedConsumer) Close ¶
func (m *ManagedConsumer) Close(ctx context.Context) error
Close consumer
func (*ManagedConsumer) Consumer ¶
func (m *ManagedConsumer) Consumer(ctx context.Context) *sub.Consumer
func (*ManagedConsumer) ConsumerID ¶
func (m *ManagedConsumer) ConsumerID(ctx context.Context) uint64
ConsumerID returns current consumer's id
func (*ManagedConsumer) Monitor ¶
func (m *ManagedConsumer) Monitor() func()
Monitor a scoped deferrable lock
func (*ManagedConsumer) Receive ¶
Receive returns a single Message, if available. A reasonable context should be provided that will be used to wait for an incoming message if none are available.
func (*ManagedConsumer) ReceiveAsync ¶
ReceiveAsync blocks until the context is done. It continuously reads messages from the consumer and Sends them to the provided channel. It manages flow control internally based on the queue size.
func (*ManagedConsumer) RedeliverOverflow ¶
func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)
RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003
func (*ManagedConsumer) RedeliverUnacknowledged ¶
func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error
RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that have not been acked.
func (*ManagedConsumer) Unactive ¶
func (m *ManagedConsumer) Unactive() bool
Unactive returns consumer's Unactive
func (*ManagedConsumer) Unsubscribe ¶
func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error
Unsubscribe the consumer from its topic.
type ManagedProducer ¶
type ManagedProducer struct { ClientPool *ClientPool Cfg ProducerConfig AsyncErrs utils.AsyncErrors Mu sync.RWMutex // protects following Producer *pub.Producer // either producer is nil and wait isn't or vice versa Waitc chan struct{} // if producer is nil, this will unblock when it's been re-set }
ManagedProducer wraps a Producer with re-connect logic.
func NewManagedProducer ¶
func NewManagedProducer(cp *ClientPool, cfg ProducerConfig) *ManagedProducer
NewManagedProducer returns an initialized ManagedProducer. It will create and re-create a Producer for the given discovery address and topic on a background goroutine.
func (*ManagedProducer) Close ¶
func (m *ManagedProducer) Close(ctx context.Context) error
Close producer
func (*ManagedProducer) Monitor ¶
func (m *ManagedProducer) Monitor() func()
Monitor a scoped deferrable lock
func (*ManagedProducer) NewProducer ¶
NewProducer attempts to create a Producer.
func (*ManagedProducer) Reconnect ¶
func (m *ManagedProducer) Reconnect(initial bool) *pub.Producer
Reconnect blocks while a new Producer is created.
func (*ManagedProducer) Send ¶
func (m *ManagedProducer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)
Send attempts to use the Producer's Send method if available. If not available, an error is returned.
func (*ManagedProducer) Set ¶
func (m *ManagedProducer) Set(p *pub.Producer)
Set unblocks the "wait" channel (if not nil), and sets the producer under lock.
func (*ManagedProducer) Unset ¶
func (m *ManagedProducer) Unset()
Unset creates the "wait" channel (if nil), and sets the producer to nil under lock.
type ProducerConfig ¶
type ProducerConfig struct { ClientConfig Topic string Name string NewProducerTimeout time.Duration // maximum duration to create Producer, including topic lookup InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Producer }
ProducerConfig is used to configure a ManagedProducer.
type SubscriptionMode ¶
type SubscriptionMode int
SubscriptionMode represents Pulsar's three subscription models
const ( // SubscriptionModeExclusive , only one consumer can be bound to a subscription. // If more than one consumer attempts to subscribe to the topic in the same way, // the consumer will receive an error. SubscriptionModeExclusive SubscriptionMode = iota + 1 // 1 // SubscriptionModeShard In shared or round robin mode, // multiple consumers can be bound to the same subscription. // Messages are distributed to different consumers via the round robin polling mechanism, // and each message is only distributed to one consumer. // When the consumer disconnects, all messages sent to him // but not confirmed will be rescheduled and distributed to other surviving consumers. SubscriptionModeShard // 2 // SubscriptionModeFailover multiple consumers can be bound to the same subscription. // Consumers will be sorted in lexicographic order, // and the first consumer is initialized to the only consumer who accepts the message. // This consumer is called the master consumer. // When the master consumer is disconnected, // all messages (unconfirmed and subsequently entered) will be distributed to the next consumer in the queue. SubscriptionModeFailover // 3 )