Documentation ¶
Overview ¶
Example ¶
package main import ( "context" "fmt" "time" "github.com/RussellLuo/orchestrator" "github.com/RussellLuo/orchestrator/builtin" ) func main() { flow := builtin.NewSerial("get_todo_user").Timeout(3*time.Second).Tasks( builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/todos/${input.todoId}", ), builtin.NewHTTP("get_user").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}", ), ).Build() input := orchestrator.NewInput(map[string]any{"todoId": 1}) output, err := flow.Execute(context.Background(), input) if err != nil { fmt.Println(err) return } body := output["body"].(map[string]any) fmt.Println(body["name"]) }
Output: Leanne Graham
Example (Actor) ¶
package main import ( "context" "fmt" "time" "github.com/RussellLuo/orchestrator" "github.com/RussellLuo/orchestrator/builtin" ) func main() { flow := builtin.NewSerial("get_todo_user").Async(true).Tasks( builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/todos/${input.todoId}", ), builtin.NewFunc("echo_once").Func(func(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error) { behavior, ok := input.Get("actor")["behavior"].(*orchestrator.ActorBehavior) if !ok { return nil, fmt.Errorf("task %q must be used within an asynchronous flow", "echo_once") } // Send the data, received from the actor's inbox, to the actor's outbox. data := behavior.Receive() behavior.Send(data, nil) return orchestrator.Output{}, nil }), builtin.NewHTTP("get_user").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}", ), ).Build() input := orchestrator.NewInput(map[string]any{"todoId": 1}) output, err := flow.Execute(context.Background(), input) if err != nil { fmt.Println(err) return } actor, ok := output.Actor() if !ok { fmt.Println("bad actor") return } // Perform a ping-pong action midway. actor.Inbox() <- map[string]any{"data": "Hello"} result := <-actor.Outbox() fmt.Println(result.Output["data"]) // Ignore error handling for simplicity. // Finally, get the flow result. result = <-actor.Outbox() if result.Err != nil { fmt.Println(result.Err) return } body := result.Output["body"].(map[string]any) fmt.Println(body["name"]) }
Output: Hello Leanne Graham
Example (Json) ¶
package main import ( "context" "fmt" "github.com/RussellLuo/orchestrator" "github.com/RussellLuo/orchestrator/builtin" ) func main() { r := orchestrator.NewRegistry() builtin.MustRegisterSerial(r) builtin.MustRegisterHTTP(r) data := []byte(`{ "name": "get_todo_user", "type": "serial", "timeout": "3s", "input": { "tasks": [ { "name": "get_todo", "type": "http", "timeout": "2s", "input": { "method": "GET", "uri": "https://jsonplaceholder.typicode.com/todos/${input.todoId}" } }, { "name": "get_user", "type": "http", "timeout": "2s", "input": { "method": "GET", "uri": "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}" } } ] } }`) flow, err := r.ConstructFromJSON(data) if err != nil { fmt.Println(err) return } input := orchestrator.NewInput(map[string]any{"todoId": 1}) output, err := flow.Execute(context.Background(), input) if err != nil { fmt.Println(err) return } body := output["body"].(map[string]any) fmt.Println(body["name"]) }
Output: Leanne Graham
Example (Trace) ¶
package main import ( "context" "fmt" "time" "github.com/RussellLuo/orchestrator" "github.com/RussellLuo/orchestrator/builtin" ) func main() { flow := builtin.NewSerial("get_todo_user").Timeout(3*time.Second).Tasks( builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/todos/${input.todoId}", ), builtin.NewHTTP("get_user").Timeout(2*time.Second).Get( "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}", ), ).Build() input := orchestrator.NewInput(map[string]any{"todoId": 1}) event := orchestrator.TraceTask(context.Background(), flow, input) // Note that for the stability of the test, we just show the output. // You may be interested in other properties of the tracing event. body := event.Output["body"].(map[string]any) fmt.Println(body["name"]) }
Output: Leanne Graham
Example (Yaml) ¶
package main import ( "context" "fmt" "github.com/RussellLuo/orchestrator" "github.com/RussellLuo/orchestrator/builtin" ) func main() { r := orchestrator.NewRegistry() builtin.MustRegisterSerial(r) builtin.MustRegisterHTTP(r) data := []byte(` name: get_todo_user type: serial timeout: 3s input: tasks: - name: get_todo type: http timeout: 2s input: method: GET uri: https://jsonplaceholder.typicode.com/todos/${input.todoId} - name: get_user type: http timeout: 2s input: method: GET uri: https://jsonplaceholder.typicode.com/users/${get_todo.body.userId} `) flow, err := r.ConstructFromYAML(data) if err != nil { fmt.Println(err) return } input := orchestrator.NewInput(map[string]any{"todoId": 1}) output, err := flow.Execute(context.Background(), input) if err != nil { fmt.Println(err) return } body := output["body"].(map[string]any) fmt.Println(body["name"]) }
Output: Leanne Graham
Index ¶
- Variables
- func ContextWithTrace(ctx context.Context, tr Trace) context.Context
- func Evaluate(v any, f func(string) (any, error)) (any, error)
- func MustRegister(factory *TaskFactory)
- func StarlarkCallFunc(s string, env map[string]any) (any, error)
- func StarlarkEvalExpr(s string, env map[string]any) (any, error)
- type Actor
- type ActorBehavior
- type Builder
- type Evaluator
- type Event
- type Expr
- type Initializer
- type Input
- type Iterator
- type IteratorSender
- type MyDict
- type Output
- type Registry
- type Result
- type Schema
- type Task
- type TaskFactory
- type TaskHeader
- type Trace
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( DefaultCodec = structool.New().TagName("json").DecodeHook( structool.DecodeStringToTime(time.RFC3339), structool.DecodeStringToDuration, structool.DecodeStringToError, ).EncodeHook( structool.EncodeTimeToString(time.RFC3339), structool.EncodeDurationToString, structool.EncodeErrorToString, ) )
var ErrStarlarkConversion = errors.New("failed to convert Starlark data type")
var GlobalRegistry = NewRegistry()
var Module = &starlarkstruct.Module{ Name: "json", Members: starlark.StringDict{ "encode": starlark.NewBuiltin("json.encode", encode), "decode": starlark.NewBuiltin("json.decode", decode), "indent": starlark.NewBuiltin("json.indent", indent), }, }
Module json is a Starlark module of JSON-related functions.
json = module( encode, decode, indent, )
def encode(x):
The encode function accepts one required positional argument, which it converts to JSON by cases:
- A Starlark value that implements Go's standard json.Marshal interface defines its own JSON encoding.
- None, True, and False are converted to null, true, and false, respectively.
- Starlark int values, no matter how large, are encoded as decimal integers. Some decoders may not be able to decode very large integers.
- Starlark float values are encoded using decimal point notation, even if the value is an integer. It is an error to encode a non-finite floating-point value.
- Starlark strings are encoded as JSON strings, using UTF-16 escapes.
- a Starlark IterableMapping (e.g. dict) is encoded as a JSON object. It is an error if any key is not a string.
- any other Starlark Iterable (e.g. list, tuple) is encoded as a JSON array.
- a Starlark HasAttrs (e.g. struct) is encoded as a JSON object.
It an application-defined type matches more than one the cases describe above, (e.g. it implements both Iterable and HasFields), the first case takes precedence. Encoding any other value yields an error.
def decode(x[, default]):
The decode function has one required positional parameter, a JSON string. It returns the Starlark value that the string denotes.
- Numbers are parsed as int or float, depending on whether they contain a decimal point.
- JSON objects are parsed as new unfrozen Starlark dicts.
- JSON arrays are parsed as new unfrozen Starlark lists.
If x is not a valid JSON string, the behavior depends on the "default" parameter: if present, Decode returns its value; otherwise, Decode fails.
def indent(str, *, prefix="", indent="\t"):
The indent function pretty-prints a valid JSON encoding, and returns a string containing the indented form. It accepts one required positional parameter, the JSON string, and two optional keyword-only string parameters, prefix and indent, that specify a prefix of each new line, and the unit of indentation.
Functions ¶
func Evaluate ¶
Evaluate will return a copy of v in which all expressions have been replaced by the return value of function f.
To achieve this, it traverses the value v and recursively evaluate every possible expression (of type string).
func MustRegister ¶
func MustRegister(factory *TaskFactory)
Types ¶
type Actor ¶
type Actor struct {
// contains filtered or unexported fields
}
Actor represents a long-running flow that is capable of interacting with the outside world through its inbox and outbox.
func (*Actor) MarshalJSON ¶
type ActorBehavior ¶
type ActorBehavior struct {
// contains filtered or unexported fields
}
ActorBehavior is a helper for sending and receiving data to/from the outside world on behalf of an actor (i.e. within the context of task execution).
func (*ActorBehavior) Receive ¶
func (ab *ActorBehavior) Receive() map[string]any
Receive receives data from the outside world through the inbox. If the internal context is done (cancelled or timed out), it will return nil immediately.
func (*ActorBehavior) Send ¶
func (ab *ActorBehavior) Send(output Output, err error)
Send sends data to the outside world through the outbox. If the internal context is done (cancelled or timed out), it will return immediately.
type Evaluator ¶
type Evaluator struct {
// contains filtered or unexported fields
}
func NewEvaluator ¶
func NewEvaluator() *Evaluator
func NewEvaluatorWithData ¶
TODO: Merge NewEvaluator and NewEvaluatorWithData.
type Event ¶
type Event struct { When time.Time `json:"when"` // Since the previous event in the trace. Elapsed time.Duration `json:"elapsed"` Name string `json:"name"` Output map[string]any `json:"output,omitempty"` Error error `json:"error,omitempty"` // Events hold the events of the child trace, if any. Events []Event `json:"events,omitempty"` }
Event is the individual component of a trace. It represents a single task that is being traced.
func TraceTask ¶
TraceTask traces the execution of the task with the given input, and reports the tracing result.
func (Event) MarshalJSON ¶
type Expr ¶
Expr represents an expression whose value is of type T.
func (*Expr[T]) DecodeMapStructure ¶
type Initializer ¶
type Iterator ¶
type Iterator struct {
// contains filtered or unexported fields
}
Iterator represents a iterable object that is capable of returning its values one at a time, permitting it to be iterated over in a for-loop.
func NewIterator ¶
func NewIterator(ctx context.Context, f func(sender *IteratorSender)) *Iterator
func (*Iterator) MarshalJSON ¶
type IteratorSender ¶
type IteratorSender struct {
// contains filtered or unexported fields
}
IteratorSender is a helper for sending data to an iterator.
func (*IteratorSender) End ¶
func (s *IteratorSender) End()
End ends the iteration by closing the internal channel.
type MyDict ¶
type Output ¶
func (Output) ClearTerminated ¶
func (o Output) ClearTerminated()
func (Output) IsTerminated ¶
func (Output) SetTerminated ¶
func (o Output) SetTerminated()
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
func NewRegistry ¶
func NewRegistry() *Registry
func (*Registry) ConstructFromJSON ¶
func (*Registry) ConstructFromYAML ¶
func (*Registry) MustRegister ¶
func (r *Registry) MustRegister(factory *TaskFactory)
MustRegister is like Register but panics if there is an error.
func (*Registry) Register ¶
func (r *Registry) Register(factory *TaskFactory) error
type Task ¶
type Task interface { // Header returns the header fields of the task. Header() TaskHeader // String returns a string representation of the task. String() string // Execute executes the task with the given input. Execute(context.Context, Input) (Output, error) }
func ConstructFromJSON ¶
func ConstructFromYAML ¶
type TaskFactory ¶
type TaskHeader ¶
type TaskHeader struct { Name string `json:"name"` Type string `json:"type"` Description string `json:"description"` //Schema Schema `json:"schema"` Timeout time.Duration `json:"timeout"` }
func (TaskHeader) Header ¶
func (h TaskHeader) Header() TaskHeader
type Trace ¶
type Trace interface { // New creates a child trace of the current trace. A child trace // provides tracing for the execution of a composite sub-task. New(name string) Trace // Wrap wraps a task to return a new task, which will automatically // add the execution result as an event to the trace. Wrap(task Task) Task // AddEvent adds an event to the trace. AddEvent(name string, output map[string]any, err error) // Events return the events stored in the trace. Events() []Event }
Trace provides tracing for the execution of a composite task.