ste

package
v10.12.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 21, 2021 License: MIT Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CustomHeaderMaxBytes = 256
	MetadataMaxBytes     = 1000 // If > 65536, then jobPartPlanBlobData's MetadataLength field's type must change
	BlobTagsMaxByte      = 4000
)
View Source
const (
	ConcurrencyReasonNone          = ""
	ConcurrencyReasonTunerDisabled = "tuner disabled" // used as the final (non-finished) state for null tuner

)
View Source
const DataSchemaVersion common.Version = 16

dataSchemaVersion defines the data schema version of JobPart order files supported by current version of azcopy To be Incremented every time when we release azcopy with changed dataSchema

View Source
const DownloadMaxRetryDelay = time.Second * 60
View Source
const DownloadRetryDelay = time.Second * 1
View Source
const DownloadTryTimeout = time.Minute * 15

TODO: consider to unify the retry options.

View Source
const JobPartPlanFileNameFormat = "%v--%05d.steV%d"
View Source
const MaxRetryPerDownloadBody = 5

download related

View Source
const PacerTimeToWaitInMs = 50

pacer related

View Source
const TagsHeaderMaxLength = 2000
View Source
const UploadMaxRetryDelay = time.Second * 60
View Source
const UploadMaxTries = 20

upload related

View Source
const UploadRetryDelay = time.Second * 1

Variables

View Source
var ADLSFlushThreshold uint32 = 7500 // The # of blocks to flush at a time-- Implemented only for CI.
View Source
var DebugSkipFiles = make(map[string]bool)

debug knob

View Source
var DefaultServiceApiVersion = common.GetLifecycleMgr().GetEnvironmentVariable(common.EEnvironmentVariable.DefaultServiceApiVersion())

DefaultServiceApiVersion is the default value of service api version that is set as value to the ServiceAPIVersionOverride in every Job's context.

View Source
var EAdviceType = AdviceType{"", ""}
View Source
var EnvironmentMimeMap map[string]string
View Source
var ServiceAPIVersionOverride = serviceAPIVersionOverride{}

ServiceAPIVersionOverride is a global variable in package ste which is a key to Service Api Version Value set in the every Job's context.

View Source
var UploadTryTimeout = time.Minute * 15

Functions

func BlobTierAllowed

func BlobTierAllowed(destTier azblob.AccessTierType) bool

// TODO: Infer availability based upon blob size as well, for premium page blobs.

func DeleteBlob

func DeleteBlob(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)

func DeleteFile

func DeleteFile(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)

func NewAzcopyHTTPClient

func NewAzcopyHTTPClient(maxIdleConns int) *http.Client

NewAzcopyHTTPClient creates a new HTTP client. We must minimize use of this, and instead maximize re-use of the returned client object. Why? Because that makes our connection pooling more efficient, and prevents us exhausting the number of available network sockets on resource-constrained Linux systems. (E.g. when 'ulimit -Hn' is low).

func NewBFSXferRetryPolicyFactory

func NewBFSXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory

TODO fix the separate retry policies NewBFSXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.

func NewBlobFSPipeline

func NewBlobFSPipeline(c azbfs.Credential, o azbfs.PipelineOptions, r XferRetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline

NewBlobFSPipeline creates a pipeline for transfers to and from BlobFS Service The blobFS operations currently in azcopy are supported by SharedKey Credentials

func NewBlobPipeline

func NewBlobPipeline(c azblob.Credential, o azblob.PipelineOptions, r XferRetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline

NewBlobPipeline creates a Pipeline using the specified credentials and options.

func NewBlobXferRetryPolicyFactory

func NewBlobXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory

TODO: Fix the separate retry policies, use Azure blob's retry policy after blob SDK with retry optimization get released. NewBlobXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.

func NewFilePipeline

func NewFilePipeline(c azfile.Credential, o azfile.PipelineOptions, r azfile.RetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline

NewFilePipeline creates a Pipeline using the specified credentials and options.

func NewNullAutoPacer

func NewNullAutoPacer() *nullAutoPacer

func NewRequestLogPolicyFactory

func NewRequestLogPolicyFactory(o RequestLogOptions) pipeline.Factory

NewRequestLogPolicyFactory creates a RequestLogPolicyFactory object configured using the specified options.

func NewTokenBucketPacer

func NewTokenBucketPacer(bytesPerSecond int64, expectedBytesPerCoarseRequest int64) *tokenBucketPacer

func NewVersionPolicyFactory

func NewVersionPolicyFactory() pipeline.Factory

NewVersionPolicy creates a factory that can override the service version set in the request header. If the context has key overwrite-current-version set to false, then x-ms-version in request is not overwritten else it will set x-ms-version to 207-04-17

func ValidateTier

func ValidateTier(jptm IJobPartTransferMgr, blobTier azblob.AccessTierType, blobURL azblob.BlobURL, ctx context.Context) (isValid bool)

Types

type AdviceType

type AdviceType struct {
	// contains filtered or unexported fields
}

func (AdviceType) AccountIOPS

func (AdviceType) AccountIOPS() AdviceType

func (AdviceType) AccountThroughput

func (AdviceType) AccountThroughput() AdviceType

func (AdviceType) ConcurrencyHighCpu

func (AdviceType) ConcurrencyHighCpu() AdviceType

func (AdviceType) ConcurrencyHitUpperLimit

func (AdviceType) ConcurrencyHitUpperLimit() AdviceType

func (AdviceType) ConcurrencyNotEnoughTime

func (AdviceType) ConcurrencyNotEnoughTime() AdviceType

func (AdviceType) ConcurrencyNotTuned

func (AdviceType) ConcurrencyNotTuned() AdviceType

func (AdviceType) FileShareOrNetwork

func (AdviceType) FileShareOrNetwork() AdviceType

func (AdviceType) MbpsCapped

func (AdviceType) MbpsCapped() AdviceType

func (AdviceType) NetworkErrors

func (AdviceType) NetworkErrors() AdviceType

func (AdviceType) NetworkIsBottleneck

func (AdviceType) NetworkIsBottleneck() AdviceType

func (AdviceType) NetworkNotBottleneck

func (AdviceType) NetworkNotBottleneck() AdviceType

func (AdviceType) ServerBusy

func (AdviceType) ServerBusy() AdviceType

func (AdviceType) SmallFilesOrNetwork

func (AdviceType) SmallFilesOrNetwork() AdviceType

func (AdviceType) VMSize

func (AdviceType) VMSize() AdviceType

type AzureFileParentDirCreator

type AzureFileParentDirCreator struct{}

namespace for functions related to creating parent directories in Azure File to avoid free floating global funcs

func (AzureFileParentDirCreator) CreateDirToRoot

CreateDirToRoot Creates the dir (and parents as necessary) if it does not exist

func (AzureFileParentDirCreator) CreateParentDirToRoot

CreateParentDirToRoot creates parent directories of the Azure file if file's parent directory doesn't exist.

type ConcurrencySettings

type ConcurrencySettings struct {

	// InitialMainPoolSize is the initial size of the main goroutine pool that transfers the data
	// (i.e. executes chunkfuncs)
	InitialMainPoolSize int

	// MaxMainPoolSize is a number >= InitialMainPoolSize, representing max size we will grow the main pool to
	MaxMainPoolSize *ConfiguredInt

	// TransferInitiationPoolSize is the size of the auxiliary goroutine pool that initiates transfers
	// (i.e. creates chunkfuncs)
	TransferInitiationPoolSize *ConfiguredInt

	// EnumerationPoolSize is size of auxiliary goroutine pool used in enumerators (only some of which are in fact parallelized)
	EnumerationPoolSize *ConfiguredInt

	// ParallelStatFiles says whether file.Stat calls should be parallelized during enumeration. May help enumeration performance
	// on Linux, but is not necessary and should not be activate on Windows.
	ParallelStatFiles *ConfiguredBool

	// MaxIdleConnections is the max number of idle TCP connections to keep open
	MaxIdleConnections int

	// MaxOpenFiles is the max number of file handles that we should have open at any time
	// Currently (July 2019) this is only used for downloads, which is where we wouldn't
	// otherwise have strict control of the number of open files.
	// For uploads, the number of open files is effectively controlled by
	// TransferInitiationPoolSize, since all the file IO (except retries) happens in
	// transfer initiation.
	MaxOpenDownloadFiles int

	// CheckCpuWhenTuning determines whether CPU usage should be taken into account when auto-tuning
	CheckCpuWhenTuning *ConfiguredBool
}

ConcurrencySettings stores the set of related numbers that govern concurrency levels in the STE

func NewConcurrencySettings

func NewConcurrencySettings(maxFileAndSocketHandles int, requestAutoTuneGRs bool) ConcurrencySettings

NewConcurrencySettings gets concurrency settings by referring to the environment variable AZCOPY_CONCURRENCY_VALUE (if set) and to properties of the machine where we are running

func (ConcurrencySettings) AutoTuneMainPool

func (c ConcurrencySettings) AutoTuneMainPool() bool

AutoTuneMainPool says whether the main pool size should by dynamically tuned

type ConcurrencyTuner

type ConcurrencyTuner interface {
	// GetRecommendedConcurrency is called repeatedly, at intervals decided by the caller,
	// to compute recommended concurrency levels
	GetRecommendedConcurrency(currentMbps int, highCpuUsage bool) (newConcurrency int, reason string)

	// RequestCallbackWhenStable lets interested parties ask the concurrency tuner to call them back when the tuner has reached a stable level
	RequestCallbackWhenStable(callback func()) (callbackAccepted bool)

	// GetFinalState returns the final state of the tuner
	GetFinalState() (finalReason string, finalRecommendedConcurrency int)
	// contains filtered or unexported methods
}

func NewAutoConcurrencyTuner

func NewAutoConcurrencyTuner(initial, max int, isBenchmarking bool) ConcurrencyTuner

type ConfiguredBool

type ConfiguredBool struct {
	Value             bool
	IsUserSpecified   bool
	EnvVarName        string
	DefaultSourceDesc string
}

ConfiguredBool is a boolean which may be optionally configured by user through an environment variable

func GetParallelStatFiles

func GetParallelStatFiles() *ConfiguredBool

func (*ConfiguredBool) GetDescription

func (b *ConfiguredBool) GetDescription() string

type ConfiguredInt

type ConfiguredInt struct {
	Value             int
	IsUserSpecified   bool
	EnvVarName        string
	DefaultSourceDesc string
}

ConfiguredInt is an integer which may be optionally configured by user through an environment variable

func GetEnumerationPoolSize

func GetEnumerationPoolSize() *ConfiguredInt

func (*ConfiguredInt) GetDescription

func (i *ConfiguredInt) GetDescription() string

type CoordinatorChannels

type CoordinatorChannels struct {
	// contains filtered or unexported fields
}

type ErrorEx

type ErrorEx struct {
	// contains filtered or unexported fields
}

func (ErrorEx) ErrorCodeAndString

func (errex ErrorEx) ErrorCodeAndString() (string, int, string)

TODO: consider rolling MSRequestID into this, so that all places that use this can pick up, and log, the request ID too

func (ErrorEx) MSRequestID

func (errex ErrorEx) MSRequestID() string

MSRequestID gets the request ID guid associated with the failed request. Returns "" if there isn't one (either no request, or there is a request but it doesn't have the header)

type IBlobSourceInfoProvider

type IBlobSourceInfoProvider interface {
	IRemoteSourceInfoProvider

	// BlobTier returns source's blob tier.
	BlobTier() azblob.AccessTierType

	// BlobType returns source's blob type.
	BlobType() azblob.BlobType
}

IBlobSourceInfoProvider is the abstraction of the methods needed to prepare blob copy source.

type ICustomLocalOpener

type ICustomLocalOpener interface {
	ISourceInfoProvider
	Open(path string) (*os.File, error)
}

type IJobMgr

type IJobMgr interface {
	JobID() common.JobID
	JobPartMgr(partNum PartNumber) (IJobPartMgr, bool)
	//Throughput() XferThroughput
	// If existingPlanMMF is nil, a new MMF is opened.
	AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
		destinationSAS string, scheduleTransfers bool) IJobPartMgr
	SetIncludeExclude(map[string]int, map[string]int)
	IncludeExclude() (map[string]int, map[string]int)
	ResumeTransfers(appCtx context.Context)
	AllTransfersScheduled() bool
	ConfirmAllTransfersScheduled()
	ResetAllTransfersScheduled()
	PipelineLogInfo() pipeline.LogOptions
	ReportJobPartDone(jobPartProgressInfo)
	Context() context.Context
	Cancel()
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	// TODO: added for debugging purpose. remove later
	ActiveConnections() int64
	GetPerfInfo() (displayStrings []string, constraint common.PerfConstraint)

	SetInMemoryTransitJobState(state InMemoryTransitJobState) // set in memory transit job state saved in this job.
	ChunkStatusLogger() common.ChunkStatusLogger
	HttpClient() *http.Client
	PipelineNetworkStats() *PipelineNetworkStats

	common.ILoggerCloser

	/* Status related functions */
	SendJobPartCreatedMsg(msg JobPartCreatedMsg)
	SendXferDoneMsg(msg xferDoneMsg)
	ListJobSummary() common.ListJobSummaryResponse
	ResurrectSummary(js common.ListJobSummaryResponse)

	/* Ported from jobsAdmin() */
	ScheduleTransfer(priority common.JobPriority, jptm IJobPartTransferMgr)
	ScheduleChunk(priority common.JobPriority, chunkFunc chunkFunc)

	/* Some comment */
	IterateJobParts(readonly bool, f func(k common.PartNumber, v IJobPartMgr))
	TransferDirection() common.TransferDirection
	AddSuccessfulBytesInActiveFiles(n int64)
	SuccessfulBytesInActiveFiles() uint64
	CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse
	// contains filtered or unexported methods
}

func NewJobMgr

func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel,
	commandString string, logFileFolder string, tuner ConcurrencyTuner,
	pacer PacerAdmin, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter,
	jobLogger common.ILoggerResetable) IJobMgr

type IJobPartMgr

type IJobPartMgr interface {
	Plan() *JobPartPlanHeader
	ScheduleTransfers(jobCtx context.Context)
	StartJobXfer(jptm IJobPartTransferMgr)
	ReportTransferDone(status common.TransferStatus) uint32
	GetOverwriteOption() common.OverwriteOption
	GetForceIfReadOnly() bool
	AutoDecompress() bool
	ScheduleChunks(chunkFunc chunkFunc)
	RescheduleTransfer(jptm IJobPartTransferMgr)
	BlobTypeOverride() common.BlobType
	BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier)
	ShouldPutMd5() bool
	SAS() (string, string)
	//CancelJob()
	Close()
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	SlicePool() common.ByteSlicePooler
	CacheLimiter() common.CacheLimiter
	FileCountLimiter() common.CacheLimiter
	ExclusiveDestinationMap() *common.ExclusiveStringMap
	ChunkStatusLogger() common.ChunkStatusLogger
	common.ILogger
	SourceProviderPipeline() pipeline.Pipeline

	SecurityInfoPersistenceManager() *securityInfoPersistenceManager
	FolderDeletionManager() common.FolderDeletionManager
	CpkInfo() common.CpkInfo
	CpkScopeInfo() common.CpkScopeInfo
	IsSourceEncrypted() bool
	/* Status Manager Updates */
	SendXferDoneMsg(msg xferDoneMsg)
	// contains filtered or unexported methods
}

type IJobPartTransferMgr

type IJobPartTransferMgr interface {
	FromTo() common.FromTo
	Info() TransferInfo
	ResourceDstData(dataFileToXfer []byte) (headers common.ResourceHTTPHeaders, metadata common.Metadata, blobTags common.BlobTags, cpkOptions common.CpkOptions)
	LastModifiedTime() time.Time
	PreserveLastModifiedTime() (time.Time, bool)
	ShouldPutMd5() bool
	MD5ValidationOption() common.HashValidationOption
	BlobTypeOverride() common.BlobType
	BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier)
	JobHasLowFileCount() bool
	//ScheduleChunk(chunkFunc chunkFunc)
	Context() context.Context
	SlicePool() common.ByteSlicePooler
	CacheLimiter() common.CacheLimiter
	WaitUntilLockDestination(ctx context.Context) error
	EnsureDestinationUnlocked()
	HoldsDestinationLock() bool
	StartJobXfer()
	GetOverwriteOption() common.OverwriteOption
	GetForceIfReadOnly() bool
	ShouldDecompress() bool
	GetSourceCompressionType() (common.CompressionType, error)
	ReportChunkDone(id common.ChunkID) (lastChunk bool, chunksDone uint32)
	TransferStatusIgnoringCancellation() common.TransferStatus
	SetStatus(status common.TransferStatus)
	SetErrorCode(errorCode int32)
	SetNumberOfChunks(numChunks uint32)
	SetActionAfterLastChunk(f func())
	ReportTransferDone() uint32
	RescheduleTransfer()
	ScheduleChunks(chunkFunc chunkFunc)
	SetDestinationIsModified()
	Cancel()
	WasCanceled() bool
	IsLive() bool
	IsDeadBeforeStart() bool
	IsDeadInflight() bool
	// TODO: added for debugging purpose. remove later
	OccupyAConnection()
	// TODO: added for debugging purpose. remove later
	ReleaseAConnection()
	SourceProviderPipeline() pipeline.Pipeline
	FailActiveUpload(where string, err error)
	FailActiveDownload(where string, err error)
	FailActiveUploadWithStatus(where string, err error, failureStatus common.TransferStatus)
	FailActiveDownloadWithStatus(where string, err error, failureStatus common.TransferStatus)
	FailActiveS2SCopy(where string, err error)
	FailActiveS2SCopyWithStatus(where string, err error, failureStatus common.TransferStatus)
	// TODO: Cleanup FailActiveUpload/FailActiveUploadWithStatus & FailActiveS2SCopy/FailActiveS2SCopyWithStatus
	FailActiveSend(where string, err error)
	FailActiveSendWithStatus(where string, err error, failureStatus common.TransferStatus)
	LogUploadError(source, destination, errorMsg string, status int)
	LogDownloadError(source, destination, errorMsg string, status int)
	LogS2SCopyError(source, destination, errorMsg string, status int)
	LogSendError(source, destination, errorMsg string, status int)
	LogError(resource, context string, err error)
	LogTransferInfo(level pipeline.LogLevel, source, destination, msg string)
	LogTransferStart(source, destination, description string)
	LogChunkStatus(id common.ChunkID, reason common.WaitReason)
	ChunkStatusLogger() common.ChunkStatusLogger
	LogAtLevelForCurrentTransfer(level pipeline.LogLevel, msg string)
	GetOverwritePrompter() *overwritePrompter
	GetFolderCreationTracker() FolderCreationTracker
	common.ILogger
	DeleteSnapshotsOption() common.DeleteSnapshotsOption
	SecurityInfoPersistenceManager() *securityInfoPersistenceManager
	FolderDeletionManager() common.FolderDeletionManager
	GetDestinationRoot() string
	ShouldInferContentType() bool
	CpkInfo() common.CpkInfo
	CpkScopeInfo() common.CpkScopeInfo
	IsSourceEncrypted() bool
}

type ILocalSourceInfoProvider

type ILocalSourceInfoProvider interface {
	ISourceInfoProvider
	OpenSourceFile() (common.CloseableReaderAt, error)
}

type IRemoteSourceInfoProvider

type IRemoteSourceInfoProvider interface {
	ISourceInfoProvider

	// SourceURL returns source's URL.
	PreSignedSourceURL() (*url.URL, error)

	// SourceSize returns size of source
	SourceSize() int64

	// RawSource returns raw source
	RawSource() string
}

IRemoteSourceInfoProvider is the abstraction of the methods needed to prepare remote copy source.

type ISMBPropertyBearingSourceInfoProvider

type ISMBPropertyBearingSourceInfoProvider interface {
	ISourceInfoProvider

	GetSDDL() (string, error)
	GetSMBProperties() (TypedSMBPropertyHolder, error)
}

type ISourceInfoProvider

type ISourceInfoProvider interface {
	// Properties returns source's properties.
	Properties() (*SrcProperties, error)

	// GetLastModifiedTime returns the source's latest last modified time.  Not used when
	// EntityType() == Folder
	GetFreshFileLastModifiedTime() (time.Time, error)

	IsLocal() bool

	EntityType() common.EntityType
}

ISourceInfoProvider is the abstraction of generic source info provider which provides source's properties.

type InMemoryTransitJobState

type InMemoryTransitJobState struct {
	CredentialInfo common.CredentialInfo
}

InMemoryTransitJobState defines job state transit in memory, and not in JobPartPlan file. Note: InMemoryTransitJobState should only be set when request come from cmd(FE) module to STE module. In memory CredentialInfo is currently maintained per job in STE, as FE could have many-to-one relationship with STE, i.e. different jobs could have different OAuth tokens requested from FE, and these jobs can run at same time in STE. This can be optimized if FE would no more be another module vs STE module.

type JPPTCompatibleFolderCreationTracker

type JPPTCompatibleFolderCreationTracker interface {
	FolderCreationTracker
	RegisterPropertiesTransfer(folder string, transferIndex uint32)
}

type JobLogLCMWrapper

type JobLogLCMWrapper struct {
	JobManager IJobMgr
	common.LifecycleMgr
}

func (JobLogLCMWrapper) Progress

func (j JobLogLCMWrapper) Progress(builder common.OutputBuilder)

type JobPartCreatedMsg

type JobPartCreatedMsg struct {
	TotalTransfers       uint32
	IsFinalPart          bool
	TotalBytesEnumerated uint64
	FileTransfers        uint32
	FolderTransfer       uint32
}

type JobPartPlanDstBlob

type JobPartPlanDstBlob struct {
	BlobType common.BlobType
	// represents user decision to interpret the content-encoding from source file
	NoGuessMimeType bool

	// Specifies the length of MIME content type of the blob
	ContentTypeLength uint16

	// Specifies the MIME content type of the blob. The default type is application/octet-stream
	ContentType [CustomHeaderMaxBytes]byte

	// Specifies length of content encoding which have been applied to the blob.
	ContentEncodingLength uint16

	// Specifies which content encodings have been applied to the blob.
	ContentEncoding [CustomHeaderMaxBytes]byte

	// Specifies length of content language which has been applied to the blob.
	ContentLanguageLength uint16

	// Specifies which content language has been applied to the blob.
	ContentLanguage [CustomHeaderMaxBytes]byte

	// Specifies length of content disposition which has been applied to the blob.
	ContentDispositionLength uint16

	// Specifies the content disposition of the blob
	ContentDisposition [CustomHeaderMaxBytes]byte

	// Specifies the length of the cache control which has been applied to the blob.
	CacheControlLength uint16

	// Specifies the cache control of the blob
	CacheControl [CustomHeaderMaxBytes]byte

	// Specifies the tier if this is a block or page blob
	BlockBlobTier common.BlockBlobTier
	PageBlobTier  common.PageBlobTier

	// Controls uploading of MD5 hashes
	PutMd5 bool

	MetadataLength uint16
	Metadata       [MetadataMaxBytes]byte

	BlobTagsLength uint16
	BlobTags       [BlobTagsMaxByte]byte

	CpkInfo            bool
	IsSourceEncrypted  bool
	CpkScopeInfo       [CustomHeaderMaxBytes]byte
	CpkScopeInfoLength uint16

	// Specifies the maximum size of block which determines the number of chunks and chunk size of a transfer
	BlockSize int64
}

JobPartPlanDstBlob holds additional settings required when the destination is a blob

type JobPartPlanDstLocal

type JobPartPlanDstLocal struct {

	// Specifies whether the timestamp of destination file has to be set to the modified time of source file
	PreserveLastModifiedTime bool

	// says how MD5 verification failures should be actioned
	MD5VerificationOption common.HashValidationOption
}

jobPartPlanDstLocal holds additional settings required when the destination is a local file

type JobPartPlanFileName

type JobPartPlanFileName string

func (JobPartPlanFileName) Create

createJobPartPlanFile creates the memory map JobPartPlanHeader using the given JobPartOrder and JobPartPlanBlobData

func (JobPartPlanFileName) Delete

func (jpfn JobPartPlanFileName) Delete() error

func (*JobPartPlanFileName) GetJobPartPlanPath

func (jppfn *JobPartPlanFileName) GetJobPartPlanPath() string

func (JobPartPlanFileName) Map

func (jpfn JobPartPlanFileName) Map() *JobPartPlanMMF

func (JobPartPlanFileName) Parse

func (jpfn JobPartPlanFileName) Parse() (jobID common.JobID, partNumber common.PartNumber, err error)

TODO: This needs testing

type JobPartPlanHeader

