Documentation ¶
Overview ¶
Package sagas implements the orchestration based saga pattern. See https://microservices.io/patterns/data/saga.html
Introduction ¶
A saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next local transaction in the saga. If a local transaction fails because it violates a business rule then the saga executes a series of compensating transactions that undo the changes that were made by the preceding local transactions.
Usage ¶
The saga is managed by sagas.Registry. Each saga step has an forward operation and a rollback counterpart. They must be registered beforehand by calling Registry.AddStep. A new endpoint will be returned to the caller. Use the returned endpoint to perform transactional operation.
store := sagas.NewInProcessStore() registry := sagas.NewRegistry(store) addOrder := registry.AddStep(&sagas.Step{ Name: "Add Order", Do: func(ctx context.Context, request interface{}) (response interface{}, err error) { resp, err := orderEndpoint(ctx, request.(OrderRequest)) if err != nil { return nil, err } return resp, nil }, Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) { return orderCancelEndpoint(ctx, req) }, }) makePayment := registry.AddStep(&sagas.Step{ Name: "Make Payment", Do: func(ctx context.Context, request interface{}) (response interface{}, err error) { resp, err := paymentEndpoint(ctx, request.(PaymentRequest)) if err != nil { return nil, err } return resp, nil }, Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) { return paymentCancelEndpoint(ctx) }, })
Initiate the transaction by calling registry.StartTX. Pass the context returned to the transaction branches. You can rollback or commit at your will. If the TX.Rollback is called, the previously registered rollback operations will be applied automatically, on condition that the forward operation is indeed executed within the transaction.
tx, ctx := registry.StartTX(context.Background()) resp, err := addOrder(ctx, OrderRequest{Sku: "1"}) if err != nil { tx.Rollback(ctx) } resp, err = makePayment(ctx, PaymentRequest{}) if err != nil { tx.Rollback(ctx) } tx.Commit(ctx)
Integration ¶
The package leader exports configuration in this format:
saga: sagaTimeoutSecond: 600 recoverIntervalSecond: 60
To use package sagas with package core:
var c *core.C = core.Default() c.Provide(sagas.Providers) c.Invoke(func(registry *sagas.Registry) { tx, ctx := registry.StartTX(context.Background()) resp, err := addOrder(ctx, OrderRequest{Sku: "1"}) if err != nil { tx.Rollback(ctx) } resp, err = makePayment(ctx, PaymentRequest{}) if err != nil { tx.Rollback(ctx) } tx.Commit(ctx) })
Example ¶
package main import ( "context" "fmt" "github.com/DoNewsCode/core/dtx" "github.com/DoNewsCode/core/dtx/sagas" ) var orderTable = make(map[string]interface{}) var paymentTable = make(map[string]interface{}) type OrderRequest struct { Sku string } type OrderResponse struct { OrderID string Sku string Cost float64 } type PaymentRequest struct { OrderID string Sku string Cost float64 } type PaymentResponse struct { Success bool } func orderEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) { correlationID := ctx.Value(dtx.CorrelationID).(string) orderTable[correlationID] = request return OrderResponse{ OrderID: "1", Sku: "1", Cost: 10.0, }, nil } func orderCancelEndpoint(ctx context.Context, request interface{}) (err error) { correlationID := ctx.Value(dtx.CorrelationID).(string) delete(orderTable, correlationID) return nil } func paymentEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) { correlationID := ctx.Value(dtx.CorrelationID).(string) paymentTable[correlationID] = request if request.(PaymentRequest).Cost < 20 { return PaymentResponse{ Success: true, }, nil } return PaymentResponse{ Success: false, }, nil } func paymentCancelEndpoint(ctx context.Context) (err error) { correlationID := ctx.Value(dtx.CorrelationID).(string) delete(paymentTable, correlationID) return nil } func main() { store := sagas.NewInProcessStore() registry := sagas.NewRegistry(store) addOrder := registry.AddStep(&sagas.Step{ Name: "Add Order", Do: func(ctx context.Context, request interface{}) (response interface{}, err error) { resp, err := orderEndpoint(ctx, request.(OrderRequest)) if err != nil { return nil, err } // Convert the response to next request return resp, nil }, Undo: func(ctx context.Context, req interface{}) (err error) { return orderCancelEndpoint(ctx, req) }, }) makePayment := registry.AddStep(&sagas.Step{ Name: "Make Payment", Do: func(ctx context.Context, request interface{}) (response interface{}, err error) { resp, err := paymentEndpoint(ctx, request.(PaymentRequest)) if err != nil { return nil, err } return resp, nil }, Undo: func(ctx context.Context, req interface{}) (err error) { return paymentCancelEndpoint(ctx) }, }) tx, ctx := registry.StartTX(context.Background()) resp, err := addOrder(ctx, OrderRequest{Sku: "1"}) if err != nil { tx.Rollback(ctx) } resp, err = makePayment(ctx, PaymentRequest{ OrderID: resp.(OrderResponse).OrderID, Sku: resp.(OrderResponse).Sku, Cost: resp.(OrderResponse).Cost, }) if err != nil { tx.Rollback(ctx) } tx.Commit(ctx) fmt.Println(resp.(PaymentResponse).Success) }
Output: true
Index ¶
- Constants
- func Providers() di.Deps
- type InProcessStore
- func (i *InProcessStore) Ack(ctx context.Context, logID string, err error) error
- func (i *InProcessStore) Log(ctx context.Context, log Log) error
- func (i *InProcessStore) UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)
- func (i *InProcessStore) UncommittedSagas(ctx context.Context) ([]Log, error)
- type Log
- type LogType
- type Option
- type Registry
- type SagaEndpoints
- type Step
- type Store
- type TX
Examples ¶
Constants ¶
const TxContextKey contextKey = "coordinator"
TxContextKey is the context key for TX.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type InProcessStore ¶
type InProcessStore struct {
// contains filtered or unexported fields
}
InProcessStore creates an in process storage that implements Store.
func NewInProcessStore ¶
func NewInProcessStore() *InProcessStore
NewInProcessStore creates a InProcessStore.
func (*InProcessStore) Ack ¶
Ack marks the log entry as acknowledged, either with an error or not. It is safe to call ack to the same log entry more than once.
func (*InProcessStore) Log ¶
func (i *InProcessStore) Log(ctx context.Context, log Log) error
Log appends a new unacknowledged log entry to the store.
func (*InProcessStore) UnacknowledgedSteps ¶
func (i *InProcessStore) UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)
UnacknowledgedSteps searches the InProcessStore for unacknowledged steps under the given correlationID.
func (*InProcessStore) UncommittedSagas ¶
func (i *InProcessStore) UncommittedSagas(ctx context.Context) ([]Log, error)
UncommittedSagas searches the store for all uncommitted sagas, and return log entries under the matching sagas.
type Log ¶
type Log struct { ID string CorrelationID string StartedAt time.Time FinishedAt time.Time LogType LogType StepParam interface{} StepName string StepError error }
Log is the structural Log type of the distributed saga.
type Option ¶
type Option func(registry *Registry)
Option is the functional option for NewRegistry.
func WithLogger ¶
WithLogger is an option that adds a logger to the registry.
func WithTimeout ¶
WithTimeout is an option that configures when the unacknowledged steps should be marked as stale and become candidates for rollback.
type Registry ¶
type Registry struct { Store Store // contains filtered or unexported fields }
Registry holds all transaction sagas in this process. It should be populated during the initialization of the application.
func NewRegistry ¶
NewRegistry creates a new Registry.
func (*Registry) AddStep ¶
AddStep registers the saga steps in the registry. The registration should be done during the bootstrapping of application. Then returned closure should be used in place of the original forward action. Logging has been taken care of in the returned closure. Once the transaction rollbacks, the compensating action will take place automatically.
addOrder := registry.AddStep(&sagas.Step{ Name: "Add Order", Do: func(ctx context.Context, request interface{}) (response interface{}, err error) { resp, err := orderEndpoint(ctx, request.(OrderRequest)) if err != nil { return nil, err } return resp, nil }, Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) { return orderCancelEndpoint(ctx, req) }, }) // just call the returned closure to move the transaction forward. addOrder(ctx, request)
type SagaEndpoints ¶
SagaEndpoints is a collection of all registered endpoint in the saga registry
type Step ¶
type Step struct { // Name is the name of the step. Useful in logs. Name string // Do is the forward action that should be take when proceeding the transaction. Do func(context.Context, interface{}) (interface{}, error) // Undo is the backward action that should be take when the transaction rolls // back Undo func(ctx context.Context, req interface{}) error // EncodeParam is a function that encodes the request parameter to bytes. Useful // when persisting the request parameter in logs. EncodeParam func(interface{}) ([]byte, error) // DecodeParam is a function that decodes the bytes to request parameter. Useful // when reconstructing the request parameter from logs. DecodeParam func([]byte) (interface{}, error) }
Step is a step in the Saga. Steps should be registered during the bootstrap phase of app, by calling Registry.AddStep.
type Store ¶
type Store interface { Log(ctx context.Context, log Log) error Ack(ctx context.Context, id string, err error) error UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error) UncommittedSagas(ctx context.Context) ([]Log, error) }
Store is the interface to persist logs of transactions.
type TX ¶
type TX struct {
// contains filtered or unexported fields
}
TX is a distributed transaction coordinator. It should be initialized by directly assigning its public members.
func TxFromContext ¶
TxFromContext returns the tx instance from context.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package mysqlstore provides a mysql store implementation for sagas.
|
Package mysqlstore provides a mysql store implementation for sagas. |