builtin

package
v0.0.0-...-314114a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: MIT Imports: 19 Imported by: 2

Documentation

Index

Constants

View Source
const (
	TypeCall = "call"
)
View Source
const (
	TypeCode = "code"
)
View Source
const (
	TypeDecision = "decision"
)
View Source
const (
	TypeFunc = "func"
)
View Source
const (
	TypeHTTP = "http"
)
View Source
const (
	TypeLoop = "loop"
)
View Source
const (
	TypeParallel = "parallel"
)
View Source
const (
	TypeSerial = "serial"
)
View Source
const (
	TypeTerminate = "terminate"
)
View Source
const (
	TypeWait = "wait"
)

Variables

View Source
var LoaderRegistry = loaderRegistry{}
View Source
var TaskSchemas map[string]map[string]any

Schemas for built-in tasks.

Functions

func CallFlow

func CallFlow(ctx context.Context, loader, name string, input map[string]any) (orchestrator.Output, error)

CallFlow loads the given flow from the given loader, and then executes the flow with the given input.

Note that CallFlow is a helper for calling flows which use tasks registered in orchestrator.GlobalRegistry. If your case involves tasks registered in a different registry, you need to write your own calling code, in which you need to construct the call task yourself and specify the registry by using Call.Registry().

func MustCollectFiles

func MustCollectFiles(f fs.FS, suffix string, convert func(map[string]any) map[string]any) map[string]map[string]any

func MustRegisterCall

func MustRegisterCall(r *orchestrator.Registry)

func MustRegisterCode

func MustRegisterCode(r *orchestrator.Registry)

func MustRegisterDecision

func MustRegisterDecision(r *orchestrator.Registry)

func MustRegisterFunc

func MustRegisterFunc(r *orchestrator.Registry)

func MustRegisterHTTP

func MustRegisterHTTP(r *orchestrator.Registry)

func MustRegisterIterate

func MustRegisterIterate(r *orchestrator.Registry)

func MustRegisterLoop

func MustRegisterLoop(r *orchestrator.Registry)

func MustRegisterParallel

func MustRegisterParallel(r *orchestrator.Registry)

func MustRegisterSerial

func MustRegisterSerial(r *orchestrator.Registry)

func MustRegisterTerminate

func MustRegisterTerminate(r *orchestrator.Registry)

func MustRegisterWait

func MustRegisterWait(r *orchestrator.Registry)

func MustUnmarshalToMap

func MustUnmarshalToMap(data []byte) map[string]any

func TraceFlow

func TraceFlow(ctx context.Context, loader, name string, input map[string]any) (orchestrator.Event, error)

TraceFlow behaves like CallFlow but also enables tracing.

Types

type Call

type Call struct {
	orchestrator.TaskHeader

	Input struct {
		Loader string                            `json:"loader"`
		Task   string                            `json:"task"`
		Raw    bool                              `json:"raw"`
		Input  orchestrator.Expr[map[string]any] `json:"input"`
	} `json:"input"`
	// contains filtered or unexported fields
}

Call is a composite task that is used to call another task with corresponding input.

func (*Call) Execute

func (c *Call) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Call) Init

func (c *Call) Init(r *orchestrator.Registry) error

func (*Call) String

func (c *Call) String() string

type CallBuilder

type CallBuilder struct {
	// contains filtered or unexported fields
}

func NewCall

func NewCall(name string) *CallBuilder

func (*CallBuilder) Build

func (b *CallBuilder) Build() orchestrator.Task

func (*CallBuilder) BuildError

func (b *CallBuilder) BuildError() (*Call, error)

func (*CallBuilder) Input

func (b *CallBuilder) Input(m map[string]any) *CallBuilder

func (*CallBuilder) Loader

func (b *CallBuilder) Loader(name string) *CallBuilder

func (*CallBuilder) Raw

func (b *CallBuilder) Raw() *CallBuilder

func (*CallBuilder) Registry

func (b *CallBuilder) Registry(r *orchestrator.Registry) *CallBuilder

func (*CallBuilder) Task

func (b *CallBuilder) Task(name string) *CallBuilder

func (*CallBuilder) Timeout

func (b *CallBuilder) Timeout(timeout time.Duration) *CallBuilder

type Code

type Code struct {
	orchestrator.TaskHeader

	Input struct {
		Code string `json:"code"`
	} `json:"input"`
}

Code is a leaf task that is used to execute a snippet of Starlark code.

Note that the signature of the function must be `def _(env):`, where `env` is the environment that contains the input and outputs of all the previously executed tasks.

Examples:

def _(env):
    return [x*2 for x in env.input.values]

func (*Code) Execute

func (c *Code) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Code) String

func (c *Code) String() string

type CodeBuilder

type CodeBuilder struct {
	// contains filtered or unexported fields
}

func NewCode

func NewCode(name string) *CodeBuilder

func (*CodeBuilder) Build

func (b *CodeBuilder) Build() orchestrator.Task

func (*CodeBuilder) Code

func (b *CodeBuilder) Code(s string) *CodeBuilder

type Codec

type Codec interface {
	Decode(in io.Reader, out any) error
	Encode(in any) (out io.Reader, err error)
}

func NewCodec

func NewCodec(encoding string) (Codec, error)

type Decision

type Decision struct {
	orchestrator.TaskHeader

	Input struct {
		Expression orchestrator.Expr[any]    `json:"expression"`
		Cases      map[any]orchestrator.Task `json:"cases"`
		Default    orchestrator.Task         `json:"default"`
	} `json:"input"`
}

Decision is a composite task that is similar to the `switch` statement in Go.

func (*Decision) Execute

func (d *Decision) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Decision) String

func (d *Decision) String() string

type DecisionBuilder

type DecisionBuilder struct {
	// contains filtered or unexported fields
}

func NewDecision

func NewDecision(name string) *DecisionBuilder

func (*DecisionBuilder) Build

func (b *DecisionBuilder) Build() orchestrator.Task

func (*DecisionBuilder) Case

func (*DecisionBuilder) Default

func (b *DecisionBuilder) Default(builder orchestrator.Builder) *DecisionBuilder

func (*DecisionBuilder) Expression

func (b *DecisionBuilder) Expression(s any) *DecisionBuilder

func (*DecisionBuilder) Timeout

func (b *DecisionBuilder) Timeout(timeout time.Duration) *DecisionBuilder

type Event

type Event struct {
	Event []byte
	Data  []byte
	ID    []byte
	Retry []byte
}

Event holds all of the event source fields

type EventStreamReader

type EventStreamReader struct {
	EncodingBase64 bool
	// contains filtered or unexported fields
}

EventStreamReader scans an io.Reader looking for EventStream messages.

func NewEventStreamReader

func NewEventStreamReader(eventStream io.Reader, maxBufferSize int) *EventStreamReader

NewEventStreamReader creates an instance of EventStreamReader.

func (*EventStreamReader) ReadBytes

func (e *EventStreamReader) ReadBytes() ([]byte, error)

ReadBytes scans the EventStream for event bytes.

func (*EventStreamReader) ReadEvent

func (e *EventStreamReader) ReadEvent() (event *Event, err error)

ReadRaw scans the EventStream for an event.

type Func

type Func struct {
	orchestrator.TaskHeader

	Input struct {
		Func func(context.Context, orchestrator.Input) (orchestrator.Output, error) `json:"func"`
	} `json:input`
}

Func is a leaf task that is used to execute the input function with the given arguments.

func (*Func) Execute

func (f *Func) Execute(ctx context.Context, input orchestrator.Input) (output orchestrator.Output, err error)

func (*Func) String

func (f *Func) String() string

type FuncBuilder

type FuncBuilder struct {
	// contains filtered or unexported fields
}

func NewFunc

func NewFunc(name string) *FuncBuilder

func (*FuncBuilder) Build

func (b *FuncBuilder) Build() orchestrator.Task

func (*FuncBuilder) Func

type HTTP

type HTTP struct {
	orchestrator.TaskHeader

	Input struct {
		Encoding string                                 `json:"encoding"`
		Method   orchestrator.Expr[string]              `json:"method"`
		URI      orchestrator.Expr[string]              `json:"uri"`
		Query    orchestrator.Expr[map[string]any]      `json:"query"`
		Header   orchestrator.Expr[map[string][]string] `json:"header"`
		Body     orchestrator.Expr[map[string]any]      `json:"body"`
		// A filter expression for extracting fields from a server-sent event.
		SSEFilter string `json:"sse_filter"`
	} `json:"input"`
	// contains filtered or unexported fields
}

HTTP is a leaf task that is used to make calls to another service over HTTP.

func (*HTTP) Encoding

func (h *HTTP) Encoding(encoding string) *HTTP

func (*HTTP) Execute

func (h *HTTP) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*HTTP) Init

func (h *HTTP) Init(r *orchestrator.Registry) error

func (*HTTP) String

func (h *HTTP) String() string

type HTTPBuilder

type HTTPBuilder struct {
	// contains filtered or unexported fields
}

func NewHTTP

func NewHTTP(name string) *HTTPBuilder

func (*HTTPBuilder) Body

func (b *HTTPBuilder) Body(body map[string]any) *HTTPBuilder

func (*HTTPBuilder) Build

func (b *HTTPBuilder) Build() orchestrator.Task

func (*HTTPBuilder) Delete

func (b *HTTPBuilder) Delete(uri string) *HTTPBuilder

func (*HTTPBuilder) Get

func (b *HTTPBuilder) Get(uri string) *HTTPBuilder

func (*HTTPBuilder) Header

func (b *HTTPBuilder) Header(key string, values ...string) *HTTPBuilder

func (*HTTPBuilder) Patch

func (b *HTTPBuilder) Patch(uri string) *HTTPBuilder

func (*HTTPBuilder) Post

func (b *HTTPBuilder) Post(uri string) *HTTPBuilder

func (*HTTPBuilder) Put

func (b *HTTPBuilder) Put(uri string) *HTTPBuilder

func (*HTTPBuilder) Query

func (b *HTTPBuilder) Query(key string, value any) *HTTPBuilder

func (*HTTPBuilder) Request

func (b *HTTPBuilder) Request(method, uri string) *HTTPBuilder

func (*HTTPBuilder) Timeout

func (b *HTTPBuilder) Timeout(timeout time.Duration) *HTTPBuilder

type Iterate

type Iterate struct {
	orchestrator.TaskHeader

	Input struct {
		Type  IterateType `json:"type"`
		Value any         `json:"value"`
	} `json:"input"`
}

Iterate is a leaf task that is used to make an iterator from a slice/map/range. Note that an Iterate task is always used along with a Loop task.

func (*Iterate) Execute

func (i *Iterate) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Iterate) String

func (i *Iterate) String() string

type IterateBuilder

type IterateBuilder struct {
	// contains filtered or unexported fields
}

func NewIterate

func NewIterate(name string) *IterateBuilder

func (*IterateBuilder) Build

func (b *IterateBuilder) Build() orchestrator.Task

func (*IterateBuilder) List

func (b *IterateBuilder) List(v any) *IterateBuilder

func (*IterateBuilder) Map

func (b *IterateBuilder) Map(v any) *IterateBuilder

func (*IterateBuilder) Range

func (b *IterateBuilder) Range(v any) *IterateBuilder

type IterateType

type IterateType string
const (
	TypeIterate = "iterate"

	IterateTypeList  IterateType = "list"
	IterateTypeMap   IterateType = "map"
	IterateTypeRange IterateType = "range"
)

type JSON

type JSON struct{}

func (JSON) Decode

func (j JSON) Decode(in io.Reader, out any) error

func (JSON) Encode

func (j JSON) Encode(in any) (io.Reader, error)

type Loader

type Loader interface {
	Load(string) (map[string]any, error)
}

type Loop

type Loop struct {
	orchestrator.TaskHeader

	Input struct {
		Iterator orchestrator.Task `json:"iterator"`
		Body     orchestrator.Task `json:"body"`
	} `json:"input"`
}

Loop is a composite task that is similar to the `for` statement in Go.

func (*Loop) Execute

func (l *Loop) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Loop) String

func (l *Loop) String() string

type LoopBuilder

type LoopBuilder struct {
	// contains filtered or unexported fields
}

func NewLoop

func NewLoop(name string) *LoopBuilder

func (*LoopBuilder) Body

func (b *LoopBuilder) Body(builder orchestrator.Builder) *LoopBuilder

func (*LoopBuilder) Build

func (b *LoopBuilder) Build() orchestrator.Task

func (*LoopBuilder) Iterator

func (b *LoopBuilder) Iterator(builder orchestrator.Builder) *LoopBuilder

func (*LoopBuilder) Timeout

func (b *LoopBuilder) Timeout(timeout time.Duration) *LoopBuilder

type MapLoader

type MapLoader map[string]map[string]any

func (MapLoader) Load

func (l MapLoader) Load(name string) (map[string]any, error)

type Parallel

type Parallel struct {
	orchestrator.TaskHeader

	Input struct {
		Tasks []orchestrator.Task `json:"tasks"`
	} `json:"input"`
}

Parallel is a composite task that is used to execute its subtasks in parallel.

func (*Parallel) Execute

func (p *Parallel) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Parallel) String

func (p *Parallel) String() string

type ParallelBuilder

type ParallelBuilder struct {
	// contains filtered or unexported fields
}

func NewParallel

func NewParallel(name string) *ParallelBuilder

func (*ParallelBuilder) Build

func (b *ParallelBuilder) Build() orchestrator.Task

func (*ParallelBuilder) Tasks

func (b *ParallelBuilder) Tasks(builders ...orchestrator.Builder) *ParallelBuilder

func (*ParallelBuilder) Timeout

func (b *ParallelBuilder) Timeout(timeout time.Duration) *ParallelBuilder

type Serial

type Serial struct {
	orchestrator.TaskHeader

	Input struct {
		Async bool `json:"async"`

		// The optional schema for the following series of subtasks.
		//
		// Typically, the schema is required for a standalone workflow.
		Schema orchestrator.Schema `json:schema,omitempty`

		Tasks []orchestrator.Task `json:"tasks"`
	} `json:"input"`
}

Serial is a composite task that is used to execute its subtasks serially.

func (*Serial) Execute

func (s *Serial) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Serial) String

func (s *Serial) String() string

type SerialBuilder

type SerialBuilder struct {
	// contains filtered or unexported fields
}

func NewSerial

func NewSerial(name string) *SerialBuilder

func (*SerialBuilder) Async

func (b *SerialBuilder) Async(async bool) *SerialBuilder

func (*SerialBuilder) Build

func (b *SerialBuilder) Build() orchestrator.Task

func (*SerialBuilder) Tasks

func (b *SerialBuilder) Tasks(builders ...orchestrator.Builder) *SerialBuilder

func (*SerialBuilder) Timeout

func (b *SerialBuilder) Timeout(timeout time.Duration) *SerialBuilder

type Terminate

type Terminate struct {
	orchestrator.TaskHeader

	Input struct {
		Output orchestrator.Expr[orchestrator.Output] `json:"output"`
		Error  orchestrator.Expr[string]              `json:"error"`
	} `json:"input"`
}

Terminate is a leaf task that is used to terminate the execution of a flow and return an output.

func (*Terminate) Execute

func (t *Terminate) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Terminate) String

func (t *Terminate) String() string

type TerminateBuilder

type TerminateBuilder struct {
	// contains filtered or unexported fields
}

func NewTerminate

func NewTerminate(name string) *TerminateBuilder

func (*TerminateBuilder) Build

func (b *TerminateBuilder) Build() orchestrator.Task

func (*TerminateBuilder) Error

func (b *TerminateBuilder) Error(err any) *TerminateBuilder

func (*TerminateBuilder) Output

func (b *TerminateBuilder) Output(output any) *TerminateBuilder

type Wait

type Wait struct {
	orchestrator.TaskHeader

	Input struct {
		Output      orchestrator.Expr[map[string]any] `json:"output"`
		InputSchema map[string]any                    `json:"input_schema"`
	} `json:"input"`
}

Wait is a leaf task that is used to wait for receiving an external input (and sometimes also send an output externally before that).

Note that a Wait task must be used within an actor (i.e. asynchronous Serial task).

func (*Wait) Execute

func (w *Wait) Execute(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error)

func (*Wait) String

func (w *Wait) String() string

type WaitBuilder

type WaitBuilder struct {
	// contains filtered or unexported fields
}

func NewWait

func NewWait(name string) *WaitBuilder

func (*WaitBuilder) Build

func (b *WaitBuilder) Build() orchestrator.Task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL