Documentation
¶
Overview ¶
Package amq provides high-level message broker building blocks.
Index ¶
- Variables
- type Binding
- type Exchange
- type Headers
- type Matcher
- type Message
- type MessageConsumer
- type MessagePublisher
- type Queue
- func (self *Queue) Close()
- func (self *Queue) Consume(msg Message)
- func (self *Queue) ForceClose()
- func (self *Queue) Len() int
- func (self *Queue) Subscribe(consumer MessageConsumer) error
- func (self *Queue) Subscriptions() []MessageConsumer
- func (self *Queue) Unsubscribe(consumer MessageConsumer) error
- type QueueHandler
- type RoundRobinDispatcher
Constants ¶
This section is empty.
Variables ¶
var ( ErrAlreadyBound = errors.New("Exchange: Already bound to this binding") ErrBindingNotFound = errors.New("Exchange: Binding not found") )
var ( ErrConsumerAlreadySubscribed = errors.New("Round robin: Consumer already subscribed") ErrConsumerNotFound = errors.New("Round robin: Consumer not found") )
Functions ¶
This section is empty.
Types ¶
type Binding ¶
type Binding struct { Key string Consumer MessageConsumer }
Binding is an type representing connection between Message Exchange and either another Message Exchange or Message Queue.
type Exchange ¶
type Exchange struct {
// contains filtered or unexported fields
}
Exchange is a base AMQ entity, it supports goroutine-safe concurrent access.
Exchange needs to be initialized by calling NewExchange() with Matcher implementation.
Exchange delivers messages to bound bindings based on result of Matcher.Matches() call. Consumer bound via multiple bindings, will receive message only once.
func NewExchange ¶
func (*Exchange) UnbindFrom ¶
type Headers ¶
type Headers map[string]interface{}
Headers type is a mapping of string header names to values.
type Matcher ¶
Matcher is an interface witch will be used by Exchange implementation to decide whatever to pass message over particular binding.
type Message ¶
type Message interface { Headers() Headers RoutingKey() string Priority() uint8 Timestamp() time.Time Body() []byte }
Message interface is a core of this implementation, the methods defined on this interface are directly used within this package.
type MessageConsumer ¶
type MessageConsumer interface {
Consume(Message)
}
MessageConsumer is an interface representing entity capable of consuming, receiving or accumulating Messages.
MessageConsumer interface implementors MUST support equality check through `==` operator.
type MessagePublisher ¶
MessagePublisher is an interface representing entity capable of directing messages to specified binding.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue is a base AMQ entity, it supports goroutine-safe concurrent access.
Queue needs to be initialized by calling NewQueue()
Queue delivers messages to subscribed MessageConsumers in a round-robin fashion. Queue MUST be closed after use, either by calling Close() witch will flush all held messages to subscibed consumers (if any), or by calling ForceClose(), witch will drop messages and exit immediately.
func (*Queue) Close ¶
func (self *Queue) Close()
Close gracefully flushes messages to all subscribed consumers (if any), and closes Queue.
Is an error to use queue after it has been closed.
func (*Queue) Consume ¶
Consume enqueues message in Queue, it's safe to call this method from multiple goroutines.
func (*Queue) ForceClose ¶
func (self *Queue) ForceClose()
Close drops all messages and closes Queue.
Is an error to use queue after it has been force-closed.
func (*Queue) Len ¶
Len returns count of currently held messages in Queue, it's safe to call this method from multiple goroutines.
func (*Queue) Subscribe ¶
func (self *Queue) Subscribe(consumer MessageConsumer) error
Subscribe new consumer in a round-robin ring, it's safe to call this method from multiple goroutines.
If consumer is already subscribed the returned error will be of type: ErrConsumerAlreadySubscribed
func (*Queue) Subscriptions ¶
func (self *Queue) Subscriptions() []MessageConsumer
Subscriptions return list of currently subscribed consumers, it's safe to call this method from multiple goroutines.
func (*Queue) Unsubscribe ¶
func (self *Queue) Unsubscribe(consumer MessageConsumer) error
Unsubscribe consumer from round-robin ring, it's safe to call this method from multiple goroutines.
If consumer isn't already subscribed the returned error will be of type: ErrConsumerNotFound
type QueueHandler ¶
QueueHandler is an interface used by Queue implementation for internal queueing of messages, implementors does not need to worry about concurrent access.
type RoundRobinDispatcher ¶
type RoundRobinDispatcher interface { Subscribe(MessageConsumer) error Unsubscribe(MessageConsumer) error Subscriptions() []MessageConsumer }
RoundRobinDispatcher is an interface representing entity capable of dispatching messages to set of MessageConsumers in a round-robin fashion.