Documentation ¶
Overview ¶
Package flow is an interface used for saga pattern microservice workflow
Index ¶
- Variables
- func NewContext(ctx context.Context, f Flow) context.Context
- func RegisterStep(step Step)
- type ExecuteOption
- func ExecuteAsync(b bool) ExecuteOption
- func ExecuteClient(c client.Client) ExecuteOption
- func ExecuteContext(ctx context.Context) ExecuteOption
- func ExecuteLogger(l logger.Logger) ExecuteOption
- func ExecuteMeter(m meter.Meter) ExecuteOption
- func ExecuteReverse(b bool) ExecuteOption
- func ExecuteTimeout(td time.Duration) ExecuteOption
- func ExecuteTracer(t tracer.Tracer) ExecuteOption
- type ExecuteOptions
- type Flow
- type Message
- type Option
- type Options
- type RawMessage
- type Status
- type Step
- type StepOption
- type StepOptions
- type Workflow
- type WorkflowOption
- type WorkflowOptions
Constants ¶
This section is empty.
Variables ¶
var ( ErrStepNotExists = errors.New("step not exists") ErrMissingClient = errors.New("client not set") )
var ( StatusString = map[Status]string{ StatusPending: "StatusPending", StatusRunning: "StatusRunning", StatusFailure: "StatusFailure", StatusSuccess: "StatusSuccess", StatusAborted: "StatusAborted", StatusSuspend: "StatusSuspend", } StringStatus = map[string]Status{ "StatusPending": StatusPending, "StatusRunning": StatusRunning, "StatusFailure": StatusFailure, "StatusSuccess": StatusSuccess, "StatusAborted": StatusAborted, "StatusSuspend": StatusSuspend, } )
Functions ¶
func NewContext ¶
NewContext stores Flow to context
func RegisterStep ¶
func RegisterStep(step Step)
Types ¶
type ExecuteOption ¶
type ExecuteOption func(*ExecuteOptions)
func ExecuteAsync ¶
func ExecuteAsync(b bool) ExecuteOption
func ExecuteClient ¶
func ExecuteClient(c client.Client) ExecuteOption
func ExecuteContext ¶
func ExecuteContext(ctx context.Context) ExecuteOption
func ExecuteLogger ¶
func ExecuteLogger(l logger.Logger) ExecuteOption
func ExecuteMeter ¶
func ExecuteMeter(m meter.Meter) ExecuteOption
func ExecuteReverse ¶
func ExecuteReverse(b bool) ExecuteOption
func ExecuteTimeout ¶
func ExecuteTimeout(td time.Duration) ExecuteOption
func ExecuteTracer ¶
func ExecuteTracer(t tracer.Tracer) ExecuteOption
type ExecuteOptions ¶
type ExecuteOptions struct { // Client holds the client.Client Client client.Client // Tracer holds the tracer Tracer tracer.Tracer // Logger holds the logger Logger logger.Logger // Meter holds the meter Meter meter.Meter // Context can be used to abort execution or pass additional opts Context context.Context // Start step Start string // Timeout for execution Timeout time.Duration // Reverse execution Reverse bool // Async enables async execution Async bool }
func NewExecuteOptions ¶
func NewExecuteOptions(opts ...ExecuteOption) ExecuteOptions
type Flow ¶
type Flow interface { // Options returns options Options() Options // Init initialize Init(...Option) error // WorkflowCreate creates new workflow with specific id and steps WorkflowCreate(ctx context.Context, id string, steps ...Step) (Workflow, error) // WorkflowSave saves workflow WorkflowSave(ctx context.Context, w Workflow) error // WorkflowLoad loads workflow with specific id WorkflowLoad(ctx context.Context, id string) (Workflow, error) // WorkflowList lists all workflows WorkflowList(ctx context.Context) ([]Workflow, error) }
Flow the base interface to interact with workflows
func FromContext ¶
FromContext returns Flow from context
type Message ¶
type Message struct { Header metadata.Metadata Body RawMessage }
type Option ¶
type Option func(*Options)
Option func
func Context ¶
Context specifies a context for the service. Can be used to signal shutdown of the flow Can be used for extra option values.
type Options ¶
type Options struct { // Context holds the external options and can be used for flow shutdown Context context.Context // Client holds the client.Client Client client.Client // Tracer holds the tracer Tracer tracer.Tracer // Logger holds the logger Logger logger.Logger // Meter holds the meter Meter meter.Meter // Store used for intermediate results Store store.Store }
Options server struct
func NewOptions ¶
NewOptions returns new options struct with default or passed values
type RawMessage ¶
type RawMessage []byte
RawMessage is a raw encoded JSON value. It implements Marshaler and Unmarshaler and can be used to delay decoding or precompute a encoding.
func (*RawMessage) MarshalJSON ¶
func (m *RawMessage) MarshalJSON() ([]byte, error)
MarshalJSON returns m as the JSON encoding of m.
func (*RawMessage) UnmarshalJSON ¶
func (m *RawMessage) UnmarshalJSON(data []byte) error
UnmarshalJSON sets *m to a copy of data.
type Step ¶
type Step interface { // ID returns step id ID() string // Endpoint returns rpc endpoint service_name.service_method or broker topic Endpoint() string // Execute step run Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (*Message, error) // Requires returns dependent steps Requires() []string // Options returns step options Options() StepOptions // Require add required steps Require(steps ...Step) error // String String() string // GetStatus returns step status GetStatus() Status // SetStatus sets the step status SetStatus(Status) // Request returns step request message Request() *Message // Response returns step response message Response() *Message }
Step represents dedicated workflow step
func NewCallStep ¶
func NewCallStep(service string, name string, method string, opts ...StepOption) Step
func NewPublishStep ¶
func NewPublishStep(topic string, opts ...StepOption) Step
type StepOption ¶
type StepOption func(*StepOptions)
func StepFallback ¶
func StepFallback(step string) StepOption
func StepID ¶
func StepID(id string) StepOption
func StepRequires ¶
func StepRequires(steps ...string) StepOption
type StepOptions ¶
func NewStepOptions ¶
func NewStepOptions(opts ...StepOption) StepOptions
type Workflow ¶
type Workflow interface { // ID returns id of the workflow ID() string // Execute workflow with args, return execution id and error Execute(ctx context.Context, req *Message, opts ...ExecuteOption) (string, error) // RemoveSteps remove steps from workflow RemoveSteps(steps ...Step) error // AppendSteps append steps to workflow AppendSteps(steps ...Step) error // Status returns workflow status Status() Status // Steps returns steps slice where parallel steps returned on the same level Steps() ([][]Step, error) // Suspend suspends execution Suspend(ctx context.Context, id string) error // Resume resumes execution Resume(ctx context.Context, id string) error // Abort abort execution Abort(ctx context.Context, id string) error }
Workflow contains all steps to execute
type WorkflowOptions ¶
WorkflowOptions holds workflow options