amq

package
v0.0.0-...-1b4c788 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2015 License: Apache-2.0 Imports: 3 Imported by: 0

Documentation

Overview

Package amq provides high-level message broker building blocks.

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAlreadyBound    = errors.New("Exchange: Already bound to this binding")
	ErrBindingNotFound = errors.New("Exchange: Binding not found")
)
View Source
var (
	ErrConsumerAlreadySubscribed = errors.New("Round robin: Consumer already subscribed")
	ErrConsumerNotFound          = errors.New("Round robin: Consumer not found")
)

Functions

This section is empty.

Types

type Binding

type Binding struct {
	Key      string
	Consumer MessageConsumer
}

Binding is an type representing connection between Message Exchange and either another Message Exchange or Message Queue.

type Exchange

type Exchange struct {
	// contains filtered or unexported fields
}

Exchange is a base AMQ entity, it supports goroutine-safe concurrent access.

Exchange needs to be initialized by calling NewExchange() with Matcher implementation.

Exchange delivers messages to bound bindings based on result of Matcher.Matches() call. Consumer bound via multiple bindings, will receive message only once.

func NewExchange

func NewExchange(matcher Matcher) *Exchange

func (*Exchange) BindTo

func (self *Exchange) BindTo(binding *Binding) error

func (*Exchange) Consume

func (self *Exchange) Consume(msg Message)

func (*Exchange) UnbindFrom

func (self *Exchange) UnbindFrom(binding *Binding) error

type Headers

type Headers map[string]interface{}

Headers type is a mapping of string header names to values.

type Matcher

type Matcher interface {
	Matches(Message, *Binding) bool
}

Matcher is an interface witch will be used by Exchange implementation to decide whatever to pass message over particular binding.

type Message

type Message interface {
	Headers() Headers
	RoutingKey() string
	Priority() uint8
	Timestamp() time.Time
	Body() []byte
}

Message interface is a core of this implementation, the methods defined on this interface are directly used within this package.

type MessageConsumer

type MessageConsumer interface {
	Consume(Message)
}

MessageConsumer is an interface representing entity capable of consuming, receiving or accumulating Messages.

MessageConsumer interface implementors MUST support equality check through `==` operator.

type MessagePublisher

type MessagePublisher interface {
	BindTo(*Binding) error
	UnbindFrom(*Binding) error
}

MessagePublisher is an interface representing entity capable of directing messages to specified binding.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue is a base AMQ entity, it supports goroutine-safe concurrent access.

Queue needs to be initialized by calling NewQueue()

Queue delivers messages to subscribed MessageConsumers in a round-robin fashion. Queue MUST be closed after use, either by calling Close() witch will flush all held messages to subscibed consumers (if any), or by calling ForceClose(), witch will drop messages and exit immediately.

func NewQueue

func NewQueue(handler QueueHandler) *Queue

NewQueue returns initialized Queue.

func (*Queue) Close

func (self *Queue) Close()

Close gracefully flushes messages to all subscribed consumers (if any), and closes Queue.

Is an error to use queue after it has been closed.

func (*Queue) Consume

func (self *Queue) Consume(msg Message)

Consume enqueues message in Queue, it's safe to call this method from multiple goroutines.

func (*Queue) ForceClose

func (self *Queue) ForceClose()

Close drops all messages and closes Queue.

Is an error to use queue after it has been force-closed.

func (*Queue) Len

func (self *Queue) Len() int

Len returns count of currently held messages in Queue, it's safe to call this method from multiple goroutines.

func (*Queue) Subscribe

func (self *Queue) Subscribe(consumer MessageConsumer) error

Subscribe new consumer in a round-robin ring, it's safe to call this method from multiple goroutines.

If consumer is already subscribed the returned error will be of type: ErrConsumerAlreadySubscribed

func (*Queue) Subscriptions

func (self *Queue) Subscriptions() []MessageConsumer

Subscriptions return list of currently subscribed consumers, it's safe to call this method from multiple goroutines.

func (*Queue) Unsubscribe

func (self *Queue) Unsubscribe(consumer MessageConsumer) error

Unsubscribe consumer from round-robin ring, it's safe to call this method from multiple goroutines.

If consumer isn't already subscribed the returned error will be of type: ErrConsumerNotFound

type QueueHandler

type QueueHandler interface {
	Add(Message)
	Peek() Message
	Remove()
	Len() int
}

QueueHandler is an interface used by Queue implementation for internal queueing of messages, implementors does not need to worry about concurrent access.

type RoundRobinDispatcher

type RoundRobinDispatcher interface {
	Subscribe(MessageConsumer) error
	Unsubscribe(MessageConsumer) error
	Subscriptions() []MessageConsumer
}

RoundRobinDispatcher is an interface representing entity capable of dispatching messages to set of MessageConsumers in a round-robin fashion.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL