rabbitmq

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: MIT Imports: 9 Imported by: 0

README

rabbitmq

RabbitMQ as backend for Queue Package. See the Go RabbitMQ Client Library maintained by the RabbitMQ core team. It was originally developed by Sean Treadway.

Exchanges and Exchange Types

See the Exchanges and Exchange Types section of AMQP 0-9-1 Model Explained.

Direct Exchange

direct-exchange

See the consumer code:

package main

import (
  "context"
  "encoding/json"
  "flag"
  "fmt"
  "log"
  "time"

  "github.com/golang-queue/queue"
  "github.com/golang-queue/queue/core"
  rabbitmq "github.com/golang-queue/rabbitmq"
)

type job struct {
  Message string
}

func (j *job) Bytes() []byte {
  b, err := json.Marshal(j)
  if err != nil {
    panic(err)
  }
  return b
}

var (
  uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
  exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
  exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
  q            = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
  bindingKey   = flag.String("key", "test-key", "AMQP binding key")
)

func init() {
  flag.Parse()
}

func main() {
  taskN := 100
  rets := make(chan string, taskN)

  // define the worker
  w := rabbitmq.NewWorker(
    rabbitmq.WithAddr(*uri),
    rabbitmq.WithQueue(*q),
    rabbitmq.WithExchangeName(*exchange),
    rabbitmq.WithRoutingKey(*bindingKey),
    rabbitmq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
      var v *job
      if err := json.Unmarshal(m.Payload(), &v); err != nil {
        return err
      }
      rets <- v.Message
      return nil
    }),
  )

  // define the queue
  q, err := queue.NewQueue(
    queue.WithWorkerCount(5),
    queue.WithWorker(w),
  )
  if err != nil {
    log.Fatal(err)
  }

  // start the five worker
  q.Start()

  // assign tasks in queue
  for i := 0; i < taskN; i++ {
    go func(i int) {
      if err := q.Queue(&job{
        Message: fmt.Sprintf("handle the job: %d", i+1),
      }); err != nil {
        log.Fatal(err)
      }
    }(i)
  }

  // wait until all tasks done
  for i := 0; i < taskN; i++ {
    fmt.Println("message:", <-rets)
    time.Sleep(50 * time.Millisecond)
  }

  // shutdown the service and notify all the worker
  q.Release()
}

Documentation

Overview

Example (Direct_exchange)

Direct Exchange

m := mockMessage{
	Message: "foo",
}
w1 := NewWorker(
	WithQueue("direct_queue"),
	WithExchangeName("direct_exchange"),
	WithExchangeType("direct"),
	WithRoutingKey("direct_exchange"),
	WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
		fmt.Println("worker01 get data:", string(m.Payload()))
		time.Sleep(100 * time.Millisecond)
		return nil
	}),
)

q1, err := queue.NewQueue(
	queue.WithWorker(w1),
)
if err != nil {
	w1.opts.logger.Fatal(err)
}
q1.Start()

w2 := NewWorker(
	WithQueue("direct_queue"),
	WithExchangeName("direct_exchange"),
	WithExchangeType("direct"),
	WithRoutingKey("direct_exchange"),
	WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
		fmt.Println("worker02 get data:", string(m.Payload()))
		time.Sleep(100 * time.Millisecond)
		return nil
	}),
)

q2, err := queue.NewQueue(
	queue.WithWorker(w2),
)
if err != nil {
	w2.opts.logger.Fatal(err)
}
q2.Start()

w := NewWorker(
	WithExchangeName("direct_exchange"),
	WithExchangeType("direct"),
	WithRoutingKey("direct_exchange"),
)

q, err := queue.NewQueue(
	queue.WithWorker(w),
)
if err != nil {
	w.opts.logger.Fatal(err)
}

time.Sleep(200 * time.Millisecond)
if err := q.Queue(m); err != nil {
	w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
	w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
	w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
	w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
q.Release()
q1.Release()
q2.Release()
Output:

worker01 get data: foo
worker02 get data: foo
worker01 get data: foo
worker02 get data: foo
Example (Fanout_exchange)

Fanout Exchange

m := mockMessage{
	Message: "foo",
}
w1 := NewWorker(
	WithQueue("fanout_queue_1"),
	WithExchangeName("fanout_exchange"),
	WithExchangeType("fanout"),
	WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
		fmt.Println("worker01 get data:", string(m.Payload()))
		return nil
	}),
)

q1, err := queue.NewQueue(
	queue.WithWorker(w1),
)
if err != nil {
	w1.opts.logger.Fatal(err)
}
q1.Start()

w2 := NewWorker(
	WithQueue("fanout_queue_2"),
	WithExchangeName("fanout_exchange"),
	WithExchangeType("fanout"),
	WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
		fmt.Println("worker02 get data:", string(m.Payload()))
		return nil
	}),
)

q2, err := queue.NewQueue(
	queue.WithWorker(w2),
)
if err != nil {
	w2.opts.logger.Fatal(err)
}
q2.Start()

w := NewWorker(
	WithExchangeName("fanout_exchange"),
	WithExchangeType("fanout"),
)

q, err := queue.NewQueue(
	queue.WithWorker(w),
)
if err != nil {
	w.opts.logger.Fatal(err)
}

time.Sleep(200 * time.Millisecond)
if err := q.Queue(m); err != nil {
	w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
q.Release()
q1.Release()
q2.Release()
Output:

worker01 get data: foo
worker02 get data: foo

Index

Examples

Constants

View Source
const (
	ExchangeDirect  = "direct"
	ExchangeFanout  = "fanout"
	ExchangeTopic   = "topic"
	ExchangeHeaders = "headers"
)

defined in rabbitmq client package.

Variables

This section is empty.

Functions

This section is empty.

Types

type Option

type Option func(*options)

Option for queue system

func WithAddr

func WithAddr(addr string) Option

WithAddr setup the URI

func WithAutoAck

func WithAutoAck(val bool) Option

WithAutoAck enable message auto-ack

func WithExchangeName

func WithExchangeName(val string) Option

WithExchangeName setup the Exchange name Exchanges are AMQP 0-9-1 entities where messages are sent to. Exchanges take a message and route it into zero or more queues.

func WithExchangeType

func WithExchangeType(val string) Option

WithExchangeType setup the Exchange type The routing algorithm used depends on the exchange type and rules called bindings. AMQP 0-9-1 brokers provide four exchange types: Direct exchange (Empty string) and amq.direct Fanout exchange amq.fanout Topic exchange amq.topic Headers exchange amq.match (and amq.headers in RabbitMQ)

func WithLogger

func WithLogger(l queue.Logger) Option

WithLogger set custom logger

func WithQueue

func WithQueue(val string) Option

WithQueue setup the queue name

func WithRoutingKey

func WithRoutingKey(val string) Option

WithRoutingKey setup AMQP routing key

func WithRunFunc

func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option

WithRunFunc setup the run func of queue

func WithTag

func WithTag(val string) Option

WithAddr setup the tag

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

Worker for NSQ

func NewWorker

func NewWorker(opts ...Option) *Worker

NewWorker for struc

func (*Worker) Queue

func (w *Worker) Queue(job core.TaskMessage) error

Queue send notification to queue

func (*Worker) Request

func (w *Worker) Request() (core.TaskMessage, error)

Request a new task

func (*Worker) Run

func (w *Worker) Run(ctx context.Context, task core.TaskMessage) error

Run start the worker

func (*Worker) Shutdown

func (w *Worker) Shutdown() (err error)

Shutdown worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL