orchestrator

package module
v0.0.0-...-314114a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 6, 2024 License: MIT Imports: 25 Imported by: 4

README

Orchestrator

A Go library for service orchestration, inspired by Conductor.

Core Concepts

Task

Tasks are the fundamental building blocks of Orchestrator. They are similar to primitive types or statements in a programming language.

Typically, a task accepts an input and returns an output. Each parameter in the input can be a literal value or an expression.

Built-in tasks:

Flow

A flow is used to define a piece of logic, which usually consists of one or more tasks. Flows are similar to functions or routines in a programming language.

In Orchestrator, a flow is essentially a composite task (i.e. a Serial task). Therefore, just like a task, a flow accepts an input and returns an output. Furthermore, a flow can be embedded into another flow by leveraging a Call task, thus serving as a sub-flow.

Expression

Expressions are used to extract values out of the flow input and other tasks in the flow.

For example, a flow is supplied with an input by the client/caller when a new execution is triggered. The flow input is available via an expression of the form ${input.<var>} (see dialects). Likewise, the output of a previously executed task can also be extracted using an expression (e.g. ${<task>.<output_var>}) for use in the input of a subsequent task.

Dialects

Currently supported expression dialects:

  • Starlark

    In addition to Starlark's built-in functions, there are some more pre-declared functions:

    • getenv(key): Retrieve the value of the environment variable named by the key.
    • isiterator(v): Whether the given value v is an Orchestrator Iterator.
    • jsonencode(v): Encode the given value v to a JSON string (go.starlark.net/lib/json).
    • jsondecode(v): Decode the given JSON string to a value (go.starlark.net/lib/json).

    Examples:

    ${input.value}  // Value from the input.
    ${tool.status == 200}  // Whether the status code (from an HTTP task `tool`) is 200
    ${len(tool.body.entities)}  // Length of the response entities (from an HTTP task `tool`)
    ${[s*2 for s in input.scores]}  // List comprehension
    ${{k: v.upper() for k, v in input.properties.items()}}  // Dictionary comprehension
    
  • Expr

    (- expand -)

    Examples:

    #{input.value}  // Value from the input.
    #{tool.status == 200}  // Whether the status code (from an HTTP task `tool`) is 200
    #{len(tool.body.entities)}  // Length of the response entities (from an HTTP task `tool`)
    //#{[s*2 for s in input.scores]}  // UNSUPPORTED
    //#{{k: v.upper() for k, v in input.properties.items()}}  // UNSUPPORTED
    
  • JSONPath (spec and implementation)

    (- expand -)

    Examples:

    @{input.value}  // Value from the input.
    //@{tool.status == 200}  // UNSUPPORTED
    //@{len(tool.body.entities)}  // UNSUPPORTED
    //@{[s*2 for s in input.scores]}  // UNSUPPORTED
    //@{{k: v.upper() for k, v in input.properties.items()}}  // UNSUPPORTED
    

Flow Builders

Orchestrator provides three ways to build a flow.

Go
flow := builtin.NewSerial("get_todo_user").Timeout(3*time.Second).Tasks(
	builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get(
		"https://jsonplaceholder.typicode.com/todos/${input.todoId}",
	),
	builtin.NewHTTP("get_user").Timeout(2*time.Second).Get(
		"https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}",
	),
).Build()

See Example for the complete example.

YAML
name: get_todo_user
type: serial
timeout: 3s
input:
  tasks:
  - name: get_todo
    type: http
    timeout: 2s
    input:
      method: GET
      uri: https://jsonplaceholder.typicode.com/todos/${input.todoId}
  - name: get_user
    type: http
    timeout: 2s
    input:
      method: GET
      uri: https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}

See Example (Yaml) for the complete example.

JSON
{
  "name": "get_todo_user",
  "type": "serial",
  "timeout": "3s",
  "input": {
    "tasks": [
      {
        "name": "get_todo",
        "type": "http",
        "timeout": "2s",
        "input": {
          "method": "GET",
          "uri": "https://jsonplaceholder.typicode.com/todos/${input.todoId}"
        }
      },
      {
        "name": "get_user",
        "type": "http",
        "timeout": "2s",
        "input": {
          "method": "GET",
          "uri": "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}"
        }
      }
    ]
  }
}

