gormq

package module
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2024 License: GPL-3.0 Imports: 10 Imported by: 0

README

Go RMQ

Simple rabbitmq pub/sub client based on https://github.com/wagslane/go-rabbitmq.

Flow

flowchart TB
    Assumptions@{ shape: brace-r, label: "
    -- Connection and channel management is not included in this diagram
    -- On disconnect/channel close event, a re-connect process must be executed
    -- On re-connect consumers must be re-run
    -- Prefetch count should be configurable through env or app config
    -- DLQ creation, publishing to DLQ must be done using the same flow" }

    RMQServer["RabbitMQ Server"] <--Pub/Sub--> App
    App ---> StartRetryRoutine["Start thread/routine
    to retry failed to
    publish messages"]

    

    subgraph RetryBlock [Retry Failed Messages]

        StartRetryRoutine ---> IntervalCheck{"Has interval
        elasped?"}

        IntervalCheck --Yes--> LockQ1ToRetry["Lock Q1"] 
        LockQ1ToRetry ---> Retry["Retry messages 
        from failed to 
        publish queue"]
        Retry ---> PushReFailed["Push failed to 
        publish messages 
        in new queue (Q2)"]
        PushReFailed ---> ReplaceFailedQueue["Replace Q1 with Q2"]
        ReplaceFailedQueue ---> UnlockQ1AfterRetry["Unlock Q1"]
        UnlockQ1AfterRetry ---> DoneRetry([Done])


        IntervalCheck --"No"--> NoOp["No action"]
        NoOp ---> DoneNoOp([Done])
    end

    App ---> Publish

    subgraph Pub [Publish]

        Publish ---> TryPublish["Try to publish 
        immediately"]
        TryPublish ---> PubFailedCheck{"Failed to 
        publish?"}
        PubFailedCheck --Yes--> LockQ1ToPush["Lock Q1"]
        LockQ1ToPush ---> AddToQueue["Add failed message
        to queue (Q1)"]
        AddToQueue ---> UnlockQ1AfterPush["Unlock Q1"]
        UnlockQ1AfterPush ---> DoneTryPub([Done])
        PubFailedCheck --"No"--> DonePub([Done])
    end

    App ---> AddConsumer[Add consumer]

    subgraph Consume [Consume]
        AddConsumer ---> DlqCheck{"Create DLQ?"}

        DlqCheck --Yes--> CreateDlq["Create DLQ 
        with name in this format
        ***service-name:route:dlq***"]
        CreateDlq ---> BindDlq["Bind DLQ to a 
        DLQ route with name in 
        this format
        ***service-name:dlq-route***"]
        DlqCheck --"No"--> SetupPrefetchCnt["Setup a non-zero
        prefetch count per consumer"]
        BindDlq ---> SetupPrefetchCnt

        SetupPrefetchCnt ---> SetupConsumerQueue["Create persistent
        consumer queue with
        name in this format
        ***service-name:route:queue***"]
        SetupConsumerQueue ---> StartConsumer["Start consumer"]
        StartConsumer ---> NewMsg["New message"]

        NewMsg ---> ProcessMsg["Process message"]
        ProcessMsg ---> ProcessFailedCheck{"Processing failed?"}
        ProcessFailedCheck --Yes--> HasDlqCheck{"Has associated
        DLQ?"}
        ProcessFailedCheck --"No"--> Ack["Acknowledge
        delivery"]
        Ack ---> DoneConsuming([Done])
        HasDlqCheck --Yes--> PublishToDlq["Publish message to 
        associated DLQ route"]
        PublishToDlq ---> Ack
        HasDlqCheck --"No"--> Ack
        DoneConsuming ---> NewMsg
    end

Rationale

This is made for a specific project need. As we are using rabbitmq server of version lower than 4 and also we don't have much option to customize it, we had to implement something that could provide the pub/sub interface our application needs. So it is not intended for any generic usecase.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Init

func Init(opt ConnectionOptions)

Init initializes rabbitmq client. On success, it sets the DefaultClient to established client. On failure, it exits the application instance.

Types

type Client

type Client interface {
	// Start runs retry routine.
	// This does not block execution.
	// This should be called only once.
	Start()

	// Stop stops retry routine and consumers.
	// This also cleans up publishers and finally stops
	// the connection.
	// This should be called only once. After calling Stop
	// a client should not be used anymore, create a new client if
	// you have to.
	Stop()

	// Conn returns underlying rabbitmq.Conn
	Conn() *rabbitmq.Conn

	// AddConsumer adds and starts consuming
	AddConsumer(ConsumerOption) error

	// Publish publishes message, on failure enqueues for retry
	Publish(Message)
}

Client interface for a rabbitmq pub/sub client

var DefaultClient Client

func GetClient

func GetClient() Client

GetClient returns DefaultClient. This should only be used after a successfull Init call.

type ConnectionOptions

type ConnectionOptions struct {
	URL                        string
	ReconnectInterval          time.Duration
	FailedMessageRetryInterval time.Duration
}

type Consumer

type Consumer func([]byte) error

Consumer defines consumer function signature

type ConsumerOption

type ConsumerOption struct {
	Exchange   string
	RoutingKey string
	Queue      string
	Consumer   Consumer

	PrefetchCount int

	Dlq           bool
	DlqRoutingKey string
	DlqName       string
}

ConsumerOption options to run a new consumer

type Logger

type Logger struct{}

func (*Logger) Debugf

func (l *Logger) Debugf(msg string, args ...interface{})

Debugf implements rabbitmq.Logger.

func (*Logger) Errorf

func (l *Logger) Errorf(msg string, args ...interface{})

Errorf implements rabbitmq.Logger.

func (*Logger) Fatalf

func (l *Logger) Fatalf(msg string, args ...interface{})

Fatalf implements rabbitmq.Logger.

func (*Logger) Infof

func (l *Logger) Infof(msg string, args ...interface{})

Infof implements rabbitmq.Logger.

func (*Logger) Warnf

func (l *Logger) Warnf(msg string, args ...interface{})

Warnf implements rabbitmq.Logger.

type Message

type Message struct {
	Exchange   string
	RoutingKey string
	Message    []byte
}

Message structure to publish a new message

func NewMessage

func NewMessage(exchange, routingKey string, data interface{}) Message

NewMessage creates a new Message instance. This is an utility function to create a new message instance.

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL