Documentation
¶
Index ¶
- Constants
- Variables
- func GetState[S any](ctx context.Context) (S, error)
- func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
- func RegisterStreamChunkConcatFunc[T any](fn func([]T) (T, error))
- type AnyGraph
- type Chain
- func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatModel(node model.ChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendLoaderSplitter(node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendParallel(p *Parallel) *Chain[I, O]
- func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
- func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- type ChainBranch
- func (cb *ChainBranch) AddChatModel(key string, node model.ChatModel, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddLoaderSplitter(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
- func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
- type Collect
- type CollectWOOpt
- type GenLocalState
- type Graph
- func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
- func (g Graph) AddChatModelNode(key string, node model.ChatModel, opts ...GraphAddNodeOpt) error
- func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
- func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
- func (g Graph) AddEdge(startNode, endNode string) (err error)
- func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
- func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
- func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
- func (g Graph) AddLoaderSplitterNode(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) errordeprecated
- func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
- func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
- func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
- func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
- func (g Graph) GetType() string
- type GraphAddNodeOpt
- func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
- func WithInputKey(k string) GraphAddNodeOpt
- func WithNodeKey(key string) GraphAddNodeOpt
- func WithNodeName(n string) GraphAddNodeOpt
- func WithOutputKey(k string) GraphAddNodeOpt
- func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
- func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
- func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
- func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
- type GraphBranch
- type GraphBranchCondition
- type GraphCompileCallback
- type GraphCompileOption
- type GraphInfo
- type GraphNodeInfo
- type Invoke
- type InvokeWOOpt
- type Lambda
- func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], ...) (*Lambda, error)
- func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
- func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
- func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
- func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
- func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
- func ToList[I any](opts ...LambdaOpt) *Lambda
- func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
- func TransformableLambdaWithOption[I, O, TOption any](t Transform[I, O, TOption], opts ...LambdaOpt) *Lambda
- type LambdaOpt
- type NewGraphOption
- type NodePath
- type NodeTriggerMode
- type Option
- func WithCallbacks(cbs ...callbacks.Handler) Option
- func WithChatModelOption(opts ...model.Option) Option
- func WithChatTemplateOption(opts ...prompt.Option) Option
- func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
- func WithEmbeddingOption(opts ...embedding.Option) Option
- func WithIndexerOption(opts ...indexer.Option) Option
- func WithLambdaOption(opts ...any) Option
- func WithLoaderOption(opts ...document.LoaderOption) Option
- func WithLoaderSplitterOption(opts ...document.LoaderSplitterOption) Optiondeprecated
- func WithRetrieverOption(opts ...retriever.Option) Option
- func WithRuntimeMaxSteps(maxSteps int) Option
- func WithToolsNodeOption(opts ...ToolsNodeOption) Option
- type Parallel
- func (p *Parallel) AddChatModel(outputKey string, node model.ChatModel, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddLoaderSplitter(outputKey string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
- func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
- type Runnable
- type StatePostHandler
- type StatePreHandler
- type Stream
- type StreamGraphBranchCondition
- type StreamStatePostHandler
- type StreamStatePreHandler
- type StreamWOOpt
- type ToolsNode
- func (tn *ToolsNode) GetType() string
- func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
- func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
- type ToolsNodeConfig
- type ToolsNodeOption
- type Transform
- type TransformWOOpts
Constants ¶
const ( ComponentOfUnknown component = "Unknown" ComponentOfGraph component = "Graph" ComponentOfChain component = "Chain" ComponentOfPassthrough component = "Passthrough" ComponentOfToolsNode component = "ToolsNode" ComponentOfLambda component = "Lambda" )
built-in component types in graph node. it represents the type of the most primitive executable object provided by the user.
const END = "end"
END is the end node of the graph. You can add your last edge with END.
const START = "start"
START is the start node of the graph. You can add your first edge with START.
Variables ¶
var ErrChainCompiled = errors.New("chain has been compiled, cannot be modified")
ErrChainCompiled is returned when attempting to modify a chain after it has been compiled
var ErrExceedMaxSteps = errors.New("exceeds max steps")
ErrExceedMaxSteps graph will throw this error when the number of steps exceeds the maximum number of steps.
var ErrGraphCompiled = errors.New("graph has been compiled, cannot be modified")
ErrGraphCompiled is returned when attempting to modify a graph after it has been compiled
Functions ¶
func GetState ¶
GetState gets the state from the context. When using this method to read or write state in custom nodes, it may lead to data race because other nodes may concurrently access the state. You need to be aware of and resolve this situation, typically by adding a mutex. It's recommended to only READ the returned state. If you want to WRITE to state, consider using StatePreHandler / StatePostHandler because they are concurrency safe out of the box. eg.
lambdaFunc := func(ctx context.Context, in string, opts ...any) (string, error) { state, err := compose.GetState[*testState](ctx) if err != nil { return "", err } // do something with state return in, nil } stateGraph := compose.NewStateGraph[string, string, testState](genStateFunc) stateGraph.AddNode("node1", lambdaFunc)
func InitGraphCompileCallbacks ¶
func InitGraphCompileCallbacks(cbs []GraphCompileCallback)
InitGraphCompileCallbacks set global graph compile callbacks, which ONLY will be added to top level graph compile options
func RegisterStreamChunkConcatFunc ¶
RegisterStreamChunkConcatFunc registers a function to concat stream chunks. It's required when you want to concat stream chunks of a specific type. for example you call Invoke() but node only implements Stream(). call at process init not thread safe nolint: byted_global_write_slicemap eg.
type testStruct struct { field1 string field2 int } compose.RegisterStreamChunkConcatFunc(func(items []testStruct) (testStruct, error) { return testStruct{ field1: items[1].field1, // may implement inplace logic by your scenario field2: items[0].field2 + items[1].field2, }, nil })
Types ¶
type AnyGraph ¶
type AnyGraph interface {
// contains filtered or unexported methods
}
AnyGraph the identifiers for composable and compilable Graph[I, O]、Chain[I, O] in Eino.
type Chain ¶
type Chain[I, O any] struct { // contains filtered or unexported fields }
Chain is a chain of components. Chain nodes can be parallel / branch / sequence components. Chain is designed to be used in a builder pattern (should Compile() before use). And the interface is `Chain style`, you can use it like: `chain.AppendXX(...).AppendXX(...)`
Normal usage:
- create a chain with input/output type: `chain := NewChain[inputType, outputType]()`
- add components to chainable list: 2.1 add components: `chain.AppendChatTemplate(...).AppendChatModel(...).AppendToolsNode(...)` 2.2 add parallel or branch node if needed: `chain.AppendParallel()`, `chain.AppendBranch()`
- compile: `r, err := c.Compile()`
- run: 4.1 `one input & one output` use `r.Invoke(ctx, input)` 4.2 `one input & multi output chunk` use `r.Stream(ctx, input)` 4.3 `multi input chunk & one output` use `r.Collect(ctx, inputReader)` 4.4 `multi input chunk & multi output chunk` use `r.Transform(ctx, inputReader)`
Using in graph or other chain: chain1 := NewChain[inputType, outputType]() graph := NewGraph[](runTypePregel) graph.AddGraph("key", chain1) // chain is an AnyGraph implementation
// or in another chain: chain2 := NewChain[inputType, outputType]() chain2.AppendGraph(chain1)
func NewChain ¶
func NewChain[I, O any](opts ...NewGraphOption) *Chain[I, O]
NewChain create a chain with input/output type.
func (*Chain[I, O]) AppendBranch ¶
func (c *Chain[I, O]) AppendBranch(b *ChainBranch) *Chain[I, O]
AppendBranch add a conditional branch to chain. Each branch within the ChainBranch can be an AnyGraph. All branches should either lead to END, or converge to another node within the Chain. e.g.
cb := compose.NewChainBranch(conditionFunc) cb.AddChatTemplate("chat_template_key_01", chatTemplate) cb.AddChatTemplate("chat_template_key_02", chatTemplate2) chain.AppendBranch(cb)
func (*Chain[I, O]) AppendChatModel ¶
func (c *Chain[I, O]) AppendChatModel(node model.ChatModel, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatModel add a ChatModel node to the chain. e.g.
model, err := openai.NewChatModel(ctx, config) if err != nil {...} chain.AppendChatModel(model)
func (*Chain[I, O]) AppendChatTemplate ¶
func (c *Chain[I, O]) AppendChatTemplate(node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendChatTemplate add a ChatTemplate node to the chain. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) chain.AppendChatTemplate(chatTemplate)
func (*Chain[I, O]) AppendDocumentTransformer ¶
func (c *Chain[I, O]) AppendDocumentTransformer(node document.Transformer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendDocumentTransformer add a DocumentTransformer node to the chain. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) chain.AppendDocumentTransformer(markdownSplitter)
func (*Chain[I, O]) AppendEmbedding ¶
func (c *Chain[I, O]) AppendEmbedding(node embedding.Embedder, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendEmbedding add a Embedding node to the chain. e.g.
embedder, err := openai.NewEmbedder(ctx, config) if err != nil {...} chain.AppendEmbedding(embedder)
func (*Chain[I, O]) AppendGraph ¶
func (c *Chain[I, O]) AppendGraph(node AnyGraph, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendGraph add a AnyGraph node to the chain. AnyGraph can be a chain or a graph. e.g.
graph := compose.NewGraph[string, string]() chain.AppendGraph(graph)
func (*Chain[I, O]) AppendIndexer ¶
func (c *Chain[I, O]) AppendIndexer(node indexer.Indexer, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendIndexer add an Indexer node to the chain. Indexer is a node that can store documents. e.g.
vectorStoreImpl, err := vikingdb.NewVectorStorer(ctx, vikingdbConfig) // in components/vectorstore/vikingdb/vectorstore.go if err != nil {...} config := vectorstore.IndexerConfig{VectorStore: vectorStoreImpl} indexer, err := vectorstore.NewIndexer(ctx, config) if err != nil {...} chain.AppendIndexer(indexer)
func (*Chain[I, O]) AppendLambda ¶
func (c *Chain[I, O]) AppendLambda(node *Lambda, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLambda add a Lambda node to the chain. Lambda is a node that can be used to implement custom logic. e.g.
lambdaNode := compose.InvokableLambda(func(ctx context.Context, docs []*schema.Document) (string, error) {...}) chain.AppendLambda(lambdaNode)
Note: to create a Lambda node, you need to use `compose.AnyLambda` or `compose.InvokableLambda` or `compose.StreamableLambda` or `compose.TransformableLambda`. if you want this node has real stream output, you need to use `compose.StreamableLambda` or `compose.TransformableLambda`, for example.
func (*Chain[I, O]) AppendLoader ¶
func (c *Chain[I, O]) AppendLoader(node document.Loader, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLoader adds a Loader node to the chain. e.g.
loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{}) if err != nil {...} chain.AppendLoader(loader)
func (*Chain[I, O]) AppendLoaderSplitter ¶
func (c *Chain[I, O]) AppendLoaderSplitter(node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendLoaderSplitter add a LoaderSplitter node to the chain. Deprecated: use AppendLoader instead.
func (*Chain[I, O]) AppendParallel ¶
AppendParallel add a Parallel structure (multiple concurrent nodes) to the chain. e.g.
parallel := compose.NewParallel() parallel.AddChatModel("openai", model1) // => "openai": *schema.Message{} parallel.AddChatModel("maas", model2) // => "maas": *schema.Message{} chain.AppendParallel(parallel) // => multiple concurrent nodes are added to the Chain The next node in the chain is either an END, or a node which accepts a map[string]any, where keys are `openai` `maas` as specified above.
func (*Chain[I, O]) AppendPassthrough ¶
func (c *Chain[I, O]) AppendPassthrough(opts ...GraphAddNodeOpt) *Chain[I, O]
AppendPassthrough add a Passthrough node to the chain. Could be used to connect multiple ChainBranch or Parallel. e.g.
chain.AppendPassthrough()
func (*Chain[I, O]) AppendRetriever ¶
func (c *Chain[I, O]) AppendRetriever(node retriever.Retriever, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendRetriever add a Retriever node to the chain. e.g.
retriever, err := vectorstore.NewRetriever(ctx, config) if err != nil {...} chain.AppendRetriever(retriever) or using fornax knowledge as retriever: config := fornaxknowledge.Config{...} retriever, err := fornaxknowledge.NewKnowledgeRetriever(ctx, config) if err != nil {...} chain.AppendRetriever(retriever)
func (*Chain[I, O]) AppendToolsNode ¶
func (c *Chain[I, O]) AppendToolsNode(node *ToolsNode, opts ...GraphAddNodeOpt) *Chain[I, O]
AppendToolsNode add a ToolsNode node to the chain. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{ Tools: []tools.Tool{...}, }) chain.AppendToolsNode(toolsNode)
func (*Chain[I, O]) Compile ¶
func (c *Chain[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile to a Runnable. Runnable can be used directly. e.g.
chain := NewChain[string, string]() r, err := chain.Compile() if err != nil {} r.Invoke(ctx, input) // ping => pong r.Stream(ctx, input) // ping => stream out r.Collect(ctx, inputReader) // stream in => pong r.Transform(ctx, inputReader) // stream in => stream out
type ChainBranch ¶
type ChainBranch struct {
// contains filtered or unexported fields
}
ChainBranch represents a conditional branch in a chain of operations. It allows for dynamic routing of execution based on a condition. All branches within ChainBranch are expected to either end the Chain, or converge to another node in the Chain.
func NewChainBranch ¶
func NewChainBranch[T any](cond GraphBranchCondition[T]) *ChainBranch
NewChainBranch creates a new ChainBranch instance based on a given condition. It takes a generic type T and a GraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in string, opts ...any) (endNode string, err error) { // logic to determine the next node return "some_next_node_key", nil } cb := NewChainBranch[string](condition) cb.AddPassthrough("next_node_key_01", xxx) // node in branch, represent one path of branch cb.AddPassthrough("next_node_key_02", xxx) // node in branch
func NewStreamChainBranch ¶
func NewStreamChainBranch[T any](cond StreamGraphBranchCondition[T]) *ChainBranch
NewStreamChainBranch creates a new ChainBranch instance based on a given stream condition. It takes a generic type T and a StreamGraphBranchCondition function for that type. The returned ChainBranch will have an empty key2BranchNode map and a condition function that wraps the provided cond to handle type assertions and error checking. eg.
condition := func(ctx context.Context, in *schema.StreamReader[string], opts ...any) (endNode string, err error) { // logic to determine the next node, you can read the stream and make a decision. // to save time, usually read the first chunk of stream, then make a decision which path to go. return "some_next_node_key", nil } cb := NewStreamChainBranch[string](condition)
func (*ChainBranch) AddChatModel ¶
func (cb *ChainBranch) AddChatModel(key string, node model.ChatModel, opts ...GraphAddNodeOpt) *ChainBranch
AddChatModel adds a ChatModel node to the branch. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o-mini", }) cb.AddChatModel("chat_model_key_01", chatModel01) cb.AddChatModel("chat_model_key_02", chatModel02)
func (*ChainBranch) AddChatTemplate ¶
func (cb *ChainBranch) AddChatTemplate(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *ChainBranch
AddChatTemplate adds a ChatTemplate node to the branch. eg.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) cb.AddChatTemplate("chat_template_key_01", chatTemplate) chatTemplate2, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}, you are not allowed to chat in other topics.", }) cb.AddChatTemplate("chat_template_key_02", chatTemplate2)
func (*ChainBranch) AddDocumentTransformer ¶
func (cb *ChainBranch) AddDocumentTransformer(key string, node document.Transformer, opts ...GraphAddNodeOpt) *ChainBranch
AddDocumentTransformer adds an Document Transformer node to the branch. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) cb.AddDocumentTransformer("document_transformer_node_key", markdownSplitter)
func (*ChainBranch) AddEmbedding ¶
func (cb *ChainBranch) AddEmbedding(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) *ChainBranch
AddEmbedding adds an Embedding node to the branch. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) cb.AddEmbedding("embedding_node_key", embeddingNode)
func (*ChainBranch) AddGraph ¶
func (cb *ChainBranch) AddGraph(key string, node AnyGraph, opts ...GraphAddNodeOpt) *ChainBranch
AddGraph adds a generic Graph node to the branch. eg.
graph, err := compose.NewGraph[string, string]() cb.AddGraph("graph_node_key", graph)
func (*ChainBranch) AddIndexer ¶
func (cb *ChainBranch) AddIndexer(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) *ChainBranch
AddIndexer adds an Indexer node to the branch. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{ Collection: "my_collection", }) cb.AddIndexer("indexer_node_key", indexer)
func (*ChainBranch) AddLambda ¶
func (cb *ChainBranch) AddLambda(key string, node *Lambda, opts ...GraphAddNodeOpt) *ChainBranch
AddLambda adds a Lambda node to the branch. eg.
lambdaFunc := func(ctx context.Context, in string, opts ...any) (out string, err error) { // logic to process the input return "processed_output", nil } cb.AddLambda("lambda_node_key", compose.InvokeLambda(lambdaFunc))
func (*ChainBranch) AddLoader ¶
func (cb *ChainBranch) AddLoader(key string, node document.Loader, opts ...GraphAddNodeOpt) *ChainBranch
AddLoader adds a Loader node to the branch. eg.
pdfParser, err := pdf.NewPDFParser() loader, err := file.NewFileLoader(ctx, &file.FileLoaderConfig{ Parser: pdfParser, }) cb.AddLoader("loader_node_key", loader)
func (*ChainBranch) AddLoaderSplitter ¶
func (cb *ChainBranch) AddLoaderSplitter(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *ChainBranch
AddLoaderSplitter adds a LoaderSplitter node to the branch. Deprecated: use AddLoader instead.
func (*ChainBranch) AddPassthrough ¶
func (cb *ChainBranch) AddPassthrough(key string, opts ...GraphAddNodeOpt) *ChainBranch
AddPassthrough adds a Passthrough node to the branch. eg.
cb.AddPassthrough("passthrough_node_key")
func (*ChainBranch) AddRetriever ¶
func (cb *ChainBranch) AddRetriever(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) *ChainBranch
AddRetriever adds a Retriever node to the branch. eg.
retriever, err := volc_vikingdb.NewRetriever(ctx, &volc_vikingdb.RetrieverConfig{ Collection: "my_collection", }) cb.AddRetriever("retriever_node_key", retriever)
func (*ChainBranch) AddToolsNode ¶
func (cb *ChainBranch) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) *ChainBranch
AddToolsNode adds a ToolsNode to the branch. eg.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{ Tools: []tools.Tool{...}, }) cb.AddToolsNode("tools_node_key", toolsNode)
type Collect ¶
type Collect[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output O, err error)
Collect is the type of the collectable lambda function.
type CollectWOOpt ¶
type CollectWOOpt[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output O, err error)
CollectWOOpt is the type of the collectable lambda function without options.
type GenLocalState ¶
GenLocalState is a function that generates the state.
type Graph ¶
type Graph[I, O any] struct { // contains filtered or unexported fields }
Graph is a generic graph that can be used to compose components. I: the input type of graph compiled product O: the output type of graph compiled product
func NewGraph ¶
func NewGraph[I, O any](opts ...NewGraphOption) *Graph[I, O]
NewGraph create a directed graph that can compose components, lambda, chain, parallel etc. simultaneously provide flexible and multi-granular aspect governance capabilities. I: the input type of graph compiled product O: the output type of graph compiled product
To share state between nodes, use WithGenLocalState option:
type testState struct { UserInfo *UserInfo KVs map[string]any } genStateFunc := func(ctx context.Context) *testState { return &testState{} } graph := compose.NewGraph[string, string](WithGenLocalState(genStateFunc)) // you can use WithPreHandler and WithPostHandler to do something with state graph.AddNode("node1", someNode, compose.WithPreHandler(func(ctx context.Context, in string, state *testState) (string, error) { // do something with state return in, nil }), compose.WithPostHandler(func(ctx context.Context, out string, state *testState) (string, error) { // do something with state return out, nil }))
func (Graph) AddBranch ¶
func (g Graph) AddBranch(startNode string, branch *GraphBranch) (err error)
AddBranch adds a branch to the graph. e.g.
condition := func(ctx context.Context, in string) (string, error) { return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewGraphBranch(condition, endNodes) graph.AddBranch("start_node_key", branch)
func (Graph) AddChatModelNode ¶
func (g Graph) AddChatModelNode(key string, node model.ChatModel, opts ...GraphAddNodeOpt) error
AddChatModelNode add node that implements model.ChatModel. e.g.
chatModel, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) graph.AddChatModelNode("chat_model_node_key", chatModel)
func (Graph) AddChatTemplateNode ¶
func (g Graph) AddChatTemplateNode(key string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) error
AddChatTemplateNode add node that implements prompt.ChatTemplate. e.g.
chatTemplate, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) graph.AddChatTemplateNode("chat_template_node_key", chatTemplate)
func (Graph) AddDocumentTransformerNode ¶
func (g Graph) AddDocumentTransformerNode(key string, node document.Transformer, opts ...GraphAddNodeOpt) error
AddDocumentTransformerNode adds a node that implements document.Transformer. e.g.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) graph.AddDocumentTransformerNode("document_transformer_node_key", markdownSplitter)
func (Graph) AddEdge ¶
AddEdge adds an edge to the graph, edge means a data flow from startNode to endNode. the previous node's output type must be set to the next node's input type. NOTE: startNode and endNode must have been added to the graph before adding edge. e.g.
graph.AddNode("start_node_key", compose.NewPassthroughNode()) graph.AddNode("end_node_key", compose.NewPassthroughNode()) err := graph.AddEdge("start_node_key", "end_node_key")
func (Graph) AddEmbeddingNode ¶
func (g Graph) AddEmbeddingNode(key string, node embedding.Embedder, opts ...GraphAddNodeOpt) error
AddEmbeddingNode adds a node that implements embedding.Embedder. e.g.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) graph.AddEmbeddingNode("embedding_node_key", embeddingNode)
func (Graph) AddGraphNode ¶
func (g Graph) AddGraphNode(key string, node AnyGraph, opts ...GraphAddNodeOpt) error
AddGraphNode add one kind of Graph[I, O]、Chain[I, O]、StateChain[I, O, S] as a node. for Graph[I, O], comes from NewGraph[I, O]() for Chain[I, O], comes from NewChain[I, O]()
func (Graph) AddIndexerNode ¶
func (g Graph) AddIndexerNode(key string, node indexer.Indexer, opts ...GraphAddNodeOpt) error
AddIndexerNode adds a node that implements indexer.Indexer. e.g.
indexer, err := vikingdb.NewIndexer(ctx, &vikingdb.IndexerConfig{}) graph.AddIndexerNode("indexer_node_key", indexer)
func (Graph) AddLambdaNode ¶
func (g Graph) AddLambdaNode(key string, node *Lambda, opts ...GraphAddNodeOpt) error
AddLambdaNode add node that implements at least one of Invoke[I, O], Stream[I, O], Collect[I, O], Transform[I, O]. due to the lack of supporting method generics, we need to use function generics to generate Lambda run as Runnable[I, O]. for Invoke[I, O], use compose.InvokableLambda() for Stream[I, O], use compose.StreamableLambda() for Collect[I, O], use compose.CollectableLambda() for Transform[I, O], use compose.TransformableLambda() for arbitrary combinations of 4 kinds of lambda, use compose.AnyLambda()
func (Graph) AddLoaderNode ¶
func (g Graph) AddLoaderNode(key string, node document.Loader, opts ...GraphAddNodeOpt) error
AddLoaderNode adds a node that implements document.Loader. e.g.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{}) graph.AddLoaderNode("loader_node_key", loader)
func (Graph) AddLoaderSplitterNode
deprecated
func (g Graph) AddLoaderSplitterNode(key string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) error
Deprecated: use AddLoaderNode instead.
func (Graph) AddPassthroughNode ¶
func (g Graph) AddPassthroughNode(key string, opts ...GraphAddNodeOpt) error
AddPassthroughNode adds a passthrough node to the graph. mostly used in pregel mode of graph. e.g.
graph.AddPassthroughNode("passthrough_node_key")
func (Graph) AddRetrieverNode ¶
func (g Graph) AddRetrieverNode(key string, node retriever.Retriever, opts ...GraphAddNodeOpt) error
AddRetrieverNode adds a node that implements retriever.Retriever. e.g.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{}) graph.AddRetrieverNode("retriever_node_key", retrieverNode)
func (Graph) AddToolsNode ¶
func (g Graph) AddToolsNode(key string, node *ToolsNode, opts ...GraphAddNodeOpt) error
AddToolsNode adds a node that implements tools.ToolsNode. e.g.
toolsNode, err := tools.NewToolNode(ctx, &tools.ToolsNodeConfig{}) graph.AddToolsNode("tools_node_key", toolsNode)
func (*Graph[I, O]) Compile ¶
func (g *Graph[I, O]) Compile(ctx context.Context, opts ...GraphCompileOption) (Runnable[I, O], error)
Compile take the raw graph and compile it into a form ready to be run. e.g.
graph, err := compose.NewGraph[string, string]() if err != nil {...} runnable, err := graph.Compile(ctx, compose.WithGraphName("my_graph")) if err != nil {...} runnable.Invoke(ctx, "input") // invoke runnable.Stream(ctx, "input") // stream runnable.Collect(ctx, inputReader) // collect runnable.Transform(ctx, inputReader) // transform
type GraphAddNodeOpt ¶
type GraphAddNodeOpt func(o *graphAddNodeOpts)
GraphAddNodeOpt is a functional option type for adding a node to a graph. e.g.
graph.AddNode("node_name", node, compose.WithInputKey("input_key"), compose.WithOutputKey("output_key"))
func WithGraphCompileOptions ¶
func WithGraphCompileOptions(opts ...GraphCompileOption) GraphAddNodeOpt
WithGraphCompileOptions when the node is an AnyGraph, use this option to set compile option for the node. e.g.
graph.AddNode("node_name", node, compose.WithGraphCompileOptions(compose.WithGraphName("my_sub_graph")))
func WithInputKey ¶
func WithInputKey(k string) GraphAddNodeOpt
WithInputKey sets the input key of the node. this will change the input value of the node, for example, if the pre node's output is map[string]any{"key01": "value01"}, and the current node's input key is "key01", then the current node's input value will be "value01".
func WithNodeKey ¶
func WithNodeKey(key string) GraphAddNodeOpt
WithNodeKey set the node key, which is used to identify the node in the chain. only for use in Chain/StateChain.
func WithNodeName ¶
func WithNodeName(n string) GraphAddNodeOpt
WithNodeName sets the name of the node.
func WithOutputKey ¶
func WithOutputKey(k string) GraphAddNodeOpt
WithOutputKey sets the output key of the node. this will change the output value of the node, for example, if the current node's output key is "key01", then the node's output value will be map[string]any{"key01": value}.
func WithStatePostHandler ¶
func WithStatePostHandler[O, S any](post StatePostHandler[O, S]) GraphAddNodeOpt
WithStatePostHandler modify node's output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStatePreHandler ¶
func WithStatePreHandler[I, S any](pre StatePreHandler[I, S]) GraphAddNodeOpt
WithStatePreHandler modify node's input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePostHandler ¶
func WithStreamStatePostHandler[O, S any](post StreamStatePostHandler[O, S]) GraphAddNodeOpt
WithStreamStatePostHandler modify node's streaming output of O according to state S and output or store output information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when current node's output is an actual stream, and you want the downstream node's input to remain an actual stream after state post handler. caution: while StreamStatePostHandler is thread safe, modifying state within your own goroutine is NOT. O: output type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
func WithStreamStatePreHandler ¶
func WithStreamStatePreHandler[I, S any](pre StreamStatePreHandler[I, S]) GraphAddNodeOpt
WithStreamStatePreHandler modify node's streaming input of I according to state S and input or store input information into state, and it's thread-safe. notice: this option requires Graph to be created with WithGenLocalState option. when to use: when upstream node's output is an actual stream, and you want the current node's input to remain an actual stream after state pre handler. caution: while StreamStatePreHandler is thread safe, modifying state within your own goroutine is NOT. I: input type of the Node like ChatModel, Lambda, Retriever etc. S: state type defined in WithGenLocalState
type GraphBranch ¶
type GraphBranch struct {
// contains filtered or unexported fields
}
GraphBranch is the branch type for the graph. It is used to determine the next node based on the condition.
func NewGraphBranch ¶
func NewGraphBranch[T any](condition GraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewGraphBranch creates a new graph branch. It is used to determine the next node based on the condition. e.g.
condition := func(ctx context.Context, in string) (string, error) { // logic to determine the next node return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewGraphBranch(condition, endNodes) graph.AddBranch("key_of_node_before_branch", branch)
func NewStreamGraphBranch ¶
func NewStreamGraphBranch[T any](condition StreamGraphBranchCondition[T], endNodes map[string]bool) *GraphBranch
NewStreamGraphBranch creates a new stream graph branch. It is used to determine the next node based on the condition of stream input. e.g.
condition := func(ctx context.Context, in *schema.StreamReader[T]) (string, error) { // logic to determine the next node. // to use the feature of stream, you can use the first chunk to determine the next node. return "next_node_key", nil } endNodes := map[string]bool{"path01": true, "path02": true} branch := compose.NewStreamGraphBranch(condition, endNodes) graph.AddBranch("key_of_node_before_branch", branch)
func (*GraphBranch) GetEndNode ¶
func (gb *GraphBranch) GetEndNode() map[string]bool
GetEndNode returns the all end nodes of the branch.
type GraphBranchCondition ¶
GraphBranchCondition is the condition type for the branch.
type GraphCompileCallback ¶
GraphCompileCallback is the callback which will be called when graph compilation finishes.
type GraphCompileOption ¶
type GraphCompileOption func(*graphCompileOptions)
GraphCompileOption options for compiling AnyGraph.
func WithGraphCompileCallbacks ¶
func WithGraphCompileCallbacks(cbs ...GraphCompileCallback) GraphCompileOption
WithGraphCompileCallbacks sets callbacks for graph compilation.
func WithGraphName ¶
func WithGraphName(graphName string) GraphCompileOption
func WithMaxRunSteps ¶
func WithMaxRunSteps(maxSteps int) GraphCompileOption
func WithNodeTriggerMode ¶
func WithNodeTriggerMode(triggerMode NodeTriggerMode) GraphCompileOption
WithNodeTriggerMode sets node trigger mode for the graph. Different node trigger mode will affect graph execution order and result for specific graphs, such as those with parallel branches having different length of nodes.
type GraphInfo ¶
type GraphInfo struct { CompileOptions []GraphCompileOption Nodes map[string]GraphNodeInfo // node key -> node info Edges map[string][]string // edge start node key -> edge end node key Branches map[string][]GraphBranch // branch start node key -> branch InputType, OutputType reflect.Type Name string GenStateFn func(context.Context) any }
GraphInfo the info which end users pass in when they are compiling a graph. it is used in compile callback for user to get the node info and instance. you may need all details info of the graph for observation.
type GraphNodeInfo ¶
type GraphNodeInfo struct { Component components.Component Instance any GraphAddNodeOpts []GraphAddNodeOpt InputType, OutputType reflect.Type // mainly for lambda, whose input and output types cannot be inferred by component type Name string InputKey, OutputKey string GraphInfo *GraphInfo }
GraphNodeInfo the info which end users pass in when they are adding nodes to graph.
type Invoke ¶
type Invoke[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output O, err error)
Invoke is the type of the invokable lambda function.
type InvokeWOOpt ¶
InvokeWOOpt is the type of the invokable lambda function without options.
type Lambda ¶
type Lambda struct {
// contains filtered or unexported fields
}
Lambda is the node that wraps the user provided lambda function. It can be used as a node in Graph or Chain (include Parallel and Branch). Create a Lambda by using AnyLambda/InvokableLambda/StreamableLambda/CollectableLambda/TransformableLambda. eg.
lambda := compose.InvokableLambda(func(ctx context.Context, input string) (output string, err error) { return input, nil })
func AnyLambda ¶
func AnyLambda[I, O, TOption any](i Invoke[I, O, TOption], s Stream[I, O, TOption], c Collect[I, O, TOption], t Transform[I, O, TOption], opts ...LambdaOpt) (*Lambda, error)
AnyLambda creates a Lambda with any lambda function. you can only implement one or more of the four lambda functions, and the rest use nil. eg.
invokeFunc := func(ctx context.Context, input string, opts ...myOption) (output string, err error) { // ... } streamFunc := func(ctx context.Context, input string, opts ...myOption) (output *schema.StreamReader[string], err error) { // ... }
lambda := compose.AnyLambda(invokeFunc, streamFunc, nil, nil)
func CollectableLambda ¶
func CollectableLambda[I, O any](c CollectWOOpt[I, O], opts ...LambdaOpt) *Lambda
CollectableLambda creates a Lambda with collectable lambda function without options.
func CollectableLambdaWithOption ¶
func CollectableLambdaWithOption[I, O, TOption any](c Collect[I, O, TOption], opts ...LambdaOpt) *Lambda
CollectableLambdaWithOption creates a Lambda with collectable lambda function and options.
func InvokableLambda ¶
func InvokableLambda[I, O any](i InvokeWOOpt[I, O], opts ...LambdaOpt) *Lambda
InvokableLambda creates a Lambda with invokable lambda function without options.
func InvokableLambdaWithOption ¶
func InvokableLambdaWithOption[I, O, TOption any](i Invoke[I, O, TOption], opts ...LambdaOpt) *Lambda
InvokableLambdaWithOption creates a Lambda with invokable lambda function and options.
func MessageParser ¶
func MessageParser[T any](p schema.MessageParser[T], opts ...LambdaOpt) *Lambda
MessageParser creates a lambda that parses a message into an object T, usually used after a chatmodel. usage:
parser := schema.NewMessageJSONParser[MyStruct](&schema.MessageJSONParseConfig{ ParseFrom: schema.MessageParseFromContent, }) parserLambda := MessageParser(parser) chain := NewChain[*schema.Message, MyStruct]() chain.AppendChatModel(chatModel) chain.AppendLambda(parserLambda) r, err := chain.Compile(context.Background()) // parsed is a MyStruct object parsed, err := r.Invoke(context.Background(), &schema.Message{ Role: schema.MessageRoleUser, Content: "return a json string for my struct", })
func StreamableLambda ¶
func StreamableLambda[I, O any](s StreamWOOpt[I, O], opts ...LambdaOpt) *Lambda
StreamableLambda creates a Lambda with streamable lambda function without options.
func StreamableLambdaWithOption ¶
func StreamableLambdaWithOption[I, O, TOption any](s Stream[I, O, TOption], opts ...LambdaOpt) *Lambda
StreamableLambdaWithOption creates a Lambda with streamable lambda function and options.
func ToList ¶
ToList creates a Lambda that converts input I to a []I. It's useful when you want to convert a single input to a list of inputs. eg.
lambda := compose.ToList[*schema.Message]() chain := compose.NewChain[[]*schema.Message, []*schema.Message]() chain.AddChatModel(chatModel) // chatModel returns *schema.Message, but we need []*schema.Message chain.AddLambda(lambda) // convert *schema.Message to []*schema.Message
func TransformableLambda ¶
func TransformableLambda[I, O any](t TransformWOOpts[I, O], opts ...LambdaOpt) *Lambda
TransformableLambda creates a Lambda with transformable lambda function without options.
type LambdaOpt ¶
type LambdaOpt func(o *lambdaOpts)
LambdaOpt is the option for creating a Lambda.
func WithLambdaCallbackEnable ¶
WithLambdaCallbackEnable enables the callback aspect of the lambda function.
func WithLambdaType ¶
WithLambdaType sets the type of the lambda function.
type NewGraphOption ¶ added in v0.3.1
type NewGraphOption func(ngo *newGraphOptions)
func WithGenLocalState ¶ added in v0.3.1
func WithGenLocalState[S any](gls GenLocalState[S]) NewGraphOption
type NodePath ¶ added in v0.3.4
type NodePath struct {
// contains filtered or unexported fields
}
func NewNodePath ¶ added in v0.3.4
type NodeTriggerMode ¶
type NodeTriggerMode string
NodeTriggerMode controls the triggering mode of graph nodes.
const ( // AnyPredecessor means that the current node will be triggered as long as any of its predecessor nodes has finished running. // Note that actual implementation organizes node execution in batches. // In this context, 'any predecessor finishes' would means the other nodes of the same batch need to be finished too. AnyPredecessor NodeTriggerMode = "any_predecessor" // AllPredecessor means that the current node will only be triggered when all of its predecessor nodes have finished running. AllPredecessor NodeTriggerMode = "all_predecessor" )
type Option ¶
type Option struct {
// contains filtered or unexported fields
}
Option is a functional option type for calling a graph.
func WithCallbacks ¶
WithCallbacks set callback handlers for all components in a single call. e.g.
runnable.Invoke(ctx, "input", compose.WithCallbacks(&myCallbacks{}))
func WithChatModelOption ¶
WithChatModelOption is a functional option type for chat model component. e.g.
chatModelOption := compose.WithChatModelOption(model.WithTemperature(0.7)) runnable.Invoke(ctx, "input", chatModelOption)
func WithChatTemplateOption ¶
WithChatTemplateOption is a functional option type for chat template component.
func WithDocumentTransformerOption ¶
func WithDocumentTransformerOption(opts ...document.TransformerOption) Option
WithDocumentTransformerOption is a functional option type for document transformer component.
func WithEmbeddingOption ¶
WithEmbeddingOption is a functional option type for embedding component. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small")) runnable.Invoke(ctx, "input", embeddingOption)
func WithIndexerOption ¶
WithIndexerOption is a functional option type for indexer component. e.g.
indexerOption := compose.WithIndexerOption(indexer.WithSubIndexes([]string{"my_sub_index"})) runnable.Invoke(ctx, "input", indexerOption)
func WithLambdaOption ¶
WithLambdaOption is a functional option type for lambda component.
func WithLoaderOption ¶
func WithLoaderOption(opts ...document.LoaderOption) Option
WithLoaderOption is a functional option type for loader component. e.g.
loaderOption := compose.WithLoaderOption(document.WithCollection("my_collection")) runnable.Invoke(ctx, "input", loaderOption)
func WithLoaderSplitterOption
deprecated
func WithLoaderSplitterOption(opts ...document.LoaderSplitterOption) Option
Deprecated: use WithLoaderOption instead.
func WithRetrieverOption ¶
WithRetrieverOption is a functional option type for retriever component. e.g.
retrieverOption := compose.WithRetrieverOption(retriever.WithIndex("my_index")) runnable.Invoke(ctx, "input", retrieverOption)
func WithRuntimeMaxSteps ¶
WithRuntimeMaxSteps sets the maximum number of steps for the graph runtime. e.g.
runnable.Invoke(ctx, "input", compose.WithRuntimeMaxSteps(20))
func WithToolsNodeOption ¶
func WithToolsNodeOption(opts ...ToolsNodeOption) Option
WithToolsNodeOption is a functional option type for tools node component.
func (Option) DesignateNode ¶
DesignateNode set the key of the node which will the option be applied to. notice: only effective at the top graph. e.g.
embeddingOption := compose.WithEmbeddingOption(embedding.WithModel("text-embedding-3-small")) runnable.Invoke(ctx, "input", embeddingOption.DesignateNode("my_embedding_node"))
func (Option) DesignateNodeWithPath ¶ added in v0.3.4
DesignateNodeWithPath sets the path of the node(s) to which the option will be applied to. You can make the option take effect in the subgraph by specifying the key of the subgraph. e.g. DesignateNodeWithPath({"sub graph node key", "node key within sub graph"})
type Parallel ¶
type Parallel struct {
// contains filtered or unexported fields
}
Parallel run multiple nodes in parallel
use `NewParallel()` to create a new parallel type Example:
parallel := NewParallel() parallel.AddChatModel("output_key01", chat01) parallel.AddChatModel("output_key01", chat02) chain := NewChain[any,any]() chain.AppendParallel(parallel)
func NewParallel ¶
func NewParallel() *Parallel
NewParallel creates a new parallel type. it is useful when you want to run multiple nodes in parallel in a chain.
func (*Parallel) AddChatModel ¶
func (p *Parallel) AddChatModel(outputKey string, node model.ChatModel, opts ...GraphAddNodeOpt) *Parallel
AddChatModel adds a chat model to the parallel. eg.
chatModel01, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) chatModel02, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{ Model: "gpt-4o", }) p.AddChatModel("output_key01", chatModel01) p.AddChatModel("output_key02", chatModel02)
func (*Parallel) AddChatTemplate ¶
func (p *Parallel) AddChatTemplate(outputKey string, node prompt.ChatTemplate, opts ...GraphAddNodeOpt) *Parallel
AddChatTemplate adds a chat template to the parallel. eg.
chatTemplate01, err := prompt.FromMessages(schema.FString, &schema.Message{ Role: schema.System, Content: "You are acting as a {role}.", }) p.AddChatTemplate("output_key01", chatTemplate01)
func (*Parallel) AddDocumentTransformer ¶
func (p *Parallel) AddDocumentTransformer(outputKey string, node document.Transformer, opts ...GraphAddNodeOpt) *Parallel
AddDocumentTransformer adds an Document Transformer node to the parallel. eg.
markdownSplitter, err := markdown.NewHeaderSplitter(ctx, &markdown.HeaderSplitterConfig{}) p.AddDocumentTransformer("output_key01", markdownSplitter)
func (*Parallel) AddEmbedding ¶
func (p *Parallel) AddEmbedding(outputKey string, node embedding.Embedder, opts ...GraphAddNodeOpt) *Parallel
AddEmbedding adds an embedding node to the parallel. eg.
embeddingNode, err := openai.NewEmbedder(ctx, &openai.EmbeddingConfig{ Model: "text-embedding-3-small", }) p.AddEmbedding("output_key01", embeddingNode)
func (*Parallel) AddGraph ¶
func (p *Parallel) AddGraph(outputKey string, node AnyGraph, opts ...GraphAddNodeOpt) *Parallel
AddGraph adds a graph node to the parallel. It is useful when you want to use a graph or a chain as a node in the parallel. eg.
graph, err := compose.NewChain[any,any]() p.AddGraph("output_key01", graph)
func (*Parallel) AddIndexer ¶
func (p *Parallel) AddIndexer(outputKey string, node indexer.Indexer, opts ...GraphAddNodeOpt) *Parallel
AddIndexer adds an indexer node to the parallel. eg.
indexer, err := volc_vikingdb.NewIndexer(ctx, &volc_vikingdb.IndexerConfig{ Collection: "my_collection", }) p.AddIndexer("output_key01", indexer)
func (*Parallel) AddLambda ¶
func (p *Parallel) AddLambda(outputKey string, node *Lambda, opts ...GraphAddNodeOpt) *Parallel
AddLambda adds a lambda node to the parallel. eg.
lambdaFunc := func(ctx context.Context, input *schema.Message) ([]*schema.Message, error) { return []*schema.Message{input}, nil } p.AddLambda("output_key01", compose.InvokeLambda(lambdaFunc))
func (*Parallel) AddLoader ¶
func (p *Parallel) AddLoader(outputKey string, node document.Loader, opts ...GraphAddNodeOpt) *Parallel
AddLoader adds a loader node to the parallel. eg.
loader, err := file.NewLoader(ctx, &file.LoaderConfig{}) p.AddLoader("output_key01", loader)
func (*Parallel) AddLoaderSplitter ¶
func (p *Parallel) AddLoaderSplitter(outputKey string, node document.LoaderSplitter, opts ...GraphAddNodeOpt) *Parallel
AddLoaderSplitter adds a loader splitter node to the parallel. Deprecated: use AddLoader instead.
func (*Parallel) AddPassthrough ¶
func (p *Parallel) AddPassthrough(outputKey string, opts ...GraphAddNodeOpt) *Parallel
AddPassthrough adds a passthrough node to the parallel. eg.
p.AddPassthrough("output_key01")
func (*Parallel) AddRetriever ¶
func (p *Parallel) AddRetriever(outputKey string, node retriever.Retriever, opts ...GraphAddNodeOpt) *Parallel
AddRetriever adds a retriever node to the parallel. eg.
retriever, err := vikingdb.NewRetriever(ctx, &vikingdb.RetrieverConfig{})
p.AddRetriever("output_key01", retriever)
func (*Parallel) AddToolsNode ¶
func (p *Parallel) AddToolsNode(outputKey string, node *ToolsNode, opts ...GraphAddNodeOpt) *Parallel
AddToolsNode adds a tools node to the parallel. eg.
toolsNode, err := compose.NewToolNode(ctx, &compose.ToolsNodeConfig{ Tools: []tool.BaseTool{...}, }) p.AddToolsNode("output_key01", toolsNode)
type Runnable ¶
type Runnable[I, O any] interface { Invoke(ctx context.Context, input I, opts ...Option) (output O, err error) Stream(ctx context.Context, input I, opts ...Option) (output *schema.StreamReader[O], err error) Collect(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output O, err error) Transform(ctx context.Context, input *schema.StreamReader[I], opts ...Option) (output *schema.StreamReader[O], err error) }
Runnable is the interface for an executable object. Graph, Chain can be compiled into Runnable. runnable is the core conception of eino, we do downgrade compatibility for four data flow patterns, and can automatically connect components that only implement one or more methods. eg, if a component only implements Stream() method, you can still call Invoke() to convert stream output to invoke output.
type StatePostHandler ¶
StatePostHandler is a function that is called after the node is executed. Notice: if user called Stream but with StatePostHandler, the StatePostHandler will read all stream chunks and merge them into a single object.
type StatePreHandler ¶
StatePreHandler is a function that is called before the node is executed. Notice: if user called Stream but with StatePreHandler, the StatePreHandler will read all stream chunks and merge them into a single object.
type Stream ¶
type Stream[I, O, TOption any] func(ctx context.Context, input I, opts ...TOption) (output *schema.StreamReader[O], err error)
Stream is the type of the streamable lambda function.
type StreamGraphBranchCondition ¶
type StreamGraphBranchCondition[T any] func(ctx context.Context, in *schema.StreamReader[T]) (endNode string, err error)
StreamGraphBranchCondition is the condition type for the stream branch.
type StreamStatePostHandler ¶
type StreamStatePostHandler[O, S any] func(ctx context.Context, out *schema.StreamReader[O], state S) (*schema.StreamReader[O], error)
StreamStatePostHandler is a function that is called after the node is executed with stream input and output.
type StreamStatePreHandler ¶
type StreamStatePreHandler[I, S any] func(ctx context.Context, in *schema.StreamReader[I], state S) (*schema.StreamReader[I], error)
StreamStatePreHandler is a function that is called before the node is executed with stream input and output.
type StreamWOOpt ¶
type StreamWOOpt[I, O any] func(ctx context.Context, input I) (output *schema.StreamReader[O], err error)
StreamWOOpt is the type of the streamable lambda function without options.
type ToolsNode ¶
type ToolsNode struct {
// contains filtered or unexported fields
}
ToolsNode a node that can run tools in a graph. the interface in Graph Node as below:
Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
func NewToolNode ¶
func NewToolNode(ctx context.Context, conf *ToolsNodeConfig) (*ToolsNode, error)
NewToolNode creates a new ToolsNode. eg.
conf := &ToolsNodeConfig{ Tools: []tool.BaseTool{invokableTool1, streamableTool2}, } toolsNode, err := NewToolNode(ctx, conf)
func (*ToolsNode) Invoke ¶
func (tn *ToolsNode) Invoke(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) ([]*schema.Message, error)
Invoke calls the tools and collects the results of invokable tools. it's parallel if there are multiple tool calls in the input message.
func (*ToolsNode) Stream ¶
func (tn *ToolsNode) Stream(ctx context.Context, input *schema.Message, opts ...ToolsNodeOption) (*schema.StreamReader[[]*schema.Message], error)
Stream calls the tools and collects the results of stream readers. it's parallel if there are multiple tool calls in the input message.
type ToolsNodeConfig ¶
ToolsNodeConfig is the config for ToolsNode. It requires a list of tools. Tools are BaseTool but must implement InvokableTool or StreamableTool.
type ToolsNodeOption ¶
type ToolsNodeOption func(o *toolsNodeOptions)
ToolsNodeOption is the option func type for ToolsNode.
func WithToolOption ¶
func WithToolOption(opts ...tool.Option) ToolsNodeOption
WithToolOption adds tool options to the ToolsNode.
type Transform ¶
type Transform[I, O, TOption any] func(ctx context.Context, input *schema.StreamReader[I], opts ...TOption) (output *schema.StreamReader[O], err error)
Transform is the type of the transformable lambda function.
type TransformWOOpts ¶
type TransformWOOpts[I, O any] func(ctx context.Context, input *schema.StreamReader[I]) (output *schema.StreamReader[O], err error)
TransformWOOpts is the type of the transformable lambda function without options.
Source Files
¶
- chain.go
- chain_branch.go
- chain_parallel.go
- component_to_graph_node.go
- dag.go
- doc.go
- error.go
- generic_graph.go
- graph.go
- graph_add_node_options.go
- graph_call_options.go
- graph_compile_options.go
- graph_node.go
- graph_run.go
- introspect.go
- pregel.go
- runnable.go
- state.go
- stream_concat.go
- stream_reader.go
- tool_node.go
- types.go
- types_composable.go
- types_lambda.go
- utils.go