Documentation
¶
Overview ¶
Package callbacks provides callback mechanisms for component execution in Eino.
This package allows you to inject callback handlers at different stages of component execution, such as start, end, and error handling. It's particularly useful for implementing governance capabilities like logging, monitoring, and metrics collection.
The package provides two ways to create callback handlers:
1. Create a callback handler using HandlerBuilder:
handler := callbacks.NewHandlerBuilder(). OnStart(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context { // Handle component start return ctx }). OnEnd(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context { // Handle component end return ctx }). OnError(func(ctx context.Context, info *RunInfo, err error) context.Context { // Handle component error return ctx }). OnStartWithStreamInput(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context { // Handle component start with stream input return ctx }). OnEndWithStreamOutput(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context { // Handle component end with stream output return ctx }). Build()
For this way, you need to convert the callback input types by yourself, and implement the logic for different component types in one handler.
2. Use [template.HandlerHelper] to create a handler:
Package utils/callbacks provides [HandlerHelper] as a convenient way to build callback handlers for different component types. It allows you to set specific handlers for each component type,
e.g.
// Create handlers for specific components modelHandler := &model.CallbackHandler{ OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context { log.Printf("Model execution started: %s", info.ComponentName) return ctx }, } promptHandler := &prompt.CallbackHandler{ OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context { log.Printf("Prompt execution completed: %s", output.Result) return ctx }, } // Build the handler using HandlerHelper handler := callbacks.NewHandlerHelper(). ChatModel(modelHandler). Prompt(promptHandler). Fallback(fallbackHandler). Handler()
[HandlerHelper] supports handlers for various component types including:
- Prompt components (via prompt.CallbackHandler)
- Chat model components (via model.CallbackHandler)
- Embedding components (via embedding.CallbackHandler)
- Indexer components (via indexer.CallbackHandler)
- Retriever components (via retriever.CallbackHandler)
- Document loader components (via loader.CallbackHandler)
- Document transformer components (via transformer.CallbackHandler)
- Tool components (via tool.CallbackHandler)
- Graph (via Handler)
- Chain (via Handler)
- Tools node (via Handler)
- Lambda (via Handler)
Use the handler with a component:
runnable.Invoke(ctx, input, compose.WithCallbacks(handler))
Index ¶
- func InitCallbackHandlers(handlers []Handler)
- func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context
- func OnEnd[T any](ctx context.Context, output T) context.Context
- func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func OnError(ctx context.Context, err error) context.Context
- func OnStart[T any](ctx context.Context, input T) context.Context
- func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (nextCtx context.Context, newStreamReader *schema.StreamReader[T])
- func ReuseHandlers(ctx context.Context, info *RunInfo) context.Context
- type CallbackInput
- type CallbackOutput
- type CallbackTiming
- type Handler
- type HandlerBuilder
- func (hb *HandlerBuilder) Build() Handler
- func (hb *HandlerBuilder) OnEndFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnEndWithStreamOutputFn(fn func(ctx context.Context, info *RunInfo, ...) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnErrorFn(fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartFn(...) *HandlerBuilder
- func (hb *HandlerBuilder) OnStartWithStreamInputFn(...) *HandlerBuilder
- type RunInfo
- type TimingChecker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitCallbackHandlers ¶
func InitCallbackHandlers(handlers []Handler)
InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes.
func InitCallbacks ¶
InitCallbacks initializes a new context with the provided RunInfo and handlers. Any previously set RunInfo and Handlers for this ctx will be overwritten.
func OnEnd ¶
OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup and finalization when a process ends. handlers are executed in normal order (compared to add order).
func OnEndWithStreamOutput ¶
func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that every input stream should be closed properly in handler. handlers are executed in normal order (compared to add order).
func OnError ¶
OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. handlers are executed in normal order (compared to add order).
func OnStart ¶
OnStart invokes the OnStart logic for the particular context, ensuring that all registered handlers are executed in reverse order (compared to add order) when a process begins.
func OnStartWithStreamInput ¶
func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) ( nextCtx context.Context, newStreamReader *schema.StreamReader[T])
OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that every input stream should be closed properly in handler. handlers are executed in reverse order (compared to add order).
Types ¶
type CallbackInput ¶
type CallbackInput = callbacks.CallbackInput
CallbackInput is the input of the callback. the type of input is defined by the component. using type Assert or convert func to convert the input to the right type you want. e.g.
CallbackInput in components/model/interface.go is: type CallbackInput struct { Messages []*schema.Message Config *Config Extra map[string]any } and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput in callback handler, you can use the following code to get the input: modelCallbackInput := model.ConvCallbackInput(in) if modelCallbackInput == nil { // is not a model callback input, just ignore it return }
type CallbackOutput ¶
type CallbackOutput = callbacks.CallbackOutput
type CallbackTiming ¶
type CallbackTiming = callbacks.CallbackTiming
CallbackTiming enumerates all the timing of callback aspects.
const ( TimingOnStart CallbackTiming = iota TimingOnEnd TimingOnError TimingOnStartWithStreamInput TimingOnEndWithStreamOutput )
type HandlerBuilder ¶
type HandlerBuilder struct {
// contains filtered or unexported fields
}
func NewHandlerBuilder ¶
func NewHandlerBuilder() *HandlerBuilder
NewHandlerBuilder creates and returns a new HandlerBuilder instance. HandlerBuilder is used to construct a Handler with custom callback functions
func (*HandlerBuilder) Build ¶ added in v0.3.3
func (hb *HandlerBuilder) Build() Handler
Build returns a Handler with the functions set in the builder.
func (*HandlerBuilder) OnEndFn ¶
func (hb *HandlerBuilder) OnEndFn( fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnEndWithStreamOutputFn ¶
func (hb *HandlerBuilder) OnEndWithStreamOutputFn( fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder
OnEndWithStreamOutputFn sets the callback function to be called.
func (*HandlerBuilder) OnErrorFn ¶
func (hb *HandlerBuilder) OnErrorFn( fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnStartFn ¶
func (hb *HandlerBuilder) OnStartFn( fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder
func (*HandlerBuilder) OnStartWithStreamInputFn ¶
func (hb *HandlerBuilder) OnStartWithStreamInputFn( fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder
OnStartWithStreamInputFn sets the callback function to be called.
type TimingChecker ¶
type TimingChecker = callbacks.TimingChecker
TimingChecker checks if the handler is needed for the given callback aspect timing. It's recommended for callback handlers to implement this interface, but not mandatory. If a callback handler is created by using callbacks.HandlerHelper or handlerBuilder, then this interface is automatically implemented. Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. Also, the callback handler that is not needed for that timing will be skipped.