runstate

package
v0.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 1, 2025 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package runstate provides functionality for managing the runtime state of message processing, including message aggregation, forking, and joining of message streams, as well as usage tracking.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AddMessage

func AddMessage[T messages.ModelMessage](a *Aggregator, m messages.Message[T])

AddMessage adds any message type that implements ModelMessage to the aggregator. This is a generic function that can handle any valid message type in the system. For common message types, prefer using the specific Add methods (AddUserPrompt, AddAssistantMessage, etc.) as they provide better type safety and clarity.

Example:

agg := &Aggregator{...}
msg := messages.New().UserPrompt("hello")
AddMessage(agg, msg)

Types

type AggregatedMessages

type AggregatedMessages []messages.Message[messages.ModelMessage]

AggregatedMessages represents a collection of model messages that can be processed together. It provides a type-safe way to handle multiple messages while maintaining their order.

func (AggregatedMessages) Len

func (a AggregatedMessages) Len() int

Len returns the number of messages in the collection.

type Aggregator

type Aggregator struct {
	// contains filtered or unexported fields
}

Aggregator manages a collection of messages and their associated usage statistics. It supports fork-join operations to allow parallel processing of message streams while maintaining message order and proper usage tracking.

func NewAggregator added in v0.0.10

func NewAggregator() *Aggregator

NewAggregator creates and initializes a new Aggregator instance. It sets up: - A new unique identifier - An empty message collection - Zero-initialized usage statistics

Example:

agg := NewAggregator()
// agg is ready to accept messages and track usage

func (*Aggregator) AddAssistantMessage

func (a *Aggregator) AddAssistantMessage(m messages.Message[messages.AssistantMessage])

AddAssistantMessage adds an assistant's response message to the aggregator. This is used for messages that represent responses or outputs from the assistant.

Example:

msg := messages.New().AssistantMessage("The weather is sunny.")
agg.AddAssistantMessage(msg)

func (*Aggregator) AddToolCall

func (a *Aggregator) AddToolCall(m messages.Message[messages.ToolCallMessage])

AddToolCall adds a tool call message to the aggregator. This is used when the assistant needs to invoke an external tool or service.

Example:

toolCall := messages.New().ToolCall("weather-api", []ToolCallData{...})
agg.AddToolCall(toolCall)

func (*Aggregator) AddToolResponse

func (a *Aggregator) AddToolResponse(m messages.Message[messages.ToolResponse])

AddToolResponse adds a tool's response message to the aggregator. This is used to store the results returned from external tool invocations.

Example:

response := messages.New().ToolResponse("call-id", "weather-api", "Temperature: 72°F")
agg.AddToolResponse(response)

func (*Aggregator) AddUserPrompt

func (a *Aggregator) AddUserPrompt(m messages.Message[messages.UserMessage])

AddUserPrompt adds a user message to the aggregator. This is typically used for adding messages that represent user input or queries.

Example:

msg := messages.New().UserPrompt("What's the weather?")
agg.AddUserPrompt(msg)

func (*Aggregator) Checkpoint added in v0.0.10

func (a *Aggregator) Checkpoint() Checkpoint

Checkpoint creates a snapshot of the current aggregator state. This allows saving the current state of messages and usage statistics for later reference or restoration. The checkpoint includes: - The aggregator's unique ID - A deep copy of all current messages - The current usage statistics

Example:

agg := &Aggregator{...}
checkpoint := agg.Checkpoint()  // Save current state
// ... make changes to agg ...
// checkpoint still holds the original state

func (*Aggregator) Fork

func (a *Aggregator) Fork() *Aggregator

Fork creates a new aggregator that starts with a copy of the current messages. The new aggregator gets: - A new unique ID - A copy of all current messages - An initLen set to the current message count This allows for parallel processing of message streams that can be joined later.

func (*Aggregator) ID

func (a *Aggregator) ID() uuid.UUID

ID returns the unique identifier of this aggregator. This ID is generated when the aggregator is created or forked.

func (*Aggregator) Join

func (a *Aggregator) Join(b *Aggregator)

Join combines messages from a forked aggregator back into this one. It:

  • Appends only the messages that were added to the forked aggregator after it was forked (determined using b.initLen)
  • Combines usage statistics from both aggregators

The join operation maintains message order by: 1. Keeping all original messages 2. Keeping any messages added to this aggregator after the fork 3. Appending only new messages from the forked aggregator (those after b.initLen)

Example:

original := &Aggregator{...}  // Has messages [1,2]
forked := original.Fork()     // forked has [1,2] and initLen=2
original.Add(msg3)            // original now has [1,2,3]
forked.Add(msg4)             // forked now has [1,2,4]
original.Join(forked)         // original ends with [1,2,3,4]

func (*Aggregator) Len

func (a *Aggregator) Len() int

Len returns the total number of messages currently held by the aggregator.

func (*Aggregator) Messages

func (a *Aggregator) Messages() AggregatedMessages

Messages returns a copy of all messages in the aggregator. The returned slice is a deep copy, so modifications to it won't affect the original messages in the aggregator.

func (*Aggregator) MessagesIter

func (a *Aggregator) MessagesIter() iter.Seq[messages.Message[messages.ModelMessage]]

MessagesIter returns an iterator over all messages in the aggregator. This provides a memory-efficient way to process messages sequentially without creating a copy of the entire message slice.

func (*Aggregator) TurnLen added in v0.0.10

func (a *Aggregator) TurnLen() int

TurnLen returns the number of messages added to the aggregator since it was forked.

func (*Aggregator) Usage

func (a *Aggregator) Usage() Usage

Usage returns the current usage statistics for this aggregator. This includes token counts for prompts and completions, as well as detailed breakdowns of token usage by category.

type Checkpoint added in v0.0.10

type Checkpoint struct {
	// contains filtered or unexported fields
}

Checkpoint represents a snapshot of an aggregator's state at a specific point in time. It contains an immutable copy of the aggregator's state, including: - The unique identifier of the source aggregator - A snapshot of all messages at checkpoint time - The usage statistics at checkpoint time

Checkpoints are useful for: - Creating save points in long-running operations - Comparing states at different points in time - Rolling back to previous states if needed

func (*Checkpoint) ID added in v0.0.10

func (c *Checkpoint) ID() uuid.UUID

ID returns the unique identifier of the aggregator that created this checkpoint. This ID matches the source aggregator's ID at the time the checkpoint was created.

func (Checkpoint) MarshalJSON added in v0.0.10

func (c Checkpoint) MarshalJSON() ([]byte, error)

func (*Checkpoint) MergeInto added in v0.0.10

func (c *Checkpoint) MergeInto(other *Aggregator)

MergeInto merges the checkpoint's state into another aggregator. This operation: - Appends messages from the checkpoint that were added after its fork point - Combines the checkpoint's usage statistics with the target aggregator's

This is useful when you want to apply a saved state to a different or new aggregator instance.

Example:

checkpoint := sourceAgg.Checkpoint()
targetAgg := NewAggregator()
checkpoint.MergeInto(targetAgg)  // targetAgg now contains checkpoint's state

func (*Checkpoint) Messages added in v0.0.10

func (c *Checkpoint) Messages() AggregatedMessages

Messages returns a copy of all messages that were present in the aggregator at the time this checkpoint was created. The returned slice is a deep copy, so modifications won't affect the checkpoint's stored messages.

func (*Checkpoint) UnmarshalJSON added in v0.0.10

func (c *Checkpoint) UnmarshalJSON(data []byte) error

func (*Checkpoint) Usage added in v0.0.10

func (c *Checkpoint) Usage() Usage

Usage returns the usage statistics that were recorded in the aggregator at the time this checkpoint was created. This includes all token counts and usage metrics up to the checkpoint time.

type CompletionTokensDetails

type CompletionTokensDetails struct {
	// When using Predicted Outputs, the number of tokens in the prediction that
	// appeared in the completion.
	AcceptedPredictionTokens int64 `json:"accepted_prediction_tokens"`
	// Audio input tokens generated by the model.
	AudioTokens int64 `json:"audio_tokens"`
	// Tokens generated by the model for reasoning.
	ReasoningTokens int64 `json:"reasoning_tokens"`
	// When using Predicted Outputs, the number of tokens in the prediction that did
	// not appear in the completion. However, like reasoning tokens, these tokens are
	// still counted in the total completion tokens for purposes of billing, output,
	// and context window limits.
	RejectedPredictionTokens int64 `json:"rejected_prediction_tokens"`
}

func (*CompletionTokensDetails) AddUsage

func (c *CompletionTokensDetails) AddUsage(details *CompletionTokensDetails)

type PromptTokensDetails

type PromptTokensDetails struct {
	// Audio input tokens present in the prompt.
	AudioTokens int64 `json:"audio_tokens"`
	// Cached tokens present in the prompt.
	CachedTokens int64 `json:"cached_tokens"`
}

func (*PromptTokensDetails) AddUsage

func (p *PromptTokensDetails) AddUsage(details *PromptTokensDetails)

type Usage

type Usage struct {
	// Number of tokens in the generated completion.
	CompletionTokens int64 `json:"completion_tokens"`
	// Number of tokens in the prompt.
	PromptTokens int64 `json:"prompt_tokens"`
	// Total number of tokens used in the request (prompt + completion).
	TotalTokens int64 `json:"total_tokens"`
	// Breakdown of tokens used in a completion.
	CompletionTokensDetails CompletionTokensDetails `json:"completion_tokens_details"`
	// Breakdown of tokens used in the prompt.
	PromptTokensDetails PromptTokensDetails `json:"prompt_tokens_details"`
}

func (*Usage) AddUsage

func (u *Usage) AddUsage(usage *Usage)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL