Documentation
¶
Overview ¶
Package filestream communicates with the W&B backend filestream service.
Index ¶
- Constants
- func RetryPolicy(ctx context.Context, resp *http.Response, err error) (bool, error)
- type ChunkTypeEnum
- type CollectorState
- type CollectorStateUpdate
- type ExitUpdate
- type FileStream
- type FileStreamOffsetMap
- type FileStreamParams
- type FilesUploadedUpdate
- type FsTransmitData
- type FsTransmitFileData
- type HistoryUpdate
- type LogsUpdate
- type PreemptingUpdate
- type StatsUpdate
- type SummaryUpdate
- type Update
- type UpdateContext
Constants ¶
const ( BufferSize = 32 EventsFileName = "wandb-events.jsonl" HistoryFileName = "wandb-history.jsonl" SummaryFileName = "wandb-summary.json" OutputFileName = "output.log" )
const ( // Retry filestream requests for 7 days before dropping chunk // retry_count = seconds_in_7_days / max_retry_time + num_retries_until_max_60_sec // = 7 * 86400 / 60 + ceil(log2(60/2)) // = 10080 + 5 DefaultRetryMax = 10085 DefaultRetryWaitMin = 2 * time.Second DefaultRetryWaitMax = 60 * time.Second // A 3-minute timeout for all filestream post requests DefaultNonRetryTimeout = 180 * time.Second )
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ChunkTypeEnum ¶
type ChunkTypeEnum int8
const ( NoneChunk ChunkTypeEnum = iota HistoryChunk OutputChunk EventsChunk SummaryChunk )
type CollectorState ¶
type CollectorState struct { HistoryLineNum int // Line number where to append run history. HistoryLines []string // Lines to append to run history. EventsLineNum int // Line number where to append run system metrics. EventsLines []string // Lines to append to run system metrics. ConsoleLogLineNum int // Line number where to append console output. ConsoleLogLines []string // Lines to append to console output. SummaryLineNum int // Line number where to write the run summary. LatestSummary string // The run's updated summary, or the empty string. // UploadedFiles are files for which uploads have finished. UploadedFiles []string HasPreempting bool Preempting bool // ExitCode is the run's script's exit code if any. // // This is sent with the final transmission. ExitCode *int32 // Complete is the run's script's completion status if any. // // This is sent with the final transmission. Complete *bool }
CollectorState is the filestream's buffered data.
func NewCollectorState ¶
func NewCollectorState(initialOffsets FileStreamOffsetMap) CollectorState
func (*CollectorState) MakeRequest ¶
func (s *CollectorState) MakeRequest(isDone bool) (*FsTransmitData, bool)
MakeRequest moves buffered data into an API request and returns it.
Returns a boolean that's true if the request is non-empty.
type CollectorStateUpdate ¶
type CollectorStateUpdate interface { // Apply modifies the collector state. Apply(*CollectorState) }
CollectorStateUpdate is a mutation to a CollectorState.
type ExitUpdate ¶
type ExitUpdate struct {
Record *service.RunExitRecord
}
ExitUpdate contains the run's script's exit code.
func (*ExitUpdate) Apply ¶
func (u *ExitUpdate) Apply(ctx UpdateContext) error
type FileStream ¶
type FileStream interface { // Start asynchronously begins to upload to the backend. // // All operations are associated with the specified run (defined by // `entity`, `project` and `runID`). In case we are resuming a run, // the `offsetMap` specifies the initial file offsets for each file // type (history, output logs, events, summary). Start( entity string, project string, runID string, offsetMap FileStreamOffsetMap, ) // Close waits for all work to be completed. Close() // StreamUpdate uploads information through the filestream API. StreamUpdate(update Update) }
func NewFileStream ¶
func NewFileStream(params FileStreamParams) FileStream
type FileStreamOffsetMap ¶
type FileStreamOffsetMap map[ChunkTypeEnum]int
type FileStreamParams ¶
type FileStreamParams struct { Settings *service.Settings Logger *observability.CoreLogger Printer *observability.Printer ApiClient api.Client MaxItemsPerPush int DelayProcess waiting.Delay HeartbeatStopwatch waiting.Stopwatch }
type FilesUploadedUpdate ¶
type FilesUploadedUpdate struct { // The path to the file, relative to the run's files directory. RelativePath string }
FilesUploadedUpdate signals that a run file's contents were uploaded.
This is used in some deployments where the backend is not notified when files finish uploading.
func (*FilesUploadedUpdate) Apply ¶
func (u *FilesUploadedUpdate) Apply(ctx UpdateContext) error
type FsTransmitData ¶
type FsTransmitData struct { Files map[string]FsTransmitFileData `json:"files,omitempty"` Complete *bool `json:"complete,omitempty"` Exitcode *int32 `json:"exitcode,omitempty"` Preempting bool `json:"preempting,omitempty"` Dropped int32 `json:"dropped,omitempty"` Uploaded []string `json:"uploaded,omitempty"` }
FsTransmitData is serialized and sent to a W&B server
type FsTransmitFileData ¶
FsServerFileData (part of FsTransmitData) is serialized and sent to a W&B server
type HistoryUpdate ¶
type HistoryUpdate struct {
Record *service.HistoryRecord
}
HistoryUpdate contains run metrics from `run.log()`.
func (*HistoryUpdate) Apply ¶
func (u *HistoryUpdate) Apply(ctx UpdateContext) error
type LogsUpdate ¶
type LogsUpdate struct {
Record *service.OutputRawRecord
}
LogsUpdate is new lines in a run's console output.
func (*LogsUpdate) Apply ¶
func (u *LogsUpdate) Apply(ctx UpdateContext) error
type PreemptingUpdate ¶
type PreemptingUpdate struct {
Record *service.RunPreemptingRecord
}
PreemptingUpdate indicates a run's preemption status.
"Preemptible runs and sweeps" were added in https://github.com/wandb/wandb/pull/2142
This is a mechanism to tell the backend that the run will be unable to even send heartbeats for some time, and to be more lenient with deciding if the run crashed.
func (*PreemptingUpdate) Apply ¶
func (u *PreemptingUpdate) Apply(ctx UpdateContext) error
type StatsUpdate ¶
type StatsUpdate struct {
Record *service.StatsRecord
}
StatsUpdate contains system metrics during the run, e.g. memory usage.
func (*StatsUpdate) Apply ¶
func (u *StatsUpdate) Apply(ctx UpdateContext) error
type SummaryUpdate ¶
type SummaryUpdate struct {
Record *service.SummaryRecord
}
SummaryUpdate contains a run's most recent summary.
func (*SummaryUpdate) Apply ¶
func (u *SummaryUpdate) Apply(ctx UpdateContext) error
type Update ¶
type Update interface { // Apply processes data and modifies the next API request. Apply(UpdateContext) error }
Update is a modification to the filestream's next API request.
type UpdateContext ¶
type UpdateContext struct { // ModifyRequest updates the next filestream API request. // // The state update runs in a separate goroutine. ModifyRequest func(CollectorStateUpdate) Settings *service.Settings Logger *observability.CoreLogger Printer *observability.Printer }