saga

package module
v0.1.51 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 2, 2025 License: MIT Imports: 11 Imported by: 0

README

saga

Go Report Card Go Reference Sourcegraph Release GitHub Workflow Status GitHub GitHub commit activity GitHub last commit

saga is a Go library designed to streamline communication between microservices using RabbitMQ. It enables easy implementation of event-driven architectures and saga patterns, while ensuring reliable message delivery.

Features

Core Communication:

  • Publish/Subscribe Messaging: Exchange messages between microservices using a publish-subscribe pattern.
  • Headers-Based Routing: Leverage the power of RabbitMQ's headers exchange for flexible and dynamic routing of messages based on custom headers.
  • Durable Exchanges and Queues: Ensure message persistence and reliability with durable RabbitMQ components.

Saga Management:

legendaryum
  • Saga Orchestration: Coordinate complex, multi-step transactions across multiple microservices with saga orchestration.
  • Saga Step Handlers: Implement step-by-step saga logic in your microservices using callbacks.
  • Compensation Logic: Define compensating actions for saga steps to handle failures gracefully and maintain data consistency.

Contributors

Thanks to all contributors!

Author

Jorge Clavijo https://github.com/jym272

License

Distributed under the MIT License. See LICENSE for more information.

Documentation

Index

Constants

View Source
const (
	NACKING_DELAY_MS = 5000 // 5 seconds
	MAX_NACK_RETRIES = 3
	// MAX_OCCURRENCE
	/**
	 * Define the maximum occurrence in a fail saga step of the nack delay with fibonacci strategy
	 * | Occurrence | Delay in the next nack |
	 * |------------|------------------------|
	 * | 17         | 0.44 hours  |
	 * | 18         | 0.72 hours  |
	 * | 19         | 1.18 hours  |
	 * | 20         | 1.88 hours  |
	 * | 21         | 3.04 hours  |
	 * | 22         | 4.92 hours  |
	 * | 23         | 7.96 hours  |
	 * | 24         | 12.87 hours |
	 * | 25         | 20.84 hours |
	 */
	MAX_OCCURRENCE = 19
)

Variables

View Source
var RabbitUri string

RabbitUri is used for send channel connection.

Functions

func CommenceSaga

func CommenceSaga(payload CommencePayload) error

func ParsePayload added in v0.1.10

func ParsePayload[T any](handlerPayload map[string]interface{}, data *T) *T

func PublishEvent

func PublishEvent(payload event.PayloadEvent) error

Types

type CommandHandler

type CommandHandler struct {
	Channel *MicroserviceConsumeChannel `json:"channel"`
	Payload map[string]interface{}      `json:"payload"`
	SagaID  int                         `json:"sagaId"`
}

type CommencePayload

type CommencePayload interface {
	Type() SagaTitle
}

type CompletedCryptoRanking added in v0.1.46

type CompletedCryptoRanking struct {
	WalletAddress string                 `json:"walletAddress"`
	Winners       []CryptoRankingWinners `json:"winners"`
}

type ConsumeChannel

type ConsumeChannel struct {
	// contains filtered or unexported fields
}

func (*ConsumeChannel) NackWithDelay

func (c *ConsumeChannel) NackWithDelay(delay time.Duration, maxRetries int32) (int32, time.Duration, error)

math.MaxInt8.

func (*ConsumeChannel) NackWithFibonacciStrategy

func (c *ConsumeChannel) NackWithFibonacciStrategy(maxOccurrence, maxRetries int32) (int32, time.Duration, int32, error)

NackWithFibonacciStrategy is a function that handles the nack of a message with a delay that increases with the fibonacci sequence. The delay is calculated as the fibonacci sequence of the occurrence of the message. The occurrence is the number of times the message has been nacked. The function returns the number of retries, the delay and the occurrence of the message.

type CryptoRankingWinners added in v0.1.46

type CryptoRankingWinners struct {
	UserID string `json:"userId"`
	Reward string `json:"reward"`
}

type Emitter

type Emitter[T any, U comparable] struct {
	// contains filtered or unexported fields
}

func (*Emitter[T, U]) Emit

func (e *Emitter[T, U]) Emit(event U, data T)

func (*Emitter[T, U]) On

func (e *Emitter[T, U]) On(event U, handler func(T))

type EventHandler

type EventHandler struct {
	Channel *EventsConsumeChannel  `json:"channel"`
	Payload map[string]interface{} `json:"payload"`
}

func (*EventHandler) ParseEventPayload

func (e *EventHandler) ParseEventPayload(data any)

ParseEventPayload also works, but you need to pass a reference to the variable and is not type safe to assure that, as the type is: any Works: var eventPayload1 saga.SocialNewUserPayload // or a pointer *saga.SocialNewUserPayload ------------------------->key, pass the reference<-----------------// handler.ParsePayload(&eventPayload1)

It does not work: handler.ParsePayload(eventPayload1).

type EventsConsumeChannel

type EventsConsumeChannel struct {
	*ConsumeChannel
}

func (*EventsConsumeChannel) AckMessage

func (m *EventsConsumeChannel) AckMessage()

type Exchange

type Exchange string
const (
	RequeueExchange         Exchange = "requeue_exchange"
	CommandsExchange        Exchange = "commands_exchange"
	MatchingExchange        Exchange = "matching_exchange"
	MatchingRequeueExchange Exchange = "matching_requeue_exchange"
)

type MicroserviceConsumeChannel

type MicroserviceConsumeChannel struct {
	*ConsumeChannel
	// contains filtered or unexported fields
}

func (*MicroserviceConsumeChannel) AckMessage

func (m *MicroserviceConsumeChannel) AckMessage(payloadForNextStep NextStepPayload)

type NextStepPayload added in v0.1.11

type NextStepPayload = map[string]interface{}

type Occurrence

type Occurrence int

type Opts added in v0.1.12

type Opts struct {
	RabbitUri    string                       `validate:"required,url"`
	Microservice micro.AvailableMicroservices `validate:"required,microservice"`
	Events       []event.MicroserviceEvent    `validate:"-"`
}

type PurchaseResourceFlowPayload

type PurchaseResourceFlowPayload struct {
	UserId     string `json:"userId"`
	ResourceId string `json:"resourceId"`
	Price      int    `json:"price"`
	Quantity   int    `json:"quantity"`
}

PurchaseResourceFlowPayload is the payload for the purchase_resource_flow event.

func (PurchaseResourceFlowPayload) Type

type Queue

type Queue string
const (
	CommenceSagaQueue Queue = "commence_saga"
)
const (
	ReplyToSagaQ Queue = "reply_to_saga"
)

type QueueConsumerProps

type QueueConsumerProps struct {
	QueueName string
	Exchange  Exchange
}

type RankingsUsersRewardPayload added in v0.1.35

type RankingsUsersRewardPayload struct {
	Rewards []UserReward `json:"rewards"`
}

RankingsUsersRewardPayload is the payload for the rankings_users_reward event.

func (RankingsUsersRewardPayload) Type added in v0.1.35

type SagaStep

type SagaStep struct {
	Microservice    micro.AvailableMicroservices `json:"microservice"`
	Command         string                       `json:"command"`
	Status          Status                       `json:"status"`
	SagaID          int                          `json:"sagaId"`
	Payload         map[string]interface{}       `json:"payload"`
	PreviousPayload map[string]interface{}       `json:"previousPayload"`
	IsCurrentStep   bool                         `json:"isCurrentStep"`
}

type SagaTitle

type SagaTitle string
const (
	PurchaseResourceFlow                 SagaTitle = "purchase_resource_flow"
	RankingsUsersReward                  SagaTitle = "rankings_users_reward"
	TransferCryptoRewardToRankingWinners SagaTitle = "transfer_crypto_reward_to_ranking_winners"
)

type Status

type Status string
const (
	Success Status = "success"
	Failure Status = "failure"
	Sent    Status = "sent"
	Pending Status = "pending"
)

type StepHashId

type StepHashId string

type Transactional

type Transactional struct {
	Microservice micro.AvailableMicroservices
	Events       []event.MicroserviceEvent
	// contains filtered or unexported fields
}

func Config added in v0.1.12

func Config(opts *Opts) *Transactional

func (*Transactional) ConnectToEvents

func (t *Transactional) ConnectToEvents() *Emitter[EventHandler, event.MicroserviceEvent]

ConnectToEvents connects to the events exchange and returns an emitter.

func (*Transactional) ConnectToSagaCommandEmitter

func (t *Transactional) ConnectToSagaCommandEmitter() *Emitter[CommandHandler, micro.StepCommand]

ConnectToSagaCommandEmitter connects to the saga commands exchange and returns an emitter.

func (*Transactional) HealthCheck added in v0.1.30

func (t *Transactional) HealthCheck() error

HealthCheck checks if the rabbitmq connection is alive and the queue exists. The queue to check is the microservice related to the saga commands or events.

func (*Transactional) StopRabbitMQ added in v0.1.30

func (t *Transactional) StopRabbitMQ() error

StopRabbitMQ closes the rabbitmq connection and channels.

type TransferCryptoRewardToRankingWinnersPayload added in v0.1.46

type TransferCryptoRewardToRankingWinnersPayload struct {
	CompletedCryptoRankings []CompletedCryptoRanking `json:"completedCryptoRankings"`
}

TransferCryptoRewardToRankingWinnersPayload is the payload for the transfer_crypto_reward_to_ranking_winners event.

func (TransferCryptoRewardToRankingWinnersPayload) Type added in v0.1.46

type UserReward added in v0.1.35

type UserReward struct {
	UserId string `json:"userId"`
	Reward string `json:"coins"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL