Documentation ¶
Index ¶
- Constants
- Variables
- func CallFlow(ctx context.Context, loader, name string, input map[string]any) (orchestrator.Output, error)
- func MustCollectFiles(f fs.FS, suffix string, convert func(map[string]any) map[string]any) map[string]map[string]any
- func MustRegisterCall(r *orchestrator.Registry)
- func MustRegisterCode(r *orchestrator.Registry)
- func MustRegisterDecision(r *orchestrator.Registry)
- func MustRegisterFunc(r *orchestrator.Registry)
- func MustRegisterHTTP(r *orchestrator.Registry)
- func MustRegisterIterate(r *orchestrator.Registry)
- func MustRegisterLoop(r *orchestrator.Registry)
- func MustRegisterParallel(r *orchestrator.Registry)
- func MustRegisterSerial(r *orchestrator.Registry)
- func MustRegisterTerminate(r *orchestrator.Registry)
- func MustRegisterWait(r *orchestrator.Registry)
- func MustUnmarshalToMap(data []byte) map[string]any
- func TraceFlow(ctx context.Context, loader, name string, input map[string]any) (orchestrator.Event, error)
- type Call
- type CallBuilder
- func (b *CallBuilder) Build() orchestrator.Task
- func (b *CallBuilder) BuildError() (*Call, error)
- func (b *CallBuilder) Input(m map[string]any) *CallBuilder
- func (b *CallBuilder) Loader(name string) *CallBuilder
- func (b *CallBuilder) Raw() *CallBuilder
- func (b *CallBuilder) Registry(r *orchestrator.Registry) *CallBuilder
- func (b *CallBuilder) Task(name string) *CallBuilder
- func (b *CallBuilder) Timeout(timeout time.Duration) *CallBuilder
- type Code
- type CodeBuilder
- type Codec
- type Decision
- type DecisionBuilder
- func (b *DecisionBuilder) Build() orchestrator.Task
- func (b *DecisionBuilder) Case(c any, builder orchestrator.Builder) *DecisionBuilder
- func (b *DecisionBuilder) Default(builder orchestrator.Builder) *DecisionBuilder
- func (b *DecisionBuilder) Expression(s any) *DecisionBuilder
- func (b *DecisionBuilder) Timeout(timeout time.Duration) *DecisionBuilder
- type Event
- type EventStreamReader
- type Func
- type FuncBuilder
- type HTTP
- type HTTPBuilder
- func (b *HTTPBuilder) Body(body map[string]any) *HTTPBuilder
- func (b *HTTPBuilder) Build() orchestrator.Task
- func (b *HTTPBuilder) Delete(uri string) *HTTPBuilder
- func (b *HTTPBuilder) Get(uri string) *HTTPBuilder
- func (b *HTTPBuilder) Header(key string, values ...string) *HTTPBuilder
- func (b *HTTPBuilder) Patch(uri string) *HTTPBuilder
- func (b *HTTPBuilder) Post(uri string) *HTTPBuilder
- func (b *HTTPBuilder) Put(uri string) *HTTPBuilder
- func (b *HTTPBuilder) Query(key string, value any) *HTTPBuilder
- func (b *HTTPBuilder) Request(method, uri string) *HTTPBuilder
- func (b *HTTPBuilder) Timeout(timeout time.Duration) *HTTPBuilder
- type Iterate
- type IterateBuilder
- type IterateType
- type JSON
- type Loader
- type Loop
- type LoopBuilder
- type MapLoader
- type Parallel
- type ParallelBuilder
- type Serial
- type SerialBuilder
- type Terminate
- type TerminateBuilder
- type Wait
- type WaitBuilder
Constants ¶
const (
TypeCall = "call"
)
const (
TypeCode = "code"
)
const (
TypeDecision = "decision"
)
const (
TypeFunc = "func"
)
const (
TypeHTTP = "http"
)
const (
TypeLoop = "loop"
)
const (
TypeParallel = "parallel"
)
const (
TypeSerial = "serial"
)
const (
TypeTerminate = "terminate"
)
const (
TypeWait = "wait"
)
Variables ¶
var LoaderRegistry = loaderRegistry{}
var TaskSchemas map[string]map[string]any
Schemas for built-in tasks.
Functions ¶
func CallFlow ¶
func CallFlow(ctx context.Context, loader, name string, input map[string]any) (orchestrator.Output, error)
CallFlow loads the given flow from the given loader, and then executes the flow with the given input.
Note that CallFlow is a helper for calling flows which use tasks registered in orchestrator.GlobalRegistry. If your case involves tasks registered in a different registry, you need to write your own calling code, in which you need to construct the call task yourself and specify the registry by using Call.Registry().
func MustCollectFiles ¶
func MustRegisterCall ¶
func MustRegisterCall(r *orchestrator.Registry)
func MustRegisterCode ¶
func MustRegisterCode(r *orchestrator.Registry)
func MustRegisterDecision ¶
func MustRegisterDecision(r *orchestrator.Registry)
func MustRegisterFunc ¶
func MustRegisterFunc(r *orchestrator.Registry)
func MustRegisterHTTP ¶
func MustRegisterHTTP(r *orchestrator.Registry)
func MustRegisterIterate ¶
func MustRegisterIterate(r *orchestrator.Registry)
func MustRegisterLoop ¶
func MustRegisterLoop(r *orchestrator.Registry)
func MustRegisterParallel ¶
func MustRegisterParallel(r *orchestrator.Registry)
func MustRegisterSerial ¶
func MustRegisterSerial(r *orchestrator.Registry)
func MustRegisterTerminate ¶
func MustRegisterTerminate(r *orchestrator.Registry)
func MustRegisterWait ¶
func MustRegisterWait(r *orchestrator.Registry)
func MustUnmarshalToMap ¶
Types ¶
type Call ¶
type Call struct { orchestrator.TaskHeader Input struct { Loader string `json:"loader"` Task string `json:"task"` Raw bool `json:"raw"` Input orchestrator.Expr[map[string]any] `json:"input"` } `json:"input"` // contains filtered or unexported fields }
Call is a composite task that is used to call another task with corresponding input.
func (*Call) Execute ¶
func (c *Call) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type CallBuilder ¶
type CallBuilder struct {
// contains filtered or unexported fields
}
func NewCall ¶
func NewCall(name string) *CallBuilder
func (*CallBuilder) Build ¶
func (b *CallBuilder) Build() orchestrator.Task
func (*CallBuilder) BuildError ¶
func (b *CallBuilder) BuildError() (*Call, error)
func (*CallBuilder) Input ¶
func (b *CallBuilder) Input(m map[string]any) *CallBuilder
func (*CallBuilder) Loader ¶
func (b *CallBuilder) Loader(name string) *CallBuilder
func (*CallBuilder) Raw ¶
func (b *CallBuilder) Raw() *CallBuilder
func (*CallBuilder) Registry ¶
func (b *CallBuilder) Registry(r *orchestrator.Registry) *CallBuilder
func (*CallBuilder) Task ¶
func (b *CallBuilder) Task(name string) *CallBuilder
func (*CallBuilder) Timeout ¶
func (b *CallBuilder) Timeout(timeout time.Duration) *CallBuilder
type Code ¶
type Code struct { orchestrator.TaskHeader Input struct { Code string `json:"code"` } `json:"input"` }
Code is a leaf task that is used to execute a snippet of Starlark code.
Note that the signature of the function must be `def _(env):`, where `env` is the environment that contains the input and outputs of all the previously executed tasks.
Examples:
def _(env): return [x*2 for x in env.input.values]
func (*Code) Execute ¶
func (c *Code) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type CodeBuilder ¶
type CodeBuilder struct {
// contains filtered or unexported fields
}
func NewCode ¶
func NewCode(name string) *CodeBuilder
func (*CodeBuilder) Build ¶
func (b *CodeBuilder) Build() orchestrator.Task
func (*CodeBuilder) Code ¶
func (b *CodeBuilder) Code(s string) *CodeBuilder
type Codec ¶
type Decision ¶
type Decision struct { orchestrator.TaskHeader Input struct { Expression orchestrator.Expr[any] `json:"expression"` Cases map[any]orchestrator.Task `json:"cases"` Default orchestrator.Task `json:"default"` } `json:"input"` }
Decision is a composite task that is similar to the `switch` statement in Go.
func (*Decision) Execute ¶
func (d *Decision) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type DecisionBuilder ¶
type DecisionBuilder struct {
// contains filtered or unexported fields
}
func NewDecision ¶
func NewDecision(name string) *DecisionBuilder
func (*DecisionBuilder) Build ¶
func (b *DecisionBuilder) Build() orchestrator.Task
func (*DecisionBuilder) Case ¶
func (b *DecisionBuilder) Case(c any, builder orchestrator.Builder) *DecisionBuilder
func (*DecisionBuilder) Default ¶
func (b *DecisionBuilder) Default(builder orchestrator.Builder) *DecisionBuilder
func (*DecisionBuilder) Expression ¶
func (b *DecisionBuilder) Expression(s any) *DecisionBuilder
func (*DecisionBuilder) Timeout ¶
func (b *DecisionBuilder) Timeout(timeout time.Duration) *DecisionBuilder
type EventStreamReader ¶
type EventStreamReader struct { EncodingBase64 bool // contains filtered or unexported fields }
EventStreamReader scans an io.Reader looking for EventStream messages.
func NewEventStreamReader ¶
func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader
NewEventStreamReader creates an instance of EventStreamReader.
func (*EventStreamReader) ReadBytes ¶
func (e *EventStreamReader) ReadBytes() ([]byte, error)
ReadBytes scans the EventStream for event bytes.
func (*EventStreamReader) ReadEvent ¶
func (e *EventStreamReader) ReadEvent() (event *Event, err error)
ReadRaw scans the EventStream for an event.
type Func ¶
type Func struct { orchestrator.TaskHeader Input struct { Func func(context.Context, orchestrator.Input) (orchestrator.Output, error) `json:"func"` } `json:input` }
Func is a leaf task that is used to execute the input function with the given arguments.
func (*Func) Execute ¶
func (f *Func) Execute(ctx context.Context, input orchestrator.Input) (output orchestrator.Output, err error)
type FuncBuilder ¶
type FuncBuilder struct {
// contains filtered or unexported fields
}
func NewFunc ¶
func NewFunc(name string) *FuncBuilder
func (*FuncBuilder) Build ¶
func (b *FuncBuilder) Build() orchestrator.Task
func (*FuncBuilder) Func ¶
func (b *FuncBuilder) Func(ef func(context.Context, orchestrator.Input) (orchestrator.Output, error)) *FuncBuilder
type HTTP ¶
type HTTP struct { orchestrator.TaskHeader Input struct { Encoding string `json:"encoding"` Method orchestrator.Expr[string] `json:"method"` URI orchestrator.Expr[string] `json:"uri"` Query orchestrator.Expr[map[string]any] `json:"query"` Header orchestrator.Expr[map[string][]string] `json:"header"` Body orchestrator.Expr[map[string]any] `json:"body"` // A filter expression for extracting fields from a server-sent event. SSEFilter string `json:"sse_filter"` } `json:"input"` // contains filtered or unexported fields }
HTTP is a leaf task that is used to make calls to another service over HTTP.
func (*HTTP) Execute ¶
func (h *HTTP) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type HTTPBuilder ¶
type HTTPBuilder struct {
// contains filtered or unexported fields
}
func NewHTTP ¶
func NewHTTP(name string) *HTTPBuilder
func (*HTTPBuilder) Body ¶
func (b *HTTPBuilder) Body(body map[string]any) *HTTPBuilder
func (*HTTPBuilder) Build ¶
func (b *HTTPBuilder) Build() orchestrator.Task
func (*HTTPBuilder) Delete ¶
func (b *HTTPBuilder) Delete(uri string) *HTTPBuilder
func (*HTTPBuilder) Get ¶
func (b *HTTPBuilder) Get(uri string) *HTTPBuilder
func (*HTTPBuilder) Header ¶
func (b *HTTPBuilder) Header(key string, values ...string) *HTTPBuilder
func (*HTTPBuilder) Patch ¶
func (b *HTTPBuilder) Patch(uri string) *HTTPBuilder
func (*HTTPBuilder) Post ¶
func (b *HTTPBuilder) Post(uri string) *HTTPBuilder
func (*HTTPBuilder) Put ¶
func (b *HTTPBuilder) Put(uri string) *HTTPBuilder
func (*HTTPBuilder) Query ¶
func (b *HTTPBuilder) Query(key string, value any) *HTTPBuilder
func (*HTTPBuilder) Request ¶
func (b *HTTPBuilder) Request(method, uri string) *HTTPBuilder
func (*HTTPBuilder) Timeout ¶
func (b *HTTPBuilder) Timeout(timeout time.Duration) *HTTPBuilder
type Iterate ¶
type Iterate struct { orchestrator.TaskHeader Input struct { Type IterateType `json:"type"` Value any `json:"value"` } `json:"input"` }
Iterate is a leaf task that is used to make an iterator from a slice/map/range. Note that an Iterate task is always used along with a Loop task.
func (*Iterate) Execute ¶
func (i *Iterate) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type IterateBuilder ¶
type IterateBuilder struct {
// contains filtered or unexported fields
}
func NewIterate ¶
func NewIterate(name string) *IterateBuilder
func (*IterateBuilder) Build ¶
func (b *IterateBuilder) Build() orchestrator.Task
func (*IterateBuilder) List ¶
func (b *IterateBuilder) List(v any) *IterateBuilder
func (*IterateBuilder) Map ¶
func (b *IterateBuilder) Map(v any) *IterateBuilder
func (*IterateBuilder) Range ¶
func (b *IterateBuilder) Range(v any) *IterateBuilder
type IterateType ¶
type IterateType string
const ( TypeIterate = "iterate" IterateTypeList IterateType = "list" IterateTypeMap IterateType = "map" IterateTypeRange IterateType = "range" )
type Loop ¶
type Loop struct { orchestrator.TaskHeader Input struct { Iterator orchestrator.Task `json:"iterator"` Body orchestrator.Task `json:"body"` } `json:"input"` }
Loop is a composite task that is similar to the `for` statement in Go.
func (*Loop) Execute ¶
func (l *Loop) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type LoopBuilder ¶
type LoopBuilder struct {
// contains filtered or unexported fields
}
func NewLoop ¶
func NewLoop(name string) *LoopBuilder
func (*LoopBuilder) Body ¶
func (b *LoopBuilder) Body(builder orchestrator.Builder) *LoopBuilder
func (*LoopBuilder) Build ¶
func (b *LoopBuilder) Build() orchestrator.Task
func (*LoopBuilder) Iterator ¶
func (b *LoopBuilder) Iterator(builder orchestrator.Builder) *LoopBuilder
func (*LoopBuilder) Timeout ¶
func (b *LoopBuilder) Timeout(timeout time.Duration) *LoopBuilder
type Parallel ¶
type Parallel struct { orchestrator.TaskHeader Input struct { Tasks []orchestrator.Task `json:"tasks"` } `json:"input"` }
Parallel is a composite task that is used to execute its subtasks in parallel.
func (*Parallel) Execute ¶
func (p *Parallel) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type ParallelBuilder ¶
type ParallelBuilder struct {
// contains filtered or unexported fields
}
func NewParallel ¶
func NewParallel(name string) *ParallelBuilder
func (*ParallelBuilder) Build ¶
func (b *ParallelBuilder) Build() orchestrator.Task
func (*ParallelBuilder) Tasks ¶
func (b *ParallelBuilder) Tasks(builders ...orchestrator.Builder) *ParallelBuilder
func (*ParallelBuilder) Timeout ¶
func (b *ParallelBuilder) Timeout(timeout time.Duration) *ParallelBuilder
type Serial ¶
type Serial struct { orchestrator.TaskHeader Input struct { Async bool `json:"async"` // The optional schema for the following series of subtasks. // // Typically, the schema is required for a standalone workflow. Schema orchestrator.Schema `json:schema,omitempty` Tasks []orchestrator.Task `json:"tasks"` } `json:"input"` }
Serial is a composite task that is used to execute its subtasks serially.
func (*Serial) Execute ¶
func (s *Serial) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type SerialBuilder ¶
type SerialBuilder struct {
// contains filtered or unexported fields
}
func NewSerial ¶
func NewSerial(name string) *SerialBuilder
func (*SerialBuilder) Async ¶
func (b *SerialBuilder) Async(async bool) *SerialBuilder
func (*SerialBuilder) Build ¶
func (b *SerialBuilder) Build() orchestrator.Task
func (*SerialBuilder) Tasks ¶
func (b *SerialBuilder) Tasks(builders ...orchestrator.Builder) *SerialBuilder
func (*SerialBuilder) Timeout ¶
func (b *SerialBuilder) Timeout(timeout time.Duration) *SerialBuilder
type Terminate ¶
type Terminate struct { orchestrator.TaskHeader Input struct { Output orchestrator.Expr[orchestrator.Output] `json:"output"` Error orchestrator.Expr[string] `json:"error"` } `json:"input"` }
Terminate is a leaf task that is used to terminate the execution of a flow and return an output.
func (*Terminate) Execute ¶
func (t *Terminate) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type TerminateBuilder ¶
type TerminateBuilder struct {
// contains filtered or unexported fields
}
func NewTerminate ¶
func NewTerminate(name string) *TerminateBuilder
func (*TerminateBuilder) Build ¶
func (b *TerminateBuilder) Build() orchestrator.Task
func (*TerminateBuilder) Error ¶
func (b *TerminateBuilder) Error(err any) *TerminateBuilder
func (*TerminateBuilder) Output ¶
func (b *TerminateBuilder) Output(output any) *TerminateBuilder
type Wait ¶
type Wait struct { orchestrator.TaskHeader Input struct { Output orchestrator.Expr[map[string]any] `json:"output"` InputSchema map[string]any `json:"input_schema"` } `json:"input"` }
Wait is a leaf task that is used to wait for receiving an external input (and sometimes also send an output externally before that).
Note that a Wait task must be used within an actor (i.e. asynchronous Serial task).
func (*Wait) Execute ¶
func (w *Wait) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)
type WaitBuilder ¶
type WaitBuilder struct {
// contains filtered or unexported fields
}
func NewWait ¶
func NewWait(name string) *WaitBuilder
func (*WaitBuilder) Build ¶
func (b *WaitBuilder) Build() orchestrator.Task