messageq

package
v0.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2020 License: MIT Imports: 13 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var BrokerURL = os.Getenv(mqBrokerURLEnv)

BrokerURL is the url at which we can reach the message queueing system

View Source
var InputQueue = os.Getenv(mqInputQueueEnv)

InputQueue is the queue from which we pull the remote fragment descriptions

View Source
var OutputQueue = os.Getenv(mqOutputQueueEnv)

OutputQueue is the queue into which we push the remote fragment descriptions

View Source
var PrefetchCount = parsePrefetchCount(mqPrefetchCountEnv)

PrefetchCount is the amount of messages a MQConsumer will prefetch: maps to rabbitMQ's QualityOfService.PrefetchCount variable will be -1 (<0) if the variable was not set

Functions

func VerifyMessageQueueEnvs

func VerifyMessageQueueEnvs() error

VerifyMessageQueueEnvs checks whether each of the environment variables returned a non-empty value

Types

type Acknowledger

type Acknowledger struct {
	// contains filtered or unexported fields
}

Acknowledger is responsible for sending acknowledgement messages to the MQBroker

func NewAcknowledger

func NewAcknowledger(consumed chan amqp.Delivery, toAcknowledge chan transmit.Serializable) Acknowledger

NewAcknowledger isntantiates a new Acknowledger struct

func (*Acknowledger) Start

func (ack *Acknowledger) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via Gorouting

func (*Acknowledger) StartBlocking

func (ack *Acknowledger) StartBlocking()

StartBlocking listens on the two channels for new messages that were consumed but not acknowledged and for messages to acknowledge

type Consumer

type Consumer struct {
	Output  chan<- transmit.Serializable // *desc.RemoteFragmentDesc
	Unacked chan amqp.Delivery
	Exit    chan bool

	QueueName string
	// contains filtered or unexported fields
}

Consumer consumes messages from the messagequeue without acknowledging them

func NewConsumer

func NewConsumer(out chan transmit.Serializable, mqChannel *amqp.Channel, inputQueue string) (consumer Consumer)

NewConsumer creates a message consumer for a listener

func (*Consumer) Start

func (consumer *Consumer) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via a Goroutine

func (*Consumer) StartBlocking

func (consumer *Consumer) StartBlocking()

StartBlocking listens on the rabbitMQ messagequeue and redirects messages on the INPUT_QUEUE to a channel

func (*Consumer) Stop

func (consumer *Consumer) Stop()

Stop finishes up the consumer

type Listener

type Listener struct {
	BrokerURL     string
	PrefetchCount int
	TargetQueue   string
	CanExit       chan bool
	// contains filtered or unexported fields
}

Listener is the structure that listens to RabbitMQ and redirects messages to a channel

func NewListener

func NewListener(output, toAcknowledge chan transmit.Serializable, brokerURL, inputQueue string, prefetchCount int) (listener Listener, err error)

NewListener creates a new message queue listener

func (*Listener) Start

func (listener *Listener) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via Gorouting

func (*Listener) StartBlocking

func (listener *Listener) StartBlocking()

StartBlocking listens on the rabbitMQ messagequeue and redirects messages on the INPUT_QUEUE to a channel

type MqFragmentDesc

type MqFragmentDesc struct {
	desc.RemoteFragmentDesc
}

MqFragmentDesc is a structure describing an iterum fragment to process coming from the MQ For now it is a copy of RemoteFragmentDesc but extensible for the future

func (*MqFragmentDesc) Deserialize

func (mqfd *MqFragmentDesc) Deserialize(data []byte) (err error)

Deserialize tries to decode a json encoded byte array into `mqfd`. Errors on failure

func (*MqFragmentDesc) Serialize

func (mqfd *MqFragmentDesc) Serialize() (data []byte, err error)

Serialize tries to transform `mqfd` into a json encoded bytearray. Errors on failure

type QPublisher

type QPublisher struct {
	ToPublish   chan transmit.Serializable // data.RemoteFragmentDesc
	Channel     *amqp.Channel
	Queue       *amqp.Queue
	TargetQueue string
	// contains filtered or unexported fields
}

QPublisher is the structure that listens to a channel and publishes messages to rabbitMQ

func NewQPublisher

func NewQPublisher(toPublish chan transmit.Serializable, channel *amqp.Channel, targetQueue string) QPublisher

NewQPublisher creates a new qpublisher which receives messages from a channel and sends them on the message queue.

func (*QPublisher) DeclareQueue

func (qpublisher *QPublisher) DeclareQueue()

DeclareQueue defines the target queue

func (*QPublisher) Start

func (qpublisher *QPublisher) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via Gorouting

func (*QPublisher) StartBlocking

func (qpublisher *QPublisher) StartBlocking()

StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.

func (*QPublisher) Stop

func (qpublisher *QPublisher) Stop()

Stop finishes up and notifies the user of its progress

type Sender

type Sender struct {
	ToLineate  chan<- transmit.Serializable // desc.RemoteFragmentDesc
	Publishers map[string]QPublisher

	TargetQueue string
	BrokerURL   string
	// contains filtered or unexported fields
}

Sender is the structure that listens to a channel and redirects messages to rabbitMQ

func NewSender

func NewSender(toSend, toLineate chan transmit.Serializable, brokerURL, targetQueue string) (sender Sender, err error)

NewSender creates a new sender which receives messages from a channel and sends them on the message queue.

func (Sender) Start

func (sender Sender) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via Gorouting

func (*Sender) StartBlocking

func (sender *Sender) StartBlocking()

StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.

func (Sender) Stop

func (sender Sender) Stop()

Stop finishes up and notifies the user of its progress

type SimpleSender

type SimpleSender struct {
	ToSend      chan transmit.Serializable
	TargetQueue string
	BrokerURL   string
	// contains filtered or unexported fields
}

SimpleSender is the structure that listens to a channel and redirects messages to rabbitMQ

func NewSimpleSender

func NewSimpleSender(toSend chan transmit.Serializable, brokerURL, targetQueue string) (sender SimpleSender)

NewSimpleSender creates a new sender which receives messages from a channel and sends them on the message queue.

func (*SimpleSender) Start

func (sender *SimpleSender) Start(wg *sync.WaitGroup)

Start asychronously calls StartBlocking via Gorouting

func (*SimpleSender) StartBlocking

func (sender *SimpleSender) StartBlocking()

StartBlocking listens to the channel, and send remoteFragments to the message queue on the OUTPUT_QUEUE queue.

func (*SimpleSender) Stop

func (sender *SimpleSender) Stop()

Stop finishes up and notifies the user of its progress

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL