filestream

package
v0.0.0-...-99c2a2b Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2023 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package filestream implements routines necessary for communicating with the W&B backend filestream service.

Internally there are three goroutines spun up:

process:  process records into an appropriate format to transmit
transmit: collect and transmit messages to the filestream service
feedback: process feedback from the filestream service

Below demonstrates a common execution flow through this package:

{caller}:
 - filestream.go:    NewFileStream           - create service
 - filestream.go:    FileStream.Start        - spin up worker goroutines
 - filestream.go:    FileStream.StreamRecord - add a record to be processed and sent
 - loop_process.go:  Filestream.addProcess   - add to process channel
{goroutine process}:
 - loop_process.go:  Filestream.loopProcess  - loop acting on process channel
 - loop_transmit.go: Filestream.addTransmit  - add to transmit channel
{goroutine transmit}:
 - loop_transmit.go: Filestream.loopTransmit - loop acting on transmit channel
 - collector.go:     chunkCollector          - class to coordinate collecting work from transmit channel
 - collector.go:     chunkCollector.read     - read the first transmit work from transmit channel
 - collector.go:     chunkCollector.readMore - keep reading until we have enough or hit timeout
 - collector.go:     chunkCollector.dump     - create a blob to be used to serialize into json to send
 - loop_transmit.go: Filestream.send         - send json to backend filestream service
 - loop_feedback.go: Filestream.add_feedback - add to feedback channel
{goroutine feedback}
 - loop_feedback.go: Filestream.loopFeedback - loop acting on feedback channel
{caller}
 - filestream.go:    FileStream.Close        - graceful shutdown of worker goroutines

Index

Constants

View Source
const (
	BufferSize      = 32
	EventsFileName  = "wandb-events.jsonl"
	HistoryFileName = "wandb-history.jsonl"
	SummaryFileName = "wandb-summary.json"
	OutputFileName  = "output.log"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChunkTypeEnum

type ChunkTypeEnum int8
const (
	NoneChunk ChunkTypeEnum = iota
	HistoryChunk
	OutputChunk
	EventsChunk
	SummaryChunk
)

type FileStream

type FileStream struct {
	// contains filtered or unexported fields
}

FileStream is a stream of data to the server

func NewFileStream

func NewFileStream(opts ...FileStreamOption) *FileStream

func (*FileStream) Close

func (fs *FileStream) Close()

Close gracefully shuts down the goroutines created by Start

func (*FileStream) GetInputChan

func (fs *FileStream) GetInputChan() chan protoreflect.ProtoMessage

func (*FileStream) Start

func (fs *FileStream) Start()

Start creates process, transmit, and feedback goroutines

func (*FileStream) StreamRecord

func (fs *FileStream) StreamRecord(rec *service.Record)

StreamRecord is the main entry point for callers to add data to be sent

type FileStreamOffsetMap

type FileStreamOffsetMap map[ChunkTypeEnum]int

type FileStreamOption

type FileStreamOption func(fs *FileStream)

func WithDelayProcess

func WithDelayProcess(delayProcess time.Duration) FileStreamOption

func WithHeartbeatTime

func WithHeartbeatTime(heartbeatTime time.Duration) FileStreamOption

func WithHttpClient

func WithHttpClient(client *retryablehttp.Client) FileStreamOption

func WithLogger

func WithLogger(logger *observability.NexusLogger) FileStreamOption

func WithMaxItemsPerPush

func WithMaxItemsPerPush(maxItemsPerPush int) FileStreamOption

func WithOffsets

func WithOffsets(offsetMap FileStreamOffsetMap) FileStreamOption

func WithPath

func WithPath(path string) FileStreamOption

func WithSettings

func WithSettings(settings *service.Settings) FileStreamOption

type FsTransmitData

type FsTransmitData struct {
	Files      map[string]fsTransmitFileData `json:"files,omitempty"`
	Complete   *bool                         `json:"complete,omitempty"`
	Exitcode   *int32                        `json:"exitcode,omitempty"`
	Preempting bool                          `json:"preempting,omitempty"`
	Dropped    int32                         `json:"dropped,omitempty"`
	Uploaded   []string                      `json:"uploaded,omitempty"`
}

FsTransmitData is serialized and sent to a W&B server

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL