flow

package
v3.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package flow is an interface used for saga pattern microservice workflow

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrStepNotExists returns when step not found
	ErrStepNotExists = errors.New("step not exists")
	// ErrMissingClient returns when client.Client is missing
	ErrMissingClient = errors.New("client not set")
)
View Source
var (
	// StatusString contains map status => string
	StatusString = map[Status]string{
		StatusPending: "StatusPending",
		StatusRunning: "StatusRunning",
		StatusFailure: "StatusFailure",
		StatusSuccess: "StatusSuccess",
		StatusAborted: "StatusAborted",
		StatusSuspend: "StatusSuspend",
	}
	// StringStatus contains map string => status
	StringStatus = map[string]Status{
		"StatusPending": StatusPending,
		"StatusRunning": StatusRunning,
		"StatusFailure": StatusFailure,
		"StatusSuccess": StatusSuccess,
		"StatusAborted": StatusAborted,
		"StatusSuspend": StatusSuspend,
	}
)

Functions

func NewContext

func NewContext(ctx context.Context, f Flow) context.Context

NewContext stores Flow to context

func RegisterStep

func RegisterStep(step Step)

RegisterStep register own step with workflow

Types

type ExecuteOption

type ExecuteOption func(*ExecuteOptions)

ExecuteOption func signature

func ExecuteAsync

func ExecuteAsync(b bool) ExecuteOption

ExecuteAsync says that caller does not wait for execution complete

func ExecuteClient

func ExecuteClient(c client.Client) ExecuteOption

ExecuteClient pass client.Client to ExecuteOption

func ExecuteContext

func ExecuteContext(ctx context.Context) ExecuteOption

ExecuteContext pass context.Context ot ExecuteOption

func ExecuteLogger

func ExecuteLogger(l logger.Logger) ExecuteOption

ExecuteLogger pass logger.Logger to ExecuteOption

func ExecuteMeter

func ExecuteMeter(m meter.Meter) ExecuteOption

ExecuteMeter pass meter.Meter to ExecuteOption

func ExecuteReverse

func ExecuteReverse(b bool) ExecuteOption

ExecuteReverse says that dag must be run in reverse order

func ExecuteTimeout

func ExecuteTimeout(td time.Duration) ExecuteOption

ExecuteTimeout pass timeout time.Duration for execution

func ExecuteTracer

func ExecuteTracer(t tracer.Tracer) ExecuteOption

ExecuteTracer pass tracer.Tracer to ExecuteOption

type ExecuteOptions

type ExecuteOptions struct {
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Context can be used to abort execution or pass additional opts
	Context context.Context
	// Start step
	Start string
	// Timeout for execution
	Timeout time.Duration
	// Reverse execution
	Reverse bool
	// Async enables async execution
	Async bool
}

ExecuteOptions holds execute options

func NewExecuteOptions

func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions

NewExecuteOptions create new ExecuteOptions struct

type Flow

type Flow interface {
	// Options returns options
	Options() Options
	// Init initialize
	Init(...Option) error
	// WorkflowCreate creates new workflow with specific id and steps
	WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error)
	// WorkflowSave saves workflow
	WorkflowSave(ctx context.Context, w Workflow) error
	// WorkflowLoad loads workflow with specific id
	WorkflowLoad(ctx context.Context, id string) (Workflow, error)
	// WorkflowList lists all workflows
	WorkflowList(ctx context.Context) ([]Workflow, error)
}

Flow the base interface to interact with workflows

func FromContext

func FromContext(ctx context.Context) (Flow, bool)

FromContext returns Flow from context

func NewFlow

func NewFlow(opts ...Option) Flow

NewFlow create new flow

type Message

type Message struct {
	Header metadata.Metadata
	Body   RawMessage
}

Message used to transfer data between steps

type Option

type Option func(*Options)

Option func

func Client

func Client(c client.Client) Option

Client to use for sync/async communication

func Context

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the flow or can be used for extra option values.

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger option

func Meter

func Meter(m meter.Meter) Option

Meter sets the meter option

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func Store

func Store(s store.Store) Option

Store used for intermediate results

func Tracer

func Tracer(t tracer.Tracer) Option

Tracer mechanism for distributed tracking

type Options

type Options struct {
	// Context holds the external options and can be used for flow shutdown
	Context context.Context
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Store used for intermediate results
	Store store.Store
}

Options server struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions returns new options struct with default or passed values

type RawMessage

type RawMessage []byte

RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.

func (*RawMessage) MarshalJSON

func (m *RawMessage) MarshalJSON() ([]byte, error)

MarshalJSON returns m as the JSON encoding of m.

func (*RawMessage) UnmarshalJSON

func (m *RawMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON sets *m to a copy of data.

type Status

type Status int

Status contains step current status

const (
	// StatusPending step waiting to start
	StatusPending Status = iota
	// StatusRunning step is running
	StatusRunning
	// StatusFailure step competed with error
	StatusFailure
	// StatusSuccess step completed without error
	StatusSuccess
	// StatusAborted step aborted while it running
	StatusAborted
	// StatusSuspend step suspended
	StatusSuspend
)

func (Status) String

func (status Status) String() string

type Step

type Step interface {
	// ID returns step id
	ID() string
	// Endpoint returns rpc endpoint service_name.service_method or broker topic
	Endpoint() string
	// Execute step run
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
	// Requires returns dependent steps
	Requires() []string
	// Options returns step options
	Options() StepOptions
	// Require add required steps
	Require(steps ...Step) error
	// String
	String() string
	// GetStatus returns step status
	GetStatus() Status
	// SetStatus sets the step status
	SetStatus(Status)
	// Request returns step request message
	Request() *Message
	// Response returns step response message
	Response() *Message
}

Step represents dedicated workflow step

func NewCallStep

func NewCallStep(service string, name string, method string, opts ...StepOption) Step

NewCallStep create new step with client.Call

func NewPublishStep

func NewPublishStep(topic string, opts ...StepOption) Step

NewPublishStep create new step with client.Publish

type StepOption

type StepOption func(*StepOptions)

StepOption func signature

func StepFallback

func StepFallback(step string) StepOption

StepFallback set the step to run on error

func StepID

func StepID(id string) StepOption

StepID sets the step id for dag

func StepRequires

func StepRequires(steps ...string) StepOption

StepRequires specifies required steps

type StepOptions

type StepOptions struct {
	Context  context.Context
	Fallback string
	ID       string
	Requires []string
}

StepOptions holds step options

func NewStepOptions

func NewStepOptions(opts ...StepOption) StepOptions

NewStepOptions create new StepOptions struct

type Workflow

type Workflow interface {
	// ID returns id of the workflow
	ID() string
	// Execute workflow with args, return execution id and error
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
	// RemoveSteps remove steps from workflow
	RemoveSteps(steps ...Step) error
	// AppendSteps append steps to workflow
	AppendSteps(steps ...Step) error
	// Status returns workflow status
	Status() Status
	// Steps returns steps slice where parallel steps returned on the same level
	Steps() ([][]Step, error)
	// Suspend suspends execution
	Suspend(ctx context.Context, id string) error
	// Resume resumes execution
	Resume(ctx context.Context, id string) error
	// Abort abort execution
	Abort(ctx context.Context, id string) error
}

Workflow contains all steps to execute

type WorkflowOption

type WorkflowOption func(*WorkflowOptions)

WorkflowOption func signature

func WorkflowID

func WorkflowID(id string) WorkflowOption

WorkflowID set workflow id

type WorkflowOptions

type WorkflowOptions struct {
	Context context.Context
	ID      string
}

WorkflowOptions holds workflow options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL