Documentation
¶
Index ¶
- Variables
- func CloseSubscribers()
- func CreateQueue(channel *amqp.Channel, subscriber *Subscriber) error
- func DeleteQueue(s Subscriber) error
- func ExposeSubscriberConnectionForTests() *amqp.Connection
- func InitPop()
- func IsDevelopmentEnv() bool
- func Pop(subscriber *Subscriber) (string, error)
- func Register(s Subscriber, handler func(amqp.Delivery) bool)
- func ResetPopConnection()
- func StartSubscribers() error
- type AssuredPublisher
- func (p *AssuredPublisher) DisableRepublishing()
- func (p *AssuredPublisher) Publish(message string, subscriber *Subscriber) bool
- func (p *AssuredPublisher) PublishBytes(message []byte, subscriber *Subscriber) bool
- func (p *AssuredPublisher) PublishBytesWithArg(message []byte, subscriber *Subscriber, arg interface{}) bool
- func (p *AssuredPublisher) PublishWithArg(message string, subscriber *Subscriber, arg interface{}) bool
- func (p *AssuredPublisher) ReceiveAllConfirmations() bool
- func (p *AssuredPublisher) SetConfirmationHandler(confirmationHandler func(amqp.Confirmation, interface{}))
- func (p *AssuredPublisher) SetExplicitWaiting()
- func (p *AssuredPublisher) WaitForAllConfirmations() bool
- type Connection
- type Publisher
- func (p *Publisher) Close()
- func (p *Publisher) Confirm(wait bool) error
- func (p *Publisher) GetChannel() *amqp.Channel
- func (p *Publisher) NotifyPublish(size int) chan amqp.Confirmation
- func (p *Publisher) Publish(message string, subscriber *Subscriber) error
- func (p *Publisher) PublishBytes(message []byte, subscriber *Subscriber) error
- type Subscriber
Constants ¶
This section is empty.
Variables ¶
var ( // Subscribers is a map of all of the registered Subscribers Subscribers map[string]Subscriber // Handlers is a map of all of the registered Subscriber Handlers Handlers map[string]func(amqp.Delivery) bool )
Functions ¶
func CloseSubscribers ¶
func CloseSubscribers()
CloseSubscribers removes all subscribers, handlers, and closes the amqp connection
func CreateQueue ¶
func CreateQueue(channel *amqp.Channel, subscriber *Subscriber) error
CreateQueue creates a queue and binds it
func DeleteQueue ¶
func DeleteQueue(s Subscriber) error
DeleteQueue does what it says, deletes a queue in rabbit
func ExposeSubscriberConnectionForTests ¶
func ExposeSubscriberConnectionForTests() *amqp.Connection
ExposeSubscriberConnectionForTests returns the subscriber connection for testing purposes
func InitPop ¶
func InitPop()
InitPop intializes the RabbitMQ Connection and Channel for popping messages off of a queue.
func IsDevelopmentEnv ¶
func IsDevelopmentEnv() bool
IsDevelopmentEnv tells you if you are currently running in a development environment
func Pop ¶
func Pop(subscriber *Subscriber) (string, error)
Pop returns a single item from a RabbitMQ queue. It uses the Subscriber to know which queue to pop the item off. This is currently a helper function for the tests so you can pop a message off the queue and test it.
func Register ¶
func Register(s Subscriber, handler func(amqp.Delivery) bool)
Register adds a subscriber and handler to the subscribers pool
func ResetPopConnection ¶
func ResetPopConnection()
ResetPopConnection sets popConnection=nil and popChannel=nil for testing purposes
func StartSubscribers ¶
func StartSubscribers() error
StartSubscribers spins up all of the registered Subscribers and consumes messages on their respective queues.
Types ¶
type AssuredPublisher ¶
type AssuredPublisher struct { Publisher // contains filtered or unexported fields }
AssuredPublisher allows you to publish events to RabbitMQ with implicit delivery confirmation
func NewAssuredPublisher ¶
func NewAssuredPublisher(cancelChannel <-chan bool) *AssuredPublisher
NewAssuredPublisher constructs a new AssuredPublisher instance
func NewAssuredPublisherWithConnection ¶
func NewAssuredPublisherWithConnection(connection *Connection, cancelChannel <-chan bool) *AssuredPublisher
NewAssuredPublisherWithConnection constructs a new AssuredPublisher instance
func (*AssuredPublisher) DisableRepublishing ¶
func (p *AssuredPublisher) DisableRepublishing()
DisableRepublishing disables messages republishing
func (*AssuredPublisher) Publish ¶
func (p *AssuredPublisher) Publish(message string, subscriber *Subscriber) bool
Publish pushes items on to a RabbitMQ Queue. For AssuredPublisher it waits for delivery confirmaiton and retries on failures
func (*AssuredPublisher) PublishBytes ¶
func (p *AssuredPublisher) PublishBytes(message []byte, subscriber *Subscriber) bool
PublishBytes is the same as Publish but accepts a []byte instead of a string. For AssuredPublisher it waits for delivery confirmaiton and retries on failures
func (*AssuredPublisher) PublishBytesWithArg ¶
func (p *AssuredPublisher) PublishBytesWithArg(message []byte, subscriber *Subscriber, arg interface{}) bool
PublishBytesWithArg is the same as Publish but accepts a []byte instead of a string. The argument will be stored for passing into the confirmation handler. For AssuredPublisher it waits for delivery confirmaiton and retries on failures
func (*AssuredPublisher) PublishWithArg ¶
func (p *AssuredPublisher) PublishWithArg(message string, subscriber *Subscriber, arg interface{}) bool
PublishWithArg pushes items on to a RabbitMQ Queue. The argument will be stored for passing into the confirmation handler. For AssuredPublisher it waits for delivery confirmaiton and retries on failures
func (*AssuredPublisher) ReceiveAllConfirmations ¶
func (p *AssuredPublisher) ReceiveAllConfirmations() bool
ReceiveAllConfirmations explicitly receives all awaiting confirmations
func (*AssuredPublisher) SetConfirmationHandler ¶
func (p *AssuredPublisher) SetConfirmationHandler(confirmationHandler func(amqp.Confirmation, interface{}))
SetConfirmationHandler sets the handler which is called for every confirmation received
func (*AssuredPublisher) SetExplicitWaiting ¶
func (p *AssuredPublisher) SetExplicitWaiting()
SetExplicitWaiting disables implicit waiting for a confirmation after each publishing
func (*AssuredPublisher) WaitForAllConfirmations ¶
func (p *AssuredPublisher) WaitForAllConfirmations() bool
WaitForAllConfirmations waits for all confirmations and retries publishing if needed. Returns false only if is cancelled.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection represents an autorecovering connection
func NewConnectionWithURL ¶
func NewConnectionWithURL(url string) *Connection
NewConnectionWithURL creates a new connection with a custom RabbitMQ URL
func (*Connection) GetConnection ¶
func (connection *Connection) GetConnection() *amqp.Connection
GetConnection returns an amqp.Connection stored in Connection. It establishes a new connection if needed.
func (*Connection) ReplaceConnection ¶
func (connection *Connection) ReplaceConnection(newConnection *amqp.Connection)
ReplaceConnection replaces the internal connection with a given one. For testing purposes only
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
Publisher allows you to publish events to RabbitMQ
func NewPublisher ¶
NewPublisher constructs a new Publisher instance.
func NewPublisherWithConnection ¶
func NewPublisherWithConnection(connection *Connection, cancelChannel <-chan bool) *Publisher
NewPublisherWithConnection constructs a new Publisher instance with a custom connection
func (*Publisher) Close ¶
func (p *Publisher) Close()
Close will close the connection and channel for the Publisher
func (*Publisher) GetChannel ¶
GetChannel returns a publisher's channel. It opens a new channel if needed.
func (*Publisher) NotifyPublish ¶
func (p *Publisher) NotifyPublish(size int) chan amqp.Confirmation
NotifyPublish registers a listener for reliable publishing.
func (*Publisher) Publish ¶
func (p *Publisher) Publish(message string, subscriber *Subscriber) error
Publish pushes items on to a RabbitMQ Queue.
func (*Publisher) PublishBytes ¶
func (p *Publisher) PublishBytes(message []byte, subscriber *Subscriber) error
PublishBytes is the same as Publish but accepts a []byte instead of a string
type Subscriber ¶
type Subscriber struct { Concurrency int Durable bool Exchange string Queue string RoutingKey string PrefetchCount int AutoDelete bool ManualAck bool }
Subscriber contains all of the necessary data for Publishing and Subscriber to RabbitMQ Topics
func (*Subscriber) AutoDeleteInDev ¶
func (subscriber *Subscriber) AutoDeleteInDev()
AutoDeleteInDev will set the Subscribers AutoDelete setting to true as long as you are in a development environement. Non production environements have a APP_ENV value that isn't ("production", "prod", "staging", "stage"). This is used for running a worker in your local environment but connecting to a stage or prodution rabbit server. You want to ensure the Subscriber gets AutoDeleted on the remote server.
func (*Subscriber) PrefixQueueInDev ¶
func (subscriber *Subscriber) PrefixQueueInDev()
PrefixQueueInDev will prefix the queue name with the name of your current user if of the APP_ENV variable is set to a non production value ("production", "prod", "staging", "stage"). This is used for running a worker in your local environment but connecting to a stage or prodution rabbit server.