Documentation ¶
Overview ¶
Package sync implements datasync for the builtin datamanger
Index ¶
- Constants
- Variables
- func NoOpCloudClientConstructor(grpc.ClientConnInterface) v1.DataSyncServiceClient
- type ClientStreamingMock
- type Config
- type ConnectivityState
- type MockDataSyncServiceClient
- func (c MockDataSyncServiceClient) DataCaptureUpload(ctx context.Context, in *v1.DataCaptureUploadRequest, opts ...grpc.CallOption) (*v1.DataCaptureUploadResponse, error)
- func (c MockDataSyncServiceClient) FileUpload(ctx context.Context, opts ...grpc.CallOption) (v1.DataSyncService_FileUploadClient, error)
- func (c MockDataSyncServiceClient) StreamingDataCaptureUpload(ctx context.Context, opts ...grpc.CallOption) (v1.DataSyncService_StreamingDataCaptureUploadClient, error)
- type NoOpClientStream
- type Sync
Constants ¶
const (
// FailedDir is a subdirectory of the capture directory that holds any files that could not be synced.
FailedDir = "failed"
)
Variables ¶
var ( // InitialWaitTimeMillis defines the time to wait on the first retried upload attempt. // public for tests. InitialWaitTimeMillis = 200 // RetryExponentialFactor defines the factor by which the retry wait time increases. // public for tests. RetryExponentialFactor = 2 )
var ( // FSThresholdToTriggerDeletion temporarily public for tests. FSThresholdToTriggerDeletion = .90 // CaptureDirToFSUsageRatio temporarily public for tests. CaptureDirToFSUsageRatio = .5 )
var CheckDeleteExcessFilesInterval = 30 * time.Second
CheckDeleteExcessFilesInterval temporarily public for tests.
var MaxUnaryFileSize = int64(units.MB)
MaxUnaryFileSize is the max number of bytes to send using the unary DataCaptureUpload, as opposed to the StreamingDataCaptureUpload.
var (
UploadChunkSize = 64 * 1024
)
UploadChunkSize defines the size of the data included in each message of a FileUpload stream.
Functions ¶
func NoOpCloudClientConstructor ¶ added in v0.44.0
func NoOpCloudClientConstructor(grpc.ClientConnInterface) v1.DataSyncServiceClient
NoOpCloudClientConstructor returns a v1.DataSyncServiceClient that does nothing & always returns successes.
Types ¶
type ClientStreamingMock ¶ added in v0.44.0
type ClientStreamingMock[Req any, Res any] struct { NoOpClientStream T *testing.T SendFunc func(Req) error CloseAndRecvFunc func() (Res, error) }
ClientStreamingMock is a generic mock for a client streaming GRPC client.
func (*ClientStreamingMock[Req, Res]) CloseAndRecv ¶ added in v0.44.0
func (m *ClientStreamingMock[Req, Res]) CloseAndRecv() (Res, error)
CloseAndRecv implemnents CloseAndRecv for a generic mock for a client streaming GRPC client.
func (*ClientStreamingMock[Req, Res]) Send ¶ added in v0.44.0
func (m *ClientStreamingMock[Req, Res]) Send(in Req) error
Send implemnents Send for a generic mock for a client streaming GRPC client.
type Config ¶
type Config struct { // AdditionalSyncPaths defines the file system paths // that should be synced in addition to the CaptureDir. // Generally 3rd party programs will write arbitrary // files to these directories which are intended to be // synced to the cloud by data manager. AdditionalSyncPaths []string // CaptureDir defines the file system path // that data capture will write to and // which data sync should sync from. // defaults to filepath.Join(os.Getenv("HOME"), ".viam", "capture") CaptureDir string // CaptureDisabled, when true disables data capture. // sync needs to know if capture is disabled b/c if it // is not disabled sync will need to check if // the disk has filled up enough that data sync needs to delete // capture files without syncing them. // See DeleteEveryNthWhenDiskFull for more info. CaptureDisabled bool // DeleteEveryNthWhenDiskFull defines the `n` in // the psudocode: // “`go // captureEnabled := !config.CaptureDisabled // if captureEnabled { // for { // time.Sleep(time.Second*30) // if diskFull() { // for i, file in range dataCaptureDirFiles { // if fileIndex % n == 0 { // delete file // } // } // } // } // } // “` // // which in english reads: // // If datacapture is enabled then every 30 seconds // if the disk is full (which is defined as the disk // is 90% full and 50% is contributed by the CaptureDir) // delete every Nth file in the CaptureDir & child // directories. // // The intent is to prevent data capture from filling up the // disk if the robot is unable to sync data for a long period // of time. Defaults to 5. DeleteEveryNthWhenDiskFull int // FileLastModifiedMillis defines the number of milliseconds that // we should wait for an arbitrary file (aka a file that doesn't end in // either the .prog nor the .capture file extension) before we consider // it as being ready to sync. // defaults to 10000, aka 10 seconds FileLastModifiedMillis int // MaximumNumSyncThreads defines the maximum number of goroutines which // data sync should create to sync data to the cloud // defaults to runtime.NumCpu() / 2 MaximumNumSyncThreads int // ScheduledSyncDisabled, when true disables data capture syncing every SyncIntervalMins ScheduledSyncDisabled bool // SelectiveSyncerName defines the name of the selective sync sensor. Ignored when empty string SelectiveSyncerName string // SyncIntervalMins defines interval in minutes that scheduled sync should run. Ignored if // ScheduledSyncDisabled is true SyncIntervalMins float64 // Tags defines the tags which should be applied to arbitrary files at sync time Tags []string // SelectiveSyncSensorEnabled when set to true, indicates that SelectiveSyncerName was non empty string // (meaning that the user configured a sync sensor). This will cause data sync to NOT sync if // SelectiveSyncSensor is nil (which will happen if resource graph doesn't have a resource with that name) SelectiveSyncSensorEnabled bool // SelectiveSyncSensor the selective sync sensor, which if non nil, will cause scheduled sync to NOT sync // unil the Readings method of the SelectiveSyncSensor (when called on the SyncIntervalMins interval) returns // the a key of datamanager.ShouldSyncKey and a value of `true` SelectiveSyncSensor sensor.Sensor }
Config is the sync config from builtin.
type ConnectivityState ¶ added in v0.44.0
type ConnectivityState interface {
GetState() connectivity.State
}
ConnectivityState allows callers to check the connectivity state of the connection see https://github.com/grpc/grpc-go/blob/master/clientconn.go#L648
func ConnToConnectivityState ¶ added in v0.44.0
func ConnToConnectivityState(conn rpc.ClientConn) ConnectivityState
ConnToConnectivityState takes an rpc.ClientConn and returns an object that implents GetStatus so that the state of the connection (whether it is healthy or not) can be monitored. NOTE: Currently, as a temporary measure, this is done by trying to make a new tcp connection with app.viam.com, but once https://viam.atlassian.net/browse/DATA-3009 is unblocked & implemented, we will either remove this function or have it call the appropriate methods on conn.
type MockDataSyncServiceClient ¶ added in v0.44.0
type MockDataSyncServiceClient struct { T *testing.T DataCaptureUploadFunc func( ctx context.Context, in *v1.DataCaptureUploadRequest, opts ...grpc.CallOption, ) (*v1.DataCaptureUploadResponse, error) FileUploadFunc func( ctx context.Context, opts ...grpc.CallOption, ) (v1.DataSyncService_FileUploadClient, error) StreamingDataCaptureUploadFunc func( ctx context.Context, opts ...grpc.CallOption, ) (v1.DataSyncService_StreamingDataCaptureUploadClient, error) }
MockDataSyncServiceClient is a mock for a v1.DataSyncServiceClient.
func (MockDataSyncServiceClient) DataCaptureUpload ¶ added in v0.44.0
func (c MockDataSyncServiceClient) DataCaptureUpload( ctx context.Context, in *v1.DataCaptureUploadRequest, opts ...grpc.CallOption, ) (*v1.DataCaptureUploadResponse, error)
DataCaptureUpload is needed to satisfy the interface.
func (MockDataSyncServiceClient) FileUpload ¶ added in v0.44.0
func (c MockDataSyncServiceClient) FileUpload( ctx context.Context, opts ...grpc.CallOption, ) (v1.DataSyncService_FileUploadClient, error)
FileUpload is needed to satisfy the interface.
func (MockDataSyncServiceClient) StreamingDataCaptureUpload ¶ added in v0.44.0
func (c MockDataSyncServiceClient) StreamingDataCaptureUpload( ctx context.Context, opts ...grpc.CallOption, ) (v1.DataSyncService_StreamingDataCaptureUploadClient, error)
StreamingDataCaptureUpload is needed to satisfy the interface.
type NoOpClientStream ¶ added in v0.44.0
type NoOpClientStream struct{}
NoOpClientStream is a mock for a GRPC ClientStream that does nothing.
func (*NoOpClientStream) CloseSend ¶ added in v0.44.0
func (m *NoOpClientStream) CloseSend() error
CloseSend satisfies the interface.
func (*NoOpClientStream) Context ¶ added in v0.44.0
func (m *NoOpClientStream) Context() context.Context
Context satisfies the interface.
func (*NoOpClientStream) Header ¶ added in v0.44.0
func (m *NoOpClientStream) Header() (metadata.MD, error)
Header satisfies the interface.
func (*NoOpClientStream) RecvMsg ¶ added in v0.44.0
func (m *NoOpClientStream) RecvMsg(any) error
RecvMsg satisfies the interface.
func (*NoOpClientStream) SendMsg ¶ added in v0.44.0
func (m *NoOpClientStream) SendMsg(any) error
SendMsg satisfies the interface.
func (*NoOpClientStream) Trailer ¶ added in v0.44.0
func (m *NoOpClientStream) Trailer() metadata.MD
Trailer satisfies the interface.
type Sync ¶ added in v0.44.0
type Sync struct { // ScheduledTicker only exists for tests ScheduledTicker *clock.Ticker Scheduler *goutils.StoppableWorkers // FileDeletingWorkers is only public for tests FileDeletingWorkers *goutils.StoppableWorkers // MaxSyncThreads only exists for tests MaxSyncThreads int // contains filtered or unexported fields }
Sync manages uploading files (both written by data capture and by 3rd party applications) to the cloud & deleting the upload files. It also manages deleting files if capture is enabled and the disk is about to fill up. There must be only one Sync per DataManager. The lifecycle of a Sync is:
- New - Reconfigure (any number of times) - Close (once).
func New ¶ added in v0.44.0
func New( clientConstructor func(cc grpc.ClientConnInterface) v1.DataSyncServiceClient, connToConnectivityState func(conn rpc.ClientConn) ConnectivityState, flushCollectors func(), clock clock.Clock, logger logging.Logger, ) *Sync
New creates a new Sync.
func (*Sync) Close ¶ added in v0.44.0
func (s *Sync) Close()
Close releases all resources managed by data sync.
func (*Sync) CloudConnReady ¶ added in v0.44.0
func (s *Sync) CloudConnReady() chan struct{}
CloudConnReady is public for builtin tests.
func (*Sync) Reconfigure ¶ added in v0.44.0
Reconfigure reconfigures Sync and is only called by the builtin data manager it assumes that it is only called by one goroutine at a time. Reconfigure: 1. stops all workers which use the config 2. sets the cloud.ConnectionService if it hans't been set yet (only needs to be set once) and starts the cloud connection manager if it hasn't been started yet so it can make a cloud connection 3. starts up the appropriate workers which use the new config.