Documentation
¶
Overview ¶
Package runstate provides functionality for managing the runtime state of message processing, including message aggregation, forking, and joining of message streams, as well as usage tracking.
Index ¶
- func AddMessage[T messages.ModelMessage](a *Aggregator, m messages.Message[T])
- type AggregatedMessages
- type Aggregator
- func (a *Aggregator) AddAssistantMessage(m messages.Message[messages.AssistantMessage])
- func (a *Aggregator) AddToolCall(m messages.Message[messages.ToolCallMessage])
- func (a *Aggregator) AddToolResponse(m messages.Message[messages.ToolResponse])
- func (a *Aggregator) AddUserPrompt(m messages.Message[messages.UserMessage])
- func (a *Aggregator) Checkpoint() Checkpoint
- func (a *Aggregator) Fork() *Aggregator
- func (a *Aggregator) ID() uuid.UUID
- func (a *Aggregator) Join(b *Aggregator)
- func (a *Aggregator) Len() int
- func (a *Aggregator) Messages() AggregatedMessages
- func (a *Aggregator) MessagesIter() iter.Seq[messages.Message[messages.ModelMessage]]
- func (a *Aggregator) TurnLen() int
- func (a *Aggregator) Usage() Usage
- type Checkpoint
- type CompletionTokensDetails
- type PromptTokensDetails
- type Usage
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AddMessage ¶
func AddMessage[T messages.ModelMessage](a *Aggregator, m messages.Message[T])
AddMessage adds any message type that implements ModelMessage to the aggregator. This is a generic function that can handle any valid message type in the system. For common message types, prefer using the specific Add methods (AddUserPrompt, AddAssistantMessage, etc.) as they provide better type safety and clarity.
Example:
agg := &Aggregator{...} msg := messages.New().UserPrompt("hello") AddMessage(agg, msg)
Types ¶
type AggregatedMessages ¶
type AggregatedMessages []messages.Message[messages.ModelMessage]
AggregatedMessages represents a collection of model messages that can be processed together. It provides a type-safe way to handle multiple messages while maintaining their order.
func (AggregatedMessages) Len ¶
func (a AggregatedMessages) Len() int
Len returns the number of messages in the collection.
type Aggregator ¶
type Aggregator struct {
// contains filtered or unexported fields
}
Aggregator manages a collection of messages and their associated usage statistics. It supports fork-join operations to allow parallel processing of message streams while maintaining message order and proper usage tracking.
func NewAggregator ¶ added in v0.0.10
func NewAggregator() *Aggregator
NewAggregator creates and initializes a new Aggregator instance. It sets up: - A new unique identifier - An empty message collection - Zero-initialized usage statistics
Example:
agg := NewAggregator() // agg is ready to accept messages and track usage
func (*Aggregator) AddAssistantMessage ¶
func (a *Aggregator) AddAssistantMessage(m messages.Message[messages.AssistantMessage])
AddAssistantMessage adds an assistant's response message to the aggregator. This is used for messages that represent responses or outputs from the assistant.
Example:
msg := messages.New().AssistantMessage("The weather is sunny.") agg.AddAssistantMessage(msg)
func (*Aggregator) AddToolCall ¶
func (a *Aggregator) AddToolCall(m messages.Message[messages.ToolCallMessage])
AddToolCall adds a tool call message to the aggregator. This is used when the assistant needs to invoke an external tool or service.
Example:
toolCall := messages.New().ToolCall("weather-api", []ToolCallData{...}) agg.AddToolCall(toolCall)
func (*Aggregator) AddToolResponse ¶
func (a *Aggregator) AddToolResponse(m messages.Message[messages.ToolResponse])
AddToolResponse adds a tool's response message to the aggregator. This is used to store the results returned from external tool invocations.
Example:
response := messages.New().ToolResponse("call-id", "weather-api", "Temperature: 72°F") agg.AddToolResponse(response)
func (*Aggregator) AddUserPrompt ¶
func (a *Aggregator) AddUserPrompt(m messages.Message[messages.UserMessage])
AddUserPrompt adds a user message to the aggregator. This is typically used for adding messages that represent user input or queries.
Example:
msg := messages.New().UserPrompt("What's the weather?") agg.AddUserPrompt(msg)
func (*Aggregator) Checkpoint ¶ added in v0.0.10
func (a *Aggregator) Checkpoint() Checkpoint
Checkpoint creates a snapshot of the current aggregator state. This allows saving the current state of messages and usage statistics for later reference or restoration. The checkpoint includes: - The aggregator's unique ID - A deep copy of all current messages - The current usage statistics
Example:
agg := &Aggregator{...} checkpoint := agg.Checkpoint() // Save current state // ... make changes to agg ... // checkpoint still holds the original state
func (*Aggregator) Fork ¶
func (a *Aggregator) Fork() *Aggregator
Fork creates a new aggregator that starts with a copy of the current messages. The new aggregator gets: - A new unique ID - A copy of all current messages - An initLen set to the current message count This allows for parallel processing of message streams that can be joined later.
func (*Aggregator) ID ¶
func (a *Aggregator) ID() uuid.UUID
ID returns the unique identifier of this aggregator. This ID is generated when the aggregator is created or forked.
func (*Aggregator) Join ¶
func (a *Aggregator) Join(b *Aggregator)
Join combines messages from a forked aggregator back into this one. It:
- Appends only the messages that were added to the forked aggregator after it was forked (determined using b.initLen)
- Combines usage statistics from both aggregators
The join operation maintains message order by: 1. Keeping all original messages 2. Keeping any messages added to this aggregator after the fork 3. Appending only new messages from the forked aggregator (those after b.initLen)
Example:
original := &Aggregator{...} // Has messages [1,2] forked := original.Fork() // forked has [1,2] and initLen=2 original.Add(msg3) // original now has [1,2,3] forked.Add(msg4) // forked now has [1,2,4] original.Join(forked) // original ends with [1,2,3,4]
func (*Aggregator) Len ¶
func (a *Aggregator) Len() int
Len returns the total number of messages currently held by the aggregator.
func (*Aggregator) Messages ¶
func (a *Aggregator) Messages() AggregatedMessages
Messages returns a copy of all messages in the aggregator. The returned slice is a deep copy, so modifications to it won't affect the original messages in the aggregator.
func (*Aggregator) MessagesIter ¶
func (a *Aggregator) MessagesIter() iter.Seq[messages.Message[messages.ModelMessage]]
MessagesIter returns an iterator over all messages in the aggregator. This provides a memory-efficient way to process messages sequentially without creating a copy of the entire message slice.
func (*Aggregator) TurnLen ¶ added in v0.0.10
func (a *Aggregator) TurnLen() int
TurnLen returns the number of messages added to the aggregator since it was forked.
func (*Aggregator) Usage ¶
func (a *Aggregator) Usage() Usage
Usage returns the current usage statistics for this aggregator. This includes token counts for prompts and completions, as well as detailed breakdowns of token usage by category.
type Checkpoint ¶ added in v0.0.10
type Checkpoint struct {
// contains filtered or unexported fields
}
Checkpoint represents a snapshot of an aggregator's state at a specific point in time. It contains an immutable copy of the aggregator's state, including: - The unique identifier of the source aggregator - A snapshot of all messages at checkpoint time - The usage statistics at checkpoint time
Checkpoints are useful for: - Creating save points in long-running operations - Comparing states at different points in time - Rolling back to previous states if needed
func (*Checkpoint) ID ¶ added in v0.0.10
func (c *Checkpoint) ID() uuid.UUID
ID returns the unique identifier of the aggregator that created this checkpoint. This ID matches the source aggregator's ID at the time the checkpoint was created.
func (Checkpoint) MarshalJSON ¶ added in v0.0.10
func (c Checkpoint) MarshalJSON() ([]byte, error)
func (*Checkpoint) MergeInto ¶ added in v0.0.10
func (c *Checkpoint) MergeInto(other *Aggregator)
MergeInto merges the checkpoint's state into another aggregator. This operation: - Appends messages from the checkpoint that were added after its fork point - Combines the checkpoint's usage statistics with the target aggregator's
This is useful when you want to apply a saved state to a different or new aggregator instance.
Example:
checkpoint := sourceAgg.Checkpoint() targetAgg := NewAggregator() checkpoint.MergeInto(targetAgg) // targetAgg now contains checkpoint's state
func (*Checkpoint) Messages ¶ added in v0.0.10
func (c *Checkpoint) Messages() AggregatedMessages
Messages returns a copy of all messages that were present in the aggregator at the time this checkpoint was created. The returned slice is a deep copy, so modifications won't affect the checkpoint's stored messages.
func (*Checkpoint) UnmarshalJSON ¶ added in v0.0.10
func (c *Checkpoint) UnmarshalJSON(data []byte) error
func (*Checkpoint) Usage ¶ added in v0.0.10
func (c *Checkpoint) Usage() Usage
Usage returns the usage statistics that were recorded in the aggregator at the time this checkpoint was created. This includes all token counts and usage metrics up to the checkpoint time.
type CompletionTokensDetails ¶
type CompletionTokensDetails struct { // When using Predicted Outputs, the number of tokens in the prediction that // appeared in the completion. AcceptedPredictionTokens int64 `json:"accepted_prediction_tokens"` // Audio input tokens generated by the model. AudioTokens int64 `json:"audio_tokens"` // Tokens generated by the model for reasoning. ReasoningTokens int64 `json:"reasoning_tokens"` // When using Predicted Outputs, the number of tokens in the prediction that did // not appear in the completion. However, like reasoning tokens, these tokens are // still counted in the total completion tokens for purposes of billing, output, // and context window limits. RejectedPredictionTokens int64 `json:"rejected_prediction_tokens"` }
func (*CompletionTokensDetails) AddUsage ¶
func (c *CompletionTokensDetails) AddUsage(details *CompletionTokensDetails)
type PromptTokensDetails ¶
type PromptTokensDetails struct { // Audio input tokens present in the prompt. AudioTokens int64 `json:"audio_tokens"` // Cached tokens present in the prompt. CachedTokens int64 `json:"cached_tokens"` }
func (*PromptTokensDetails) AddUsage ¶
func (p *PromptTokensDetails) AddUsage(details *PromptTokensDetails)
type Usage ¶
type Usage struct { // Number of tokens in the generated completion. CompletionTokens int64 `json:"completion_tokens"` // Number of tokens in the prompt. PromptTokens int64 `json:"prompt_tokens"` // Total number of tokens used in the request (prompt + completion). TotalTokens int64 `json:"total_tokens"` // Breakdown of tokens used in a completion. CompletionTokensDetails CompletionTokensDetails `json:"completion_tokens_details"` // Breakdown of tokens used in the prompt. PromptTokensDetails PromptTokensDetails `json:"prompt_tokens_details"` }