gbus

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 18, 2019 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	//MaxRetryCount defines the max times a retry can run.
	//Default is 3 but it is configurable
	MaxRetryCount uint = 3
	//BaseRetryDuration defines the basic milliseconds that the retry algorithm uses
	//for a random retry time. Default is 10 but it is configurable.
	BaseRetryDuration = 10 * time.Millisecond
	//RpcHeaderName used to define the header in grabbit for RPC
	RpcHeaderName = "x-grabbit-msg-rpc-id"
)

Functions

func GetFqn

func GetFqn(obj interface{}) string

GetFqn gets the "fully qualified name" of an interface. meaning the package path + typename

func GetTypeFQN

func GetTypeFQN(t reflect.Type) string

GetTypeFQN gets the "fully qualified name" of a type. meaning the package path + typename

Types

type AMQPOutbox

type AMQPOutbox struct {
	SvcName string
	// contains filtered or unexported fields
}

AMQPOutbox sends messages to the amqp transport

func (*AMQPOutbox) NotifyConfirm

func (out *AMQPOutbox) NotifyConfirm(ack, nack chan uint64)

NotifyConfirm send an amqp notification

func (*AMQPOutbox) Post

func (out *AMQPOutbox) Post(exchange, routingKey string, amqpMessage amqp.Publishing) (uint64, error)

Post implements Outbox.Send

func (*AMQPOutbox) Shutdown added in v1.1.0

func (out *AMQPOutbox) Shutdown()

Shutdown stops the outbox

type Builder

type Builder interface {
	PurgeOnStartUp() Builder
	WithDeadlettering(deadletterExchange string) Builder
	/*
		Txnl sets the bus to be transactional using a persisted saga store
		provider: mysql for mysql database
		connStr: connection string in the format of the passed in provider
	*/
	Txnl(provider, connStr string) Builder
	//WithSerializer provides the ability to plugin custom serializers
	WithSerializer(serializer Serializer) Builder
	/*
		 		WorkerNum sets the number of worker go routines consuming messages from the queue
				The default value if this option is not set is 1
	*/
	WorkerNum(workers uint, prefetchCount uint) Builder

	/*
	   WithConfirms enables publisher confirms
	*/
	WithConfirms() Builder

	//WithPolicies defines the default policies that are applied for evey outgoing amqp messge
	WithPolicies(policies ...MessagePolicy) Builder

	//ConfigureHealthCheck defines the default timeout in seconds for the db ping check
	ConfigureHealthCheck(timeoutInSeconds time.Duration) Builder

	//RetriesNum defines the number of retries upon error
	WithConfiguration(config BusConfiguration) Builder

	//Build the bus
	Build(svcName string) Bus

	//WithLogger set custom logger instance
	WithLogger(logger logrus.FieldLogger) Builder
}

Builder is the main interface that should be used to create an instance of a Bus

type Bus

Bus interface provides the majority of functionality to Send, Reply and Publish messages to the Bus

type BusConfiguration added in v1.0.2

type BusConfiguration struct {
	MaxRetryCount     uint
	BaseRetryDuration int
}

BusConfiguration provides configuration passed to the bus builder

type BusMessage

type BusMessage struct {
	ID                string
	CorrelationID     string
	SagaID            string
	SagaCorrelationID string
	Semantics         Semantics /*cmd or evt*/
	Payload           Message
	PayloadFQN        string
	RPCID             string
}

BusMessage the structure that gets sent to the underlying transport

func NewBusMessage

func NewBusMessage(payload Message) *BusMessage

NewBusMessage factory method for creating a BusMessage that wraps the given payload

func NewFromAMQPHeaders

func NewFromAMQPHeaders(headers amqp.Table) *BusMessage

NewFromAMQPHeaders creates a BusMessage from headers of an amqp message

func (*BusMessage) GetAMQPHeaders

func (bm *BusMessage) GetAMQPHeaders() (headers amqp.Table)

GetAMQPHeaders convert to AMQP headers Table everything but a payload

func (*BusMessage) GetTraceLog

func (bm *BusMessage) GetTraceLog() (fields []log.Field)

GetTraceLog returns an array of log entires containing all of the message properties

func (*BusMessage) SetFromAMQPHeaders

func (bm *BusMessage) SetFromAMQPHeaders(headers amqp.Table)

SetFromAMQPHeaders convert from AMQP headers Table everything but a payload

func (*BusMessage) SetPayload

func (bm *BusMessage) SetPayload(payload Message)

SetPayload sets the payload and makes sure that Name is saved

type BusSwitch

type BusSwitch interface {
	/*
		Start starts the bus, once the bus is started messages get consiumed from the queue
		and handlers get invoced.
		Register all handlers prior to calling GBus.Start()
	*/
	Start() error
	/*
		Shutdown the bus and close connection to the underlying broker
	*/
	Shutdown() error
}

BusSwitch starts and shutdowns the bus

type DeadLetterMessageHandler added in v1.1.0

type DeadLetterMessageHandler func(tx *sql.Tx, poison amqp.Delivery) error

DeadLetterMessageHandler signature for dead letter handler

func (DeadLetterMessageHandler) Name added in v1.1.0

func (dlmg DeadLetterMessageHandler) Name() string

Name is a helper function returning the runtime name of the function bound to an instance of the DeadLetterMessageHandler type

type Deadlettering added in v1.1.0

type Deadlettering interface {
	HandleDeadletter(handler DeadLetterMessageHandler)
	ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error
}

Deadlettering provides the ability to handle messages that were rejected as poision and arrive to the deadletter queue

type DefaultBus

type DefaultBus struct {
	*Safety
	*Glogged
	Outbox        TxOutbox
	PrefetchCount uint
	AmqpConnStr   string

	SvcName string

	Registrations []*Registration

	RPCHandlers map[string]MessageHandler

	HandlersLock         *sync.Mutex
	RPCLock              *sync.Mutex
	SenderLock           *sync.Mutex
	ConsumerLock         *sync.Mutex
	RegisteredSchemas    map[string]bool
	DelayedSubscriptions [][]string
	PurgeOnStartup       bool

	Glue       SagaGlue
	TxProvider TxProvider

	WorkerNum       uint
	Serializer      Serializer
	DLX             string
	DefaultPolicies []MessagePolicy
	Confirm         bool

	DbPingTimeout time.Duration
	// contains filtered or unexported fields
}

DefaultBus implements the Bus interface

func (*DefaultBus) GetHealth

func (b *DefaultBus) GetHealth() HealthCard

GetHealth implements Health.GetHealth

func (*DefaultBus) HandleDeadletter

func (b *DefaultBus) HandleDeadletter(handler DeadLetterMessageHandler)

HandleDeadletter implements GBus.HandleDeadletter

func (*DefaultBus) HandleEvent

func (b *DefaultBus) HandleEvent(exchange, topic string, event Message, handler MessageHandler) error

HandleEvent implements GBus.HandleEvent

func (*DefaultBus) HandleMessage

func (b *DefaultBus) HandleMessage(message Message, handler MessageHandler) error

HandleMessage implements GBus.HandleMessage

func (*DefaultBus) Log added in v1.0.3

func (b *DefaultBus) Log() logrus.FieldLogger

Log returns the default logrus.FieldLogger for the bus via the Glogged helper

func (*DefaultBus) NotifyHealth

func (b *DefaultBus) NotifyHealth(health chan error)

NotifyHealth implements Health.NotifyHealth

func (*DefaultBus) Publish

func (b *DefaultBus) Publish(ctx context.Context, exchange, topic string, message *BusMessage, policies ...MessagePolicy) error

Publish implements GBus.Publish(topic, message)

func (*DefaultBus) RPC

func (b *DefaultBus) RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)

RPC implements GBus.RPC

func (*DefaultBus) RegisterSaga

func (b *DefaultBus) RegisterSaga(saga Saga, conf ...SagaConfFn) error

RegisterSaga impements GBus.RegisterSaga

func (*DefaultBus) ReturnDeadToQueue added in v1.1.0

func (b *DefaultBus) ReturnDeadToQueue(ctx context.Context, publishing *amqp.Publishing) error

ReturnDeadToQueue returns a message to its original destination

func (*DefaultBus) Send

func (b *DefaultBus) Send(ctx context.Context, toService string, message *BusMessage, policies ...MessagePolicy) error

