Documentation
¶
Overview ¶
Framework for AMQP.
It's built on top of https://github.com/streadway/amqp
Example (Common) ¶
This example shows common use-case of library.
type Comment struct { Id string Message string } ch := make(chan []interface{}) // Listens errors and writes them to stdout. go func() { for l := range ch { fmt.Println(l...) } }() lg := logger.NewChanLogger(ch) client, err := NewClient(conn.DefaultConnector( "amqp://localhost:5672", conn.WithLogger(lg), // We want to know connection status and errors. ), TemporaryExchange("example-exchange"), ) if err != nil { panic(err) } subscriber := client.Subscriber() eventChan := subscriber.SubscribeToExchange(context.Background(), "example-exchange", Comment{}, Consumer{}) go func() { for event := range eventChan { fmt.Println(event.Data) // do something with events } }() publisher := client.Publisher() for i := 0; i < 10; i++ { // Prepare your data before publishing comment := Comment{ Id: strconv.Itoa(i), Message: "message " + strconv.Itoa(i), } err := publisher.Publish(context.Background(), "example-exchange", comment, Publish{}) if err != nil { panic(err) } time.Sleep(time.Millisecond * 500) } time.Sleep(time.Second * 5) // wait for delivering all events
Output:
Index ¶
- Constants
- Variables
- func CommonMessageIdBuilder() string
- func CommonTyper(v interface{}) string
- func WrapError(errs ...interface{}) error
- type Binding
- type Channel
- type Client
- type Consumer
- type ContentTyper
- type Declaration
- type DeliveryBefore
- type ErrorBefore
- type Event
- type Exchange
- type Exchanges
- type ObserverOption
- type Publish
- type Publisher
- type PublisherOption
- func PublisherBefore(before ...PublishingBefore) PublisherOption
- func PublisherHandlersAmount(n int) PublisherOption
- func PublisherLogger(lg logger.Logger) PublisherOption
- func PublisherWaitConnection(should bool, timeout time.Duration) PublisherOption
- func PublisherWithObserverOptions(opts ...ObserverOption) PublisherOption
- type PublishingBefore
- type Queue
- type Queues
- type Subscriber
- func (s Subscriber) Subscribe(ctx context.Context, exchangeName, queueName string, dataType interface{}, ...) <-chan Event
- func (s Subscriber) SubscribeToExchange(ctx context.Context, exchangeName string, dataType interface{}, cfg Consumer) <-chan Event
- func (s Subscriber) SubscribeToQueue(ctx context.Context, queueName string, dataType interface{}, cfg Consumer) <-chan Event
- type SubscriberOption
- func SubHandlersAmount(n int) SubscriberOption
- func SubProcessAllDeliveries(v bool) SubscriberOption
- func SubSetDefaultContentType(t string) SubscriberOption
- func SubWithObserverOptions(opts ...ObserverOption) SubscriberOption
- func SubscriberAllowedPriority(from, to uint8) SubscriberOption
- func SubscriberBufferSize(a int) SubscriberOption
- func SubscriberDeliverBefore(before ...DeliveryBefore) SubscriberOption
- func SubscriberLogger(lg logger.Logger) SubscriberOption
- func SubscriberWaitConnection(should bool, timeout time.Duration) SubscriberOption
- type SyncedStringSlice
Examples ¶
Constants ¶
const ( MaxMessagePriority = 9 MinMessagePriority = 0 )
Variables ¶
var ( // This error occurs when message was delivered, but it has too low or too high priority. NotAllowedPriority = errors.New("not allowed priority") // DeliveryChannelWasClosedError is an information error, that logs to info logger when delivery channel was closed. DeliveryChannelWasClosedError = errors.New("delivery channel was closed") // Durable or non-auto-delete queues with empty names will survive when all consumers have finished using it, but no one can connect to it back. QueueDeclareWarning = errors.New("declaring durable or non-auto-delete queue with empty name") )
var CodecNotFound = errors.New("codec not found")
Functions ¶
func CommonMessageIdBuilder ¶
func CommonMessageIdBuilder() string
CommonMessageIdBuilder builds new UUID as message Id.
func CommonTyper ¶
func CommonTyper(v interface{}) string
CommonTyper prints go-style type of value.
Types ¶
type Channel ¶
type Channel struct {
// contains filtered or unexported fields
}
Channel is a wrapper of *amqp.Channel
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func NewClient ¶
func NewClient(connector conn.Connector, decls ...Declaration) (cl Client, err error)
func (*Client) Publisher ¶
func (c *Client) Publisher(opts ...PublisherOption) *Publisher
func (*Client) Subscriber ¶
func (c *Client) Subscriber(opts ...SubscriberOption) *Subscriber
type ContentTyper ¶
type ContentTyper interface {
ContentType() string
}
type Declaration ¶
type Declaration interface {
// contains filtered or unexported methods
}
type DeliveryBefore ¶
Function, that changes message before delivering.
type ErrorBefore ¶
Function, that changes error, which caused on incorrect handling. Common use-case: debugging.
type Event ¶
type Event struct { // Converted and ready to use pointer to entity of reply type. Data interface{} // Event's context. // Contains context.Background by default and setups with DeliveryBefore option. Context context.Context amqp.Delivery }
Event represents amqp.Delivery with attached context and data
type Exchange ¶
type Exchange struct { Name string Kind string Durable bool AutoDelete bool Internal bool NoWait bool Args amqp.Table }
func LongExchange ¶
LongExchange is a common way to declare exchange with given name.
func TemporaryExchange ¶
TemporaryExchange is a common way to create temporary exchange with given name.
type Exchanges ¶ added in v1.1.0
type Exchanges []Exchange
func PersistentExchanges ¶ added in v1.1.0
PersistentExchanges allow you to declare a bunch of exchanges with given names.
type ObserverOption ¶
type ObserverOption func(opts *observerOpts)
func Lifetime ¶
func Lifetime(dur time.Duration) ObserverOption
Lifetime sets duration between observer checks idle channels. Somewhere between dur and 2*dur observer will close channels, which do not used at least `dur` time units. Default value is 15 seconds.
func LimitCount ¶
func LimitCount(count int) ObserverOption
LimitCount limits messages for channel, by calling Qos after each reconnection. Pass zero for unlimited messages.
func LimitSize ¶
func LimitSize(size int) ObserverOption
LimitSize limits messages size in bytes for channel, by calling Qos after each reconnection. Pass zero for unlimited messages.
func Max ¶
func Max(max int) ObserverOption
Max sets maximum amount of channels, that can be opened at the same time.
func Min ¶
func Min(min int) ObserverOption
Min sets minimum amount of channels, that should be opened at the same time. Min does not open new channels, but forces observer not to close existing ones.
type Publisher ¶
type Publisher struct {
// contains filtered or unexported fields
}
type PublisherOption ¶
type PublisherOption func(*Publisher)
func PublisherBefore ¶
func PublisherBefore(before ...PublishingBefore) PublisherOption
PublishBefore adds functions, that should be called before publishing message to broker.
func PublisherHandlersAmount ¶
func PublisherHandlersAmount(n int) PublisherOption
HandlersAmount sets the amount of handle processes, which receive deliveries from one channel. For n > 1 client does not guarantee the order of events.
func PublisherLogger ¶
func PublisherLogger(lg logger.Logger) PublisherOption
WarnLogger option sets logger, which logs warning messages.
func PublisherWaitConnection ¶
func PublisherWaitConnection(should bool, timeout time.Duration) PublisherOption
WaitConnection tells client to wait connection before Subscription or Pub executing.
func PublisherWithObserverOptions ¶
func PublisherWithObserverOptions(opts ...ObserverOption) PublisherOption
type PublishingBefore ¶
type PublishingBefore func(context.Context, *amqp.Publishing)
Function, that changes message before publishing.
type Queue ¶
type Queues ¶ added in v1.1.0
type Queues []Queue
func PersistentQueues ¶ added in v1.1.0
PersistentQueues allow you to declare a bunch of queues with given names.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
func (Subscriber) SubscribeToExchange ¶
func (Subscriber) SubscribeToQueue ¶
type SubscriberOption ¶
type SubscriberOption func(*Subscriber)
func SubHandlersAmount ¶
func SubHandlersAmount(n int) SubscriberOption
HandlersAmount sets the amount of handle processes, which receive deliveries from one channel. For n > 1 client does not guarantee the order of events.
func SubProcessAllDeliveries ¶
func SubProcessAllDeliveries(v bool) SubscriberOption
Add this option with true value that allows you to handle all deliveries from current channel, even if the Done was sent.
func SubSetDefaultContentType ¶
func SubSetDefaultContentType(t string) SubscriberOption
SetDefaultContentType sets content type which codec should be used if ContentType field of message is empty.
func SubWithObserverOptions ¶
func SubWithObserverOptions(opts ...ObserverOption) SubscriberOption
func SubscriberAllowedPriority ¶
func SubscriberAllowedPriority(from, to uint8) SubscriberOption
AllowedPriority rejects messages, which not in range.
func SubscriberBufferSize ¶
func SubscriberBufferSize(a int) SubscriberOption
EventChanBuffer sets the buffer of event channel for Subscription method.
func SubscriberDeliverBefore ¶
func SubscriberDeliverBefore(before ...DeliveryBefore) SubscriberOption
DeliverBefore adds functions, that should be called before sending Event to channel.
func SubscriberLogger ¶
func SubscriberLogger(lg logger.Logger) SubscriberOption
SubscriberLogger option sets logger, which logs error messages.
func SubscriberWaitConnection ¶
func SubscriberWaitConnection(should bool, timeout time.Duration) SubscriberOption
WaitConnection tells client to wait connection before Subscription or Pub executing.
type SyncedStringSlice ¶
type SyncedStringSlice struct {
// contains filtered or unexported fields
}
func (*SyncedStringSlice) Append ¶
func (s *SyncedStringSlice) Append(strs ...string)
func (*SyncedStringSlice) Drop ¶
func (s *SyncedStringSlice) Drop()
func (*SyncedStringSlice) Find ¶
func (s *SyncedStringSlice) Find(str string) int
func (*SyncedStringSlice) Get ¶
func (s *SyncedStringSlice) Get() []string
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package conn adds to https://github.com/streadway/amqp Connection ability to reconnect and some optional parameters.
|
Package conn adds to https://github.com/streadway/amqp Connection ability to reconnect and some optional parameters. |
Represents logger interface and common loggers.
|
Represents logger interface and common loggers. |