type JobPartPlanHeader struct {
	// Once set, the following fields are constants; they should never be modified
	Version                common.Version    // The version of data schema format of header; see the dataSchemaVersion constant
	StartTime              int64             // The start time of this part
	JobID                  common.JobID      // Job Part's JobID
	PartNum                common.PartNumber // Job Part's part number (0+)
	SourceRootLength       uint16            // The length of the source root path
	SourceRoot             [1000]byte        // The root directory of the source
	SourceExtraQueryLength uint16
	SourceExtraQuery       [1000]byte // Extra query params applicable to the source
	DestinationRootLength  uint16     // The length of the destination root path
	DestinationRoot        [1000]byte // The root directory of the destination
	DestExtraQueryLength   uint16
	DestExtraQuery         [1000]byte                  // Extra query params applicable to the dest
	IsFinalPart            bool                        // True if this is the Job's last part; else false
	ForceWrite             common.OverwriteOption      // True if the existing blobs needs to be overwritten.
	ForceIfReadOnly        bool                        // Supplements ForceWrite with an additional setting for Azure Files. If true, the read-only attribute will be cleared before we overwrite
	AutoDecompress         bool                        // if true, source data with encodings that represent compression are automatically decompressed when downloading
	Priority               common.JobPriority          // The Job Part's priority
	TTLAfterCompletion     uint32                      // Time to live after completion is used to persists the file on disk of specified time after the completion of JobPartOrder
	FromTo                 common.FromTo               // The location of the transfer's source & destination
	Fpo                    common.FolderPropertyOption // option specifying how folders will be handled
	CommandStringLength    uint32
	NumTransfers           uint32              // The number of transfers in the Job part
	LogLevel               common.LogLevel     // This Job Part's minimal log level
	DstBlobData            JobPartPlanDstBlob  // Additional data for blob destinations
	DstLocalData           JobPartPlanDstLocal // Additional data for local destinations

	PreservePermissions common.PreservePermissionsOption
	PreserveSMBInfo     bool
	// S2SGetPropertiesInBackend represents whether to enable get S3 objects' or Azure files' properties during s2s copy in backend.
	S2SGetPropertiesInBackend bool
	// S2SSourceChangeValidation represents whether user wants to check if source has changed after enumerating.
	S2SSourceChangeValidation bool
	// DestLengthValidation represents whether the user wants to check if the destination has a different content-length
	DestLengthValidation bool
	// S2SInvalidMetadataHandleOption represents how user wants to handle invalid metadata.
	S2SInvalidMetadataHandleOption common.InvalidMetadataHandleOption

	// For delete operation specify what to do with snapshots
	DeleteSnapshotsOption common.DeleteSnapshotsOption
	// contains filtered or unexported fields
}

JobPartPlanHeader represents the header of Job Part's memory-mapped file

func (*JobPartPlanHeader) CommandString

func (jpph *JobPartPlanHeader) CommandString() string

CommandString returns the command string given by user when job was created

func (*JobPartPlanHeader) JobStatus

func (jpph *JobPartPlanHeader) JobStatus() common.JobStatus

Status returns the job status stored in JobPartPlanHeader in thread-safe manner

func (*JobPartPlanHeader) SetJobStatus

func (jpph *JobPartPlanHeader) SetJobStatus(newJobStatus common.JobStatus)

SetJobStatus sets the job status in JobPartPlanHeader in thread-safe manner

func (*JobPartPlanHeader) Transfer

func (jpph *JobPartPlanHeader) Transfer(transferIndex uint32) *JobPartPlanTransfer

Transfer api gives memory map JobPartPlanTransfer header for given index

func (*JobPartPlanHeader) TransferSrcDstRelatives

func (jpph *JobPartPlanHeader) TransferSrcDstRelatives(transferIndex uint32) (relSource, relDest string)

func (*JobPartPlanHeader) TransferSrcDstStrings

func (jpph *JobPartPlanHeader) TransferSrcDstStrings(transferIndex uint32) (source, destination string, isFolder bool)

TransferSrcDstDetail returns the source and destination string for a transfer at given transferIndex in JobPartOrder Also indication of entity type since that's often necessary to avoid ambiguity about what the source and dest are

func (*JobPartPlanHeader) TransferSrcPropertiesAndMetadata

func (jpph *JobPartPlanHeader) TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType azblob.BlobType, blobTier azblob.AccessTierType,
	s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobTags common.BlobTags)

TransferSrcPropertiesAndMetadata returns the SrcHTTPHeaders, properties and metadata for a transfer at given transferIndex in JobPartOrder TODO: Refactor return type to an object

type JobPartPlanMMF

type JobPartPlanMMF common.MMF

func (*JobPartPlanMMF) Plan

func (mmf *JobPartPlanMMF) Plan() *JobPartPlanHeader

func (*JobPartPlanMMF) Unmap

func (mmf *JobPartPlanMMF) Unmap()

type JobPartPlanTransfer

