flow

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator func(map[string][]byte) ([]byte, error)

Aggregator definition for the data aggregator of nodes

type Definitor

type Definitor func(ctx context.Context, f *Flow) error

type ExecResult

type ExecResult struct {
	ID   string `json:"id"`
	Resp []byte `json:"resp"`
	Err  string `json:"err"`
}

type ExecutionOptions

type ExecutionOptions struct {
	// contains filtered or unexported fields
}

type Executor

type Executor struct {
	// id format: "flow_name:node_name:random_id"
	ID   string `json:"id"`
	Body []byte `json:"body"`
	// contains filtered or unexported fields
}

func (*Executor) Execute

func (e *Executor) Execute(ctx context.Context) error

type Flow

type Flow struct {
	Name string
	DAG  *dag.DAG
	// contains filtered or unexported fields
}

func New

func New(
	name string, stor store.Store, cli *asynq.Client,
	logger *slog.Logger, cfg *types.Config, insp *asynq.Inspector,
) *Flow

func (*Flow) Edge

func (f *Flow) Edge(src, dst string) error

func (*Flow) GetResult

func (f *Flow) GetResult(sessID string) (map[string]*ExecResult, error)

func (*Flow) Node

func (f *Flow) Node(name string, fn NodeFunc, opts ...Option) error

func (*Flow) Register

func (f *Flow) Register(mux *asynq.ServeMux)

func (*Flow) Submit

func (f *Flow) Submit(body []byte) (string, error)

func (*Flow) SwitchNode

func (f *Flow) SwitchNode(
	name string, condFn SwitchCondFunc,
	cases map[string]NodeFunc, opts ...Option,
) error

type ForEach

type ForEach func([]byte) map[string][]byte

ForEach definition for the foreach function

type Forwarder

type Forwarder func([]byte) []byte

Forwarder definition for the data forwarder of nodes

type FuncErrorHandler

type FuncErrorHandler func(error) error

type NodeFunc

type NodeFunc func([]byte, map[string][]string) ([]byte, error)

type Option

type Option func(*ExecutionOptions)

func WithAggregator

func WithAggregator(agg Aggregator) Option

func WithFailureHandler

func WithFailureHandler(fn FuncErrorHandler) Option

type SwitchCondFunc

type SwitchCondFunc func([]byte) string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL