Documentation ¶
Overview ¶
Rabbit_hole creates a simple interface to a RabbitMQ exchange allowing setup via one function call to Mk_mqreader() for reading and Mk_mqwriter() for sending. Rabbit_hole also provides for a channel listening interface which can be paused by the user.
The user programme can create a listener via Mk_mqreader() and then can either listen directly on the lister.Port for amqp.Delivery messages, or can invoke listener.Eat() and supply a channel where Eat() will write received messages (allowing a central user function to process all messages from multiple listeners).
User programme creates a sender via Mk_mqwriter() which returns a struct that is used to start the driver. Once the driver is started, the user can pass messages on the struct.Port and the driver will push it out to the message exchange.
Index ¶
Constants ¶
const ( DURABLE bool = true // bloody amqp parms are true false w/o constants to make code readable AUTO_DEL bool = true INTERNAL bool = true WAIT bool = false EXCLUSIVE bool = true AUTO_ACK bool = true // automatically ack messages so we/user does not have to LOCAL bool = true MANDITORY bool = true IMMED bool = true )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Mq_reader ¶
type Mq_reader struct { Port <-chan amqp.Delivery // this is exposed so that the user can listen directly // contains filtered or unexported fields }
Manages a reader connection.
func Mk_mqreader ¶
func Mk_mqreader(host string, port string, user string, pw string, ex string, ex_type string, key *string) (rdr *Mq_reader, err error)
Create a reader for a given user/pw host/port exchange 5-tuple. Creates a connection and channel.
The exchange type has the following syntax:
[type][+eop+eop..][>[qname][+qop+qop...]
Type is fanout etc. Eop is one of the following and can be !eop to negate:
ad == autodel du == durable in == internal
qname is the name of the queue, if >+qop is coded, then a random name is gnerated. The qops are:
ad == auto delete du == durable ex == exclusive
Any attribute may be prefixed with ! to negate it (e.g. !ad) and the order of either type of op is NOT important.
The queue type must match the exchange type (durable or !durable) and setting the type for the exchage will also set the type for the queue, and thus setting du or !du in the queue is optional.
The key is a string of one or more comma separated keys which will be asscoiated with the queue when bound to the exchange. For a direct exchange, this allows the same queue to be used to receive multiple message types based on key.
func (*Mq_reader) Close ¶
func (rdr *Mq_reader) Close()
----------------reader things ----------------------------------------------------------
Ensure that everything is closed down before we go.
func (*Mq_reader) Pause ¶
Pause will set the pause flag in the reader based on the on/off state that is passed in. When paused is true, then messages which are receivd from the Rabbit exchange are dropped. When paused, and quiet is off, periodic dropped counts are written to the standard error device.
func (*Mq_reader) Start_eating ¶
Start_eating causes the package to beging processin messages from the rabbit channel and passing them on the user channel, or dropping them if paused.
type Mq_writer ¶
type Mq_writer struct { Port chan interface{} // anything user writes to port is sent to the exhange (accepts string, *string and []byte blobs) Notify chan string // user can send request to us (quiet, stop, etc.) // contains filtered or unexported fields }
Struct which manages a connection to an exchange for writing.
func Mk_mqwriter ¶
func Mk_mqwriter(host string, port string, user string, pw string, ex string, ex_type string, key *string) (wrtr *Mq_writer, err error)
Create a new writer which connects to the RMQ server and binds to the named exchange.
func (*Mq_writer) Close ¶
func (wrtr *Mq_writer) Close()
Ensure that everything is closed down before we go.
func (*Mq_writer) Delete ¶
Delete the queue and exchange associated with the writer. If force is not true, then deletion only happens if there are no consumers on the queue.
func (*Mq_writer) Driver ¶
func (wrtr *Mq_writer) Driver()
Listens on the channel for some kind of 'blob' and then sends it off to the exchange. The blob can be string, *string, []byte, or Mq_msg. Mq_msg is the only way of using a key other than the default given when the driver was started.
If the write channel encounters an error, an attempt to reconnct will be made, and writing will resume when a connection is established, however any messages received on our inbound channel will be dropped. The user can test the state after each burst (or individual message) using the Is_alive() function which will return false if the connection is down. This does not make any attempt to preserve messages during an outage.
func (*Mq_writer) Is_alive ¶
Is alive checks the known state of the connection and returns true if it is thought to be alive.
func (*Mq_writer) Start_writer ¶
Starts the writer driver for the exchange using the given key as the default.