type JobPartPlanTransfer struct {

	// SrcOffset represents the actual start offset transfer header written in JobPartOrder file
	SrcOffset int64
	// SrcLength represents the actual length of source string for specific transfer
	SrcLength int16
	// DstLength represents the actual length of destination string for specific transfer
	DstLength int16
	// ChunkCount represents the num of chunks a transfer is split into
	//ChunkCount uint16	// TODO: Remove this, we need to determine it at runtime
	// EntityType indicates whether this is a file or a folder
	// We use a dedicated field for this because the alternative (of doing something fancy the names) was too complex and error-prone
	EntityType common.EntityType
	// ModifiedTime represents the last time at which source was modified before start of transfer stored as nanoseconds.
	ModifiedTime int64
	// SourceSize represents the actual size of the source on disk
	SourceSize int64
	// CompletionTime represents the time at which transfer was completed
	CompletionTime uint64

	// For S2S copy, per Transfer source's properties
	// TODO: ensure the length is enough
	SrcContentTypeLength        int16
	SrcContentEncodingLength    int16
	SrcContentLanguageLength    int16
	SrcContentDispositionLength int16
	SrcCacheControlLength       int16
	SrcContentMD5Length         int16
	SrcMetadataLength           int16
	SrcBlobTypeLength           int16
	SrcBlobTierLength           int16
	SrcBlobVersionIDLength      int16
	SrcBlobTagsLength           int16
	// contains filtered or unexported fields
}

JobPartPlanTransfer represent the header of Job Part's Transfer in Memory Map File

func (*JobPartPlanTransfer) ErrorCode

func (jppt *JobPartPlanTransfer) ErrorCode() int32

ErrorCode returns the transfer's errorCode.

func (*JobPartPlanTransfer) SetErrorCode

func (jppt *JobPartPlanTransfer) SetErrorCode(errorCode int32, overwrite bool)

SetErrorCode sets the error code of the error if transfer failed. overWrite flags if set to true overWrites the atomicErrorCode. If overWrite flag is set to false, then errorCode won't be overwritten.

func (*JobPartPlanTransfer) SetTransferStatus

func (jppt *JobPartPlanTransfer) SetTransferStatus(status common.TransferStatus, overWrite bool)

SetTransferStatus sets the transfer's status overWrite flags if set to true overWrites the failed status. If overWrite flag is set to false, then status of transfer is set to failed won't be overWritten. overWrite flag is used while resuming the failed transfers where the errorCode are set to default i.e 0

func (*JobPartPlanTransfer) TransferStatus

func (jppt *JobPartPlanTransfer) TransferStatus() common.TransferStatus

TransferStatus returns the transfer's status

type NullConcurrencyTuner

type NullConcurrencyTuner struct {
	FixedValue int
}

func (*NullConcurrencyTuner) GetFinalState

func (n *NullConcurrencyTuner) GetFinalState() (finalReason string, finalRecommendedConcurrency int)

func (*NullConcurrencyTuner) GetRecommendedConcurrency

func (n *NullConcurrencyTuner) GetRecommendedConcurrency(currentMbps int, highCpuUsage bool) (newConcurrency int, reason string)

func (*NullConcurrencyTuner) RequestCallbackWhenStable

func (n *NullConcurrencyTuner) RequestCallbackWhenStable(callback func()) (callbackAccepted bool)

type PacerAdmin

type PacerAdmin interface {

	// GetTotalTraffic returns the cumulative count of all traffic that has been processed
	GetTotalTraffic() int64
	// contains filtered or unexported methods
}

type PartNumber

type PartNumber = common.PartNumber

type PerformanceAdvisor

type PerformanceAdvisor struct {
	// contains filtered or unexported fields
}

func NewPerformanceAdvisor

func NewPerformanceAdvisor(stats *PipelineNetworkStats, commandLineMbpsCap float64, mbps int64, finalReason string, finalConcurrency int, dir common.TransferDirection, avgBytesPerFile int64, isToAzureFiles bool) *PerformanceAdvisor

func (*PerformanceAdvisor) GetAdvice

func (p *PerformanceAdvisor) GetAdvice() []common.PerformanceAdvice

GetPerfAdvice returns one or many performance advice objects, in priority order, with the highest priority advice first

type PipelineNetworkStats

type PipelineNetworkStats struct {
	// contains filtered or unexported fields
}

func (*PipelineNetworkStats) AverageE2EMilliseconds

func (s *PipelineNetworkStats) AverageE2EMilliseconds() int

func (*PipelineNetworkStats) GetTotalRetries

func (s *PipelineNetworkStats) GetTotalRetries() int64

func (*PipelineNetworkStats) IOPSServerBusyPercentage

func (s *PipelineNetworkStats) IOPSServerBusyPercentage() float32

func (*PipelineNetworkStats) IsStarted

func (s *PipelineNetworkStats) IsStarted() bool

func (*PipelineNetworkStats) NetworkErrorPercentage

func (s *PipelineNetworkStats) NetworkErrorPercentage() float32

func (*PipelineNetworkStats) OperationsPerSecond

func (s *PipelineNetworkStats) OperationsPerSecond() int

func (*PipelineNetworkStats) OtherServerBusyPercentage

func (s *PipelineNetworkStats) OtherServerBusyPercentage() float32

func (*PipelineNetworkStats) ThroughputServerBusyPercentage

func (s *PipelineNetworkStats) ThroughputServerBusyPercentage() float32

func (*PipelineNetworkStats) TotalServerBusyPercentage

func (s *PipelineNetworkStats) TotalServerBusyPercentage() float32

type RequestLogOptions

type RequestLogOptions struct {
	// LogWarningIfTryOverThreshold logs a warning if a tried operation takes longer than the specified
	// duration (-1=no logging; 0=default threshold).
	LogWarningIfTryOverThreshold time.Duration

	// SyslogDisabled is a flag to check if logging to Syslog/Windows-Event-Logger is enabled or not
	// We by default print to Syslog/Windows-Event-Logger.
	// If SyslogDisabled is not provided explicitly, the default value will be false.
	SyslogDisabled bool
}

RequestLogOptions configures the retry policy's behavior.

type SrcProperties

type SrcProperties struct {
	SrcHTTPHeaders common.ResourceHTTPHeaders // User for S2S copy, where per transfer's src properties need be set in destination.
	SrcMetadata    common.Metadata
	SrcBlobTags    common.BlobTags
}

type TransferInfo

type TransferInfo struct {
	JobID                  common.JobID
	BlockSize              int64
	Source                 string
	SourceSize             int64
	Destination            string
	EntityType             common.EntityType
	PreserveSMBPermissions common.PreservePermissionsOption
	PreserveSMBInfo        bool

	// Transfer info for S2S copy
	SrcProperties
	S2SGetPropertiesInBackend      bool
	S2SSourceChangeValidation      bool
	DestLengthValidation           bool
	S2SInvalidMetadataHandleOption common.InvalidMetadataHandleOption

	// Blob
	SrcBlobType    azblob.BlobType       // used for both S2S and for downloads to local from blob
	S2SSrcBlobTier azblob.AccessTierType // AccessTierType (string) is used to accommodate service-side support matrix change.

	// NumChunks is the number of chunks in which transfer will be split into while uploading the transfer.
	// NumChunks is not used in case of AppendBlob transfer.
	NumChunks uint16
}

func (TransferInfo) IsFolderPropertiesTransfer

func (i TransferInfo) IsFolderPropertiesTransfer() bool

func (TransferInfo) ShouldTransferLastWriteTime

func (i TransferInfo) ShouldTransferLastWriteTime() bool

We don't preserve LMTs on folders. The main reason is that preserving folder LMTs at download time is very difficult, because it requires us to keep track of when the last file has been saved in each folder OR just do all the folders at the very end. This is because if we modify the contents of a folder after setting its LMT, then the LMT will change because Windows and Linux (and presumably MacOS) automatically update the folder LMT when the contents are changed. The possible solutions to this problem may become difficult on very large jobs (e.g. 10s or hundreds of millions of files, with millions of directories). The secondary reason is that folder LMT's don't actually tell the user anything particularly useful. Specifically, they do NOT tell you when the folder contents (recursively) were last updated: in Azure Files they are never updated when folder contents change; and in NTFS they are only updated when immediate children are changed (not grandchildren).

type TypedSMBPropertyHolder

type TypedSMBPropertyHolder interface {
	FileCreationTime() time.Time
	FileLastWriteTime() time.Time
	FileAttributes() azfile.FileAttributeFlags
}

type URLHolder

type URLHolder interface {
	URL() url.URL
	String() string
}

type XferChannels

type XferChannels struct {
	// contains filtered or unexported fields
}

type XferRetryOptions

type XferRetryOptions struct {
	// Policy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants.\
	// A value of zero means that you accept our default policy.
	Policy XferRetryPolicy

	// MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default).
	// A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries.
	MaxTries int32

	// TryTimeout indicates the maximum time allowed for any single try of an HTTP request.
	// A value of zero means that you accept our default timeout. NOTE: When transferring large amounts
	// of data, the default TryTimeout will probably not be sufficient. You should override this value
	// based on the bandwidth available to the host machine and proximity to the Storage service. A good
	// starting point may be something like (60 seconds per MB of anticipated-payload-size).
	TryTimeout time.Duration

	// RetryDelay specifies the amount of delay to use before retrying an operation (0=default).
	// The delay increases (exponentially or linearly) with each retry up to a maximum specified by
	// MaxRetryDelay. If you specify 0, then you must also specify 0 for MaxRetryDelay.
	RetryDelay time.Duration

	// MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default).
	// If you specify 0, then you must also specify 0 for RetryDelay.
	MaxRetryDelay time.Duration

	// RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host.
	// If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host.
	// NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent
	// data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs
	RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs
}

XferRetryOptions configures the retry policy's behavior.

type XferRetryPolicy

type XferRetryPolicy int32

XferRetryPolicy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants. Added a new retry policy and not using the existing policy azblob.zc_retry_policy.go since there are some changes in the retry policy. Retry on all the type of network errors instead of retrying only in case of temporary or timeout errors.

const (
	// RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy
	RetryPolicyExponential XferRetryPolicy = 0

	// RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy
	RetryPolicyFixed XferRetryPolicy = 1
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL