Documentation ¶
Index ¶
- Constants
- Variables
- func BlobTierAllowed(destTier azblob.AccessTierType) bool
- func DeleteBlob(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)
- func DeleteFile(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)
- func NewAzcopyHTTPClient(maxIdleConns int) *http.Client
- func NewBFSXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory
- func NewBlobFSPipeline(c azbfs.Credential, o azbfs.PipelineOptions, r XferRetryOptions, p pacer, ...) pipeline.Pipeline
- func NewBlobPipeline(c azblob.Credential, o azblob.PipelineOptions, r XferRetryOptions, p pacer, ...) pipeline.Pipeline
- func NewBlobXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory
- func NewFilePipeline(c azfile.Credential, o azfile.PipelineOptions, r azfile.RetryOptions, p pacer, ...) pipeline.Pipeline
- func NewNullAutoPacer() *nullAutoPacer
- func NewRequestLogPolicyFactory(o RequestLogOptions) pipeline.Factory
- func NewTokenBucketPacer(bytesPerSecond int64, expectedBytesPerCoarseRequest int64) *tokenBucketPacer
- func NewVersionPolicyFactory() pipeline.Factory
- func ValidateTier(jptm IJobPartTransferMgr, blobTier azblob.AccessTierType, ...) (isValid bool)
- type AdviceType
- func (AdviceType) AccountIOPS() AdviceType
- func (AdviceType) AccountThroughput() AdviceType
- func (AdviceType) ConcurrencyHighCpu() AdviceType
- func (AdviceType) ConcurrencyHitUpperLimit() AdviceType
- func (AdviceType) ConcurrencyNotEnoughTime() AdviceType
- func (AdviceType) ConcurrencyNotTuned() AdviceType
- func (AdviceType) FileShareOrNetwork() AdviceType
- func (AdviceType) MbpsCapped() AdviceType
- func (AdviceType) NetworkErrors() AdviceType
- func (AdviceType) NetworkIsBottleneck() AdviceType
- func (AdviceType) NetworkNotBottleneck() AdviceType
- func (AdviceType) ServerBusy() AdviceType
- func (AdviceType) SmallFilesOrNetwork() AdviceType
- func (AdviceType) VMSize() AdviceType
- type AzureFileParentDirCreator
- type ConcurrencySettings
- type ConcurrencyTuner
- type ConfiguredBool
- type ConfiguredInt
- type CoordinatorChannels
- type ErrorEx
- type FolderCreationTracker
- type IBlobSourceInfoProvider
- type ICustomLocalOpener
- type IJobMgr
- type IJobPartMgr
- type IJobPartTransferMgr
- type ILocalSourceInfoProvider
- type IRemoteSourceInfoProvider
- type ISMBPropertyBearingSourceInfoProvider
- type ISourceInfoProvider
- type InMemoryTransitJobState
- type JPPTCompatibleFolderCreationTracker
- type JobLogLCMWrapper
- type JobPartCreatedMsg
- type JobPartPlanDstBlob
- type JobPartPlanDstLocal
- type JobPartPlanFileName
- func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest)
- func (jpfn JobPartPlanFileName) Delete() error
- func (jppfn *JobPartPlanFileName) GetJobPartPlanPath() string
- func (jpfn JobPartPlanFileName) Map() *JobPartPlanMMF
- func (jpfn JobPartPlanFileName) Parse() (jobID common.JobID, partNumber common.PartNumber, err error)
- type JobPartPlanHeader
- func (jpph *JobPartPlanHeader) CommandString() string
- func (jpph *JobPartPlanHeader) JobStatus() common.JobStatus
- func (jpph *JobPartPlanHeader) SetJobStatus(newJobStatus common.JobStatus)
- func (jpph *JobPartPlanHeader) Transfer(transferIndex uint32) *JobPartPlanTransfer
- func (jpph *JobPartPlanHeader) TransferSrcDstRelatives(transferIndex uint32) (relSource, relDest string)
- func (jpph *JobPartPlanHeader) TransferSrcDstStrings(transferIndex uint32) (source, destination string, isFolder bool)
- func (jpph *JobPartPlanHeader) TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, ...)
- type JobPartPlanMMF
- type JobPartPlanTransfer
- func (jppt *JobPartPlanTransfer) ErrorCode() int32
- func (jppt *JobPartPlanTransfer) ErrorMessage() string
- func (jppt *JobPartPlanTransfer) SetErrorCode(errorCode int32, overwrite bool)
- func (jppt *JobPartPlanTransfer) SetErrorMessage(errorMessage string, overwrite bool)
- func (jppt *JobPartPlanTransfer) SetTransferStatus(status common.TransferStatus, overWrite bool)
- func (jppt *JobPartPlanTransfer) TransferStatus() common.TransferStatus
- type NullConcurrencyTuner
- func (n *NullConcurrencyTuner) GetFinalState() (finalReason string, finalRecommendedConcurrency int)
- func (n *NullConcurrencyTuner) GetRecommendedConcurrency(currentMbps int, highCpuUsage bool) (newConcurrency int, reason string)
- func (n *NullConcurrencyTuner) RequestCallbackWhenStable(callback func()) (callbackAccepted bool)
- type PacerAdmin
- type PartNumber
- type PerformanceAdvisor
- type PipelineNetworkStats
- func (s *PipelineNetworkStats) AverageE2EMilliseconds() int
- func (s *PipelineNetworkStats) GetTotalRetries() int64
- func (s *PipelineNetworkStats) IOPSServerBusyPercentage() float32
- func (s *PipelineNetworkStats) IsStarted() bool
- func (s *PipelineNetworkStats) NetworkErrorPercentage() float32
- func (s *PipelineNetworkStats) OperationsPerSecond() int
- func (s *PipelineNetworkStats) OtherServerBusyPercentage() float32
- func (s *PipelineNetworkStats) ThroughputServerBusyPercentage() float32
- func (s *PipelineNetworkStats) TotalServerBusyPercentage() float32
- type RequestLogOptions
- type SrcProperties
- type TransferInfo
- type TypedSMBPropertyHolder
- type URLHolder
- type XferChannels
- type XferRetryOptions
- type XferRetryPolicy
Constants ¶
const ( CustomHeaderMaxBytes = 256 MetadataMaxBytes = 1000 // If > 65536, then jobPartPlanBlobData's MetadataLength field's type must change BlobTagsMaxByte = 4000 MaxErrorMessageLength = 1000 )
const ( ConcurrencyReasonNone = "" ConcurrencyReasonTunerDisabled = "tuner disabled" // used as the final (non-finished) state for null tuner )
const DataSchemaVersion common.Version = 16
dataSchemaVersion defines the data schema version of JobPart order files supported by current version of azcopy To be Incremented every time when we release azcopy with changed dataSchema
const DownloadMaxRetryDelay = time.Second * 60
const DownloadRetryDelay = time.Second * 1
const DownloadTryTimeout = time.Minute * 15
TODO: consider to unify the retry options.
const JobPartPlanFileNameFormat = "%v--%05d.steV%d"
const MaxRetryPerDownloadBody = 5
download related
const PacerTimeToWaitInMs = 50
pacer related
const TagsHeaderMaxLength = 2000
const UploadMaxRetryDelay = time.Second * 60
const UploadMaxTries = 20
upload related
const UploadRetryDelay = time.Second * 1
Variables ¶
var ADLSFlushThreshold uint32 = 7500 // The # of blocks to flush at a time-- Implemented only for CI.
var DebugSkipFiles = make(map[string]bool)
debug knob
var DefaultServiceApiVersion = common.GetLifecycleMgr().GetEnvironmentVariable(common.EEnvironmentVariable.DefaultServiceApiVersion())
DefaultServiceApiVersion is the default value of service api version that is set as value to the ServiceAPIVersionOverride in every Job's context.
var EAdviceType = AdviceType{"", ""}
var EnvironmentMimeMap map[string]string
var ServiceAPIVersionOverride = serviceAPIVersionOverride{}
ServiceAPIVersionOverride is a global variable in package ste which is a key to Service Api Version Value set in the every Job's context.
var UploadTryTimeout = time.Minute * 15
Functions ¶
func BlobTierAllowed ¶
func BlobTierAllowed(destTier azblob.AccessTierType) bool
// TODO: Infer availability based upon blob size as well, for premium page blobs.
func DeleteBlob ¶
func DeleteBlob(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)
func DeleteFile ¶
func DeleteFile(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pacer)
func NewAzcopyHTTPClient ¶
NewAzcopyHTTPClient creates a new HTTP client. We must minimize use of this, and instead maximize re-use of the returned client object. Why? Because that makes our connection pooling more efficient, and prevents us exhausting the number of available network sockets on resource-constrained Linux systems. (E.g. when 'ulimit -Hn' is low).
func NewBFSXferRetryPolicyFactory ¶
func NewBFSXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory
TODO fix the separate retry policies NewBFSXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.
func NewBlobFSPipeline ¶
func NewBlobFSPipeline(c azbfs.Credential, o azbfs.PipelineOptions, r XferRetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline
NewBlobFSPipeline creates a pipeline for transfers to and from BlobFS Service The blobFS operations currently in azcopy are supported by SharedKey Credentials
func NewBlobPipeline ¶
func NewBlobPipeline(c azblob.Credential, o azblob.PipelineOptions, r XferRetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline
NewBlobPipeline creates a Pipeline using the specified credentials and options.
func NewBlobXferRetryPolicyFactory ¶
func NewBlobXferRetryPolicyFactory(o XferRetryOptions) pipeline.Factory
TODO: Fix the separate retry policies, use Azure blob's retry policy after blob SDK with retry optimization get released. NewBlobXferRetryPolicyFactory creates a RetryPolicyFactory object configured using the specified options.
func NewFilePipeline ¶
func NewFilePipeline(c azfile.Credential, o azfile.PipelineOptions, r azfile.RetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline
NewFilePipeline creates a Pipeline using the specified credentials and options.
func NewNullAutoPacer ¶
func NewNullAutoPacer() *nullAutoPacer
func NewRequestLogPolicyFactory ¶
func NewRequestLogPolicyFactory(o RequestLogOptions) pipeline.Factory
NewRequestLogPolicyFactory creates a RequestLogPolicyFactory object configured using the specified options.
func NewTokenBucketPacer ¶
func NewVersionPolicyFactory ¶
NewVersionPolicy creates a factory that can override the service version set in the request header. If the context has key overwrite-current-version set to false, then x-ms-version in request is not overwritten else it will set x-ms-version to 207-04-17
func ValidateTier ¶
func ValidateTier(jptm IJobPartTransferMgr, blobTier azblob.AccessTierType, blobURL azblob.BlobURL, ctx context.Context) (isValid bool)
Types ¶
type AdviceType ¶
type AdviceType struct {
// contains filtered or unexported fields
}
func (AdviceType) AccountIOPS ¶
func (AdviceType) AccountIOPS() AdviceType
func (AdviceType) AccountThroughput ¶
func (AdviceType) AccountThroughput() AdviceType
func (AdviceType) ConcurrencyHighCpu ¶
func (AdviceType) ConcurrencyHighCpu() AdviceType
func (AdviceType) ConcurrencyHitUpperLimit ¶
func (AdviceType) ConcurrencyHitUpperLimit() AdviceType
func (AdviceType) ConcurrencyNotEnoughTime ¶
func (AdviceType) ConcurrencyNotEnoughTime() AdviceType
func (AdviceType) ConcurrencyNotTuned ¶
func (AdviceType) ConcurrencyNotTuned() AdviceType
func (AdviceType) FileShareOrNetwork ¶
func (AdviceType) FileShareOrNetwork() AdviceType
func (AdviceType) MbpsCapped ¶
func (AdviceType) MbpsCapped() AdviceType
func (AdviceType) NetworkErrors ¶
func (AdviceType) NetworkErrors() AdviceType
func (AdviceType) NetworkIsBottleneck ¶
func (AdviceType) NetworkIsBottleneck() AdviceType
func (AdviceType) NetworkNotBottleneck ¶
func (AdviceType) NetworkNotBottleneck() AdviceType
func (AdviceType) ServerBusy ¶
func (AdviceType) ServerBusy() AdviceType
func (AdviceType) SmallFilesOrNetwork ¶
func (AdviceType) SmallFilesOrNetwork() AdviceType
func (AdviceType) VMSize ¶
func (AdviceType) VMSize() AdviceType
type AzureFileParentDirCreator ¶
type AzureFileParentDirCreator struct{}
namespace for functions related to creating parent directories in Azure File to avoid free floating global funcs
func (AzureFileParentDirCreator) CreateDirToRoot ¶
func (d AzureFileParentDirCreator) CreateDirToRoot(ctx context.Context, dirURL azfile.DirectoryURL, p pipeline.Pipeline, t FolderCreationTracker) error
CreateDirToRoot Creates the dir (and parents as necessary) if it does not exist
func (AzureFileParentDirCreator) CreateParentDirToRoot ¶
func (d AzureFileParentDirCreator) CreateParentDirToRoot(ctx context.Context, fileURL azfile.FileURL, p pipeline.Pipeline, t FolderCreationTracker) error
CreateParentDirToRoot creates parent directories of the Azure file if file's parent directory doesn't exist.
type ConcurrencySettings ¶
type ConcurrencySettings struct { // InitialMainPoolSize is the initial size of the main goroutine pool that transfers the data // (i.e. executes chunkfuncs) InitialMainPoolSize int // MaxMainPoolSize is a number >= InitialMainPoolSize, representing max size we will grow the main pool to MaxMainPoolSize *ConfiguredInt // TransferInitiationPoolSize is the size of the auxiliary goroutine pool that initiates transfers // (i.e. creates chunkfuncs) TransferInitiationPoolSize *ConfiguredInt // EnumerationPoolSize is size of auxiliary goroutine pool used in enumerators (only some of which are in fact parallelized) EnumerationPoolSize *ConfiguredInt // ParallelStatFiles says whether file.Stat calls should be parallelized during enumeration. May help enumeration performance // on Linux, but is not necessary and should not be activate on Windows. ParallelStatFiles *ConfiguredBool // MaxIdleConnections is the max number of idle TCP connections to keep open MaxIdleConnections int // MaxOpenFiles is the max number of file handles that we should have open at any time // Currently (July 2019) this is only used for downloads, which is where we wouldn't // otherwise have strict control of the number of open files. // For uploads, the number of open files is effectively controlled by // TransferInitiationPoolSize, since all the file IO (except retries) happens in // transfer initiation. MaxOpenDownloadFiles int // CheckCpuWhenTuning determines whether CPU usage should be taken into account when auto-tuning CheckCpuWhenTuning *ConfiguredBool }
ConcurrencySettings stores the set of related numbers that govern concurrency levels in the STE
func NewConcurrencySettings ¶
func NewConcurrencySettings(maxFileAndSocketHandles int, requestAutoTuneGRs bool) ConcurrencySettings
NewConcurrencySettings gets concurrency settings by referring to the environment variable AZCOPY_CONCURRENCY_VALUE (if set) and to properties of the machine where we are running
func (ConcurrencySettings) AutoTuneMainPool ¶
func (c ConcurrencySettings) AutoTuneMainPool() bool
AutoTuneMainPool says whether the main pool size should by dynamically tuned
type ConcurrencyTuner ¶
type ConcurrencyTuner interface { // GetRecommendedConcurrency is called repeatedly, at intervals decided by the caller, // to compute recommended concurrency levels GetRecommendedConcurrency(currentMbps int, highCpuUsage bool) (newConcurrency int, reason string) // RequestCallbackWhenStable lets interested parties ask the concurrency tuner to call them back when the tuner has reached a stable level RequestCallbackWhenStable(callback func()) (callbackAccepted bool) // GetFinalState returns the final state of the tuner GetFinalState() (finalReason string, finalRecommendedConcurrency int) // contains filtered or unexported methods }
func NewAutoConcurrencyTuner ¶
func NewAutoConcurrencyTuner(initial, max int, isBenchmarking bool) ConcurrencyTuner
type ConfiguredBool ¶
type ConfiguredBool struct { Value bool IsUserSpecified bool EnvVarName string DefaultSourceDesc string }
ConfiguredBool is a boolean which may be optionally configured by user through an environment variable
func GetParallelStatFiles ¶
func GetParallelStatFiles() *ConfiguredBool
func (*ConfiguredBool) GetDescription ¶
func (b *ConfiguredBool) GetDescription() string
type ConfiguredInt ¶
type ConfiguredInt struct { Value int IsUserSpecified bool EnvVarName string DefaultSourceDesc string }
ConfiguredInt is an integer which may be optionally configured by user through an environment variable
func GetEnumerationPoolSize ¶
func GetEnumerationPoolSize() *ConfiguredInt
func (*ConfiguredInt) GetDescription ¶
func (i *ConfiguredInt) GetDescription() string
type CoordinatorChannels ¶
type CoordinatorChannels struct {
// contains filtered or unexported fields
}
type ErrorEx ¶
type ErrorEx struct {
// contains filtered or unexported fields
}
func (ErrorEx) ErrorCodeAndString ¶
TODO: consider rolling MSRequestID into this, so that all places that use this can pick up, and log, the request ID too
func (ErrorEx) MSRequestID ¶
MSRequestID gets the request ID guid associated with the failed request. Returns "" if there isn't one (either no request, or there is a request but it doesn't have the header)
type FolderCreationTracker ¶
type FolderCreationTracker common.FolderCreationTracker
func NewFolderCreationTracker ¶
func NewFolderCreationTracker(fpo common.FolderPropertyOption, plan *JobPartPlanHeader) FolderCreationTracker
type IBlobSourceInfoProvider ¶
type IBlobSourceInfoProvider interface { IRemoteSourceInfoProvider // BlobTier returns source's blob tier. BlobTier() azblob.AccessTierType // BlobType returns source's blob type. BlobType() azblob.BlobType }
IBlobSourceInfoProvider is the abstraction of the methods needed to prepare blob copy source.
type ICustomLocalOpener ¶
type ICustomLocalOpener interface { ISourceInfoProvider Open(path string) (*os.File, error) }
type IJobMgr ¶
type IJobMgr interface { JobID() common.JobID JobPartMgr(partNum PartNumber) (IJobPartMgr, bool) //Throughput() XferThroughput // If existingPlanMMF is nil, a new MMF is opened. AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string, destinationSAS string, scheduleTransfers bool) IJobPartMgr SetIncludeExclude(map[string]int, map[string]int) IncludeExclude() (map[string]int, map[string]int) ResumeTransfers(appCtx context.Context) AllTransfersScheduled() bool ConfirmAllTransfersScheduled() ResetAllTransfersScheduled() PipelineLogInfo() pipeline.LogOptions ReportJobPartDone(jobPartProgressInfo) Context() context.Context Cancel() // TODO: added for debugging purpose. remove later OccupyAConnection() // TODO: added for debugging purpose. remove later ReleaseAConnection() // TODO: added for debugging purpose. remove later ActiveConnections() int64 GetPerfInfo() (displayStrings []string, constraint common.PerfConstraint) SetInMemoryTransitJobState(state InMemoryTransitJobState) // set in memory transit job state saved in this job. ChunkStatusLogger() common.ChunkStatusLogger HttpClient() *http.Client PipelineNetworkStats() *PipelineNetworkStats common.ILoggerCloser /* Status related functions */ SendJobPartCreatedMsg(msg JobPartCreatedMsg) SendXferDoneMsg(msg xferDoneMsg) ListJobSummary() common.ListJobSummaryResponse ResurrectSummary(js common.ListJobSummaryResponse) DrainXferDoneMessages() bool /* Ported from jobsAdmin() */ ScheduleTransfer(priority common.JobPriority, jptm IJobPartTransferMgr) ScheduleChunk(priority common.JobPriority, chunkFunc chunkFunc) /* Some comment */ IterateJobParts(readonly bool, f func(k common.PartNumber, v IJobPartMgr)) TransferDirection() common.TransferDirection AddSuccessfulBytesInActiveFiles(n int64) SuccessfulBytesInActiveFiles() uint64 CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse ChangeLogLevel(pipeline.LogLevel) // Cleanup Functions DeferredCleanupJobMgr() CleanupJobStatusMgr() // contains filtered or unexported methods }
func NewJobMgr ¶
func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel, commandString string, logFileFolder string, tuner ConcurrencyTuner, pacer PacerAdmin, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter) IJobMgr
type IJobPartMgr ¶
type IJobPartMgr interface { Plan() *JobPartPlanHeader ScheduleTransfers(jobCtx context.Context) StartJobXfer(jptm IJobPartTransferMgr) ReportTransferDone(status common.TransferStatus) uint32 GetOverwriteOption() common.OverwriteOption GetForceIfReadOnly() bool AutoDecompress() bool ScheduleChunks(chunkFunc chunkFunc) RescheduleTransfer(jptm IJobPartTransferMgr) BlobTypeOverride() common.BlobType BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier) ShouldPutMd5() bool SAS() (string, string) //CancelJob() Close() // TODO: added for debugging purpose. remove later OccupyAConnection() // TODO: added for debugging purpose. remove later ReleaseAConnection() SlicePool() common.ByteSlicePooler CacheLimiter() common.CacheLimiter FileCountLimiter() common.CacheLimiter ExclusiveDestinationMap() *common.ExclusiveStringMap ChunkStatusLogger() common.ChunkStatusLogger common.ILogger SourceProviderPipeline() pipeline.Pipeline SecurityInfoPersistenceManager() *securityInfoPersistenceManager FolderDeletionManager() common.FolderDeletionManager CpkInfo() common.CpkInfo CpkScopeInfo() common.CpkScopeInfo IsSourceEncrypted() bool /* Status Manager Updates */ SendXferDoneMsg(msg xferDoneMsg) // contains filtered or unexported methods }
type IJobPartTransferMgr ¶
type IJobPartTransferMgr interface { FromTo() common.FromTo Info() TransferInfo ResourceDstData(dataFileToXfer []byte) (headers common.ResourceHTTPHeaders, metadata common.Metadata, blobTags common.BlobTags, cpkOptions common.CpkOptions) LastModifiedTime() time.Time PreserveLastModifiedTime() (time.Time, bool) ShouldPutMd5() bool MD5ValidationOption() common.HashValidationOption BlobTypeOverride() common.BlobType BlobTiers() (blockBlobTier common.BlockBlobTier, pageBlobTier common.PageBlobTier) JobHasLowFileCount() bool //ScheduleChunk(chunkFunc chunkFunc) Context() context.Context SlicePool() common.ByteSlicePooler CacheLimiter() common.CacheLimiter WaitUntilLockDestination(ctx context.Context) error EnsureDestinationUnlocked() HoldsDestinationLock() bool StartJobXfer() GetOverwriteOption() common.OverwriteOption GetForceIfReadOnly() bool ShouldDecompress() bool GetSourceCompressionType() (common.CompressionType, error) ReportChunkDone(id common.ChunkID) (lastChunk bool, chunksDone uint32) TransferStatusIgnoringCancellation() common.TransferStatus SetStatus(status common.TransferStatus) SetErrorCode(errorCode int32) SetErrorMessage(errorMessage string) SetNumberOfChunks(numChunks uint32) SetActionAfterLastChunk(f func()) ReportTransferDone() uint32 RescheduleTransfer() ScheduleChunks(chunkFunc chunkFunc) SetDestinationIsModified() Cancel() WasCanceled() bool IsLive() bool IsDeadBeforeStart() bool IsDeadInflight() bool // TODO: added for debugging purpose. remove later OccupyAConnection() // TODO: added for debugging purpose. remove later ReleaseAConnection() SourceProviderPipeline() pipeline.Pipeline FailActiveUpload(where string, err error) FailActiveDownload(where string, err error) FailActiveUploadWithStatus(where string, err error, failureStatus common.TransferStatus) FailActiveDownloadWithStatus(where string, err error, failureStatus common.TransferStatus) FailActiveS2SCopy(where string, err error) FailActiveS2SCopyWithStatus(where string, err error, failureStatus common.TransferStatus) // TODO: Cleanup FailActiveUpload/FailActiveUploadWithStatus & FailActiveS2SCopy/FailActiveS2SCopyWithStatus FailActiveSend(where string, err error) FailActiveSendWithStatus(where string, err error, failureStatus common.TransferStatus) LogUploadError(source, destination, errorMsg string, status int) LogDownloadError(source, destination, errorMsg string, status int) LogS2SCopyError(source, destination, errorMsg string, status int) LogSendError(source, destination, errorMsg string, status int) LogError(resource, context string, err error) LogTransferInfo(level pipeline.LogLevel, source, destination, msg string) LogTransferStart(source, destination, description string) LogChunkStatus(id common.ChunkID, reason common.WaitReason) ChunkStatusLogger() common.ChunkStatusLogger LogAtLevelForCurrentTransfer(level pipeline.LogLevel, msg string) GetOverwritePrompter() *overwritePrompter GetFolderCreationTracker() FolderCreationTracker common.ILogger DeleteSnapshotsOption() common.DeleteSnapshotsOption SecurityInfoPersistenceManager() *securityInfoPersistenceManager FolderDeletionManager() common.FolderDeletionManager GetDestinationRoot() string ShouldInferContentType() bool CpkInfo() common.CpkInfo CpkScopeInfo() common.CpkScopeInfo IsSourceEncrypted() bool }
type ILocalSourceInfoProvider ¶
type ILocalSourceInfoProvider interface { ISourceInfoProvider OpenSourceFile() (common.CloseableReaderAt, error) }
type IRemoteSourceInfoProvider ¶
type IRemoteSourceInfoProvider interface { ISourceInfoProvider // SourceURL returns source's URL. PreSignedSourceURL() (*url.URL, error) // SourceSize returns size of source SourceSize() int64 // RawSource returns raw source RawSource() string }
IRemoteSourceInfoProvider is the abstraction of the methods needed to prepare remote copy source.
type ISMBPropertyBearingSourceInfoProvider ¶
type ISMBPropertyBearingSourceInfoProvider interface { ISourceInfoProvider GetSDDL() (string, error) GetSMBProperties() (TypedSMBPropertyHolder, error) }
type ISourceInfoProvider ¶
type ISourceInfoProvider interface { // Properties returns source's properties. Properties() (*SrcProperties, error) // GetLastModifiedTime returns the source's latest last modified time. Not used when // EntityType() == Folder GetFreshFileLastModifiedTime() (time.Time, error) IsLocal() bool EntityType() common.EntityType }
ISourceInfoProvider is the abstraction of generic source info provider which provides source's properties.
type InMemoryTransitJobState ¶
type InMemoryTransitJobState struct {
CredentialInfo common.CredentialInfo
}
InMemoryTransitJobState defines job state transit in memory, and not in JobPartPlan file. Note: InMemoryTransitJobState should only be set when request come from cmd(FE) module to STE module. In memory CredentialInfo is currently maintained per job in STE, as FE could have many-to-one relationship with STE, i.e. different jobs could have different OAuth tokens requested from FE, and these jobs can run at same time in STE. This can be optimized if FE would no more be another module vs STE module.
type JPPTCompatibleFolderCreationTracker ¶
type JPPTCompatibleFolderCreationTracker interface { FolderCreationTracker RegisterPropertiesTransfer(folder string, transferIndex uint32) }
type JobLogLCMWrapper ¶
type JobLogLCMWrapper struct { JobManager IJobMgr common.LifecycleMgr }
func (JobLogLCMWrapper) Progress ¶
func (j JobLogLCMWrapper) Progress(builder common.OutputBuilder)
type JobPartCreatedMsg ¶
type JobPartPlanDstBlob ¶
type JobPartPlanDstBlob struct { BlobType common.BlobType // represents user decision to interpret the content-encoding from source file NoGuessMimeType bool // Specifies the length of MIME content type of the blob ContentTypeLength uint16 // Specifies the MIME content type of the blob. The default type is application/octet-stream ContentType [CustomHeaderMaxBytes]byte // Specifies length of content encoding which have been applied to the blob. ContentEncodingLength uint16 // Specifies which content encodings have been applied to the blob. ContentEncoding [CustomHeaderMaxBytes]byte // Specifies length of content language which has been applied to the blob. ContentLanguageLength uint16 // Specifies which content language has been applied to the blob. ContentLanguage [CustomHeaderMaxBytes]byte // Specifies length of content disposition which has been applied to the blob. ContentDispositionLength uint16 // Specifies the content disposition of the blob ContentDisposition [CustomHeaderMaxBytes]byte // Specifies the length of the cache control which has been applied to the blob. CacheControlLength uint16 // Specifies the cache control of the blob CacheControl [CustomHeaderMaxBytes]byte // Specifies the tier if this is a block or page blob BlockBlobTier common.BlockBlobTier PageBlobTier common.PageBlobTier // Controls uploading of MD5 hashes PutMd5 bool MetadataLength uint16 Metadata [MetadataMaxBytes]byte BlobTagsLength uint16 BlobTags [BlobTagsMaxByte]byte CpkInfo bool IsSourceEncrypted bool CpkScopeInfo [CustomHeaderMaxBytes]byte CpkScopeInfoLength uint16 // Specifies the maximum size of block which determines the number of chunks and chunk size of a transfer BlockSize int64 }
JobPartPlanDstBlob holds additional settings required when the destination is a blob
type JobPartPlanDstLocal ¶
type JobPartPlanDstLocal struct { // Specifies whether the timestamp of destination file has to be set to the modified time of source file PreserveLastModifiedTime bool // says how MD5 verification failures should be actioned MD5VerificationOption common.HashValidationOption }
jobPartPlanDstLocal holds additional settings required when the destination is a local file
type JobPartPlanFileName ¶
type JobPartPlanFileName string
func (JobPartPlanFileName) Create ¶
func (jpfn JobPartPlanFileName) Create(order common.CopyJobPartOrderRequest)
createJobPartPlanFile creates the memory map JobPartPlanHeader using the given JobPartOrder and JobPartPlanBlobData
func (JobPartPlanFileName) Delete ¶
func (jpfn JobPartPlanFileName) Delete() error
func (*JobPartPlanFileName) GetJobPartPlanPath ¶
func (jppfn *JobPartPlanFileName) GetJobPartPlanPath() string
func (JobPartPlanFileName) Map ¶
func (jpfn JobPartPlanFileName) Map() *JobPartPlanMMF
func (JobPartPlanFileName) Parse ¶
func (jpfn JobPartPlanFileName) Parse() (jobID common.JobID, partNumber common.PartNumber, err error)
TODO: This needs testing
type JobPartPlanHeader ¶
type JobPartPlanHeader struct { // Once set, the following fields are constants; they should never be modified Version common.Version // The version of data schema format of header; see the dataSchemaVersion constant StartTime int64 // The start time of this part JobID common.JobID // Job Part's JobID PartNum common.PartNumber // Job Part's part number (0+) SourceRootLength uint16 // The length of the source root path SourceRoot [1000]byte // The root directory of the source SourceExtraQueryLength uint16 SourceExtraQuery [1000]byte // Extra query params applicable to the source DestinationRootLength uint16 // The length of the destination root path DestinationRoot [1000]byte // The root directory of the destination DestExtraQueryLength uint16 DestExtraQuery [1000]byte // Extra query params applicable to the dest IsFinalPart bool // True if this is the Job's last part; else false ForceWrite common.OverwriteOption // True if the existing blobs needs to be overwritten. ForceIfReadOnly bool // Supplements ForceWrite with an additional setting for Azure Files. If true, the read-only attribute will be cleared before we overwrite AutoDecompress bool // if true, source data with encodings that represent compression are automatically decompressed when downloading Priority common.JobPriority // The Job Part's priority TTLAfterCompletion uint32 // Time to live after completion is used to persists the file on disk of specified time after the completion of JobPartOrder FromTo common.FromTo // The location of the transfer's source & destination Fpo common.FolderPropertyOption // option specifying how folders will be handled CommandStringLength uint32 NumTransfers uint32 // The number of transfers in the Job part LogLevel common.LogLevel // This Job Part's minimal log level DstBlobData JobPartPlanDstBlob // Additional data for blob destinations DstLocalData JobPartPlanDstLocal // Additional data for local destinations PreservePermissions common.PreservePermissionsOption PreserveSMBInfo bool // S2SGetPropertiesInBackend represents whether to enable get S3 objects' or Azure files' properties during s2s copy in backend. S2SGetPropertiesInBackend bool // S2SSourceChangeValidation represents whether user wants to check if source has changed after enumerating. S2SSourceChangeValidation bool // DestLengthValidation represents whether the user wants to check if the destination has a different content-length DestLengthValidation bool // S2SInvalidMetadataHandleOption represents how user wants to handle invalid metadata. S2SInvalidMetadataHandleOption common.InvalidMetadataHandleOption // For delete operation specify what to do with snapshots DeleteSnapshotsOption common.DeleteSnapshotsOption // contains filtered or unexported fields }
JobPartPlanHeader represents the header of Job Part's memory-mapped file
func (*JobPartPlanHeader) CommandString ¶
func (jpph *JobPartPlanHeader) CommandString() string
CommandString returns the command string given by user when job was created
func (*JobPartPlanHeader) JobStatus ¶
func (jpph *JobPartPlanHeader) JobStatus() common.JobStatus
Status returns the job status stored in JobPartPlanHeader in thread-safe manner
func (*JobPartPlanHeader) SetJobStatus ¶
func (jpph *JobPartPlanHeader) SetJobStatus(newJobStatus common.JobStatus)
SetJobStatus sets the job status in JobPartPlanHeader in thread-safe manner
func (*JobPartPlanHeader) Transfer ¶
func (jpph *JobPartPlanHeader) Transfer(transferIndex uint32) *JobPartPlanTransfer
Transfer api gives memory map JobPartPlanTransfer header for given index
func (*JobPartPlanHeader) TransferSrcDstRelatives ¶
func (jpph *JobPartPlanHeader) TransferSrcDstRelatives(transferIndex uint32) (relSource, relDest string)
func (*JobPartPlanHeader) TransferSrcDstStrings ¶
func (jpph *JobPartPlanHeader) TransferSrcDstStrings(transferIndex uint32) (source, destination string, isFolder bool)
TransferSrcDstDetail returns the source and destination string for a transfer at given transferIndex in JobPartOrder Also indication of entity type since that's often necessary to avoid ambiguity about what the source and dest are
func (*JobPartPlanHeader) TransferSrcPropertiesAndMetadata ¶
func (jpph *JobPartPlanHeader) TransferSrcPropertiesAndMetadata(transferIndex uint32) (h common.ResourceHTTPHeaders, metadata common.Metadata, blobType azblob.BlobType, blobTier azblob.AccessTierType, s2sGetPropertiesInBackend bool, DestLengthValidation bool, s2sSourceChangeValidation bool, s2sInvalidMetadataHandleOption common.InvalidMetadataHandleOption, entityType common.EntityType, blobVersionID string, blobTags common.BlobTags)
TransferSrcPropertiesAndMetadata returns the SrcHTTPHeaders, properties and metadata for a transfer at given transferIndex in JobPartOrder TODO: Refactor return type to an object
type JobPartPlanMMF ¶
func (*JobPartPlanMMF) Plan ¶
func (mmf *JobPartPlanMMF) Plan() *JobPartPlanHeader
func (*JobPartPlanMMF) Unmap ¶
func (mmf *JobPartPlanMMF) Unmap()
type JobPartPlanTransfer ¶
type JobPartPlanTransfer struct { // SrcOffset represents the actual start offset transfer header written in JobPartOrder file SrcOffset int64 // SrcLength represents the actual length of source string for specific transfer SrcLength int16 // DstLength represents the actual length of destination string for specific transfer DstLength int16 // ChunkCount represents the num of chunks a transfer is split into //ChunkCount uint16 // TODO: Remove this, we need to determine it at runtime // EntityType indicates whether this is a file or a folder // We use a dedicated field for this because the alternative (of doing something fancy the names) was too complex and error-prone EntityType common.EntityType // ModifiedTime represents the last time at which source was modified before start of transfer stored as nanoseconds. ModifiedTime int64 // SourceSize represents the actual size of the source on disk SourceSize int64 // CompletionTime represents the time at which transfer was completed CompletionTime uint64 // For S2S copy, per Transfer source's properties // TODO: ensure the length is enough SrcContentTypeLength int16 SrcContentEncodingLength int16 SrcContentLanguageLength int16 SrcContentDispositionLength int16 SrcCacheControlLength int16 SrcContentMD5Length int16 SrcMetadataLength int16 SrcBlobTypeLength int16 SrcBlobTierLength int16 SrcBlobVersionIDLength int16 SrcBlobTagsLength int16 // contains filtered or unexported fields }
JobPartPlanTransfer represent the header of Job Part's Transfer in Memory Map File
func (*JobPartPlanTransfer) ErrorCode ¶
func (jppt *JobPartPlanTransfer) ErrorCode() int32
ErrorCode returns the transfer's errorCode.
func (*JobPartPlanTransfer) ErrorMessage ¶ added in v10.12.5
func (jppt *JobPartPlanTransfer) ErrorMessage() string
ErrorMessage returns the transfer's error message.
func (*JobPartPlanTransfer) SetErrorCode ¶
func (jppt *JobPartPlanTransfer) SetErrorCode(errorCode int32, overwrite bool)
SetErrorCode sets the error code of the error if transfer failed. overWrite flags if set to true overWrites the atomicErrorCode. If overWrite flag is set to false, then errorCode won't be overwritten.
func (*JobPartPlanTransfer) SetErrorMessage ¶ added in v10.12.5
func (jppt *JobPartPlanTransfer) SetErrorMessage(errorMessage string, overwrite bool)
SetErrorMessage sets the error message if transfer failed. overWrite flags if set to true overWrites the errorMessage. If overWrite flag is set to false, then errorMessage won't be overwritten.
func (*JobPartPlanTransfer) SetTransferStatus ¶
func (jppt *JobPartPlanTransfer) SetTransferStatus(status common.TransferStatus, overWrite bool)
SetTransferStatus sets the transfer's status overWrite flags if set to true overWrites the failed status. If overWrite flag is set to false, then status of transfer is set to failed won't be overWritten. overWrite flag is used while resuming the failed transfers where the errorCode are set to default i.e 0
func (*JobPartPlanTransfer) TransferStatus ¶
func (jppt *JobPartPlanTransfer) TransferStatus() common.TransferStatus
TransferStatus returns the transfer's status
type NullConcurrencyTuner ¶
type NullConcurrencyTuner struct {
FixedValue int
}
func (*NullConcurrencyTuner) GetFinalState ¶
func (n *NullConcurrencyTuner) GetFinalState() (finalReason string, finalRecommendedConcurrency int)
func (*NullConcurrencyTuner) GetRecommendedConcurrency ¶
func (n *NullConcurrencyTuner) GetRecommendedConcurrency(currentMbps int, highCpuUsage bool) (newConcurrency int, reason string)
func (*NullConcurrencyTuner) RequestCallbackWhenStable ¶
func (n *NullConcurrencyTuner) RequestCallbackWhenStable(callback func()) (callbackAccepted bool)
type PacerAdmin ¶
type PacerAdmin interface { // GetTotalTraffic returns the cumulative count of all traffic that has been processed GetTotalTraffic() int64 // contains filtered or unexported methods }
type PartNumber ¶
type PartNumber = common.PartNumber
type PerformanceAdvisor ¶
type PerformanceAdvisor struct {
// contains filtered or unexported fields
}
func NewPerformanceAdvisor ¶
func NewPerformanceAdvisor(stats *PipelineNetworkStats, commandLineMbpsCap float64, mbps int64, finalReason string, finalConcurrency int, dir common.TransferDirection, avgBytesPerFile int64, isToAzureFiles bool) *PerformanceAdvisor
func (*PerformanceAdvisor) GetAdvice ¶
func (p *PerformanceAdvisor) GetAdvice() []common.PerformanceAdvice
GetPerfAdvice returns one or many performance advice objects, in priority order, with the highest priority advice first
type PipelineNetworkStats ¶
type PipelineNetworkStats struct {
// contains filtered or unexported fields
}
func (*PipelineNetworkStats) AverageE2EMilliseconds ¶
func (s *PipelineNetworkStats) AverageE2EMilliseconds() int
func (*PipelineNetworkStats) GetTotalRetries ¶
func (s *PipelineNetworkStats) GetTotalRetries() int64
func (*PipelineNetworkStats) IOPSServerBusyPercentage ¶
func (s *PipelineNetworkStats) IOPSServerBusyPercentage() float32
func (*PipelineNetworkStats) IsStarted ¶
func (s *PipelineNetworkStats) IsStarted() bool
func (*PipelineNetworkStats) NetworkErrorPercentage ¶
func (s *PipelineNetworkStats) NetworkErrorPercentage() float32
func (*PipelineNetworkStats) OperationsPerSecond ¶
func (s *PipelineNetworkStats) OperationsPerSecond() int
func (*PipelineNetworkStats) OtherServerBusyPercentage ¶
func (s *PipelineNetworkStats) OtherServerBusyPercentage() float32
func (*PipelineNetworkStats) ThroughputServerBusyPercentage ¶
func (s *PipelineNetworkStats) ThroughputServerBusyPercentage() float32
func (*PipelineNetworkStats) TotalServerBusyPercentage ¶
func (s *PipelineNetworkStats) TotalServerBusyPercentage() float32
type RequestLogOptions ¶
type RequestLogOptions struct { // LogWarningIfTryOverThreshold logs a warning if a tried operation takes longer than the specified // duration (-1=no logging; 0=default threshold). LogWarningIfTryOverThreshold time.Duration // SyslogDisabled is a flag to check if logging to Syslog/Windows-Event-Logger is enabled or not // We by default print to Syslog/Windows-Event-Logger. // If SyslogDisabled is not provided explicitly, the default value will be false. SyslogDisabled bool }
RequestLogOptions configures the retry policy's behavior.
type SrcProperties ¶
type TransferInfo ¶
type TransferInfo struct { JobID common.JobID BlockSize int64 Source string SourceSize int64 Destination string EntityType common.EntityType PreserveSMBPermissions common.PreservePermissionsOption PreserveSMBInfo bool // Transfer info for S2S copy SrcProperties S2SGetPropertiesInBackend bool S2SSourceChangeValidation bool DestLengthValidation bool S2SInvalidMetadataHandleOption common.InvalidMetadataHandleOption // Blob SrcBlobType azblob.BlobType // used for both S2S and for downloads to local from blob S2SSrcBlobTier azblob.AccessTierType // AccessTierType (string) is used to accommodate service-side support matrix change. // NumChunks is the number of chunks in which transfer will be split into while uploading the transfer. // NumChunks is not used in case of AppendBlob transfer. NumChunks uint16 }
func (TransferInfo) IsFolderPropertiesTransfer ¶
func (i TransferInfo) IsFolderPropertiesTransfer() bool
func (TransferInfo) ShouldTransferLastWriteTime ¶
func (i TransferInfo) ShouldTransferLastWriteTime() bool
We don't preserve LMTs on folders. The main reason is that preserving folder LMTs at download time is very difficult, because it requires us to keep track of when the last file has been saved in each folder OR just do all the folders at the very end. This is because if we modify the contents of a folder after setting its LMT, then the LMT will change because Windows and Linux (and presumably MacOS) automatically update the folder LMT when the contents are changed. The possible solutions to this problem may become difficult on very large jobs (e.g. 10s or hundreds of millions of files, with millions of directories). The secondary reason is that folder LMT's don't actually tell the user anything particularly useful. Specifically, they do NOT tell you when the folder contents (recursively) were last updated: in Azure Files they are never updated when folder contents change; and in NTFS they are only updated when immediate children are changed (not grandchildren).
type TypedSMBPropertyHolder ¶
type XferChannels ¶
type XferChannels struct {
// contains filtered or unexported fields
}
type XferRetryOptions ¶
type XferRetryOptions struct { // Policy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants.\ // A value of zero means that you accept our default policy. Policy XferRetryPolicy // MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default). // A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries. MaxTries int32 // TryTimeout indicates the maximum time allowed for any single try of an HTTP request. // A value of zero means that you accept our default timeout. NOTE: When transferring large amounts // of data, the default TryTimeout will probably not be sufficient. You should override this value // based on the bandwidth available to the host machine and proximity to the Storage service. A good // starting point may be something like (60 seconds per MB of anticipated-payload-size). TryTimeout time.Duration // RetryDelay specifies the amount of delay to use before retrying an operation (0=default). // The delay increases (exponentially or linearly) with each retry up to a maximum specified by // MaxRetryDelay. If you specify 0, then you must also specify 0 for MaxRetryDelay. RetryDelay time.Duration // MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default). // If you specify 0, then you must also specify 0 for RetryDelay. MaxRetryDelay time.Duration // RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host. // If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host. // NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent // data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs }
XferRetryOptions configures the retry policy's behavior.
type XferRetryPolicy ¶
type XferRetryPolicy int32
XferRetryPolicy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants. Added a new retry policy and not using the existing policy azblob.zc_retry_policy.go since there are some changes in the retry policy. Retry on all the type of network errors instead of retrying only in case of temporary or timeout errors.
const ( // RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy RetryPolicyExponential XferRetryPolicy = 0 // RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy RetryPolicyFixed XferRetryPolicy = 1 )
Source Files ¶
- ErrorExt.go
- JobPartPlan.go
- JobPartPlanFileName.go
- concurrency.go
- concurrencyTuner.go
- downloader-azureFiles.go
- downloader-blob.go
- downloader-blobFS.go
- downloader.go
- folderCreationTracker.go
- jobStatusManager.go
- joblog_lcm_wrapper.go
- md5Comparer.go
- mgr-JobMgr.go
- mgr-JobPartMgr.go
- mgr-JobPartTransferMgr.go
- overwritePrompter.go
- pacedReadSeeker.go
- pacer-autoPacer.go
- pacer-nullAutoPacer.go
- pacer-tokenBucketPacer.go
- performanceAdvisor.go
- putListNeed.go
- remoteObjectExists.go
- s2sCopier-URLToBlob.go
- securityInfoPersistenceManager.go
- sender-appendBlob.go
- sender-appendBlobFromLocal.go
- sender-appendBlobFromURL.go
- sender-azureFile.go
- sender-azureFileFromLocal.go
- sender-azureFileFromURL.go
- sender-blobFS.go
- sender-blobFSFromLocal.go
- sender-blockBlob.go
- sender-blockBlobFromLocal.go
- sender-blockBlobFromURL.go
- sender-pageBlob.go
- sender-pageBlobFromLocal.go
- sender-pageBlobFromURL.go
- sender.go
- sourceInfoProvider-Benchmark.go
- sourceInfoProvider-Blob.go
- sourceInfoProvider-File.go
- sourceInfoProvider-GCP.go
- sourceInfoProvider-Local.go
- sourceInfoProvider-S3.go
- sourceInfoProvider.go
- xfer-anyToRemote-file.go
- xfer-anyToRemote-folder.go
- xfer-deleteBlob.go
- xfer-deleteFile.go
- xfer-remoteToLocal-file.go
- xfer-remoteToLocal-folder.go
- xfer.go
- xferLogPolicy.go
- xferRetryNotificationPolicy.go
- xferRetrypolicy.go
- xferStatsPolicy.go