Send implements GBus.Send(destination string, message interface{})

func (*DefaultBus) Shutdown

func (b *DefaultBus) Shutdown() (shutdwonErr error)

Shutdown implements GBus.Start()

func (*DefaultBus) Start

func (b *DefaultBus) Start() error

Start implements GBus.Start()

type DeliveryInfo added in v1.1.0

type DeliveryInfo struct {
	Attempt       uint
	MaxRetryCount uint
}

DeliveryInfo provdes information as to the attempted deilvery of the invocation

type Glogged added in v1.0.3

type Glogged struct {
	// contains filtered or unexported fields
}

Glogged provides an easy way for structs with in the grabbit package to participate in the general logging schema of the bus

func (*Glogged) Log added in v1.0.3

func (gl *Glogged) Log() logrus.FieldLogger

Log returns the set default log or a new instance of a logrus.FieldLogger

func (*Glogged) SetLogger added in v1.0.3

func (gl *Glogged) SetLogger(entry logrus.FieldLogger)

SetLogger sets the default logrus.FieldLogger that should be used when logging a new message

type HandlerRegister

type HandlerRegister interface {
	/*
		HandleMessage registers a handler to a specific message type
		Use this method to register handlers for commands and reply messages
		Use the HandleEvent method to subscribe on events and register a handler
	*/
	HandleMessage(message Message, handler MessageHandler) error
	/*
		HandleEvent registers a handler for a specific message type published
		to an exchange with a specific topic
	*/
	HandleEvent(exchange, topic string, event Message, handler MessageHandler) error
}

HandlerRegister registers message handlers to specific messages and events

type Health

type Health interface {
	NotifyHealth(health chan error)
	GetHealth() HealthCard
}

Health reports om health issues in which the bus needs to be restarted

type HealthCard

type HealthCard struct {
	DbConnected        bool
	RabbitConnected    bool
	RabbitBackPressure bool
}

HealthCard that holds the health values of the bus

type Invocation

type Invocation interface {
	Logged
	Reply(ctx context.Context, message *BusMessage) error
	Bus() Messaging
	Tx() *sql.Tx
	Ctx() context.Context
	Routing() (exchange, routingKey string)
	DeliveryInfo() DeliveryInfo
}

Invocation context for a specific processed message

type Logged added in v1.0.3

type Logged interface {
	SetLogger(entry logrus.FieldLogger)
	Log() logrus.FieldLogger
}

Logged represents a grabbit component that can be logged

type Message

type Message interface {
	SchemaName() string
}

Message a common interface that passes to the serializers to allow decoding and encoding of content

type MessageFilter

type MessageFilter struct {
	Exchange   string
	RoutingKey string
	MsgName    string
}

MessageFilter matches rabbitmq topic patterns

func NewMessageFilter

func NewMessageFilter(exchange, routingKey string, message Message) *MessageFilter

NewMessageFilter creates a new MessageFilter

func (*MessageFilter) Matches

func (filter *MessageFilter) Matches(exchange, routingKey, msgName string) bool

Matches the passed in exchange, routingKey, msgName with the defined filter

type MessageHandler

type MessageHandler func(invocation Invocation, message *BusMessage) error

MessageHandler signature for all command handlers

func (MessageHandler) Name added in v1.1.0

func (mg MessageHandler) Name() string

Name is a helper function returning the runtime name of the function bound to an instance of the MessageHandler type

type MessagePolicy

type MessagePolicy interface {
	Apply(publishing *amqp.Publishing)
}

MessagePolicy defines a user policy for out going amqp messages User policies can control message ttl, durability etc..

type Messaging

type Messaging interface {
	/*
		Send a command or a command response to a specific service
		one-to-one semantics
	*/
	Send(ctx context.Context, toService string, command *BusMessage, policies ...MessagePolicy) error

	/*
		Publish and event, one-to-many semantics
	*/
	Publish(ctx context.Context, exchange, topic string, event *BusMessage, policies ...MessagePolicy) error

	/*
		RPC calls the service passing him the request BusMessage and blocks until a reply is
		received or timeout experied.

	*/
	RPC(ctx context.Context, service string, request, reply *BusMessage, timeout time.Duration) (*BusMessage, error)
}

Messaging interface to send and publish messages to the bus

type Registration

type Registration struct {
	Handler MessageHandler
	// contains filtered or unexported fields
}

Registration represents a message handler's registration for a given exchange, topic and msg combination

func NewRegistration

func NewRegistration(exchange, routingKey string, message Message, handler MessageHandler) *Registration

NewRegistration creates a new registration

func (*Registration) Matches

func (sub *Registration) Matches(exchange, routingKey, msgName string) bool

Matches the registration with the given xchange, routingKey, msgName

type RequestSagaTimeout

type RequestSagaTimeout interface {
	TimeoutDuration() time.Duration
	Timeout(tx *sql.Tx, bus Messaging) error
}

RequestSagaTimeout is the interface a saga needs to implement to get timeout servicess

type Safety

type Safety struct{}

Safety provides utility methods to safly invoke methods

func (*Safety) SafeWithRetries

func (s *Safety) SafeWithRetries(funk func() error, retries uint) error

SafeWithRetries safely invoke the function with the number of retries

type Saga

type Saga interface {
	//StartedBy returns the messages that when received should create a new saga instance
	StartedBy() []Message
	/*
		RegisterAllHandlers passes in the HandlerRegister so that the saga can register
		the messages that it handles
	*/
	RegisterAllHandlers(register HandlerRegister)

	//IsComplete retruns if the saga is complete and can be discarded
	IsComplete() bool

	//New is a factory method used by the bus to crerate new instances of a saga
	New() Saga
}

Saga is the base interface for all Sagas.

type SagaConfFn

type SagaConfFn func(Saga) Saga

SagaConfFn is a function to allow configuration of a saga in the context of the gbus

type SagaGlue added in v1.1.0

type SagaGlue interface {
	SagaRegister
	Logged
	Start() error
	Stop() error
}

SagaGlue glues together all the parts needed in order to orchistrate saga instances

type SagaRegister

type SagaRegister interface {
	RegisterSaga(saga Saga, conf ...SagaConfFn) error
}

SagaRegister registers sagas to the bus

type SagaTimeoutMessage

type SagaTimeoutMessage struct {
	SagaID string
}

SagaTimeoutMessage is the timeout message for Saga's

func (SagaTimeoutMessage) SchemaName

func (SagaTimeoutMessage) SchemaName() string

SchemaName implements gbus.Message

type Semantics

type Semantics string

Semantics reopresents the semantics of a grabbit message

const (
	//CMD represenst a messge with command semantics in grabbit
	CMD Semantics = "cmd"
	//EVT represenst a messge with event semantics in grabbit
	EVT Semantics = "evt"
)

type Serializer

type Serializer interface {
	Name() string
	Encode(message Message) ([]byte, error)
	Decode(buffer []byte, schemaName string) (Message, error)
	Register(obj Message)
}

Serializer is the base interface for all message serializers

type TimeoutManager added in v1.1.0

type TimeoutManager interface {
	//RegisterTimeout requests the TimeoutManager to register a timeout for a specific saga instance
	RegisterTimeout(tx *sql.Tx, sagaID string, duration time.Duration) error
	//ClearTimeout clears a timeout for a specific saga
	ClearTimeout(tx *sql.Tx, sagaID string) error
	//SetTimeoutFunction accepts the function that the TimeoutManager should invoke once a timeout expires
	SetTimeoutFunction(func(tx *sql.Tx, sagaID string) error)
	//Start starts the timeout manager
	Start() error
	//Stop shuts the timeout manager down
	Stop() error
}

TimeoutManager abstracts the implementation of determining when a saga should be timed out

type TxOutbox

type TxOutbox interface {
	Save(tx *sql.Tx, exchange, routingKey string, amqpMessage amqp.Publishing) error
	Start(amqpOut *AMQPOutbox) error
	Stop() error
}

TxOutbox abstracts the transactional outgoing channel type

type TxProvider

type TxProvider interface {
	New() (*sql.Tx, error)
	Dispose()
	Ping(timeoutInSeconds time.Duration) bool
}

TxProvider provides a new Tx from the configured driver to the bus

Directories

Path Synopsis
tx

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL