Documentation
¶
Overview ¶
Example (Direct_exchange) ¶
Direct Exchange
m := mockMessage{ Message: "foo", } w1 := NewWorker( WithQueue("direct_queue"), WithExchangeName("direct_exchange"), WithExchangeType("direct"), WithRoutingKey("direct_exchange"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { fmt.Println("worker01 get data:", string(m.Payload())) time.Sleep(100 * time.Millisecond) return nil }), ) q1, err := queue.NewQueue( queue.WithWorker(w1), ) if err != nil { w1.opts.logger.Fatal(err) } q1.Start() w2 := NewWorker( WithQueue("direct_queue"), WithExchangeName("direct_exchange"), WithExchangeType("direct"), WithRoutingKey("direct_exchange"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { fmt.Println("worker02 get data:", string(m.Payload())) time.Sleep(100 * time.Millisecond) return nil }), ) q2, err := queue.NewQueue( queue.WithWorker(w2), ) if err != nil { w2.opts.logger.Fatal(err) } q2.Start() w := NewWorker( WithExchangeName("direct_exchange"), WithExchangeType("direct"), WithRoutingKey("direct_exchange"), ) q, err := queue.NewQueue( queue.WithWorker(w), ) if err != nil { w.opts.logger.Fatal(err) } time.Sleep(200 * time.Millisecond) if err := q.Queue(m); err != nil { w.opts.logger.Fatal(err) } if err := q.Queue(m); err != nil { w.opts.logger.Fatal(err) } if err := q.Queue(m); err != nil { w.opts.logger.Fatal(err) } if err := q.Queue(m); err != nil { w.opts.logger.Fatal(err) } time.Sleep(200 * time.Millisecond) q.Release() q1.Release() q2.Release()
Output: worker01 get data: foo worker02 get data: foo worker01 get data: foo worker02 get data: foo
Example (Fanout_exchange) ¶
Fanout Exchange
m := mockMessage{ Message: "foo", } w1 := NewWorker( WithQueue("fanout_queue_1"), WithExchangeName("fanout_exchange"), WithExchangeType("fanout"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { fmt.Println("worker01 get data:", string(m.Payload())) return nil }), ) q1, err := queue.NewQueue( queue.WithWorker(w1), ) if err != nil { w1.opts.logger.Fatal(err) } q1.Start() w2 := NewWorker( WithQueue("fanout_queue_2"), WithExchangeName("fanout_exchange"), WithExchangeType("fanout"), WithRunFunc(func(ctx context.Context, m core.TaskMessage) error { fmt.Println("worker02 get data:", string(m.Payload())) return nil }), ) q2, err := queue.NewQueue( queue.WithWorker(w2), ) if err != nil { w2.opts.logger.Fatal(err) } q2.Start() w := NewWorker( WithExchangeName("fanout_exchange"), WithExchangeType("fanout"), ) q, err := queue.NewQueue( queue.WithWorker(w), ) if err != nil { w.opts.logger.Fatal(err) } time.Sleep(200 * time.Millisecond) if err := q.Queue(m); err != nil { w.opts.logger.Fatal(err) } time.Sleep(200 * time.Millisecond) q.Release() q1.Release() q2.Release()
Output: worker01 get data: foo worker02 get data: foo
Index ¶
- Constants
- type Option
- func WithAddr(addr string) Option
- func WithAutoAck(val bool) Option
- func WithExchangeName(val string) Option
- func WithExchangeType(val string) Option
- func WithLogger(l queue.Logger) Option
- func WithQueue(val string) Option
- func WithRoutingKey(val string) Option
- func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option
- func WithTag(val string) Option
- type Worker
Examples ¶
Constants ¶
View Source
const ( ExchangeDirect = "direct" ExchangeFanout = "fanout" ExchangeTopic = "topic" ExchangeHeaders = "headers" )
defined in rabbitmq client package.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*options)
Option for queue system
func WithExchangeName ¶
WithExchangeName setup the Exchange name Exchanges are AMQP 0-9-1 entities where messages are sent to. Exchanges take a message and route it into zero or more queues.
func WithExchangeType ¶
WithExchangeType setup the Exchange type The routing algorithm used depends on the exchange type and rules called bindings. AMQP 0-9-1 brokers provide four exchange types: Direct exchange (Empty string) and amq.direct Fanout exchange amq.fanout Topic exchange amq.topic Headers exchange amq.match (and amq.headers in RabbitMQ)
func WithRunFunc ¶
WithRunFunc setup the run func of queue
Click to show internal directories.
Click to hide internal directories.