Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var BrokerURL = os.Getenv(mqBrokerURLEnv)
BrokerURL is the url at which we can reach the message queueing system
var InputQueue = os.Getenv(mqInputQueueEnv)
InputQueue is the queue from which we pull the remote fragment descriptions
var OutputQueue = os.Getenv(mqOutputQueueEnv)
OutputQueue is the queue into which we push the remote fragment descriptions
var PrefetchCount = parsePrefetchCount(mqPrefetchCountEnv)
PrefetchCount is the amount of messages a MQConsumer will prefetch: maps to rabbitMQ's QualityOfService.PrefetchCount variable will be -1 (<0) if the variable was not set
Functions ¶
func VerifyMessageQueueEnvs ¶
func VerifyMessageQueueEnvs() error
VerifyMessageQueueEnvs checks whether each of the environment variables returned a non-empty value
Types ¶
type Acknowledger ¶
type Acknowledger struct {
// contains filtered or unexported fields
}
Acknowledger is responsible for sending acknowledgement messages to the MQBroker
func NewAcknowledger ¶
func NewAcknowledger(consumed chan amqp.Delivery, toAcknowledge chan transmit.Serializable) Acknowledger
NewAcknowledger isntantiates a new Acknowledger struct
func (*Acknowledger) Start ¶
func (ack *Acknowledger) Start(wg *sync.WaitGroup)
Start asychronously calls StartBlocking via Gorouting
func (*Acknowledger) StartBlocking ¶
func (ack *Acknowledger) StartBlocking()
StartBlocking listens on the two channels for new messages that were consumed but not acknowledged and for messages to acknowledge
type Consumer ¶
type Consumer struct { Output chan<- transmit.Serializable // *desc.RemoteFragmentDesc Unacked chan amqp.Delivery Exit chan bool QueueName string // contains filtered or unexported fields }
Consumer consumes messages from the messagequeue without acknowledging them
func NewConsumer ¶
func NewConsumer(out chan transmit.Serializable, mqChannel *amqp.Channel, inputQueue string) (consumer Consumer)
NewConsumer creates a message consumer for a listener
func (*Consumer) StartBlocking ¶
func (consumer *Consumer) StartBlocking()
StartBlocking listens on the rabbitMQ messagequeue and redirects messages on the INPUT_QUEUE to a channel
type Listener ¶
type Listener struct { BrokerURL string PrefetchCount int TargetQueue string CanExit chan bool // contains filtered or unexported fields }
Listener is the structure that listens to RabbitMQ and redirects messages to a channel
func NewListener ¶
func NewListener(output, toAcknowledge chan transmit.Serializable, brokerURL, inputQueue string, prefetchCount int) (listener Listener, err error)
NewListener creates a new message queue listener
func (*Listener) StartBlocking ¶
func (listener *Listener) StartBlocking()
StartBlocking listens on the rabbitMQ messagequeue and redirects messages on the INPUT_QUEUE to a channel
type MqFragmentDesc ¶
type MqFragmentDesc struct {
desc.RemoteFragmentDesc
}
MqFragmentDesc is a structure describing an iterum fragment to process coming from the MQ For now it is a copy of RemoteFragmentDesc but extensible for the future
func (*MqFragmentDesc) Deserialize ¶
func (mqfd *MqFragmentDesc) Deserialize(data []byte) (err error)
Deserialize tries to decode a json encoded byte array into `mqfd`. Errors on failure
func (*MqFragmentDesc) Serialize ¶
func (mqfd *MqFragmentDesc) Serialize() (data []byte, err error)
Serialize tries to transform `mqfd` into a json encoded bytearray. Errors on failure
type QPublisher ¶
type QPublisher struct { ToPublish chan transmit.Serializable // data.RemoteFragmentDesc Channel *amqp.Channel Queue *amqp.Queue TargetQueue string // contains filtered or unexported fields }
QPublisher is the structure that listens to a channel and publishes messages to rabbitMQ
func NewQPublisher ¶
func NewQPublisher(toPublish chan transmit.Serializable, channel *amqp.Channel, targetQueue string) QPublisher
NewQPublisher creates a new qpublisher which receives messages from a channel and sends them on the message queue.
func (*QPublisher) DeclareQueue ¶
func (qpublisher *QPublisher) DeclareQueue()
DeclareQueue defines the target queue
func (*QPublisher) Start ¶
func (qpublisher *QPublisher) Start(wg *sync.WaitGroup)
Start asychronously calls StartBlocking via Gorouting
func (*QPublisher) StartBlocking ¶
func (qpublisher *QPublisher) StartBlocking()
StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.
func (*QPublisher) Stop ¶
func (qpublisher *QPublisher) Stop()
Stop finishes up and notifies the user of its progress
type Sender ¶
type Sender struct { ToLineate chan<- transmit.Serializable // desc.RemoteFragmentDesc Publishers map[string]QPublisher TargetQueue string BrokerURL string // contains filtered or unexported fields }
Sender is the structure that listens to a channel and redirects messages to rabbitMQ
func NewSender ¶
func NewSender(toSend, toLineate chan transmit.Serializable, brokerURL, targetQueue string) (sender Sender, err error)
NewSender creates a new sender which receives messages from a channel and sends them on the message queue.
func (*Sender) StartBlocking ¶
func (sender *Sender) StartBlocking()
StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.
type SimpleSender ¶
type SimpleSender struct { ToSend chan transmit.Serializable TargetQueue string BrokerURL string // contains filtered or unexported fields }
SimpleSender is the structure that listens to a channel and redirects messages to rabbitMQ
func NewSimpleSender ¶
func NewSimpleSender(toSend chan transmit.Serializable, brokerURL, targetQueue string) (sender SimpleSender)
NewSimpleSender creates a new sender which receives messages from a channel and sends them on the message queue.
func (*SimpleSender) Start ¶
func (sender *SimpleSender) Start(wg *sync.WaitGroup)
Start asychronously calls StartBlocking via Gorouting
func (*SimpleSender) StartBlocking ¶
func (sender *SimpleSender) StartBlocking()
StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.
func (*SimpleSender) Stop ¶
func (sender *SimpleSender) Stop()
Stop finishes up and notifies the user of its progress