Documentation ¶
Index ¶
- Variables
- type Connection
- type InvalidConfigError
- type Listener
- type Marshaler
- type Publisher
- type PublisherConfig
- type Subscriber
- func (s *Subscriber) Addr() string
- func (s *Subscriber) Close() error
- func (s *Subscriber) Connect(l Listener) error
- func (s *Subscriber) GetConnection() (Connection, error)
- func (s *Subscriber) SetConnection(c Connection)
- func (s *Subscriber) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
- type SubscriberConfig
- type Unmarshaler
Constants ¶
This section is empty.
Variables ¶
var ( ErrPublisherClosed = errors.New("publisher closed") ErrSubscriberClosed = errors.New("subscriber closed") ErrSubscriberNotStarted = errors.New("subscriber not started") ErrNacked = errors.New("remote side sent nack for message") ErrIOTimeout = os.ErrDeadlineExceeded ErrConnectionNotSet = errors.New("connection not set") )
Functions ¶
This section is empty.
Types ¶
type Connection ¶
type InvalidConfigError ¶
func (*InvalidConfigError) Error ¶
func (ic *InvalidConfigError) Error() string
type Listener ¶
type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Connection, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() net.Addr }
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
func NewPublisher ¶
func NewPublisher(config PublisherConfig, waitAck bool) (*Publisher, error)
NewPublisher create new publisher. ATTENTION! Set connection immediately after creation.
func (*Publisher) GetConnection ¶ added in v0.1.3
func (p *Publisher) GetConnection() (Connection, error)
GetConnection get publisher connection.
func (*Publisher) Publish ¶
Publish publishes provided messages to given topic. Publish can be synchronous or asynchronous - it depends on the implementation.
Most publishers implementations don't support atomic publishing of messages. This means that if publishing one of the messages fails, the next messages will not be published.
Publish must be thread safe.
func (*Publisher) SetConnection ¶ added in v0.1.3
func (p *Publisher) SetConnection(c Connection)
type PublisherConfig ¶
type PublisherConfig struct { Marshaler Marshaler Unmarshaler Unmarshaler Logger watermill.LoggerAdapter }
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func NewSubscriber ¶
func NewSubscriber(config SubscriberConfig) (*Subscriber, error)
NewSubscriber create new subscriber. ATTENTION! Set connection immediately after creation.
func (*Subscriber) Addr ¶
func (s *Subscriber) Addr() string
func (*Subscriber) Close ¶
func (s *Subscriber) Close() error
Close closes all subscriptions with their output channels and flush offsets etc. when needed.
func (*Subscriber) Connect ¶
func (s *Subscriber) Connect(l Listener) error
Connect establish connection. If connection was set via SetConnection, we should pass nil to l. If listener was set, subscriber will be waiting until the client reconnects when the connection is lost.
func (*Subscriber) GetConnection ¶ added in v0.1.3
func (s *Subscriber) GetConnection() (Connection, error)
GetConnection get publisher connection.
func (*Subscriber) SetConnection ¶ added in v0.1.3
func (s *Subscriber) SetConnection(c Connection)
func (*Subscriber) Subscribe ¶
Subscribe returns output channel with messages from provided topic. Channel is closed, when Close() was called on the subscriber.
To receive the next message, `Ack()` must be called on the received message. If message processing failed and message should be redelivered `Nack()` should be called.
When provided ctx is cancelled, subscriber will close subscribe and close output channel. Provided ctx is set to all produced messages. When Nack or Ack is called on the message, context of the message is canceled. will wait for reconnects and will not exit the read loop when the connection is lost. Since it is impossible to understand whether the remote side will reconnect, this is a mandatory mechanism.
type SubscriberConfig ¶
type SubscriberConfig struct { Marshaler Marshaler Unmarshaler Unmarshaler Logger watermill.LoggerAdapter }