filestream

package
v0.0.0-...-4d70e44 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 14, 2024 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package filestream communicates with the W&B backend filestream service.

Index

Constants

View Source
const (
	BufferSize      = 32
	EventsFileName  = "wandb-events.jsonl"
	HistoryFileName = "wandb-history.jsonl"
	SummaryFileName = "wandb-summary.json"
	OutputFileName  = "output.log"
)
View Source
const (
	// Retry filestream requests for 7 days before dropping chunk
	// retry_count = seconds_in_7_days / max_retry_time + num_retries_until_max_60_sec
	//             = 7 * 86400 / 60 + ceil(log2(60/2))
	//             = 10080 + 5
	DefaultRetryMax     = 10085
	DefaultRetryWaitMin = 2 * time.Second
	DefaultRetryWaitMax = 60 * time.Second
	// A 3-minute timeout for all filestream post requests
	DefaultNonRetryTimeout = 180 * time.Second
)

Variables

This section is empty.

Functions

func RetryPolicy

func RetryPolicy(
	ctx context.Context,
	resp *http.Response,
	err error,
) (bool, error)

RetryPolicy is the retry policy to be used for file stream operations.

Types

type ChunkTypeEnum

type ChunkTypeEnum int8
const (
	NoneChunk ChunkTypeEnum = iota
	HistoryChunk
	OutputChunk
	EventsChunk
	SummaryChunk
)

type CollectorState

type CollectorState struct {
	HistoryLineNum int      // Line number where to append run history.
	HistoryLines   []string // Lines to append to run history.

	EventsLineNum int      // Line number where to append run system metrics.
	EventsLines   []string // Lines to append to run system metrics.

	ConsoleLogLineNum int      // Line number where to append console output.
	ConsoleLogLines   []string // Lines to append to console output.

	SummaryLineNum int    // Line number where to write the run summary.
	LatestSummary  string // The run's updated summary, or the empty string.

	// UploadedFiles are files for which uploads have finished.
	UploadedFiles []string

	HasPreempting bool
	Preempting    bool

	// ExitCode is the run's script's exit code if any.
	//
	// This is sent with the final transmission.
	ExitCode *int32

	// Complete is the run's script's completion status if any.
	//
	// This is sent with the final transmission.
	Complete *bool
}

CollectorState is the filestream's buffered data.

func NewCollectorState

func NewCollectorState(initialOffsets FileStreamOffsetMap) CollectorState

func (*CollectorState) MakeRequest

func (s *CollectorState) MakeRequest(isDone bool) (*FsTransmitData, bool)

MakeRequest moves buffered data into an API request and returns it.

Returns a boolean that's true if the request is non-empty.

type CollectorStateUpdate

type CollectorStateUpdate interface {
	// Apply modifies the collector state.
	Apply(*CollectorState)
}

CollectorStateUpdate is a mutation to a CollectorState.

type ExitUpdate

type ExitUpdate struct {
	Record *service.RunExitRecord
}

ExitUpdate contains the run's script's exit code.

func (*ExitUpdate) Apply

func (u *ExitUpdate) Apply(ctx UpdateContext) error

type FileStream

type FileStream interface {
	// Start asynchronously begins to upload to the backend.
	//
	// All operations are associated with the specified run (defined by
	// `entity`, `project` and `runID`). In case we are resuming a run,
	// the `offsetMap` specifies the initial file offsets for each file
	// type (history, output logs, events, summary).
	Start(
		entity string,
		project string,
		runID string,
		offsetMap FileStreamOffsetMap,
	)

	// Close waits for all work to be completed.
	Close()

	// StreamUpdate uploads information through the filestream API.
	StreamUpdate(update Update)
}

func NewFileStream

func NewFileStream(params FileStreamParams) FileStream

type FileStreamOffsetMap

type FileStreamOffsetMap map[ChunkTypeEnum]int

type FileStreamParams

type FileStreamParams struct {
	Settings           *service.Settings
	Logger             *observability.CoreLogger
	Printer            *observability.Printer
	ApiClient          api.Client
	MaxItemsPerPush    int
	DelayProcess       waiting.Delay
	HeartbeatStopwatch waiting.Stopwatch
}

type FilesUploadedUpdate

type FilesUploadedUpdate struct {
	// The path to the file, relative to the run's files directory.
	RelativePath string
}

FilesUploadedUpdate signals that a run file's contents were uploaded.

This is used in some deployments where the backend is not notified when files finish uploading.

func (*FilesUploadedUpdate) Apply

func (u *FilesUploadedUpdate) Apply(ctx UpdateContext) error

type FsTransmitData

type FsTransmitData struct {
	Files      map[string]FsTransmitFileData `json:"files,omitempty"`
	Complete   *bool                         `json:"complete,omitempty"`
	Exitcode   *int32                        `json:"exitcode,omitempty"`
	Preempting bool                          `json:"preempting,omitempty"`
	Dropped    int32                         `json:"dropped,omitempty"`
	Uploaded   []string                      `json:"uploaded,omitempty"`
}

FsTransmitData is serialized and sent to a W&B server

type FsTransmitFileData

type FsTransmitFileData struct {
	Offset  int      `json:"offset"`
	Content []string `json:"content"`
}

FsServerFileData (part of FsTransmitData) is serialized and sent to a W&B server

type HistoryUpdate

type HistoryUpdate struct {
	Record *service.HistoryRecord
}

HistoryUpdate contains run metrics from `run.log()`.

func (*HistoryUpdate) Apply

func (u *HistoryUpdate) Apply(ctx UpdateContext) error

type LogsUpdate

type LogsUpdate struct {
	Record *service.OutputRawRecord
}

LogsUpdate is new lines in a run's console output.

func (*LogsUpdate) Apply

func (u *LogsUpdate) Apply(ctx UpdateContext) error

type PreemptingUpdate

type PreemptingUpdate struct {
	Record *service.RunPreemptingRecord
}

PreemptingUpdate indicates a run's preemption status.

"Preemptible runs and sweeps" were added in https://github.com/wandb/wandb/pull/2142

This is a mechanism to tell the backend that the run will be unable to even send heartbeats for some time, and to be more lenient with deciding if the run crashed.

func (*PreemptingUpdate) Apply

func (u *PreemptingUpdate) Apply(ctx UpdateContext) error

type StatsUpdate

type StatsUpdate struct {
	Record *service.StatsRecord
}

StatsUpdate contains system metrics during the run, e.g. memory usage.

func (*StatsUpdate) Apply

func (u *StatsUpdate) Apply(ctx UpdateContext) error

type SummaryUpdate

type SummaryUpdate struct {
	Record *service.SummaryRecord
}

SummaryUpdate contains a run's most recent summary.

func (*SummaryUpdate) Apply

func (u *SummaryUpdate) Apply(ctx UpdateContext) error

type Update

type Update interface {
	// Apply processes data and modifies the next API request.
	Apply(UpdateContext) error
}

Update is a modification to the filestream's next API request.

type UpdateContext

type UpdateContext struct {
	// ModifyRequest updates the next filestream API request.
	//
	// The state update runs in a separate goroutine.
	ModifyRequest func(CollectorStateUpdate)

	Settings *service.Settings

	Logger  *observability.CoreLogger
	Printer *observability.Printer
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL