callbacks

package
v0.3.15 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2025 License: Apache-2.0 Imports: 3 Imported by: 39

Documentation

Overview

Package callbacks provides callback mechanisms for component execution in Eino.

This package allows you to inject callback handlers at different stages of component execution, such as start, end, and error handling. It's particularly useful for implementing governance capabilities like logging, monitoring, and metrics collection.

The package provides two ways to create callback handlers:

1. Create a callback handler using HandlerBuilder:

handler := callbacks.NewHandlerBuilder().
	OnStart(func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context {
		// Handle component start
		return ctx
	}).
	OnEnd(func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context {
		// Handle component end
		return ctx
	}).
	OnError(func(ctx context.Context, info *RunInfo, err error) context.Context {
		// Handle component error
		return ctx
	}).
	OnStartWithStreamInput(func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context {
		// Handle component start with stream input
		return ctx
	}).
	OnEndWithStreamOutput(func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context {
		// Handle component end with stream output
		return ctx
	}).
	Build()

For this way, you need to convert the callback input types by yourself, and implement the logic for different component types in one handler.

2. Use [template.HandlerHelper] to create a handler:

Package utils/callbacks provides [HandlerHelper] as a convenient way to build callback handlers for different component types. It allows you to set specific handlers for each component type,

e.g.

// Create handlers for specific components
modelHandler := &model.CallbackHandler{
	OnStart: func(ctx context.Context, info *RunInfo, input *model.CallbackInput) context.Context {
		log.Printf("Model execution started: %s", info.ComponentName)
		return ctx
	},
}

promptHandler := &prompt.CallbackHandler{
	OnEnd: func(ctx context.Context, info *RunInfo, output *prompt.CallbackOutput) context.Context {
		log.Printf("Prompt execution completed: %s", output.Result)
		return ctx
	},
}

// Build the handler using HandlerHelper
handler := callbacks.NewHandlerHelper().
	ChatModel(modelHandler).
	Prompt(promptHandler).
	Fallback(fallbackHandler).
	Handler()

[HandlerHelper] supports handlers for various component types including:

  • Prompt components (via prompt.CallbackHandler)
  • Chat model components (via model.CallbackHandler)
  • Embedding components (via embedding.CallbackHandler)
  • Indexer components (via indexer.CallbackHandler)
  • Retriever components (via retriever.CallbackHandler)
  • Document loader components (via loader.CallbackHandler)
  • Document transformer components (via transformer.CallbackHandler)
  • Tool components (via tool.CallbackHandler)
  • Graph (via Handler)
  • Chain (via Handler)
  • Tools node (via Handler)
  • Lambda (via Handler)

Use the handler with a component:

runnable.Invoke(ctx, input, compose.WithCallbacks(handler))

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitCallbackHandlers

func InitCallbackHandlers(handlers []Handler)

InitCallbackHandlers sets the global callback handlers. It should be called BEFORE any callback handler by user. It's useful when you want to inject some basic callbacks to all nodes.

func InitCallbacks

func InitCallbacks(ctx context.Context, info *RunInfo, handlers ...Handler) context.Context

InitCallbacks initializes a new context with the provided RunInfo and handlers. Any previously set RunInfo and Handlers for this ctx will be overwritten.

func OnEnd

func OnEnd[T any](ctx context.Context, output T) context.Context

OnEnd invokes the OnEnd logic of the particular context, allowing for proper cleanup and finalization when a process ends. handlers are executed in normal order (compared to add order).

func OnEndWithStreamOutput

func OnEndWithStreamOutput[T any](ctx context.Context, output *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnEndWithStreamOutput invokes the OnEndWithStreamOutput logic of the particular, ensuring that every input stream should be closed properly in handler. handlers are executed in normal order (compared to add order).

func OnError

func OnError(ctx context.Context, err error) context.Context

OnError invokes the OnError logic of the particular, notice that error in stream will not represent here. handlers are executed in normal order (compared to add order).

func OnStart

func OnStart[T any](ctx context.Context, input T) context.Context

OnStart invokes the OnStart logic for the particular context, ensuring that all registered handlers are executed in reverse order (compared to add order) when a process begins.

func OnStartWithStreamInput

func OnStartWithStreamInput[T any](ctx context.Context, input *schema.StreamReader[T]) (
	nextCtx context.Context, newStreamReader *schema.StreamReader[T])

OnStartWithStreamInput invokes the OnStartWithStreamInput logic of the particular context, ensuring that every input stream should be closed properly in handler. handlers are executed in reverse order (compared to add order).

func ReuseHandlers added in v0.3.4

func ReuseHandlers(ctx context.Context, info *RunInfo) context.Context

ReuseHandlers initializes a new context with the provided RunInfo, while using the same handlers already exist.

Types

type CallbackInput

type CallbackInput = callbacks.CallbackInput

CallbackInput is the input of the callback. the type of input is defined by the component. using type Assert or convert func to convert the input to the right type you want. e.g.

	CallbackInput in components/model/interface.go is:
	type CallbackInput struct {
		Messages []*schema.Message
		Config   *Config
		Extra map[string]any
	}

 and provide a func of model.ConvCallbackInput() to convert CallbackInput to *model.CallbackInput
 in callback handler, you can use the following code to get the input:

	modelCallbackInput := model.ConvCallbackInput(in)
	if modelCallbackInput == nil {
		// is not a model callback input, just ignore it
		return
	}

type CallbackOutput

type CallbackOutput = callbacks.CallbackOutput

type CallbackTiming

type CallbackTiming = callbacks.CallbackTiming

CallbackTiming enumerates all the timing of callback aspects.

const (
	TimingOnStart CallbackTiming = iota
	TimingOnEnd
	TimingOnError
	TimingOnStartWithStreamInput
	TimingOnEndWithStreamOutput
)

type Handler

type Handler = callbacks.Handler

type HandlerBuilder

type HandlerBuilder struct {
	// contains filtered or unexported fields
}

func NewHandlerBuilder

func NewHandlerBuilder() *HandlerBuilder

NewHandlerBuilder creates and returns a new HandlerBuilder instance. HandlerBuilder is used to construct a Handler with custom callback functions

func (*HandlerBuilder) Build added in v0.3.3

func (hb *HandlerBuilder) Build() Handler

Build returns a Handler with the functions set in the builder.

func (*HandlerBuilder) OnEndFn

func (hb *HandlerBuilder) OnEndFn(
	fn func(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context) *HandlerBuilder

func (*HandlerBuilder) OnEndWithStreamOutputFn

func (hb *HandlerBuilder) OnEndWithStreamOutputFn(
	fn func(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context) *HandlerBuilder

OnEndWithStreamOutputFn sets the callback function to be called.

func (*HandlerBuilder) OnErrorFn

func (hb *HandlerBuilder) OnErrorFn(
	fn func(ctx context.Context, info *RunInfo, err error) context.Context) *HandlerBuilder

func (*HandlerBuilder) OnStartFn

func (hb *HandlerBuilder) OnStartFn(
	fn func(ctx context.Context, info *RunInfo, input CallbackInput) context.Context) *HandlerBuilder

func (*HandlerBuilder) OnStartWithStreamInputFn

func (hb *HandlerBuilder) OnStartWithStreamInputFn(
	fn func(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context) *HandlerBuilder

OnStartWithStreamInputFn sets the callback function to be called.

type RunInfo

type RunInfo = callbacks.RunInfo

RunInfo contains information about the running component.

type TimingChecker

type TimingChecker = callbacks.TimingChecker

TimingChecker checks if the handler is needed for the given callback aspect timing. It's recommended for callback handlers to implement this interface, but not mandatory. If a callback handler is created by using callbacks.HandlerHelper or handlerBuilder, then this interface is automatically implemented. Eino's callback mechanism will try to use this interface to determine whether any handlers are needed for the given timing. Also, the callback handler that is not needed for that timing will be skipped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL