flow

package
v3.8.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 28, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Overview

Package flow is an interface used for saga pattern microservice workflow

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStepNotExists = errors.New("step not exists")
	ErrMissingClient = errors.New("client not set")
)
View Source
var (
	StatusString = map[Status]string{
		StatusPending: "StatusPending",
		StatusRunning: "StatusRunning",
		StatusFailure: "StatusFailure",
		StatusSuccess: "StatusSuccess",
		StatusAborted: "StatusAborted",
		StatusSuspend: "StatusSuspend",
	}
	StringStatus = map[string]Status{
		"StatusPending": StatusPending,
		"StatusRunning": StatusRunning,
		"StatusFailure": StatusFailure,
		"StatusSuccess": StatusSuccess,
		"StatusAborted": StatusAborted,
		"StatusSuspend": StatusSuspend,
	}
)

Functions

func NewContext

func NewContext(ctx context.Context, f Flow) context.Context

NewContext stores Flow to context

func RegisterStep

func RegisterStep(step Step)

Types

type ExecuteOption

type ExecuteOption func(*ExecuteOptions)

func ExecuteAsync

func ExecuteAsync(b bool) ExecuteOption

func ExecuteClient

func ExecuteClient(c client.Client) ExecuteOption

func ExecuteContext

func ExecuteContext(ctx context.Context) ExecuteOption

func ExecuteLogger

func ExecuteLogger(l logger.Logger) ExecuteOption

func ExecuteMeter

func ExecuteMeter(m meter.Meter) ExecuteOption

func ExecuteReverse

func ExecuteReverse(b bool) ExecuteOption

func ExecuteTimeout

func ExecuteTimeout(td time.Duration) ExecuteOption

func ExecuteTracer

func ExecuteTracer(t tracer.Tracer) ExecuteOption

type ExecuteOptions

type ExecuteOptions struct {
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Context can be used to abort execution or pass additional opts
	Context context.Context
	// Start step
	Start string
	// Timeout for execution
	Timeout time.Duration
	// Reverse execution
	Reverse bool
	// Async enables async execution
	Async bool
}

func NewExecuteOptions

func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions

type Flow

type Flow interface {
	// Options returns options
	Options() Options
	// Init initialize
	Init(...Option) error
	// WorkflowCreate creates new workflow with specific id and steps
	WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error)
	// WorkflowSave saves workflow
	WorkflowSave(ctx context.Context, w Workflow) error
	// WorkflowLoad loads workflow with specific id
	WorkflowLoad(ctx context.Context, id string) (Workflow, error)
	// WorkflowList lists all workflows
	WorkflowList(ctx context.Context) ([]Workflow, error)
}

Flow the base interface to interact with workflows

func FromContext

func FromContext(ctx context.Context) (Flow, bool)

FromContext returns Flow from context

func NewFlow

func NewFlow(opts ...Option) Flow

type Message

type Message struct {
	Header metadata.Metadata
	Body   RawMessage
}

type Option

type Option func(*Options)

Option func

func Client

func Client(c client.Client) Option

Client to use for sync/async communication

func Context

func Context(ctx context.Context) Option

Context specifies a context for the service. Can be used to signal shutdown of the flow Can be used for extra option values.

func Logger

func Logger(l logger.Logger) Option

Logger sets the logger option

func Meter

func Meter(m meter.Meter) Option

Meter sets the meter option

func SetOption

func SetOption(k, v interface{}) Option

SetOption returns a function to setup a context with given value

func Store

func Store(s store.Store) Option

Store used for intermediate results

func Tracer

func Tracer(t tracer.Tracer) Option

Tracer mechanism for distributed tracking

type Options

type Options struct {
	// Context holds the external options and can be used for flow shutdown
	Context context.Context
	// Client holds the client.Client
	Client client.Client
	// Tracer holds the tracer
	Tracer tracer.Tracer
	// Logger holds the logger
	Logger logger.Logger
	// Meter holds the meter
	Meter meter.Meter
	// Store used for intermediate results
	Store store.Store
}

Options server struct

func NewOptions

func NewOptions(opts ...Option) Options

NewOptions returns new options struct with default or passed values

type RawMessage

type RawMessage []byte

RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.

func (*RawMessage) MarshalJSON

func (m *RawMessage) MarshalJSON() ([]byte, error)

MarshalJSON returns m as the JSON encoding of m.

func (*RawMessage) UnmarshalJSON

func (m *RawMessage) UnmarshalJSON(data []byte) error

UnmarshalJSON sets *m to a copy of data.

type Status

type Status int
const (
	StatusPending Status = iota
	StatusRunning
	StatusFailure
	StatusSuccess
	StatusAborted
	StatusSuspend
)

func (Status) String

func (status Status) String() string

type Step

type Step interface {
	// ID returns step id
	ID() string
	// Endpoint returns rpc endpoint service_name.service_method or broker topic
	Endpoint() string
	// Execute step run
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error)
	// Requires returns dependent steps
	Requires() []string
	// Options returns step options
	Options() StepOptions
	// Require add required steps
	Require(steps ...Step) error
	// String
	String() string
	// GetStatus returns step status
	GetStatus() Status
	// SetStatus sets the step status
	SetStatus(Status)
	// Request returns step request message
	Request() *Message
	// Response returns step response message
	Response() *Message
}

Step represents dedicated workflow step

func NewCallStep

func NewCallStep(service string, name string, method string, opts ...StepOption) Step

func NewPublishStep

func NewPublishStep(topic string, opts ...StepOption) Step

type StepOption

type StepOption func(*StepOptions)

func StepFallback

func StepFallback(step string) StepOption

func StepID

func StepID(id string) StepOption

func StepRequires

func StepRequires(steps ...string) StepOption

type StepOptions

type StepOptions struct {
	Context  context.Context
	Fallback string
	ID       string
	Requires []string
}

func NewStepOptions

func NewStepOptions(opts ...StepOption) StepOptions

type Workflow

type Workflow interface {
	// ID returns id of the workflow
	ID() string
	// Execute workflow with args, return execution id and error
	Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error)
	// RemoveSteps remove steps from workflow
	RemoveSteps(steps ...Step) error
	// AppendSteps append steps to workflow
	AppendSteps(steps ...Step) error
	// Status returns workflow status
	Status() Status
	// Steps returns steps slice where parallel steps returned on the same level
	Steps() ([][]Step, error)
	// Suspend suspends execution
	Suspend(ctx context.Context, id string) error
	// Resume resumes execution
	Resume(ctx context.Context, id string) error
	// Abort abort execution
	Abort(ctx context.Context, id string) error
}

Workflow contains all steps to execute

type WorkflowOption

type WorkflowOption func(*WorkflowOptions)

WorflowOption signature

func WorkflowID

func WorkflowID(id string) WorkflowOption

WorkflowID set workflow id

type WorkflowOptions

type WorkflowOptions struct {
	Context context.Context
	ID      string
}

WorkflowOptions holds workflow options

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL