Documentation ¶
Index ¶
- type Config
- type DefaultMarshaler
- type PubSub
- func (p *PubSub) Close() error
- func (p *PubSub) IsConnected() bool
- func (p *PubSub) Publish(topic string, messages ...*message.Message) error
- func (p *PubSub) PublishWithOpts(topic string, msg *message.Message, opts ...spi.Option) error
- func (p *PubSub) Subscribe(ctx context.Context, topic string) (<-chan *message.Message, error)
- func (p *PubSub) SubscribeWithOpts(ctx context.Context, topic string, opts ...spi.Option) (<-chan *message.Message, error)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { URI string MaxConnectRetries int MaxConnectionChannels int MaxRedeliveryAttempts int RedeliveryMultiplier float64 RedeliveryInitialInterval time.Duration MaxRedeliveryInterval time.Duration PublisherChannelPoolSize int PublisherConfirmDelivery bool }
Config holds the configuration for the publisher/subscriber.
type DefaultMarshaler ¶ added in v1.0.0
type DefaultMarshaler struct { // PostprocessPublishing can be used to make some extra processing with amqp.Publishing, // for example add CorrelationId and ContentType: // // amqp.DefaultMarshaler{ // PostprocessPublishing: func(publishing stdAmqp.Publishing) stdAmqp.Publishing { // publishing.CorrelationId = "correlation" // publishing.ContentType = "application/json" // // return publishing // }, // } PostprocessPublishing func(amqp.Publishing) amqp.Publishing // When true, DeliveryMode will be not set to Persistent. // // DeliveryMode Transient means higher throughput, but messages will not be // restored on broker restart. The delivery mode of publishings is unrelated // to the durability of the queues they reside on. Transient messages will // not be restored to durable queues, persistent messages will be restored to // durable queues and lost on non-durable queues during server restart. NotPersistentDeliveryMode bool // Header used to store and read message UUID. // // If value is empty, defaultMessageUUIDHeaderKey value is used. // If header doesn't exist, empty value is passed as message UUID. MessageUUIDHeaderKey string }
DefaultMarshaler is a modified version of the marshaller in watermill-amqp. This marshaller adds support for dead-letter queue header values and also allows a message's expiration to be set in the header.
func (DefaultMarshaler) Marshal ¶ added in v1.0.0
func (d DefaultMarshaler) Marshal(msg *message.Message) (amqp.Publishing, error)
Marshal marshals a message.
type PubSub ¶
PubSub implements a publisher/subscriber that connects to an AMQP-compatible message queue.
func (*PubSub) IsConnected ¶ added in v1.0.0
IsConnected return true if connected to the AMQP server.
func (*PubSub) PublishWithOpts ¶ added in v1.0.0
PublishWithOpts publishes a message to a topic using the supplied options.
func (*PubSub) Subscribe ¶
Subscribe subscribes to a topic and returns the Go channel over which messages are sent. The returned channel will be closed when Close() is called on this struct.
func (*PubSub) SubscribeWithOpts ¶
func (p *PubSub) SubscribeWithOpts(ctx context.Context, topic string, opts ...spi.Option) (<-chan *message.Message, error)
SubscribeWithOpts subscribes to a topic using the given options, and returns the Go channel over which messages are sent. The returned channel will be closed when Close() is called on this struct.