executor

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2025 License: MIT Imports: 40 Imported by: 0

Documentation

Overview

Package executor provides the core execution engine for AI agent operations, implementing a robust system for running commands with support for streaming, tool calls, and asynchronous operations through a Future/Promise pattern.

Design decisions:

  • Command pattern: Encapsulates execution parameters in RunCommand struct
  • Future/Promise: Async operations with type-safe result handling
  • Structured output: JSON Schema validation for responses
  • Context awareness: All operations respect context cancellation
  • Thread safety: Concurrent execution support with proper synchronization
  • Flexible unmarshaling: Support for different response types (JSON, string, gjson)

Key components:

  • Executor: Interface defining the core execution contract ├── Run: Executes agent commands with streaming support └── handleToolCalls: Manages tool invocations during execution

  • RunCommand: Configuration for execution ├── Agent: The AI agent to execute ├── Thread: Memory aggregator for context ├── Stream: Enable/disable streaming mode └── Hook: Event handler for execution lifecycle

  • Future/Promise pattern: ├── CompletableFuture: Combined interface for async operations ├── Promise: Write interface for results └── Future: Read interface for retrieving results

Example usage:

// Create and configure a run command
cmd, err := NewRunCommand(agent, thread, hook)
if err != nil {
    return err
}
cmd = cmd.WithStream(true).
    WithMaxTurns(5).
    WithContextVariables(vars)

// Create a future for the result
future := NewFuture(DefaultUnmarshal[MyResponse]())

// Execute the command
if err := executor.Run(ctx, cmd, future); err != nil {
    return err
}

// Get the result (blocks until complete)
result, err := future.Get()

The package is designed to be internal, providing the execution engine while keeping implementation details private. It handles:

  • Command execution lifecycle
  • Asynchronous operation management
  • Tool call coordination
  • Response validation and unmarshaling
  • Event distribution through hooks
  • Context and cancellation management

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultUnmarshal added in v0.1.0

func DefaultUnmarshal[T any]() func([]byte) (T, error)

func ToJSONSchema added in v0.1.0

func ToJSONSchema[T any]() *jsonschema.Schema

Types

type CompletableFuture added in v0.1.0

type CompletableFuture[T any] interface {
	Future[T]
	Promise
}

func NewFuture added in v0.1.0

func NewFuture[T any](unmarshal func([]byte) (T, error)) CompletableFuture[T]

type Executor

type Executor interface {
	Run(context.Context, RunCommand, Promise) error
	// contains filtered or unexported methods
}

type Future added in v0.1.0

type Future[T any] interface {
	Get() (T, error)
}

type Local

type Local struct{}

func NewLocal

func NewLocal() *Local

func (*Local) Run

func (l *Local) Run(ctx context.Context, command RunCommand, promise Promise) error

type Promise added in v0.1.0

type Promise interface {
	Complete(string)
	Error(error)
}

type RemoteAgent added in v0.1.3

type RemoteAgent struct {
	Name              string `json:"name"`
	Model             string `json:"model"`
	Instructions      string `json:"instructions"`
	ParallelToolCalls bool   `json:"parallelToolCalls"`
}

func (*RemoteAgent) RenderInstructions added in v0.1.3

func (a *RemoteAgent) RenderInstructions(cv types.ContextVars) (string, error)

RenderInstructions renders the agent's instructions with the provided context variables.

type RemoteRunCommand added in v0.1.3

type RemoteRunCommand struct {
	ID               uuid.UUID                  `json:"id"`
	Agent            RemoteAgent                `json:"agent"`
	StructuredOutput *provider.StructuredOutput `json:"structured_output,omitempty"`
	Stream           bool                       `json:"stream"`
	MaxTurns         int                        `json:"max_turns"`
	ContextVariables types.ContextVars          `json:"context_variables,omitempty"`
	Checkpoint       shorttermmemory.Checkpoint `json:"checkpoint"`
}

func RemoteRunCommandFromRunCommand added in v0.1.3

func RemoteRunCommandFromRunCommand(cmd RunCommand) RemoteRunCommand

type RemoteRunResult added in v0.1.3

type RemoteRunResult struct {
	ID               uuid.UUID                  `json:"id"`
	Checkpoint       shorttermmemory.Checkpoint `json:"checkpoint"`
	Result           string                     `json:"result"`
	Type             RemoteRunResultType        `json:"type"`
	ToolCalls        *messages.ToolCallMessage  `json:"tool_calls,omitempty"`
	ContextVariables types.ContextVars          `json:"context_variables,omitempty"`
}

type RemoteRunResultType added in v0.1.3

type RemoteRunResultType uint8
const (
	RemoteRunResultTypeIncomplete RemoteRunResultType = iota
	RemoteRunResultTypeCompletion
	RemoteRunResultTypeToolCall
)

type RunCommand

type RunCommand struct {
	Agent            api.Agent
	Thread           *shorttermmemory.Aggregator
	StructuredOutput *provider.StructuredOutput
	Stream           bool
	MaxTurns         int
	ContextVariables types.ContextVars
	Hook             events.Hook
	// contains filtered or unexported fields
}

func NewRunCommand

func NewRunCommand(agent api.Agent, thread *shorttermmemory.Aggregator, hook events.Hook) (RunCommand, error)

func (*RunCommand) ID

func (r *RunCommand) ID() uuid.UUID

func (*RunCommand) Validate added in v0.1.0

func (r *RunCommand) Validate() error

func (RunCommand) WithContextVariables

func (r RunCommand) WithContextVariables(contextVariables types.ContextVars) RunCommand

func (RunCommand) WithMaxTurns

func (r RunCommand) WithMaxTurns(maxTurns int) RunCommand

func (RunCommand) WithStream

func (r RunCommand) WithStream(stream bool) RunCommand

func (RunCommand) WithStructuredOutput added in v0.1.2

func (r RunCommand) WithStructuredOutput(output *provider.StructuredOutput) RunCommand

type Temporal

type Temporal struct {
	// contains filtered or unexported fields
}

func (*Temporal) CallTool added in v0.1.3

func (t *Temporal) CallTool(ctx context.Context, tc remoteToolCallParams) (remoteToolCallResult, error)

func (*Temporal) PublishError added in v0.1.3

func (t *Temporal) PublishError(ctx context.Context, params completionParams, errMsg string) error

PublishError is an activity that publishes error events

func (*Temporal) Run added in v0.1.3

func (t *Temporal) Run(ctx workflow.Context, cmd RemoteRunCommand) (string, error)

func (*Temporal) RunChildWorkflow added in v0.1.3

func (t *Temporal) RunChildWorkflow(ctx workflow.Context, cmd RemoteRunCommand) (string, error)

func (*Temporal) RunCompletion added in v0.1.3

func (t *Temporal) RunCompletion(ctx context.Context, cmd completionParams) (RemoteRunResult, error)

type TemporalProxy added in v0.1.3

type TemporalProxy struct {
	// contains filtered or unexported fields
}

func (*TemporalProxy) Run added in v0.1.3

func (t *TemporalProxy) Run(ctx context.Context, cmd RunCommand, promise Promise) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL