sync

package
v0.44.0-rc0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2024 License: AGPL-3.0 Imports: 40 Imported by: 0

Documentation

Overview

Package sync implements datasync for the builtin datamanger

Index

Constants

View Source
const (
	// FailedDir is a subdirectory of the capture directory that holds any files that could not be synced.
	FailedDir = "failed"
)

Variables

View Source
var (
	// InitialWaitTimeMillis defines the time to wait on the first retried upload attempt.
	// public for tests.
	InitialWaitTimeMillis = 200
	// RetryExponentialFactor defines the factor by which the retry wait time increases.
	// public for tests.
	RetryExponentialFactor = 2
)
View Source
var (
	// FSThresholdToTriggerDeletion temporarily public for tests.
	FSThresholdToTriggerDeletion = .90
	// CaptureDirToFSUsageRatio temporarily public for tests.
	CaptureDirToFSUsageRatio = .5
)
View Source
var CheckDeleteExcessFilesInterval = 30 * time.Second

CheckDeleteExcessFilesInterval temporarily public for tests.

View Source
var MaxUnaryFileSize = int64(units.MB)

MaxUnaryFileSize is the max number of bytes to send using the unary DataCaptureUpload, as opposed to the StreamingDataCaptureUpload.

View Source
var (
	UploadChunkSize = 64 * 1024
)

UploadChunkSize defines the size of the data included in each message of a FileUpload stream.

Functions

func NoOpCloudClientConstructor added in v0.44.0

func NoOpCloudClientConstructor(grpc.ClientConnInterface) v1.DataSyncServiceClient

NoOpCloudClientConstructor returns a v1.DataSyncServiceClient that does nothing & always returns successes.

Types

type ClientStreamingMock added in v0.44.0

type ClientStreamingMock[Req any, Res any] struct {
	NoOpClientStream
	T                *testing.T
	SendFunc         func(Req) error
	CloseAndRecvFunc func() (Res, error)
}

ClientStreamingMock is a generic mock for a client streaming GRPC client.

func (*ClientStreamingMock[Req, Res]) CloseAndRecv added in v0.44.0

func (m *ClientStreamingMock[Req, Res]) CloseAndRecv() (Res, error)

CloseAndRecv implemnents CloseAndRecv for a generic mock for a client streaming GRPC client.

func (*ClientStreamingMock[Req, Res]) Send added in v0.44.0

func (m *ClientStreamingMock[Req, Res]) Send(in Req) error

Send implemnents Send for a generic mock for a client streaming GRPC client.

type Config

type Config struct {
	// AdditionalSyncPaths defines the file system paths
	// that should be synced in addition to the CaptureDir.
	// Generally 3rd party programs will write arbitrary
	// files to these directories which are intended to be
	// synced to the cloud by data manager.
	AdditionalSyncPaths []string
	// CaptureDir defines the file system path
	// that data capture will write to and
	// which data sync should sync from.
	// defaults to filepath.Join(os.Getenv("HOME"), ".viam", "capture")
	CaptureDir string
	// CaptureDisabled, when true disables data capture.
	// sync needs to know if capture is disabled b/c if it
	// is not disabled sync will need to check if
	// the disk has filled up enough that data sync needs to delete
	// capture files without syncing them.
	// See DeleteEveryNthWhenDiskFull for more info.
	CaptureDisabled bool
	// DeleteEveryNthWhenDiskFull defines the `n` in
	// the psudocode:
	// “`go
	// captureEnabled := !config.CaptureDisabled
	// if captureEnabled {
	//     for {
	//         time.Sleep(time.Second*30)
	//         if diskFull() {
	//             for i, file in range dataCaptureDirFiles {
	//                 if fileIndex % n == 0 {
	//                     delete file
	//                 }
	//             }
	//         }
	//     }
	// }
	// “`
	//
	// which in english reads:
	//
	// If datacapture is enabled then every 30 seconds
	// if the disk is full (which is defined as the disk
	// is 90% full and 50% is contributed by the CaptureDir)
	// delete every Nth file in the CaptureDir & child
	// directories.
	//
	// The intent is to prevent data capture from filling up the
	// disk if the robot is unable to sync data for a long period
	// of time. Defaults to 5.
	DeleteEveryNthWhenDiskFull int
	// FileLastModifiedMillis defines the number of milliseconds that
	// we should wait for an arbitrary file (aka a file that doesn't end in
	// either the .prog nor the .capture file extension) before we consider
	// it as being ready to sync.
	// defaults to 10000, aka 10 seconds
	FileLastModifiedMillis int
	// MaximumNumSyncThreads defines the maximum number of goroutines which
	// data sync should create to sync data to the cloud
	// defaults to runtime.NumCpu() / 2
	MaximumNumSyncThreads int
	// ScheduledSyncDisabled, when true disables data capture syncing every SyncIntervalMins
	ScheduledSyncDisabled bool
	// SelectiveSyncerName defines the name of the selective sync sensor. Ignored when empty string
	SelectiveSyncerName string
	// SyncIntervalMins defines interval in minutes that scheduled sync should run. Ignored if
	// ScheduledSyncDisabled is true
	SyncIntervalMins float64
	// Tags defines the tags which should be applied to arbitrary files at sync time
	Tags []string
	// SelectiveSyncSensorEnabled when set to true, indicates that SelectiveSyncerName was non empty string
	// (meaning that the user configured a sync sensor). This will cause data sync to NOT sync if
	// SelectiveSyncSensor is nil (which will happen if resource graph doesn't have a resource with that name)
	SelectiveSyncSensorEnabled bool
	// SelectiveSyncSensor the selective sync sensor, which if non nil, will cause scheduled sync to NOT sync
	// unil the Readings method of the SelectiveSyncSensor (when called on the SyncIntervalMins interval) returns
	// the a key of datamanager.ShouldSyncKey and a value of `true`
	SelectiveSyncSensor sensor.Sensor
}

Config is the sync config from builtin.

func (Config) Equal

func (c Config) Equal(o Config) bool

Equal returns true when two Configs are semantically equivalent.

func (Config) SyncPaths added in v0.44.0

func (c Config) SyncPaths() []string

SyncPaths returns the capture directory and additional sync paths as a slice.

type ConnectivityState added in v0.44.0

type ConnectivityState interface {
	GetState() connectivity.State
}

ConnectivityState allows callers to check the connectivity state of the connection see https://github.com/grpc/grpc-go/blob/master/clientconn.go#L648

func ConnToConnectivityState added in v0.44.0

func ConnToConnectivityState(conn rpc.ClientConn) ConnectivityState

ConnToConnectivityState takes an rpc.ClientConn and returns an object that implents GetStatus so that the state of the connection (whether it is healthy or not) can be monitored. NOTE: Currently, as a temporary measure, this is done by trying to make a new tcp connection with app.viam.com, but once https://viam.atlassian.net/browse/DATA-3009 is unblocked & implemented, we will either remove this function or have it call the appropriate methods on conn.

type MockDataSyncServiceClient added in v0.44.0

type MockDataSyncServiceClient struct {
	T                     *testing.T
	DataCaptureUploadFunc func(
		ctx context.Context,
		in *v1.DataCaptureUploadRequest,
		opts ...grpc.CallOption,
	) (*v1.DataCaptureUploadResponse, error)
	FileUploadFunc func(
		ctx context.Context,
		opts ...grpc.CallOption,
	) (v1.DataSyncService_FileUploadClient, error)
	StreamingDataCaptureUploadFunc func(
		ctx context.Context,
		opts ...grpc.CallOption,
	) (v1.DataSyncService_StreamingDataCaptureUploadClient, error)
}

MockDataSyncServiceClient is a mock for a v1.DataSyncServiceClient.

func (MockDataSyncServiceClient) DataCaptureUpload added in v0.44.0

DataCaptureUpload is needed to satisfy the interface.

func (MockDataSyncServiceClient) FileUpload added in v0.44.0

FileUpload is needed to satisfy the interface.

func (MockDataSyncServiceClient) StreamingDataCaptureUpload added in v0.44.0

StreamingDataCaptureUpload is needed to satisfy the interface.

type NoOpClientStream added in v0.44.0

type NoOpClientStream struct{}

NoOpClientStream is a mock for a GRPC ClientStream that does nothing.

func (*NoOpClientStream) CloseSend added in v0.44.0

func (m *NoOpClientStream) CloseSend() error

CloseSend satisfies the interface.

func (*NoOpClientStream) Context added in v0.44.0

func (m *NoOpClientStream) Context() context.Context

Context satisfies the interface.

func (*NoOpClientStream) Header added in v0.44.0

func (m *NoOpClientStream) Header() (metadata.MD, error)

Header satisfies the interface.

func (*NoOpClientStream) RecvMsg added in v0.44.0

func (m *NoOpClientStream) RecvMsg(any) error

RecvMsg satisfies the interface.

func (*NoOpClientStream) SendMsg added in v0.44.0

func (m *NoOpClientStream) SendMsg(any) error

SendMsg satisfies the interface.

func (*NoOpClientStream) Trailer added in v0.44.0

func (m *NoOpClientStream) Trailer() metadata.MD

Trailer satisfies the interface.

type Sync added in v0.44.0

type Sync struct {
	// ScheduledTicker only exists for tests
	ScheduledTicker *clock.Ticker

	Scheduler *goutils.StoppableWorkers

	// FileDeletingWorkers is only public for tests
	FileDeletingWorkers *goutils.StoppableWorkers

	// MaxSyncThreads only exists for tests
	MaxSyncThreads int
	// contains filtered or unexported fields
}

Sync manages uploading files (both written by data capture and by 3rd party applications) to the cloud & deleting the upload files. It also manages deleting files if capture is enabled and the disk is about to fill up. There must be only one Sync per DataManager. The lifecycle of a Sync is:

- New - Reconfigure (any number of times) - Close (once).

func New added in v0.44.0

func New(
	clientConstructor func(cc grpc.ClientConnInterface) v1.DataSyncServiceClient,
	connToConnectivityState func(conn rpc.ClientConn) ConnectivityState,
	flushCollectors func(),
	clock clock.Clock,
	logger logging.Logger,
) *Sync

New creates a new Sync.

func (*Sync) Close added in v0.44.0

func (s *Sync) Close()

Close releases all resources managed by data sync.

func (*Sync) CloudConnReady added in v0.44.0

func (s *Sync) CloudConnReady() chan struct{}

CloudConnReady is public for builtin tests.

func (*Sync) Reconfigure added in v0.44.0

func (s *Sync) Reconfigure(_ context.Context, config Config, cloudConnSvc cloud.ConnectionService)

Reconfigure reconfigures Sync and is only called by the builtin data manager it assumes that it is only called by one goroutine at a time. Reconfigure: 1. stops all workers which use the config 2. sets the cloud.ConnectionService if it hans't been set yet (only needs to be set once) and starts the cloud connection manager if it hasn't been started yet so it can make a cloud connection 3. starts up the appropriate workers which use the new config.

func (*Sync) Sync added in v0.44.0

func (s *Sync) Sync(ctx context.Context, _ map[string]interface{}) error

Sync performs a non-scheduled sync of the data in the capture directory. If automated sync is also enabled, calling Sync will upload the files, regardless of whether or not is the scheduled time.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL