Documentation ¶
Overview ¶
Package pulsar implements a Pulsar client.
It exposes a lower-level Client type, and higher-level Managed* types that take care of reconnects, pings, topic lookup etc.
Index ¶
- Constants
- Variables
- type Client
- func (c *Client) Close() error
- func (c *Client) Closed() <-chan struct{}
- func (c *Client) Connect(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
- func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
- func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
- func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, ...) (*Consumer, error)
- func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)
- func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*Producer, error)
- func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)
- func (c *Client) Ping(ctx context.Context) error
- type ClientConfig
- type Consumer
- func (c *Consumer) Ack(msg Message) error
- func (c *Consumer) Close(ctx context.Context) error
- func (c *Consumer) Closed() <-chan struct{}
- func (c *Consumer) ConnClosed() <-chan struct{}
- func (c *Consumer) Flow(permits uint32) error
- func (c *Consumer) Messages() <-chan Message
- func (c *Consumer) ReachedEndOfTopic() <-chan struct{}
- func (c *Consumer) RedeliverOverflow(ctx context.Context) (int, error)
- func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error
- func (c *Consumer) Unsubscribe(ctx context.Context) error
- type ErrUnexpectedMsg
- type ManagedClient
- type ManagedClientConfig
- type ManagedClientPool
- type ManagedConsumer
- func (m *ManagedConsumer) Ack(ctx context.Context, msg Message) error
- func (m *ManagedConsumer) Close(ctx context.Context) error
- func (m *ManagedConsumer) Monitor() func()
- func (m *ManagedConsumer) Receive(ctx context.Context) (Message, error)
- func (m *ManagedConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Message) error
- func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)
- func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error
- func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error
- type ManagedConsumerConfig
- type ManagedProducer
- type ManagedProducerConfig
- type Message
- type Producer
Constants ¶
const ( // ProtoVersion is the Pulsar protocol version // used by this client. ProtoVersion = int32(api.ProtocolVersion_v12) // ClientVersion is an opaque string sent // by the client to the server on connect, eg: // "Pulsar-Client-Java-v1.15.2" ClientVersion = "pulsar-client-go" )
Variables ¶
var ErrClosedProducer = errors.New("producer is closed")
ErrClosedProducer is returned when attempting to send from a closed Producer.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a Pulsar client, capable of sending and receiving messages and managing the associated state.
func NewClient ¶
func NewClient(cfg ClientConfig) (*Client, error)
NewClient returns a Pulsar client for the given configuration options.
func (*Client) Close ¶
Close closes the connection. The channel returned from `Closed` will unblock. The client should no longer be used after calling Close.
func (*Client) Closed ¶
func (c *Client) Closed() <-chan struct{}
Closed returns a channel that unblocks when the client's connection has been closed and is no longer usable. Users should monitor this channel and recreate the Client if closed. TODO: Rename to Done
func (*Client) Connect ¶
Connect sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is not set in the CONNECT message. See ConnectTLS for TLS auth method. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw
func (*Client) ConnectTLS ¶
func (c *Client) ConnectTLS(ctx context.Context, proxyBrokerURL string) (*api.CommandConnected, error)
ConnectTLS sends a Connect message to the Pulsar server, then waits for either a CONNECTED response or the context to timeout. Connect should be called immediately after creating a client, before sending any other messages. The "auth method" is set to tls in the CONNECT message. The proxyBrokerURL may be blank, or it can be used to indicate that the client is connecting through a proxy server. See "Connection establishment" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Connectionestablishment-6pslvw
func (*Client) LookupTopic ¶
func (c *Client) LookupTopic(ctx context.Context, topic string, authoritative bool) (*api.CommandLookupTopicResponse, error)
LookupTopic returns metadata about the given topic. Topic lookup needs to be performed each time a client needs to create or reconnect a producer or a consumer. Lookup is used to discover which particular broker is serving the topic we are about to use.
The command has to be used in a connection that has already gone through the Connect / Connected initial handshake. See "Topic lookup" for more info: https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-rxds6i
func (*Client) NewExclusiveConsumer ¶
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan Message) (*Consumer, error)
NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (*Client) NewFailoverConsumer ¶
func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)
NewFailoverConsumer creates a new failover consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (*Client) NewProducer ¶
NewProducer creates a new producer capable of sending message to the given topic.
func (*Client) NewSharedConsumer ¶
func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error)
NewSharedConsumer creates a new shared consumer capable of reading messages from the given topic. See "Subscription modes" for more information: https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
type ClientConfig ¶
type ClientConfig struct { Addr string // pulsar broker address. May start with pulsar:// DialTimeout time.Duration // timeout to use when establishing TCP connection TLSConfig *tls.Config // TLS configuration. May be nil, in which case TLS will not be used Errs chan<- error // asynchronous errors will be sent here. May be nil // contains filtered or unexported fields }
ClientConfig is used to configure a Pulsar client.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer handles all consumer related state.
func (*Consumer) Ack ¶
Ack is used to signal to the broker that a given message has been successfully processed by the application and can be discarded by the broker.
func (*Consumer) Close ¶
Close closes the consumer. The channel returned from the Closed method will then unblock upon successful closure.
func (*Consumer) Closed ¶
func (c *Consumer) Closed() <-chan struct{}
Closed returns a channel that will block _unless_ the consumer has been closed, in which case the channel will have been closed and unblocked.
func (*Consumer) ConnClosed ¶
func (c *Consumer) ConnClosed() <-chan struct{}
ConnClosed unblocks when the consumer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the consumer.
func (*Consumer) Flow ¶
Flow command gives additional permits to send messages to the consumer. A typical consumer implementation will use a queue to accumulate these messages before the application is ready to consume them. After the consumer is ready, the client needs to give permission to the broker to push messages.
func (*Consumer) Messages ¶
Messages returns a read-only channel of messages received by the consumer. The channel will never be closed by the consumer.
func (*Consumer) ReachedEndOfTopic ¶
func (c *Consumer) ReachedEndOfTopic() <-chan struct{}
ReachedEndOfTopic unblocks whenever the topic has been "terminated" and all the messages on the subscription were acknowledged.
func (*Consumer) RedeliverOverflow ¶
RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003
func (*Consumer) RedeliverUnacknowledged ¶
RedeliverUnacknowledged uses the protocol option REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages.
type ErrUnexpectedMsg ¶
type ErrUnexpectedMsg struct {
// contains filtered or unexported fields
}
ErrUnexpectedMsg is returned when an unexpected message is received.
func (*ErrUnexpectedMsg) Error ¶
func (e *ErrUnexpectedMsg) Error() string
Error satisfies the error interface.
type ManagedClient ¶
type ManagedClient struct {
// contains filtered or unexported fields
}
ManagedClient wraps a Client with re-connect and connection management logic.
func NewManagedClient ¶
func NewManagedClient(cfg ManagedClientConfig) *ManagedClient
NewManagedClient returns a ManagedClient for the given address. The Client will be created and monitored in the background.
func (*ManagedClient) Done ¶
func (m *ManagedClient) Done() <-chan struct{}
Done returns a channel that unblocks when the ManagedClient has been closed.
func (*ManagedClient) Get ¶
func (m *ManagedClient) Get(ctx context.Context) (*Client, error)
Get returns the managed Client in a thread-safe way. If the client is temporarily unavailable, Get will block until either it becomes available or the context expires.
There is no guarantee that the returned Client will be connected or stay connected.
func (*ManagedClient) Stop ¶
func (m *ManagedClient) Stop() error
Stop closes the Client if possible, and/or stops it from re-connecting. The ManagedClient shouldn't be used after calling Stop.
type ManagedClientConfig ¶
type ManagedClientConfig struct { ClientConfig PingFrequency time.Duration // how often to PING server PingTimeout time.Duration // how long to wait for PONG response ConnectTimeout time.Duration // how long to wait for CONNECTED response InitialReconnectDelay time.Duration // how long to initially wait to reconnect Client MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Client }
ManagedClientConfig is used to configure a ManagedClient.
type ManagedClientPool ¶
type ManagedClientPool struct {
// contains filtered or unexported fields
}
ManagedClientPool provides a thread-safe store for ManagedClients, based on their address.
func NewManagedClientPool ¶
func NewManagedClientPool() *ManagedClientPool
NewManagedClientPool initializes a ManagedClientPool.
func (*ManagedClientPool) ForTopic ¶
func (m *ManagedClientPool) ForTopic(ctx context.Context, cfg ManagedClientConfig, topic string) (*ManagedClient, error)
ForTopic performs topic lookup for the given topic and returns the ManagedClient for the discovered topic information. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#Topiclookup-6g0lo incubator-pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
func (*ManagedClientPool) Get ¶
func (m *ManagedClientPool) Get(cfg ManagedClientConfig) *ManagedClient
Get returns the ManagedClient for the given client configuration. First the cache is checked for an existing client. If one doesn't exist, a new one is created and cached, then returned.
type ManagedConsumer ¶
type ManagedConsumer struct {
// contains filtered or unexported fields
}
ManagedConsumer wraps a Consumer with reconnect logic.
func NewManagedConsumer ¶
func NewManagedConsumer(cp *ManagedClientPool, cfg ManagedConsumerConfig) *ManagedConsumer
NewManagedConsumer returns an initialized ManagedConsumer. It will create and recreate a Consumer for the given discovery address and topic on a background goroutine.
func (*ManagedConsumer) Ack ¶
func (m *ManagedConsumer) Ack(ctx context.Context, msg Message) error
Ack acquires a consumer and sends an ACK message for the given message.
func (*ManagedConsumer) Close ¶ added in v0.1.1
func (m *ManagedConsumer) Close(ctx context.Context) error
Close consumer
func (*ManagedConsumer) Monitor ¶ added in v0.1.1
func (m *ManagedConsumer) Monitor() func()
Monitor a scoped deferrable lock
func (*ManagedConsumer) Receive ¶
func (m *ManagedConsumer) Receive(ctx context.Context) (Message, error)
Receive returns a single Message, if available. A reasonable context should be provided that will be used to wait for an incoming message if none are available.
func (*ManagedConsumer) ReceiveAsync ¶
func (m *ManagedConsumer) ReceiveAsync(ctx context.Context, msgs chan<- Message) error
ReceiveAsync blocks until the context is done. It continuously reads messages from the consumer and sends them to the provided channel. It manages flow control internally based on the queue size.
func (*ManagedConsumer) RedeliverOverflow ¶ added in v0.1.1
func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error)
RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that were dropped because of full message buffer. Note that for all subscription types other than `shared`, _all_ unacknowledged messages will be redelivered. https://github.com/apache/incubator-pulsar/issues/2003
func (*ManagedConsumer) RedeliverUnacknowledged ¶ added in v0.1.1
func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error
RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request for all messages that have not been acked.
func (*ManagedConsumer) Unsubscribe ¶ added in v0.1.1
func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error
Unsubscribe the consumer from its topic.
type ManagedConsumerConfig ¶
type ManagedConsumerConfig struct { ManagedClientConfig Topic string Name string // subscription name Exclusive bool // if false, subscription is shared Earliest bool // if true, subscription cursor set to beginning QueueSize int // number of messages to buffer before dropping messages NewConsumerTimeout time.Duration // maximum duration to create Consumer, including topic lookup InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Producer }
ManagedConsumerConfig is used to configure a ManagedConsumer.
type ManagedProducer ¶
type ManagedProducer struct {
// contains filtered or unexported fields
}
ManagedProducer wraps a Producer with re-connect logic.
func NewManagedProducer ¶
func NewManagedProducer(cp *ManagedClientPool, cfg ManagedProducerConfig) *ManagedProducer
NewManagedProducer returns an initialized ManagedProducer. It will create and re-create a Producer for the given discovery address and topic on a background goroutine.
func (*ManagedProducer) Close ¶ added in v0.1.1
func (m *ManagedProducer) Close(ctx context.Context) error
Close producer
func (*ManagedProducer) Monitor ¶ added in v0.1.1
func (m *ManagedProducer) Monitor() func()
Monitor a scoped deferrable lock
func (*ManagedProducer) Send ¶
func (m *ManagedProducer) Send(ctx context.Context, payload []byte) (*api.CommandSendReceipt, error)
Send attempts to use the Producer's Send method if available. If not available, an error is returned.
type ManagedProducerConfig ¶
type ManagedProducerConfig struct { ManagedClientConfig Topic string Name string NewProducerTimeout time.Duration // maximum duration to create Producer, including topic lookup InitialReconnectDelay time.Duration // how long to initially wait to reconnect Producer MaxReconnectDelay time.Duration // maximum time to wait to attempt to reconnect Producer }
ManagedProducerConfig is used to configure a ManagedProducer.
type Message ¶
type Message struct { Topic string Msg *api.CommandMessage Meta *api.MessageMetadata Payload []byte // contains filtered or unexported fields }
Message represents a received MESSAGE from the Pulsar server.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is responsible for creating a subscription producer and managing its state.
func (*Producer) Close ¶
Close closes the producer. When receiving a CloseProducer command, the broker will stop accepting any more messages for the producer, wait until all pending messages are persisted and then reply Success to the client. https://pulsar.incubator.apache.org/docs/latest/project/BinaryProtocol/#command-closeproducer
func (*Producer) Closed ¶
func (p *Producer) Closed() <-chan struct{}
Closed returns a channel that will block _unless_ the producer has been closed, in which case the channel will have been closed. TODO: Rename Done
func (*Producer) ConnClosed ¶
func (p *Producer) ConnClosed() <-chan struct{}
ConnClosed unblocks when the producer's connection has been closed. Once that happens, it's necessary to first recreate the client and then the producer.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package api provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol.
|
Package api provides the protocol buffer messages that Pulsar uses for the client/broker wire protocol. |
This program offers a simple CLI utility for interacting with a Pulsar server using the `pulsar` package.
|
This program offers a simple CLI utility for interacting with a Pulsar server using the `pulsar` package. |
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol.
|
Package frame provides the ability to encode and decode to and from Pulsar's custom binary protocol. |
fuzz
|
|