See Example (Json) for the complete example.

Documentation

Checkout the Godoc.

License

MIT

Documentation

Overview

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/RussellLuo/orchestrator"
	"github.com/RussellLuo/orchestrator/builtin"
)

func main() {
	flow := builtin.NewSerial("get_todo_user").Timeout(3*time.Second).Tasks(
		builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/todos/${input.todoId}",
		),
		builtin.NewHTTP("get_user").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}",
		),
	).Build()

	input := orchestrator.NewInput(map[string]any{"todoId": 1})
	output, err := flow.Execute(context.Background(), input)
	if err != nil {
		fmt.Println(err)
		return
	}

	body := output["body"].(map[string]any)
	fmt.Println(body["name"])

}
Output:

Leanne Graham
Example (Actor)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/RussellLuo/orchestrator"
	"github.com/RussellLuo/orchestrator/builtin"
)

func main() {
	flow := builtin.NewSerial("get_todo_user").Async(true).Tasks(
		builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/todos/${input.todoId}",
		),
		builtin.NewFunc("echo_once").Func(func(ctx context.Context, input orchestrator.Input) (orchestrator.Output, error) {
			behavior, ok := input.Get("actor")["behavior"].(*orchestrator.ActorBehavior)
			if !ok {
				return nil, fmt.Errorf("task %q must be used within an asynchronous flow", "echo_once")
			}

			// Send the data, received from the actor's inbox, to the actor's outbox.
			data := behavior.Receive()
			behavior.Send(data, nil)

			return orchestrator.Output{}, nil
		}),
		builtin.NewHTTP("get_user").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}",
		),
	).Build()

	input := orchestrator.NewInput(map[string]any{"todoId": 1})
	output, err := flow.Execute(context.Background(), input)
	if err != nil {
		fmt.Println(err)
		return
	}

	actor, ok := output.Actor()
	if !ok {
		fmt.Println("bad actor")
		return
	}

	// Perform a ping-pong action midway.
	actor.Inbox() <- map[string]any{"data": "Hello"}
	result := <-actor.Outbox()
	fmt.Println(result.Output["data"]) // Ignore error handling for simplicity.

	// Finally, get the flow result.
	result = <-actor.Outbox()
	if result.Err != nil {
		fmt.Println(result.Err)
		return
	}

	body := result.Output["body"].(map[string]any)
	fmt.Println(body["name"])

}
Output:

Hello
Leanne Graham
Example (Json)
package main

import (
	"context"
	"fmt"

	"github.com/RussellLuo/orchestrator"
	"github.com/RussellLuo/orchestrator/builtin"
)

func main() {
	r := orchestrator.NewRegistry()
	builtin.MustRegisterSerial(r)
	builtin.MustRegisterHTTP(r)

	data := []byte(`{
  "name": "get_todo_user",
  "type": "serial",
  "timeout": "3s",
  "input": {
    "tasks": [
      {
        "name": "get_todo",
        "type": "http",
        "timeout": "2s",
        "input": {
          "method": "GET",
          "uri": "https://jsonplaceholder.typicode.com/todos/${input.todoId}"
        }
      },
      {
        "name": "get_user",
        "type": "http",
        "timeout": "2s",
        "input": {
          "method": "GET",
          "uri": "https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}"
        }
      }
    ]
  }
}`)

	flow, err := r.ConstructFromJSON(data)
	if err != nil {
		fmt.Println(err)
		return
	}

	input := orchestrator.NewInput(map[string]any{"todoId": 1})
	output, err := flow.Execute(context.Background(), input)
	if err != nil {
		fmt.Println(err)
		return
	}

	body := output["body"].(map[string]any)
	fmt.Println(body["name"])

}
Output:

Leanne Graham
Example (Trace)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/RussellLuo/orchestrator"
	"github.com/RussellLuo/orchestrator/builtin"
)

func main() {
	flow := builtin.NewSerial("get_todo_user").Timeout(3*time.Second).Tasks(
		builtin.NewHTTP("get_todo").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/todos/${input.todoId}",
		),
		builtin.NewHTTP("get_user").Timeout(2*time.Second).Get(
			"https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}",
		),
	).Build()

	input := orchestrator.NewInput(map[string]any{"todoId": 1})
	event := orchestrator.TraceTask(context.Background(), flow, input)

	// Note that for the stability of the test, we just show the output.
	// You may be interested in other properties of the tracing event.
	body := event.Output["body"].(map[string]any)
	fmt.Println(body["name"])

}
Output:

Leanne Graham
Example (Yaml)
package main

import (
	"context"
	"fmt"

	"github.com/RussellLuo/orchestrator"
	"github.com/RussellLuo/orchestrator/builtin"
)

func main() {
	r := orchestrator.NewRegistry()
	builtin.MustRegisterSerial(r)
	builtin.MustRegisterHTTP(r)

	data := []byte(`
name: get_todo_user
type: serial
timeout: 3s
input:
  tasks:
  - name: get_todo
    type: http
    timeout: 2s
    input:
      method: GET
      uri: https://jsonplaceholder.typicode.com/todos/${input.todoId}
  - name: get_user
    type: http
    timeout: 2s
    input:
      method: GET
      uri: https://jsonplaceholder.typicode.com/users/${get_todo.body.userId}
`)

	flow, err := r.ConstructFromYAML(data)
	if err != nil {
		fmt.Println(err)
		return
	}

	input := orchestrator.NewInput(map[string]any{"todoId": 1})
	output, err := flow.Execute(context.Background(), input)
	if err != nil {
		fmt.Println(err)
		return
	}

	body := output["body"].(map[string]any)
	fmt.Println(body["name"])

}
Output:

Leanne Graham

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrStarlarkConversion = errors.New("failed to convert Starlark data type")
View Source
var GlobalRegistry = NewRegistry()
View Source
var Module = &starlarkstruct.Module{
	Name: "json",
	Members: starlark.StringDict{
		"encode": starlark.NewBuiltin("json.encode", encode),
		"decode": starlark.NewBuiltin("json.decode", decode),
		"indent": starlark.NewBuiltin("json.indent", indent),
	},
}

Module json is a Starlark module of JSON-related functions.

json = module(
   encode,
   decode,
   indent,
)

def encode(x):

The encode function accepts one required positional argument, which it converts to JSON by cases:

  • A Starlark value that implements Go's standard json.Marshal interface defines its own JSON encoding.
  • None, True, and False are converted to null, true, and false, respectively.
  • Starlark int values, no matter how large, are encoded as decimal integers. Some decoders may not be able to decode very large integers.
  • Starlark float values are encoded using decimal point notation, even if the value is an integer. It is an error to encode a non-finite floating-point value.
  • Starlark strings are encoded as JSON strings, using UTF-16 escapes.
  • a Starlark IterableMapping (e.g. dict) is encoded as a JSON object. It is an error if any key is not a string.
  • any other Starlark Iterable (e.g. list, tuple) is encoded as a JSON array.
  • a Starlark HasAttrs (e.g. struct) is encoded as a JSON object.

It an application-defined type matches more than one the cases describe above, (e.g. it implements both Iterable and HasFields), the first case takes precedence. Encoding any other value yields an error.

def decode(x[, default]):

The decode function has one required positional parameter, a JSON string. It returns the Starlark value that the string denotes.

  • Numbers are parsed as int or float, depending on whether they contain a decimal point.
  • JSON objects are parsed as new unfrozen Starlark dicts.
  • JSON arrays are parsed as new unfrozen Starlark lists.

If x is not a valid JSON string, the behavior depends on the "default" parameter: if present, Decode returns its value; otherwise, Decode fails.

def indent(str, *, prefix="", indent="\t"):

The indent function pretty-prints a valid JSON encoding, and returns a string containing the indented form. It accepts one required positional parameter, the JSON string, and two optional keyword-only string parameters, prefix and indent, that specify a prefix of each new line, and the unit of indentation.

Functions

func ContextWithTrace

func ContextWithTrace(ctx context.Context, tr Trace) context.Context

func Evaluate

func Evaluate(v any, f func(string) (any, error)) (any, error)

Evaluate will return a copy of v in which all expressions have been replaced by the return value of function f.

To achieve this, it traverses the value v and recursively evaluate every possible expression (of type string).

func MustRegister

func MustRegister(factory *TaskFactory)

func StarlarkCallFunc

func StarlarkCallFunc(s string, env map[string]any) (any, error)

func StarlarkEvalExpr

func StarlarkEvalExpr(s string, env map[string]any) (any, error)

Types

type Actor

type Actor struct {
	// contains filtered or unexported fields
}

Actor represents a long-running flow that is capable of interacting with the outside world through its inbox and outbox.

func NewActor

func NewActor(f func(ctx context.Context, ab *ActorBehavior)) *Actor

func (*Actor) Inbox

func (a *Actor) Inbox() chan<- map[string]any

func (*Actor) MarshalJSON

func (a *Actor) MarshalJSON() ([]byte, error)

func (*Actor) Outbox

func (a *Actor) Outbox() <-chan Result

func (*Actor) Stop

func (a *Actor) Stop()

func (*Actor) String

func (a *Actor) String() string

type ActorBehavior

type ActorBehavior struct {
	// contains filtered or unexported fields
}

ActorBehavior is a helper for sending and receiving data to/from the outside world on behalf of an actor (i.e. within the context of task execution).

func (*ActorBehavior) Receive

func (ab *ActorBehavior) Receive() map[string]any

Receive receives data from the outside world through the inbox. If the internal context is done (cancelled or timed out), it will return nil immediately.

func (*ActorBehavior) Send

func (ab *ActorBehavior) Send(output Output, err error)

Send sends data to the outside world through the outbox. If the internal context is done (cancelled or timed out), it will return immediately.

type Builder

type Builder interface {
	Build() Task
}

type Evaluator

type Evaluator struct {
	// contains filtered or unexported fields
}

func NewEvaluator

func NewEvaluator() *Evaluator

func NewEvaluatorWithData

func NewEvaluatorWithData(data map[string]any) *Evaluator

TODO: Merge NewEvaluator and NewEvaluatorWithData.

func (*Evaluator) Add

func (e *Evaluator) Add(taskName string, value map[string]any)

func (*Evaluator) Env

func (e *Evaluator) Env() map[string]any

func (*Evaluator) Evaluate

func (e *Evaluator) Evaluate(s string) (any, error)

Evaluate evaluates the expression s.

func (*Evaluator) Get

func (e *Evaluator) Get(taskName string) map[string]any

type Event

type Event struct {
	When time.Time `json:"when"`
	// Since the previous event in the trace.
	Elapsed time.Duration `json:"elapsed"`

	Name   string         `json:"name"`
	Output map[string]any `json:"output,omitempty"`
	Error  error          `json:"error,omitempty"`

	// Events hold the events of the child trace, if any.
	Events []Event `json:"events,omitempty"`
}

Event is the individual component of a trace. It represents a single task that is being traced.

func TraceTask

func TraceTask(ctx context.Context, task Task, input Input) Event

TraceTask traces the execution of the task with the given input, and reports the tracing result.

func (Event) Map

func (e Event) Map() (map[string]any, error)

Map converts an event to a map.

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

type Expr

type Expr[T any] struct {
	Expr  any
	Value T
}

Expr represents an expression whose value is of type T.

func (*Expr[T]) DecodeMapStructure

func (e *Expr[T]) DecodeMapStructure(value any) error

func (*Expr[T]) Evaluate

func (e *Expr[T]) Evaluate(input Input) error

func (*Expr[T]) EvaluateX

func (e *Expr[T]) EvaluateX(input Input) (T, error)

EvaluateX evaluates the internal expression based on the given input environment.

type Initializer

type Initializer interface {
	// Init initializes a task with the given registry r.
	// It will return an error if it fails.
	Init(r *Registry) error
}

type Input

type Input struct {
	*Evaluator
}

func NewInput

func NewInput(input map[string]any) Input

type Iterator

type Iterator struct {
	// contains filtered or unexported fields
}

Iterator represents a iterable object that is capable of returning its values one at a time, permitting it to be iterated over in a for-loop.

func NewIterator

func NewIterator(ctx context.Context, f func(sender *IteratorSender)) *Iterator

func (*Iterator) Break

func (i *Iterator) Break()

func (*Iterator) Equal

func (i *Iterator) Equal(o *Iterator) bool

Equal implements the custom equality method of github.com/google/go-cmp.

func (*Iterator) MarshalJSON

func (i *Iterator) MarshalJSON() ([]byte, error)

func (*Iterator) Next

func (i *Iterator) Next() <-chan Result

func (*Iterator) String

func (i *Iterator) String() string

type IteratorSender

type IteratorSender struct {
	// contains filtered or unexported fields
}

IteratorSender is a helper for sending data to an iterator.

func (*IteratorSender) End

func (s *IteratorSender) End()

End ends the iteration by closing the internal channel.

func (*IteratorSender) Send

func (s *IteratorSender) Send(output Output, err error) (continue_ bool)

Send sends data to the internal channel. If the internal context is done (cancelled or timed out), it will mark the continue flag (whether to continue sending) as false.

type MyDict

type MyDict struct {
	*starlark.Dict
}

func NewMyDict

func NewMyDict(size int) *MyDict

func (*MyDict) Attr

func (md *MyDict) Attr(name string) (starlark.Value, error)

Attr make MyDict keys can be read by a dot expression (y = x.f).

func (*MyDict) SetField

func (md *MyDict) SetField(name string, val starlark.Value) error

SetField make MyDict keys can be written by a dot expression (x.f = y).

type Output

type Output map[string]any

func (Output) Actor

func (o Output) Actor() (actor *Actor, ok bool)

func (Output) ClearTerminated

func (o Output) ClearTerminated()

func (Output) IsTerminated

func (o Output) IsTerminated() bool

func (Output) Iterator

func (o Output) Iterator() (iterator *Iterator, ok bool)

func (Output) SetTerminated

func (o Output) SetTerminated()

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

func NewRegistry

func NewRegistry() *Registry

func (*Registry) Construct

func (r *Registry) Construct(m map[string]any) (Task, error)

func (*Registry) ConstructFromJSON

func (r *Registry) ConstructFromJSON(data []byte) (Task, error)

func (*Registry) ConstructFromYAML

func (r *Registry) ConstructFromYAML(data []byte) (Task, error)

func (*Registry) MustRegister

func (r *Registry) MustRegister(factory *TaskFactory)

MustRegister is like Register but panics if there is an error.

func (*Registry) Register

func (r *Registry) Register(factory *TaskFactory) error

type Result

type Result struct {
	Name   string
	Output Output
	Err    error
}

type Schema

type Schema struct {
	Input  map[string]any `json:"input"`
	Output map[string]any `json:"output"`
}

func (Schema) Validate

func (s Schema) Validate(input map[string]any) error

type Task

type Task interface {
	// Header returns the header fields of the task.
	Header() TaskHeader

	// String returns a string representation of the task.
	String() string

	// Execute executes the task with the given input.
	Execute(context.Context, Input) (Output, error)
}

func Construct

func Construct(m map[string]any) (Task, error)

func ConstructFromJSON

func ConstructFromJSON(data []byte) (Task, error)

func ConstructFromYAML

func ConstructFromYAML(data []byte) (Task, error)

type TaskFactory

type TaskFactory struct {
	Type string
	New  func() Task
}

type TaskHeader

type TaskHeader struct {
	Name        string `json:"name"`
	Type        string `json:"type"`
	Description string `json:"description"`
	//Schema        Schema        `json:"schema"`
	Timeout time.Duration `json:"timeout"`
}

func (TaskHeader) Header

func (h TaskHeader) Header() TaskHeader

type Trace

type Trace interface {
	// New creates a child trace of the current trace. A child trace
	// provides tracing for the execution of a composite sub-task.
	New(name string) Trace

	// Wrap wraps a task to return a new task, which will automatically
	// add the execution result as an event to the trace.
	Wrap(task Task) Task

	// AddEvent adds an event to the trace.
	AddEvent(name string, output map[string]any, err error)

	// Events return the events stored in the trace.
	Events() []Event
}

Trace provides tracing for the execution of a composite task.

func NewTrace

func NewTrace(name string) Trace

func TraceFromContext

func TraceFromContext(ctx context.Context) Trace

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL