Documentation ¶
Index ¶
- Constants
- Variables
- func Initialize(ctx context.Context, appState *models.AppState, router models.TaskRouter)
- func NewMessageNERTask(appState *models.AppState) models.Task
- func NewMessageSummaryNERTask(appState *models.AppState) models.Task
- func NewRetryableHTTPClient(retryMax int, timeout time.Duration) *http.Client
- func NewSQLQueuePublisher(db *sql.DB, logger watermill.LoggerAdapter) (message.Publisher, error)
- func NewSQLQueueSubscriber(db *sql.DB, logger watermill.LoggerAdapter) (message.Subscriber, error)
- func RunTaskRouter(ctx context.Context, appState *models.AppState, db *sql.DB)
- func TaskHandler(task models.Task) message.NoPublishHandlerFunc
- type BaseTask
- type DocumentEmbedderTask
- type IntentPromptTemplateData
- type MessageEmbedderTask
- type MessageIntentTask
- type MessageNERTask
- type MessageSummaryEmbedderTask
- type MessageSummaryNERTask
- type MessageSummaryTask
- type MessageTokenCountTask
- type SQLSchema
- type SummaryPromptTemplateData
- type TaskPublisher
- type TaskRouter
Constants ¶
const MaxQueueRetries = 5
const MaxTokensFallback = 2048
const NerRetryMax = 3
const NerTimeout = 10 * time.Second
const SQLSubscriberPollInterval = 500 * time.Millisecond
const SummaryMaxOutputTokens = 1024
const TaskCountThrottle = 250 // messages per second
const TaskTimeout = 60 // seconds
Variables ¶
var IntentStringRegex = regexp.MustCompile(`(?i)^\s*intent\W+\s+`)
Functions ¶
func Initialize ¶
func NewRetryableHTTPClient ¶ added in v0.19.0
NewRetryableHTTPClient returns a new retryable HTTP client with the given retryMax and timeout. The retryable HTTP transport is wrapped in an OpenTelemetry transport.
func NewSQLQueuePublisher ¶
func NewSQLQueueSubscriber ¶
func NewSQLQueueSubscriber(db *sql.DB, logger watermill.LoggerAdapter) (message.Subscriber, error)
func TaskHandler ¶
func TaskHandler(task models.Task) message.NoPublishHandlerFunc
TaskHandler returns a message handler function for the given task. Handlers are NoPublishHandlerFuncs i.e. do not publish messages.
Types ¶
type BaseTask ¶
type BaseTask struct {
// contains filtered or unexported fields
}
func (*BaseTask) HandleError ¶
type DocumentEmbedderTask ¶
type DocumentEmbedderTask struct {
BaseTask
}
func NewDocumentEmbedderTask ¶
func NewDocumentEmbedderTask( appState *models.AppState, ) *DocumentEmbedderTask
func (*DocumentEmbedderTask) Process ¶
func (dt *DocumentEmbedderTask) Process( ctx context.Context, collectionName string, docTasks []models.DocEmbeddingTask, ) error
type IntentPromptTemplateData ¶
type IntentPromptTemplateData struct {
Input string
}
type MessageEmbedderTask ¶
type MessageEmbedderTask struct {
BaseTask
}
func NewMessageEmbedderTask ¶
func NewMessageEmbedderTask(appState *models.AppState) *MessageEmbedderTask
type MessageIntentTask ¶
type MessageIntentTask struct {
BaseTask
}
func NewMessageIntentTask ¶
func NewMessageIntentTask(appState *models.AppState) *MessageIntentTask
type MessageNERTask ¶
type MessageNERTask struct {
BaseTask
}
type MessageSummaryEmbedderTask ¶
type MessageSummaryEmbedderTask struct {
BaseTask
}
func NewMessageSummaryEmbedderTask ¶
func NewMessageSummaryEmbedderTask(appState *models.AppState) *MessageSummaryEmbedderTask
func (*MessageSummaryEmbedderTask) HandleError ¶
func (t *MessageSummaryEmbedderTask) HandleError(err error)
type MessageSummaryNERTask ¶
type MessageSummaryNERTask struct {
BaseTask
}
type MessageSummaryTask ¶
type MessageSummaryTask struct {
BaseTask
}
MessageSummaryTask gets a list of messages created since the last SummaryPoint, determines if the message count exceeds the configured message window, and if so: - determines the new SummaryPoint index, which will one message older than message_window / 2 - summarizes the messages from this new SummaryPoint to the oldest message not yet Summarized.
When summarizing, it adds context from these messages to an existing summary if there is one.
func NewMessageSummaryTask ¶
func NewMessageSummaryTask(appState *models.AppState) *MessageSummaryTask
func (*MessageSummaryTask) HandleError ¶
func (t *MessageSummaryTask) HandleError(err error)
type MessageTokenCountTask ¶
type MessageTokenCountTask struct {
BaseTask
}
func NewMessageTokenCountTask ¶
func NewMessageTokenCountTask(appState *models.AppState) *MessageTokenCountTask
func (*MessageTokenCountTask) HandleError ¶
func (mt *MessageTokenCountTask) HandleError(err error)
type SQLSchema ¶
type SQLSchema struct {
wsql.DefaultPostgreSQLSchema
}
func (SQLSchema) SubscribeIsolationLevel ¶
func (s SQLSchema) SubscribeIsolationLevel() sql.IsolationLevel
type TaskPublisher ¶
type TaskPublisher struct {
// contains filtered or unexported fields
}
func NewTaskPublisher ¶
func NewTaskPublisher(db *sql.DB) *TaskPublisher
func (*TaskPublisher) Close ¶
func (t *TaskPublisher) Close() error
func (*TaskPublisher) Publish ¶
func (t *TaskPublisher) Publish( taskType models.TaskTopic, metadata map[string]string, payload any, ) error
Publish publishes a message to the given topic. Payload must be a struct that can be marshalled to JSON.
func (*TaskPublisher) PublishMessage ¶
func (t *TaskPublisher) PublishMessage( metadata map[string]string, payload []models.MessageTask, ) error
PublishMessage publishes a slice of Messages to all Message topics.
type TaskRouter ¶
type TaskRouter struct { *message.Router Subscribers map[string]message.Subscriber // contains filtered or unexported fields }
TaskRouter is a wrapper around watermill's Router that adds some functionality for managing tasks and handlers. TaskRouter uses a SQLQueueSubscriber for all handlers.
func NewTaskRouter ¶
NewTaskRouter creates a new TaskRouter. Note that db should not be a bun.DB instance as bun runs at an isolation level that is incompatible with watermill's SQLQueueSubscriber.
func (*TaskRouter) AddTask ¶
func (tr *TaskRouter) AddTask( _ context.Context, name string, taskType models.TaskTopic, task models.Task, )
AddTask adds a task handler to the router.
func (*TaskRouter) Close ¶
func (tr *TaskRouter) Close() (err error)