compose

package
v0.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 23 Imported by: 33

Documentation

Index

Constants

View Source
const (
	ComponentOfUnknown     component = "Unknown"
	ComponentOfGraph       component = "Graph"
	ComponentOfChain       component = "Chain"
	ComponentOfPassthrough component = "Passthrough"
	ComponentOfToolsNode   component = "ToolsNode"
	ComponentOfLambda      component = "Lambda"
)

built-in component types in graph node. it represents the type of the most primitive executable object provided by the user.

View Source
const END = "end"

END is the end node of the graph. You can add your last edge with END.

View Source
const START = "start"

START is the start node of the graph. You can add your first edge with START.

Variables

View Source
var ErrChainCompiled = errors.New("chain has been compiled, cannot be modified")

ErrChainCompiled is returned when attempting to modify a chain after it has been compiled

View Source
var ErrExceedMaxSteps = errors.New("exceeds max steps")

ErrExceedMaxSteps graph will throw this error when the number of steps exceeds the maximum number of steps.

View Source
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")

ErrGraphCompiled is returned when attempting to modify a graph after it has been compiled

Functions

func GetState

func GetState[S any](ctx context.Context) (S, error)

GetState gets the state from the context. When using this method to read or write state in custom nodes, it may lead to data race because other nodes may concurrently access the state. You need to be aware of and resolve this situation, typically by adding a mutex. It's recommended to only READ the returned state. If you want to WRITE to state, consider using StatePreHandler / StatePostHandler because they are concurrency safe out of the box. eg.

lambdaFunc := func(ctx context.Context, in string, opts ...any) (string, error) {
	state, err := compose.GetState[*testState](ctx)
	if err != nil {
		return "", err
	}
	// do something with state
	return in, nil
}

stateGraph := compose.NewStateGraph[string, string, testState](genStateFunc)
stateGraph.AddNode("node1", lambdaFunc)

func InitGraphCompileCallbacks

func InitGraphCompileCallbacks(cbs []GraphCompileCallback)

InitGraphCompileCallbacks set global graph compile callbacks, which ONLY will be added to top level graph compile options

func RegisterStreamChunkConcatFunc

func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))

RegisterStreamChunkConcatFunc registers a function to concat stream chunks. It's required when you want to concat stream chunks of a specific type. for example you call Invoke() but node only implements Stream(). call at process init not thread safe nolint: byted_global_write_slicemap eg.

type testStruct struct {
	field1 string
	field2 int
}
compose.RegisterStreamChunkConcatFunc(func(items []testStruct) (testStruct, error) {
	return testStruct{
		field1: items[1].field1, // may implement inplace logic by your scenario
		field2: items[0].field2 + items[1].field2,
	}, nil
})

Types

type AnyGraph

type AnyGraph interface {
	// contains filtered or unexported methods
}

AnyGraph the identifiers for composable and compilable Graph[I, O]、Chain[I, O] in Eino.

type Chain

type Chain[I, O any] struct {
	// contains filtered or unexported fields
}

Chain is a chain of components. Chain nodes can be parallel / branch / sequence components. Chain is designed to be used in a builder pattern (should Compile() before use). And the interface is `Chain style`, you can use it like: `chain.AppendXX(...).AppendXX(...)`

Normal usage:

  1. create a chain with input/output type: `chain := NewChain[inputType, outputType]()`
  2. add components to chainable list: 2.1 add components: `chain.AppendChatTemplate(...).AppendChatModel(...).AppendToolsNode(...)` 2.2 add parallel or branch node if needed: `chain.AppendParallel()`, `chain.AppendBranch()`
  3. compile: `r, err := c.Compile()`
  4. run: 4.1 `one input & one output` use `r.Invoke(ctx, input)` 4.2 `one input & multi output chunk` use `r.Stream(ctx, input)` 4.3 `multi input chunk & one output` use `r.Collect(ctx, inputReader)` 4.4 `multi input chunk & multi output chunk` use `r.Transform(ctx, inputReader)`

Using in graph or other chain: chain1 := NewChain[inputType, outputType]() graph := NewGraph[](runTypePregel) graph.AddGraph("key", chain1) // chain is an AnyGraph implementation

// or in another chain: chain2 := NewChain[inputType, outputType]() chain2.AppendGraph(chain1)

func NewChain

func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O]

NewChain create a chain with input/output type.

func (*Chain[I, O]) AppendBranch

func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]

AppendBranch add a conditional branch to chain. Each branch within the ChainBranch can be an AnyGraph. All branches should either lead to END, or converge to another node within the Chain. e.g.

cb := compose.NewChainBranch(conditionFunc)
cb.AddChatTemplate("chat_template_key_01", chatTemplate)
cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
chain.AppendBranch(cb)

func (*Chain[I, O]) AppendChatModel

func (c *Chain[I, O]) AppendChatModel(node model.ChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendChatModel add a ChatModel node to the chain. e.g.

model, err := openai.NewChatModel(ctx, config)
if err != nil {...}
chain.AppendChatModel(model)

func (*Chain[I, O]) AppendChatTemplate

func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendChatTemplate add a ChatTemplate node to the chain. eg.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

chain.AppendChatTemplate(chatTemplate)

func (*Chain[I, O]) AppendDocumentTransformer

func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendDocumentTransformer add a DocumentTransformer node to the chain. e.g.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

chain.AppendDocumentTransformer(markdownSplitter)

func (*Chain[I, O]) AppendEmbedding

func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendEmbedding add a Embedding node to the chain. e.g.

embedder, err := openai.NewEmbedder(ctx, config)
if err != nil {...}
chain.AppendEmbedding(embedder)

func (*Chain[I, O]) AppendGraph

func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendGraph add a AnyGraph node to the chain. AnyGraph can be a chain or a graph. e.g.

graph := compose.NewGraph[string, string]()
chain.AppendGraph(graph)

func (*Chain[I, O]) AppendIndexer

func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendIndexer add an Indexer node to the chain. Indexer is a node that can store documents. e.g.

vectorStoreImpl, err := vikingdb.NewVectorStorer(ctx, vikingdbConfig) // in components/vectorstore/vikingdb/vectorstore.go
if err != nil {...}

config := vectorstore.IndexerConfig{VectorStore: vectorStoreImpl}
indexer, err := vectorstore.NewIndexer(ctx, config)
if err != nil {...}

chain.AppendIndexer(indexer)

func (*Chain[I, O]) AppendLambda

func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendLambda add a Lambda node to the chain. Lambda is a node that can be used to implement custom logic. e.g.

lambdaNode := compose.InvokableLambda(func(ctx context.Context, docs []*schema.Document) (string, error) {...})
chain.AppendLambda(lambdaNode)

Note: to create a Lambda node, you need to use `compose.AnyLambda` or `compose.InvokableLambda` or `compose.StreamableLambda` or `compose.TransformableLambda`. if you want this node has real stream output, you need to use `compose.StreamableLambda` or `compose.TransformableLambda`, for example.

func (*Chain[I, O]) AppendLoader

func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendLoader adds a Loader node to the chain. e.g.

loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{})
if err != nil {...}
chain.AppendLoader(loader)

func (*Chain[I, O]) AppendLoaderSplitter

func (c *Chain[I, O]) AppendLoaderSplitter(node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendLoaderSplitter add a LoaderSplitter node to the chain. Deprecated: use AppendLoader instead.

func (*Chain[I, O]) AppendParallel

func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O]

AppendParallel add a Parallel structure (multiple concurrent nodes) to the chain. e.g.

parallel := compose.NewParallel()
parallel.AddChatModel("openai", model1) // => "openai": *schema.Message{}
parallel.AddChatModel("maas", model2) // => "maas": *schema.Message{}

chain.AppendParallel(parallel) // => multiple concurrent nodes are added to the Chain

The next node in the chain is either an END, or a node which accepts a map[string]any, where keys are `openai` `maas` as specified above.

func (*Chain[I, O]) AppendPassthrough

func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]

AppendPassthrough add a Passthrough node to the chain. Could be used to connect multiple ChainBranch or Parallel. e.g.

chain.AppendPassthrough()

func (*Chain[I, O]) AppendRetriever

func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendRetriever add a Retriever node to the chain. e.g.

	retriever, err := vectorstore.NewRetriever(ctx, config)
	if err != nil {...}
	chain.AppendRetriever(retriever)

 or using fornax knowledge as retriever:

	config := fornaxknowledge.Config{...}
	retriever, err := fornaxknowledge.NewKnowledgeRetriever(ctx, config)
	if err != nil {...}
	chain.AppendRetriever(retriever)

func (*Chain[I, O]) AppendToolsNode

func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]

AppendToolsNode add a ToolsNode node to the chain. e.g.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
	Tools: []tools.Tool{...},
})

chain.AppendToolsNode(toolsNode)

func (*Chain[I, O]) Compile

func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

Compile to a Runnable. Runnable can be used directly. e.g.

	chain := NewChain[string, string]()
	r, err := chain.Compile()
	if err != nil {}

 	r.Invoke(ctx, input) // ping => pong
	r.Stream(ctx, input) // ping => stream out
	r.Collect(ctx, inputReader) // stream in => pong
	r.Transform(ctx, inputReader) // stream in => stream out

type ChainBranch

type ChainBranch struct {
	// contains filtered or unexported fields
}

ChainBranch represents a conditional branch in a chain of operations. It allows for dynamic routing of execution based on a condition. All branches within ChainBranch are expected to either end the Chain, or converge to another node in the Chain.

func NewChainBranch

func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch

NewChainBranch creates a new ChainBranch instance based on a given condition. It takes a generic type T and a GraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.

condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) {
	// logic to determine the next node
	return "some_next_node_key", nil
}

cb := NewChainBranch[string](condition)
cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch
cb.AddPassthrough("next_node_key_02", xxx) // node in branch

func NewStreamChainBranch

func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch

NewStreamChainBranch creates a new ChainBranch instance based on a given stream condition. It takes a generic type T and a StreamGraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.

condition := func(ctx context.Context, in *schema.StreamReader[string], opts ...any) (endNode string, err error) {
	// logic to determine the next node, you can read the stream and make a decision.
	// to save time, usually read the first chunk of stream, then make a decision which path to go.
	return "some_next_node_key", nil
}

cb := NewStreamChainBranch[string](condition)

func (*ChainBranch) AddChatModel

func (cb *ChainBranch) AddChatModel(key string, node model.ChatModel, opts ...GraphAddNodeOpt) *ChainBranch

AddChatModel adds a ChatModel node to the branch. eg.

chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})
chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o-mini",
})
cb.AddChatModel("chat_model_key_01", chatModel01)
cb.AddChatModel("chat_model_key_02", chatModel02)

func (*ChainBranch) AddChatTemplate

func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch

AddChatTemplate adds a ChatTemplate node to the branch. eg.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

cb.AddChatTemplate("chat_template_key_01", chatTemplate)

chatTemplate2, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}, you are not allowed to chat in other topics.",
})

cb.AddChatTemplate("chat_template_key_02", chatTemplate2)

func (*ChainBranch) AddDocumentTransformer

func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch

AddDocumentTransformer adds an Document Transformer node to the branch. eg.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

cb.AddDocumentTransformer("document_transformer_node_key", markdownSplitter)

func (*ChainBranch) AddEmbedding

func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch

AddEmbedding adds an Embedding node to the branch. eg.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

cb.AddEmbedding("embedding_node_key", embeddingNode)

func (*ChainBranch) AddGraph

func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch

AddGraph adds a generic Graph node to the branch. eg.

graph, err := compose.NewGraph[string, string]()

cb.AddGraph("graph_node_key", graph)

func (*ChainBranch) AddIndexer

func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch

AddIndexer adds an Indexer node to the branch. eg.

indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
	Collection: "my_collection",
})

cb.AddIndexer("indexer_node_key", indexer)

func (*ChainBranch) AddLambda

func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch

AddLambda adds a Lambda node to the branch. eg.

lambdaFunc := func(ctx context.Context, in string, opts ...any) (out string, err error) {
	// logic to process the input
	return "processed_output", nil
}

cb.AddLambda("lambda_node_key", compose.InvokeLambda(lambdaFunc))

func (*ChainBranch) AddLoader

func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch

AddLoader adds a Loader node to the branch. eg.

pdfParser, err := pdf.NewPDFParser()
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{
	Parser: pdfParser,
})

cb.AddLoader("loader_node_key", loader)

func (*ChainBranch) AddLoaderSplitter

func (cb *ChainBranch) AddLoaderSplitter(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *ChainBranch

AddLoaderSplitter adds a LoaderSplitter node to the branch. Deprecated: use AddLoader instead.

func (*ChainBranch) AddPassthrough

func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch

AddPassthrough adds a Passthrough node to the branch. eg.

cb.AddPassthrough("passthrough_node_key")

func (*ChainBranch) AddRetriever

func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch

AddRetriever adds a Retriever node to the branch. eg.

retriever, err := volc_vikingdb.NewRetriever(ctx, &volc_vikingdb.RetrieverConfig{
	Collection: "my_collection",
})

cb.AddRetriever("retriever_node_key", retriever)

func (*ChainBranch) AddToolsNode

func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch

AddToolsNode adds a ToolsNode to the branch. eg.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{
	Tools: []tools.Tool{...},
})

cb.AddToolsNode("tools_node_key", toolsNode)

type Collect

type Collect[I, O, TOption any] func(ctx context.Context,
	input *schema.StreamReader[I], opts ...TOption) (output O, err error)

Collect is the type of the collectable lambda function.

type CollectWOOpt

type CollectWOOpt[I, O any] func(ctx context.Context,
	input *schema.StreamReader[I]) (output O, err error)

CollectWOOpt is the type of the collectable lambda function without options.

type GenLocalState

type GenLocalState[S any] func(ctx context.Context) (state S)

GenLocalState is a function that generates the state.

type Graph

type Graph[I, O any] struct {
	// contains filtered or unexported fields
}

Graph is a generic graph that can be used to compose components. I: the input type of graph compiled product O: the output type of graph compiled product

func NewGraph

func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O]

NewGraph create a directed graph that can compose components, lambda, chain, parallel etc. simultaneously provide flexible and multi-granular aspect governance capabilities. I: the input type of graph compiled product O: the output type of graph compiled product

To share state between nodes, use WithGenLocalState option:

type testState struct {
	UserInfo *UserInfo
	KVs     map[string]any
}

genStateFunc := func(ctx context.Context) *testState {
	return &testState{}
}

graph := compose.NewGraph[string, string](WithGenLocalState(genStateFunc))

// you can use WithPreHandler and WithPostHandler to do something with state
graph.AddNode("node1", someNode, compose.WithPreHandler(func(ctx context.Context, in string, state *testState) (string, error) {
	// do something with state
	return in, nil
}), compose.WithPostHandler(func(ctx context.Context, out string, state *testState) (string, error) {
	// do something with state
	return out, nil
}))

func (Graph) AddBranch

func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)

AddBranch adds a branch to the graph. e.g.

condition := func(ctx context.Context, in string) (string, error) {
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)

graph.AddBranch("start_node_key", branch)

func (Graph) AddChatModelNode

func (g Graph) AddChatModelNode(key string, node model.ChatModel, opts ...GraphAddNodeOpt) error

AddChatModelNode add node that implements model.ChatModel. e.g.

chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

graph.AddChatModelNode("chat_model_node_key", chatModel)

func (Graph) AddChatTemplateNode

func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error

AddChatTemplateNode add node that implements prompt.ChatTemplate. e.g.

chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

graph.AddChatTemplateNode("chat_template_node_key", chatTemplate)

func (Graph) AddDocumentTransformerNode

func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error

AddDocumentTransformerNode adds a node that implements document.Transformer. e.g.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

graph.AddDocumentTransformerNode("document_transformer_node_key", markdownSplitter)

func (Graph) AddEdge

func (g Graph) AddEdge(startNode, endNode string) (err error)

AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode. the previous node's output type must be set to the next node's input type. NOTE: startNode and endNode must have been added to the graph before adding edge. e.g.

graph.AddNode("start_node_key", compose.NewPassthroughNode())
graph.AddNode("end_node_key", compose.NewPassthroughNode())

err := graph.AddEdge("start_node_key", "end_node_key")

func (Graph) AddEmbeddingNode

func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error

AddEmbeddingNode adds a node that implements embedding.Embedder. e.g.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

graph.AddEmbeddingNode("embedding_node_key", embeddingNode)

func (Graph) AddGraphNode

func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error

AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node. for Graph[I, O], comes from NewGraph[I, O]() for Chain[I, O], comes from NewChain[I, O]()

func (Graph) AddIndexerNode

func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error

AddIndexerNode adds a node that implements indexer.Indexer. e.g.

indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{})

graph.AddIndexerNode("indexer_node_key", indexer)

func (Graph) AddLambdaNode

func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error

AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O]. due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O]. for Invoke[I, O], use compose.InvokableLambda() for Stream[I, O], use compose.StreamableLambda() for Collect[I, O], use compose.CollectableLambda() for Transform[I, O], use compose.TransformableLambda() for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()

func (Graph) AddLoaderNode

func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error

AddLoaderNode adds a node that implements document.Loader. e.g.

loader, err := file.NewLoader(ctx, &file.LoaderConfig{})

graph.AddLoaderNode("loader_node_key", loader)

func (Graph) AddLoaderSplitterNode deprecated

func (g Graph) AddLoaderSplitterNode(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) error

Deprecated: use AddLoaderNode instead.

func (Graph) AddPassthroughNode

func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error

AddPassthroughNode adds a passthrough node to the graph. mostly used in pregel mode of graph. e.g.

graph.AddPassthroughNode("passthrough_node_key")

func (Graph) AddRetrieverNode

func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error

AddRetrieverNode adds a node that implements retriever.Retriever. e.g.

retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})

graph.AddRetrieverNode("retriever_node_key", retrieverNode)

func (Graph) AddToolsNode

func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error

AddToolsNode adds a node that implements tools.ToolsNode. e.g.

toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{})

graph.AddToolsNode("tools_node_key", toolsNode)

func (*Graph[I, O]) Compile

func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)

Compile take the raw graph and compile it into a form ready to be run. e.g.

graph, err := compose.NewGraph[string, string]()
if err != nil {...}

runnable, err := graph.Compile(ctx, compose.WithGraphName("my_graph"))
if err != nil {...}

runnable.Invoke(ctx, "input") // invoke
runnable.Stream(ctx, "input") // stream
runnable.Collect(ctx, inputReader) // collect
runnable.Transform(ctx, inputReader) // transform

func (Graph) GetType

func (g Graph) GetType() string

type GraphAddNodeOpt

type GraphAddNodeOpt func(o *graphAddNodeOpts)

GraphAddNodeOpt is a functional option type for adding a node to a graph. e.g.

graph.AddNode("node_name", node, compose.WithInputKey("input_key"), compose.WithOutputKey("output_key"))

func WithGraphCompileOptions

func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt

WithGraphCompileOptions when the node is an AnyGraph, use this option to set compile option for the node. e.g.

graph.AddNode("node_name", node, compose.WithGraphCompileOptions(compose.WithGraphName("my_sub_graph")))

func WithInputKey

func WithInputKey(k string) GraphAddNodeOpt

WithInputKey sets the input key of the node. this will change the input value of the node, for example, if the pre node's output is map[string]any{"key01": "value01"}, and the current node's input key is "key01", then the current node's input value will be "value01".

func WithNodeKey

func WithNodeKey(key string) GraphAddNodeOpt

WithNodeKey set the node key, which is used to identify the node in the chain. only for use in Chain/StateChain.

func WithNodeName

func WithNodeName(n string) GraphAddNodeOpt

WithNodeName sets the name of the node.

func WithOutputKey

func WithOutputKey(k string) GraphAddNodeOpt

WithOutputKey sets the output key of the node. this will change the output value of the node, for example, if the current node's output key is "key01", then the node's output value will be map[string]any{"key01": value}.

func WithStatePostHandler

func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt

WithStatePostHandler modify node's output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStatePreHandler

func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt

WithStatePreHandler modify node's input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStreamStatePostHandler

func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt

WithStreamStatePostHandler modify node's streaming output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when current node's output is an actual stream, and you want the downstream node's input to remain an actual stream after state post handler. caution: while StreamStatePostHandler is thread safe, modifying state within your own goroutine is NOT. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

func WithStreamStatePreHandler

func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt

WithStreamStatePreHandler modify node's streaming input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when upstream node's output is an actual stream, and you want the current node's input to remain an actual stream after state pre handler. caution: while StreamStatePreHandler is thread safe, modifying state within your own goroutine is NOT. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState

type GraphBranch

type GraphBranch struct {
	// contains filtered or unexported fields
}

GraphBranch is the branch type for the graph. It is used to determine the next node based on the condition.

func NewGraphBranch

func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch

NewGraphBranch creates a new graph branch. It is used to determine the next node based on the condition. e.g.

condition := func(ctx context.Context, in string) (string, error) {
	// logic to determine the next node
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewGraphBranch(condition, endNodes)

graph.AddBranch("key_of_node_before_branch", branch)

func NewStreamGraphBranch

func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T],
	endNodes map[string]bool) *GraphBranch

NewStreamGraphBranch creates a new stream graph branch. It is used to determine the next node based on the condition of stream input. e.g.

condition := func(ctx context.Context, in *schema.StreamReader[T]) (string, error) {
	// logic to determine the next node.
	// to use the feature of stream, you can use the first chunk to determine the next node.
	return "next_node_key", nil
}
endNodes := map[string]bool{"path01": true, "path02": true}
branch := compose.NewStreamGraphBranch(condition, endNodes)

graph.AddBranch("key_of_node_before_branch", branch)

func (*GraphBranch) GetEndNode

func (gb *GraphBranch) GetEndNode() map[string]bool

GetEndNode returns the all end nodes of the branch.

type GraphBranchCondition

type GraphBranchCondition[T any] func(ctx context.Context, in T) (endNode string, err error)

GraphBranchCondition is the condition type for the branch.

type GraphCompileCallback

type GraphCompileCallback interface {
	OnFinish(ctx context.Context, info *GraphInfo)
}

GraphCompileCallback is the callback which will be called when graph compilation finishes.

type GraphCompileOption

type GraphCompileOption func(*graphCompileOptions)

GraphCompileOption options for compiling AnyGraph.

func WithGraphCompileCallbacks

func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption

WithGraphCompileCallbacks sets callbacks for graph compilation.

func WithGraphName

func WithGraphName(graphName string) GraphCompileOption

func WithMaxRunSteps

func WithMaxRunSteps(maxSteps int) GraphCompileOption

func WithNodeTriggerMode

func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption

WithNodeTriggerMode sets node trigger mode for the graph. Different node trigger mode will affect graph execution order and result for specific graphs, such as those with parallel branches having different length of nodes.

type GraphInfo

type GraphInfo struct {
	CompileOptions        []GraphCompileOption
	Nodes                 map[string]GraphNodeInfo // node key -> node info
	Edges                 map[string][]string      // edge start node key -> edge end node key
	Branches              map[string][]GraphBranch // branch start node key -> branch
	InputType, OutputType reflect.Type
	Name                  string

	GenStateFn func(context.Context) any
}

GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.

type GraphNodeInfo

type GraphNodeInfo struct {
	Component             components.Component
	Instance              any
	GraphAddNodeOpts      []GraphAddNodeOpt
	InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type
	Name                  string
	InputKey, OutputKey   string
	GraphInfo             *GraphInfo
}

GraphNodeInfo the info which end users pass in when they are adding nodes to graph.

type Invoke

type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)

Invoke is the type of the invokable lambda function.

type InvokeWOOpt

type InvokeWOOpt[I, O any] func(ctx context.Context, input I) (output O, err error)

InvokeWOOpt is the type of the invokable lambda function without options.

type Lambda

type Lambda struct {
	// contains filtered or unexported fields
}

Lambda is the node that wraps the user provided lambda function. It can be used as a node in Graph or Chain (include Parallel and Branch). Create a Lambda by using AnyLambda/InvokableLambda/StreamableLambda/CollectableLambda/TransformableLambda. eg.

lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) {
	return input, nil
})

func AnyLambda

func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption],
	c Collect[I, O, TOption], t Transform[I, O, TOption], opts ...LambdaOpt) (*Lambda, error)

AnyLambda creates a Lambda with any lambda function. you can only implement one or more of the four lambda functions, and the rest use nil. eg.

invokeFunc := func(ctx context.Context, input string, opts ...myOption) (output string, err error) {
	// ...
}
streamFunc := func(ctx context.Context, input string, opts ...myOption) (output *schema.StreamReader[string], err error) {
	// ...
}

lambda := compose.AnyLambda(invokeFunc, streamFunc, nil, nil)

func CollectableLambda

func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda

CollectableLambda creates a Lambda with collectable lambda function without options.

func CollectableLambdaWithOption

func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda

CollectableLambdaWithOption creates a Lambda with collectable lambda function and options.

func InvokableLambda

func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda

InvokableLambda creates a Lambda with invokable lambda function without options.

func InvokableLambdaWithOption

func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda

InvokableLambdaWithOption creates a Lambda with invokable lambda function and options.

func MessageParser

func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda

MessageParser creates a lambda that parses a message into an object T, usually used after a chatmodel. usage:

parser := schema.NewMessageJSONParser[MyStruct](&schema.MessageJSONParseConfig{
	ParseFrom: schema.MessageParseFromContent,
})
parserLambda := MessageParser(parser)

chain := NewChain[*schema.Message, MyStruct]()
chain.AppendChatModel(chatModel)
chain.AppendLambda(parserLambda)

r, err := chain.Compile(context.Background())

// parsed is a MyStruct object
parsed, err := r.Invoke(context.Background(), &schema.Message{
	Role:    schema.MessageRoleUser,
	Content: "return a json string for my struct",
})

func StreamableLambda

func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda

StreamableLambda creates a Lambda with streamable lambda function without options.

func StreamableLambdaWithOption

func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda

StreamableLambdaWithOption creates a Lambda with streamable lambda function and options.

func ToList

func ToList[I any](opts ...LambdaOpt) *Lambda

ToList creates a Lambda that converts input I to a []I. It's useful when you want to convert a single input to a list of inputs. eg.

lambda := compose.ToList[*schema.Message]()
chain := compose.NewChain[[]*schema.Message, []*schema.Message]()

chain.AddChatModel(chatModel) // chatModel returns *schema.Message, but we need []*schema.Message
chain.AddLambda(lambda) // convert *schema.Message to []*schema.Message

func TransformableLambda

func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda

TransformableLambda creates a Lambda with transformable lambda function without options.

func TransformableLambdaWithOption

func TransformableLambdaWithOption[I, O, TOption any](t Transform[I, O, TOption], opts ...LambdaOpt) *Lambda

TransformableLambdaWithOption creates a Lambda with transformable lambda function and options.

type LambdaOpt

type LambdaOpt func(o *lambdaOpts)

LambdaOpt is the option for creating a Lambda.

func WithLambdaCallbackEnable

func WithLambdaCallbackEnable(y bool) LambdaOpt

WithLambdaCallbackEnable enables the callback aspect of the lambda function.

func WithLambdaType

func WithLambdaType(t string) LambdaOpt

WithLambdaType sets the type of the lambda function.

type NewGraphOption added in v0.3.1

type NewGraphOption func(ngo *newGraphOptions)

func WithGenLocalState added in v0.3.1

func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption

type NodePath added in v0.3.4

type NodePath struct {
	// contains filtered or unexported fields
}

func NewNodePath added in v0.3.4

func NewNodePath(path ...string) *NodePath

type NodeTriggerMode

type NodeTriggerMode string

NodeTriggerMode controls the triggering mode of graph nodes.

const (
	// AnyPredecessor means that the current node will be triggered as long as any of its predecessor nodes has finished running.
	// Note that actual implementation organizes node execution in batches.
	// In this context, 'any predecessor finishes' would means the other nodes of the same batch need to be finished too.
	AnyPredecessor NodeTriggerMode = "any_predecessor"
	// AllPredecessor means that the current node will only be triggered when all of its predecessor nodes have finished running.
	AllPredecessor NodeTriggerMode = "all_predecessor"
)

type Option

type Option struct {
	// contains filtered or unexported fields
}

Option is a functional option type for calling a graph.

func WithCallbacks

func WithCallbacks(cbs ...callbacks.Handler) Option

WithCallbacks set callback handlers for all components in a single call. e.g.

runnable.Invoke(ctx, "input", compose.WithCallbacks(&myCallbacks{}))

func WithChatModelOption

func WithChatModelOption(opts ...model.Option) Option

WithChatModelOption is a functional option type for chat model component. e.g.

chatModelOption := compose.WithChatModelOption(model.WithTemperature(0.7))
runnable.Invoke(ctx, "input", chatModelOption)

func WithChatTemplateOption

func WithChatTemplateOption(opts ...prompt.Option) Option

WithChatTemplateOption is a functional option type for chat template component.

func WithDocumentTransformerOption

func WithDocumentTransformerOption(opts ...document.TransformerOption) Option

WithDocumentTransformerOption is a functional option type for document transformer component.

func WithEmbeddingOption

func WithEmbeddingOption(opts ...embedding.Option) Option

WithEmbeddingOption is a functional option type for embedding component. e.g.

embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
runnable.Invoke(ctx, "input", embeddingOption)

func WithIndexerOption

func WithIndexerOption(opts ...indexer.Option) Option

WithIndexerOption is a functional option type for indexer component. e.g.

indexerOption := compose.WithIndexerOption(indexer.WithSubIndexes([]string{"my_sub_index"}))
runnable.Invoke(ctx, "input", indexerOption)

func WithLambdaOption

func WithLambdaOption(opts ...any) Option

WithLambdaOption is a functional option type for lambda component.

func WithLoaderOption

func WithLoaderOption(opts ...document.LoaderOption) Option

WithLoaderOption is a functional option type for loader component. e.g.

loaderOption := compose.WithLoaderOption(document.WithCollection("my_collection"))
runnable.Invoke(ctx, "input", loaderOption)

func WithLoaderSplitterOption deprecated

func WithLoaderSplitterOption(opts ...document.LoaderSplitterOption) Option

Deprecated: use WithLoaderOption instead.

func WithRetrieverOption

func WithRetrieverOption(opts ...retriever.Option) Option

WithRetrieverOption is a functional option type for retriever component. e.g.

retrieverOption := compose.WithRetrieverOption(retriever.WithIndex("my_index"))
runnable.Invoke(ctx, "input", retrieverOption)

func WithRuntimeMaxSteps

func WithRuntimeMaxSteps(maxSteps int) Option

WithRuntimeMaxSteps sets the maximum number of steps for the graph runtime. e.g.

runnable.Invoke(ctx, "input", compose.WithRuntimeMaxSteps(20))

func WithToolsNodeOption

func WithToolsNodeOption(opts ...ToolsNodeOption) Option

WithToolsNodeOption is a functional option type for tools node component.

func (Option) DesignateNode

func (o Option) DesignateNode(key ...string) Option

DesignateNode set the key of the node which will the option be applied to. notice: only effective at the top graph. e.g.

embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small"))
runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))

func (Option) DesignateNodeWithPath added in v0.3.4

func (o Option) DesignateNodeWithPath(path ...*NodePath) Option

DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to. You can make the option take effect in the subgraph by specifying the key of the subgraph. e.g. DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})

type Parallel

type Parallel struct {
	// contains filtered or unexported fields
}

Parallel run multiple nodes in parallel

use `NewParallel()` to create a new parallel type Example:

parallel := NewParallel()
parallel.AddChatModel("output_key01", chat01)
parallel.AddChatModel("output_key01", chat02)

chain := NewChain[any,any]()
chain.AppendParallel(parallel)

func NewParallel

func NewParallel() *Parallel

NewParallel creates a new parallel type. it is useful when you want to run multiple nodes in parallel in a chain.

func (*Parallel) AddChatModel

func (p *Parallel) AddChatModel(outputKey string, node model.ChatModel, opts ...GraphAddNodeOpt) *Parallel

AddChatModel adds a chat model to the parallel. eg.

chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
	Model: "gpt-4o",
})

p.AddChatModel("output_key01", chatModel01)
p.AddChatModel("output_key02", chatModel02)

func (*Parallel) AddChatTemplate

func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel

AddChatTemplate adds a chat template to the parallel. eg.

chatTemplate01, err := prompt.FromMessages(schema.FString, &schema.Message{
	Role:    schema.System,
	Content: "You are acting as a {role}.",
})

p.AddChatTemplate("output_key01", chatTemplate01)

func (*Parallel) AddDocumentTransformer

func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel

AddDocumentTransformer adds an Document Transformer node to the parallel. eg.

markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{})

p.AddDocumentTransformer("output_key01", markdownSplitter)

func (*Parallel) AddEmbedding

func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel

AddEmbedding adds an embedding node to the parallel. eg.

embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{
	Model: "text-embedding-3-small",
})

p.AddEmbedding("output_key01", embeddingNode)

func (*Parallel) AddGraph

func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel

AddGraph adds a graph node to the parallel. It is useful when you want to use a graph or a chain as a node in the parallel. eg.

graph, err := compose.NewChain[any,any]()

p.AddGraph("output_key01", graph)

func (*Parallel) AddIndexer

func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel

AddIndexer adds an indexer node to the parallel. eg.

indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{
	Collection: "my_collection",
})

p.AddIndexer("output_key01", indexer)

func (*Parallel) AddLambda

func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel

AddLambda adds a lambda node to the parallel. eg.

lambdaFunc := func(ctx context.Context, input *schema.Message) ([]*schema.Message, error) {
	return []*schema.Message{input}, nil
}

p.AddLambda("output_key01", compose.InvokeLambda(lambdaFunc))

func (*Parallel) AddLoader

func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel

AddLoader adds a loader node to the parallel. eg.

loader, err := file.NewLoader(ctx, &file.LoaderConfig{})

p.AddLoader("output_key01", loader)

func (*Parallel) AddLoaderSplitter

func (p *Parallel) AddLoaderSplitter(outputKey string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Parallel

AddLoaderSplitter adds a loader splitter node to the parallel. Deprecated: use AddLoader instead.

func (*Parallel) AddPassthrough

func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel

AddPassthrough adds a passthrough node to the parallel. eg.

p.AddPassthrough("output_key01")

func (*Parallel) AddRetriever

func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel

AddRetriever adds a retriever node to the parallel. eg.

retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})

p.AddRetriever("output_key01", retriever)

func (*Parallel) AddToolsNode

func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel

AddToolsNode adds a tools node to the parallel. eg.

toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{
	Tools: []tool.BaseTool{...},
})

p.AddToolsNode("output_key01", toolsNode)

type Runnable

type Runnable[I, O any] interface {
	Invoke(ctx context.Context, input I, opts ...Option) (output O, err error)
	Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error)
	Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error)
	Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error)
}

Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, and can automatically connect components that only implement one or more methods. eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output.

type StatePostHandler

type StatePostHandler[O, S any] func(ctx context.Context, out O, state S) (O, error)

StatePostHandler is a function that is called after the node is executed. Notice: if user called Stream but with StatePostHandler, the StatePostHandler will read all stream chunks and merge them into a single object.

type StatePreHandler

type StatePreHandler[I, S any] func(ctx context.Context, in I, state S) (I, error)

StatePreHandler is a function that is called before the node is executed. Notice: if user called Stream but with StatePreHandler, the StatePreHandler will read all stream chunks and merge them into a single object.

type Stream

type Stream[I, O, TOption any] func(ctx context.Context,
	input I, opts ...TOption) (output *schema.StreamReader[O], err error)

Stream is the type of the streamable lambda function.

type StreamGraphBranchCondition

type StreamGraphBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNode string, err error)

StreamGraphBranchCondition is the condition type for the stream branch.

type StreamStatePostHandler

type StreamStatePostHandler[O, S any] func(ctx context.Context, out *schema.StreamReader[O], state S) (*schema.StreamReader[O], error)

StreamStatePostHandler is a function that is called after the node is executed with stream input and output.

type StreamStatePreHandler

type StreamStatePreHandler[I, S any] func(ctx context.Context, in *schema.StreamReader[I], state S) (*schema.StreamReader[I], error)

StreamStatePreHandler is a function that is called before the node is executed with stream input and output.

type StreamWOOpt

type StreamWOOpt[I, O any] func(ctx context.Context,
	input I) (output *schema.StreamReader[O], err error)

StreamWOOpt is the type of the streamable lambda function without options.

type ToolsNode

type ToolsNode struct {
	// contains filtered or unexported fields
}

ToolsNode a node that can run tools in a graph. the interface in Graph Node as below:

Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)

func NewToolNode

func NewToolNode(ctx context.Context, conf *ToolsNodeConfig) (*ToolsNode, error)

NewToolNode creates a new ToolsNode. eg.

conf := &ToolsNodeConfig{
	Tools: []tool.BaseTool{invokableTool1, streamableTool2},
}
toolsNode, err := NewToolNode(ctx, conf)

func (*ToolsNode) GetType

func (tn *ToolsNode) GetType() string

func (*ToolsNode) Invoke

func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message,
	opts ...ToolsNodeOption) ([]*schema.Message, error)

Invoke calls the tools and collects the results of invokable tools. it's parallel if there are multiple tool calls in the input message.

func (*ToolsNode) Stream

func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message,
	opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)

Stream calls the tools and collects the results of stream readers. it's parallel if there are multiple tool calls in the input message.

type ToolsNodeConfig

type ToolsNodeConfig struct {
	Tools []tool.BaseTool
}

ToolsNodeConfig is the config for ToolsNode. It requires a list of tools. Tools are BaseTool but must implement InvokableTool or StreamableTool.

type ToolsNodeOption

type ToolsNodeOption func(o *toolsNodeOptions)

ToolsNodeOption is the option func type for ToolsNode.

func WithToolOption

func WithToolOption(opts ...tool.Option) ToolsNodeOption

WithToolOption adds tool options to the ToolsNode.

type Transform

type Transform[I, O, TOption any] func(ctx context.Context,
	input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)

Transform is the type of the transformable lambda function.

type TransformWOOpts

type TransformWOOpts[I, O any] func(ctx context.Context,
	input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)

TransformWOOpts is the type of the transformable lambda function without options.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL