sagas

package
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 19, 2021 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package sagas implements the orchestration based saga pattern. See https://microservices.io/patterns/data/saga.html

Introduction

A saga is a sequence of local transactions. Each local transaction updates the database and publishes a message or event to trigger the next local transaction in the saga. If a local transaction fails because it violates a business rule then the saga executes a series of compensating transactions that undo the changes that were made by the preceding local transactions.

Usage

The saga is managed by sagas.Registry. Each saga step has an forward operation and a rollback counterpart. They must be registered beforehand by calling Registry.AddStep. A new endpoint will be returned to the caller. Use the returned endpoint to perform transactional operation.

store := sagas.NewInProcessStore()
registry := sagas.NewRegistry(store)
addOrder := registry.AddStep(&sagas.Step{
	Name: "Add Order",
	Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
		resp, err := orderEndpoint(ctx, request.(OrderRequest))
		if err != nil {
			return nil, err
		}
		return resp, nil
	},
	Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
		return orderCancelEndpoint(ctx, req)
	},
})
makePayment := registry.AddStep(&sagas.Step{
	Name: "Make Payment",
	Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
		resp, err := paymentEndpoint(ctx, request.(PaymentRequest))
		if err != nil {
			return nil, err
		}
		return resp, nil
	},
	Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
		return paymentCancelEndpoint(ctx)
	},
})

Initiate the transaction by calling registry.StartTX. Pass the context returned to the transaction branches. You can rollback or commit at your will. If the TX.Rollback is called, the previously registered rollback operations will be applied automatically, on condition that the forward operation is indeed executed within the transaction.

tx, ctx := registry.StartTX(context.Background())
resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
if err != nil {
	tx.Rollback(ctx)
}
resp, err = makePayment(ctx, PaymentRequest{})
if err != nil {
	tx.Rollback(ctx)
}
tx.Commit(ctx)

Integration

The package leader exports configuration in this format:

saga:
	sagaTimeoutSecond: 600
	recoverIntervalSecond: 60

To use package sagas with package core:

var c *core.C = core.Default()
c.Provide(sagas.Providers)
c.Invoke(func(registry *sagas.Registry) {
	tx, ctx := registry.StartTX(context.Background())
	resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
	if err != nil {
		tx.Rollback(ctx)
	}
	resp, err = makePayment(ctx, PaymentRequest{})
	if err != nil {
		tx.Rollback(ctx)
	}
	tx.Commit(ctx)
})
Example
package main

import (
	"context"
	"fmt"

	"github.com/DoNewsCode/core/dtx"
	"github.com/DoNewsCode/core/dtx/sagas"
)

var orderTable = make(map[string]interface{})
var paymentTable = make(map[string]interface{})

type OrderRequest struct {
	Sku string
}

type OrderResponse struct {
	OrderID string
	Sku     string
	Cost    float64
}

type PaymentRequest struct {
	OrderID string
	Sku     string
	Cost    float64
}

type PaymentResponse struct {
	Success bool
}

func orderEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	orderTable[correlationID] = request
	return OrderResponse{
		OrderID: "1",
		Sku:     "1",
		Cost:    10.0,
	}, nil
}

func orderCancelEndpoint(ctx context.Context, request interface{}) (err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	delete(orderTable, correlationID)
	return nil
}

func paymentEndpoint(ctx context.Context, request interface{}) (response interface{}, err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	paymentTable[correlationID] = request
	if request.(PaymentRequest).Cost < 20 {
		return PaymentResponse{
			Success: true,
		}, nil
	}
	return PaymentResponse{
		Success: false,
	}, nil
}

func paymentCancelEndpoint(ctx context.Context) (err error) {
	correlationID := ctx.Value(dtx.CorrelationID).(string)
	delete(paymentTable, correlationID)
	return nil
}

func main() {
	store := sagas.NewInProcessStore()
	registry := sagas.NewRegistry(store)
	addOrder := registry.AddStep(&sagas.Step{
		Name: "Add Order",
		Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			resp, err := orderEndpoint(ctx, request.(OrderRequest))
			if err != nil {
				return nil, err
			}
			// Convert the response to next request
			return resp, nil
		},
		Undo: func(ctx context.Context, req interface{}) (err error) {
			return orderCancelEndpoint(ctx, req)
		},
	})
	makePayment := registry.AddStep(&sagas.Step{
		Name: "Make Payment",
		Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			resp, err := paymentEndpoint(ctx, request.(PaymentRequest))
			if err != nil {
				return nil, err
			}
			return resp, nil
		},
		Undo: func(ctx context.Context, req interface{}) (err error) {
			return paymentCancelEndpoint(ctx)
		},
	})

	tx, ctx := registry.StartTX(context.Background())
	resp, err := addOrder(ctx, OrderRequest{Sku: "1"})
	if err != nil {
		tx.Rollback(ctx)
	}
	resp, err = makePayment(ctx, PaymentRequest{
		OrderID: resp.(OrderResponse).OrderID,
		Sku:     resp.(OrderResponse).Sku,
		Cost:    resp.(OrderResponse).Cost,
	})
	if err != nil {
		tx.Rollback(ctx)
	}
	tx.Commit(ctx)
	fmt.Println(resp.(PaymentResponse).Success)

}
Output:

true

Index

Examples

Constants

View Source
const TxContextKey contextKey = "coordinator"

TxContextKey is the context key for TX.

Variables

This section is empty.

Functions

func Providers

func Providers() di.Deps

Providers returns a set of dependency providers.

Depends On:
	contract.ConfigAccessor
	log.Logger
	Store   `optional:"true"`
	[]*Step `group:"saga"`
Provide:
	*Registry
	SagaEndpoints

Types

type InProcessStore

type InProcessStore struct {
	// contains filtered or unexported fields
}

InProcessStore creates an in process storage that implements Store.

func NewInProcessStore

func NewInProcessStore() *InProcessStore

NewInProcessStore creates a InProcessStore.

func (*InProcessStore) Ack

func (i *InProcessStore) Ack(ctx context.Context, logID string, err error) error

Ack marks the log entry as acknowledged, either with an error or not. It is safe to call ack to the same log entry more than once.

func (*InProcessStore) Log

func (i *InProcessStore) Log(ctx context.Context, log Log) error

Log appends a new unacknowledged log entry to the store.

func (*InProcessStore) UnacknowledgedSteps

func (i *InProcessStore) UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)

UnacknowledgedSteps searches the InProcessStore for unacknowledged steps under the given correlationID.

func (*InProcessStore) UncommittedSagas

func (i *InProcessStore) UncommittedSagas(ctx context.Context) ([]Log, error)

UncommittedSagas searches the store for all uncommitted sagas, and return log entries under the matching sagas.

type Log

type Log struct {
	ID            string
	CorrelationID string
	StartedAt     time.Time
	FinishedAt    time.Time
	LogType       LogType
	StepParam     interface{}
	StepName      string
	StepError     error
}

Log is the structural Log type of the distributed saga.

type LogType

type LogType uint

LogType is a type enum that describes the types of Log.

const (
	// Session type logs the occurrence of a new distributed transaction.
	Session LogType = iota
	// Do type logs an incremental action in the distributed saga step.
	Do
	// Undo type logs a compensation action in the distributed saga step.
	Undo
)

type Option

type Option func(registry *Registry)

Option is the functional option for NewRegistry.

func WithLogger

func WithLogger(logger log.Logger) Option

WithLogger is an option that adds a logger to the registry.

func WithTimeout

func WithTimeout(duration time.Duration) Option

WithTimeout is an option that configures when the unacknowledged steps should be marked as stale and become candidates for rollback.

type Registry

type Registry struct {
	Store Store
	// contains filtered or unexported fields
}

Registry holds all transaction sagas in this process. It should be populated during the initialization of the application.

func NewRegistry

func NewRegistry(store Store, opts ...Option) *Registry

NewRegistry creates a new Registry.

func (*Registry) AddStep

func (r *Registry) AddStep(step *Step) func(context.Context, interface{}) (interface{}, error)

AddStep registers the saga steps in the registry. The registration should be done during the bootstrapping of application. Then returned closure should be used in place of the original forward action. Logging has been taken care of in the returned closure. Once the transaction rollbacks, the compensating action will take place automatically.

	addOrder := registry.AddStep(&sagas.Step{
		Name: "Add Order",
		Do: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			resp, err := orderEndpoint(ctx, request.(OrderRequest))
			if err != nil {
				return nil, err
			}
			return resp, nil
		},
		Undo: func(ctx context.Context, req interface{}) (response interface{}, err error) {
			return orderCancelEndpoint(ctx, req)
		},
	})
 // just call the returned closure to move the transaction forward.
 addOrder(ctx, request)

func (*Registry) Recover

func (r *Registry) Recover(ctx context.Context)

Recover rollbacks all uncommitted sagas by retrieving them in the store.

func (*Registry) StartTX

func (r *Registry) StartTX(ctx context.Context) (*TX, context.Context)

StartTX starts a transaction using saga pattern.

type SagaEndpoints

type SagaEndpoints map[string]endpoint.Endpoint

SagaEndpoints is a collection of all registered endpoint in the saga registry

type Step

type Step struct {
	// Name is the name of the step. Useful in logs.
	Name string
	// Do is the forward action that should be take when proceeding the transaction.
	Do func(context.Context, interface{}) (interface{}, error)
	// Undo is the backward action that should be take when the transaction rolls
	// back
	Undo func(ctx context.Context, req interface{}) error
	// EncodeParam is a function that encodes the request parameter to bytes. Useful
	// when persisting the request parameter in logs.
	EncodeParam func(interface{}) ([]byte, error)
	// DecodeParam is a function that decodes the bytes to request parameter. Useful
	// when reconstructing the request parameter from logs.
	DecodeParam func([]byte) (interface{}, error)
}

Step is a step in the Saga. Steps should be registered during the bootstrap phase of app, by calling Registry.AddStep.

type Store

type Store interface {
	Log(ctx context.Context, log Log) error
	Ack(ctx context.Context, id string, err error) error
	UnacknowledgedSteps(ctx context.Context, correlationID string) ([]Log, error)
	UncommittedSagas(ctx context.Context) ([]Log, error)
}

Store is the interface to persist logs of transactions.

type TX

type TX struct {
	// contains filtered or unexported fields
}

TX is a distributed transaction coordinator. It should be initialized by directly assigning its public members.

func TxFromContext

func TxFromContext(ctx context.Context) *TX

TxFromContext returns the tx instance from context.

func (*TX) Commit

func (tx *TX) Commit(ctx context.Context) error

Commit commits the current transaction.

func (*TX) Rollback

func (tx *TX) Rollback(ctx context.Context) error

Rollback rollbacks the current transaction.

Directories

Path Synopsis
Package mysqlstore provides a mysql store implementation for sagas.
Package mysqlstore provides a mysql store implementation for sagas.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL