rabbitmq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 2, 2022 License: MIT Imports: 10 Imported by: 0

README

AMQP-Messaging

Highly efficient library to communicate with other microservices using RabbitMQ(AMQP Protocol) in Go.

Installing

Install via go get:

$ go get github.com/farhadiis/amqp-messaging

Creating messaging variable

import rabbitmq "github.com/farhadiis/amqp-messaging"

messaging := rabbitmq.NewMessaging("RABBITMQ-ADDRESS");
//now you can work with messaging APIs

Work Queues

When you want to push to a queue and go ahead, You're not waiting for any responses. you can see this example:

err := messaging.AddWorker("mySampleQueue", 
        func(message rabbitmq.Message) (interface{}, rabbitmq.Acknowledge) {
        log.Printf("Received a message: %s\n", message.Body)
        return nil, rabbitmq.None
    },
)
if err != nil {
	log.Fatal(err)
}

var data interface{} = "foo"
err = messaging.SendPush("mySampleQueue", &data)
if err != nil {
	log.Fatal(err)
}

default QOSPrefetch is set to 1 in options. If you set it more than 1 Rabbitmq will give more than 1 items at once.

Publish/Subscribe

err := messaging.Subscribe("mySampleTopic", func(message rabbitmq.Message) {
	log.Printf("Received a message: %s\n", message.Body)
})
if err != nil {
	log.Fatal(err)
}

var data interface{} = "foo"
err = messaging.Publish("mySampleTopic", &data)
if err != nil {
	log.Fatal(err)
}

RPC

type User struct {
	Id   string
	Name string
}
	
messaging.RegisterType(User{})
	
err := messaging.AddWorker("findUser",
    func(message rabbitmq.Message) (interface{}, rabbitmq.Acknowledge) {
        var id = message.Body.(string)
        var result interface{} = User{id, "Farhad"}
        return &result, rabbitmq.None
    },
)
if err != nil {
    log.Fatal(err)
}

var id interface{} = "12"
err, result := messaging.RpcCall("findUser", &id)
if err != nil {
	log.Fatal(err)
}
user := result.(User)
log.Printf("user: %v\n", user)

Finish all jobs and stop all rabbitmq workers

You can stop Gracefully shutdown with command below. It will cancel all Workers.

err := messaging.CancelWorkers()
if err != nil {
	log.Fatal(err)
}

Develop

We're open for pull requests. in order to run tests just run go test -v

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Acknowledge

type Acknowledge int

Acknowledge is an action that occurs after processed this message.

const (
	// Ack default ack this msg after you have successfully processed this message.
	Ack Acknowledge = iota
	// NackDiscard the message will be dropped or delivered to a server configured dead-letter queue.
	NackDiscard
	// NackRequeue deliver this message to a different consumer.
	NackRequeue
	// None for auto ack option.
	None
)

type ChannelManager

type ChannelManager interface {
	// contains filtered or unexported methods
}

type Logger

type Logger interface {
	Fatal(string, ...interface{})
	Error(string, ...interface{})
	Warn(string, ...interface{})
	Info(string, ...interface{})
	Debug(string, ...interface{})
	Trace(string, ...interface{})
}

type Message

type Message struct {
	Body interface{}
}

Message captures the fields for a message received from queue.

type Messaging

type Messaging interface {
	AddWorker(queue string, handler WorkerHandler) error
	AddWorkerWithOptions(queue string, handler WorkerHandler, options *WorkerOptions) error
	SendPush(queue string, data interface{}) error
	SendPushWithOptions(queue string, data interface{}, options *PushOptions) error
	RpcCall(queue string, data interface{}) (error, interface{})
	Publish(queue string, data interface{}) error
	PublishWithOptions(queue string, data interface{}, options *PublishOptions) error
	Subscribe(queue string, handler SubscribeHandler) error
	SubscribeWithOptions(queue string, handler SubscribeHandler, options *SubscribeOptions) error
	CancelWorkers() error

	RegisterType(value interface{})
}

func NewMessaging

func NewMessaging(url string) (Messaging, error)

func NewMessagingWithOptions

func NewMessagingWithOptions(url string, options *MessagingOptions) (Messaging, error)

type MessagingOptions

type MessagingOptions struct {
	Logger            Logger
	ReconnectInterval time.Duration
}

MessagingOptions defines options of messaging.

type PublishOptions

type PublishOptions struct {
	Durable bool
}

PublishOptions defines options of publish.

type PushOptions

type PushOptions struct {
	Persistent bool
}

PushOptions defines options of worker.

type SubscribeHandler

type SubscribeHandler func(message Message)

SubscribeHandler defines the subscribe function.

type SubscribeOptions

type SubscribeOptions struct {
	Durable     bool
	Concurrency int
}

SubscribeOptions defines options of subscribe.

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

type WorkerHandler

type WorkerHandler func(message Message) (interface{}, Acknowledge)

WorkerHandler defines the worker function with Data message param.

type WorkerOptions

type WorkerOptions struct {
	AutoAck     bool
	Durable     bool
	QOSPrefetch int
	QOSGlobal   bool
	Concurrency int
}

WorkerOptions defines options of worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL