Documentation ¶
Overview ¶
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information. Package cachewarmer implements the structures, methods, and functions used by the cache warmer
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Copyright (C) Microsoft Corporation. All rights reserved. Licensed under the MIT License. See LICENSE-CODE in the project root for license information.
Index ¶
- Constants
- func AlreadyMounted(mountPath string) bool
- func BashCommand(cmdstr string) (bytes.Buffer, bytes.Buffer, error)
- func CreateVmss(ctx context.Context, azureClients *AzureClients, ...) (vmss compute.VirtualMachineScaleSet, err error)
- func DeleteVmss(ctx context.Context, azureClients *AzureClients, name string) error
- func EnsureWarmPath(jobMountAddress string, jobExportPath string, jobBasePath string) (string, error)
- func FileMatches(inclusionList []string, exclusionList []string, maxFileSizeBytes int64, ...) bool
- func GetLocalMountPath(jobMountAddress string, jobExportPath string) string
- func GetPrimaryStorageKey(ctx context.Context, resourceGroup string, accountName string) (string, error)
- func GetResourceName(id string) string
- func GetSubnetId(ctx context.Context, azureClients *AzureClients) (string, error)
- func GetSubscriptionID() (string, error)
- func IsDirectory(path string) (bool, error)
- func MountPath(address string, exportPath string, localPath string) error
- func SwapResourceName(id string, resourceName string) string
- func VmssExists(ctx context.Context, azureClients *AzureClients, name string) (bool, error)
- type AzureClients
- type CacheWarmerCloudInit
- type CacheWarmerQueues
- func (q *CacheWarmerQueues) DeleteWarmPathJob(warmPathJob *WarmPathJob) error
- func (q *CacheWarmerQueues) DeleteWorkerJob(workerJob *WorkerJob) error
- func (q *CacheWarmerQueues) GetWarmPathJob() (*WarmPathJob, error)
- func (q *CacheWarmerQueues) GetWorkerJob() (*WorkerJob, error)
- func (q *CacheWarmerQueues) IsJobQueueEmpty() (bool, error)
- func (q *CacheWarmerQueues) IsWorkQueueEmpty() (bool, error)
- func (q *CacheWarmerQueues) PeekWorkerJob() (*WorkerJob, error)
- func (q *CacheWarmerQueues) StillProcessingWarmPathJob(warmPathJob *WarmPathJob) error
- func (q *CacheWarmerQueues) StillProcessingWorkerJob(workerJob *WorkerJob) error
- func (q *CacheWarmerQueues) WriteWarmPathJob(job WarmPathJob) error
- func (q *CacheWarmerQueues) WriteWorkerJob(workerjob *WorkerJob) error
- type ComputeMetadata
- type FileToWarm
- type WarmPathJob
- func (j *WarmPathJob) FileMatches(filename string, filesize int64) bool
- func (j *WarmPathJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)
- func (j *WarmPathJob) GetWarmPathJobFileContents() (string, error)
- func (j *WarmPathJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)
- type WarmPathManager
- func (m *WarmPathManager) EnsureVmssDeleted(ctx context.Context)
- func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context)
- func (m *WarmPathManager) RunJobGenerator(ctx context.Context, syncWaitGroup *sync.WaitGroup)
- func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *sync.WaitGroup)
- type WorkQueue
- type Worker
- type WorkerJob
- func InitializeWorkerJob(warmTargetMountAddresses []string, warmTargetExportPath string, ...) *WorkerJob
- func InitializeWorkerJobForLargeFile(warmTargetMountAddresses []string, warmTargetExportPath string, ...) *WorkerJob
- func InitializeWorkerJobFromString(workerJobContents string) (*WorkerJob, error)
- func InitializeWorkerJobWithFilter(warmTargetMountAddresses []string, warmTargetExportPath string, ...) *WorkerJob
- func (j *WorkerJob) FilterFiles(dirEntries []os.FileInfo) []string
- func (j *WorkerJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)
- func (j *WorkerJob) GetWorkerJobFileContents() (string, error)
- func (j *WorkerJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)
Constants ¶
const ( B = 1 << (10 * iota) KB MB GB )
sizes
const ( MinimumSingleFileSize = int64(100 * MB) MaximumJobSize = int64(1500 * MB) MaximumFilesToRead = 200 // golang uses an 8192 buffer passed to getdents64 so we'll choose 128 because we get these on the first call anyway MinimumJobsOnDirRead = 128 WarmPathJobQueueSuffix = "job" WorkQueueSuffix = "work" // the base mount path DefaultCacheWarmerMountPath = "/mnt/cachewarmer" NumberOfMessagesToDequeue = 1 CacheWarmerVisibilityTimeout = time.Duration(60) * time.Second // 1 minute visibility timeout // retry mounting for 10 minutes MountRetryCount = 60 MountRetrySleepSeconds = 10 // this size is the most common, and will stand up the fastest VMSSNodeSize = "Standard_D2s_v3" VmssName = "cwvmss" // the controller will work in an airgapped environment MarketPlacePublisher = "microsoft-avere" MarketPlaceOffer = "vfxt" MarketPlaceSku = "avere-vfxt-controller" PlanName = "avere-vfxt-controller" PlanPublisherName = "microsoft-avere" PlanProductName = "vfxt" // file read settings ReadPageSize = 10 * MB WorkerMultiplier = 2 MinimumJobsBeforeRefill = 100 SubscriptionIdEnvVar = "AZURE_SUBSCRIPTION_ID" )
Variables ¶
This section is empty.
Functions ¶
func AlreadyMounted ¶
func CreateVmss ¶
func CreateVmss(ctx context.Context, azureClients *AzureClients, vmssModel compute.VirtualMachineScaleSet) (vmss compute.VirtualMachineScaleSet, err error)
func DeleteVmss ¶
func DeleteVmss(ctx context.Context, azureClients *AzureClients, name string) error
func EnsureWarmPath ¶
func EnsureWarmPath(jobMountAddress string, jobExportPath string, jobBasePath string) (string, error)
EnsureWarmPath ensures that the path is mounted and exists
func FileMatches ¶
func GetLocalMountPath ¶
func GetPrimaryStorageKey ¶ added in v1.3.0
func GetResourceName ¶
func GetSubnetId ¶
func GetSubnetId(ctx context.Context, azureClients *AzureClients) (string, error)
GetSubnetId returns the subnet of the current VM
func GetSubscriptionID ¶ added in v1.3.0
func IsDirectory ¶
func SwapResourceName ¶
func VmssExists ¶
Types ¶
type AzureClients ¶
type AzureClients struct { VMClient compute.VirtualMachinesClient VMSSClient compute.VirtualMachineScaleSetsClient NICClient network.InterfacesClient LocalMetadata ComputeMetadata }
func InitializeAzureClients ¶
func InitializeAzureClients() (*AzureClients, error)
type CacheWarmerCloudInit ¶
type CacheWarmerCloudInit struct { LocalMountPath string BootstrapAddress string BootstrapExportPath string BootstrapScriptPath string EnvVars string }
func InitializeCloutInit ¶
func (*CacheWarmerCloudInit) GetCacheWarmerCloudInit ¶
func (c *CacheWarmerCloudInit) GetCacheWarmerCloudInit() (string, error)
type CacheWarmerQueues ¶
type CacheWarmerQueues struct {
// contains filtered or unexported fields
}
func (*CacheWarmerQueues) DeleteWarmPathJob ¶
func (q *CacheWarmerQueues) DeleteWarmPathJob(warmPathJob *WarmPathJob) error
func (*CacheWarmerQueues) DeleteWorkerJob ¶
func (q *CacheWarmerQueues) DeleteWorkerJob(workerJob *WorkerJob) error
func (*CacheWarmerQueues) GetWarmPathJob ¶
func (q *CacheWarmerQueues) GetWarmPathJob() (*WarmPathJob, error)
func (*CacheWarmerQueues) GetWorkerJob ¶
func (q *CacheWarmerQueues) GetWorkerJob() (*WorkerJob, error)
func (*CacheWarmerQueues) IsJobQueueEmpty ¶
func (q *CacheWarmerQueues) IsJobQueueEmpty() (bool, error)
IsJobQueueEmpty returns true if there are one or more visible or invisible items in the queue
func (*CacheWarmerQueues) IsWorkQueueEmpty ¶
func (q *CacheWarmerQueues) IsWorkQueueEmpty() (bool, error)
func (*CacheWarmerQueues) PeekWorkerJob ¶
func (q *CacheWarmerQueues) PeekWorkerJob() (*WorkerJob, error)
func (*CacheWarmerQueues) StillProcessingWarmPathJob ¶
func (q *CacheWarmerQueues) StillProcessingWarmPathJob(warmPathJob *WarmPathJob) error
func (*CacheWarmerQueues) StillProcessingWorkerJob ¶
func (q *CacheWarmerQueues) StillProcessingWorkerJob(workerJob *WorkerJob) error
func (*CacheWarmerQueues) WriteWarmPathJob ¶
func (q *CacheWarmerQueues) WriteWarmPathJob(job WarmPathJob) error
func (*CacheWarmerQueues) WriteWorkerJob ¶
func (q *CacheWarmerQueues) WriteWorkerJob(workerjob *WorkerJob) error
type ComputeMetadata ¶
type ComputeMetadata struct { SubscriptionId string `json:"subscriptionId"` ResourceGroup string `json:"resourceGroupName"` Location string `json:"location"` Name string `json:"name"` }
func GetComputeMetadata ¶
func GetComputeMetadata() (*ComputeMetadata, error)
type FileToWarm ¶
func InitializeFileToWarm ¶
func InitializeFileToWarm(warmFilePath string, startByte int64, stopByte int64) FileToWarm
type WarmPathJob ¶
type WarmPathJob struct { WarmTargetMountAddresses []string WarmTargetExportPath string WarmTargetPath string InclusionList []string ExclusionList []string MaxFileSizeBytes int64 // contains filtered or unexported fields }
WarmPathJob contains the information for a new job item
func InitializeWarmPathJob ¶
func InitializeWarmPathJob( warmTargetMountAddresses string, warmTargetExportPath string, warmTargetPath string, inclusionCsv string, exclusionCsv string, maxFileSizeBytes int64) *WarmPathJob
InitializeWarmPathJob initializes the job submitter structure
func InitializeWarmPathJobFromString ¶
func InitializeWarmPathJobFromString(warmPathJobContents string) (*WarmPathJob, error)
InitializeWarmPathJobFromString reads warmPathJobContents
func (*WarmPathJob) FileMatches ¶
func (j *WarmPathJob) FileMatches(filename string, filesize int64) bool
func (*WarmPathJob) GetQueueMessageInfo ¶
func (j *WarmPathJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)
func (*WarmPathJob) GetWarmPathJobFileContents ¶
func (j *WarmPathJob) GetWarmPathJobFileContents() (string, error)
GetWarmPathJobFileContents returns the contents of the file
func (*WarmPathJob) SetQueueMessageInfo ¶
func (j *WarmPathJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)
type WarmPathManager ¶
type WarmPathManager struct { AzureClients *AzureClients WorkerCount int64 Queues *CacheWarmerQueues // contains filtered or unexported fields }
WarmPathManager contains the information for the manager
func InitializeWarmPathManager ¶
func InitializeWarmPathManager( azureClients *AzureClients, workerCount int64, queues *CacheWarmerQueues, bootstrapMountAddress string, bootstrapExportPath string, bootstrapScriptPath string, vmssUserName string, vmssPassword string, vmssSshPublicKey string, vmssSubnet string, storageAccount string, storageKey string, queueNamePrefix string) *WarmPathManager
InitializeWarmPathManager initializes the job submitter structure
func (*WarmPathManager) EnsureVmssDeleted ¶
func (m *WarmPathManager) EnsureVmssDeleted(ctx context.Context)
func (*WarmPathManager) EnsureVmssRunning ¶
func (m *WarmPathManager) EnsureVmssRunning(ctx context.Context)
func (*WarmPathManager) RunJobGenerator ¶
func (m *WarmPathManager) RunJobGenerator(ctx context.Context, syncWaitGroup *sync.WaitGroup)
func (*WarmPathManager) RunVMSSManager ¶
func (m *WarmPathManager) RunVMSSManager(ctx context.Context, syncWaitGroup *sync.WaitGroup)
type WorkQueue ¶
type WorkQueue struct {
// contains filtered or unexported fields
}
RoundRobinPathManager round robins among the available paths
func InitializeWorkQueue ¶
func InitializeWorkQueue() *WorkQueue
func (*WorkQueue) AddWorkItem ¶
func (q *WorkQueue) AddWorkItem(fileToWarm FileToWarm)
func (*WorkQueue) GetNextWorkItem ¶
func (q *WorkQueue) GetNextWorkItem() (FileToWarm, bool)
GetNextWorkItem retrieves the next workItem
func (*WorkQueue) WorkItemCount ¶
type Worker ¶
type Worker struct { Queues *CacheWarmerQueues // contains filtered or unexported fields }
Worker contains the information for the worker
func InitializeWorker ¶
func InitializeWorker(queues *CacheWarmerQueues) *Worker
InitializeWorker initializes the job submitter structure
type WorkerJob ¶
type WorkerJob struct { WarmTargetMountAddresses []string WarmTargetExportPath string WarmTargetPath string StartByte int64 StopByte int64 ApplyFilter bool StartFileFilter string EndFileFilter string InclusionList []string ExclusionList []string MaxFileSizeBytes int64 // contains filtered or unexported fields }
WorkerJob contains the information for a worker job item
func InitializeWorkerJob ¶
func InitializeWorkerJob( warmTargetMountAddresses []string, warmTargetExportPath string, warmTargetPath string, inclusionList []string, exclusionList []string, maxFileSizeBytes int64) *WorkerJob
InitializeWorkerJob initializes the worker job structure
func InitializeWorkerJobForLargeFile ¶
func InitializeWorkerJobForLargeFile( warmTargetMountAddresses []string, warmTargetExportPath string, warmTargetPath string, startByte int64, stopByte int64, inclusionList []string, exclusionList []string, maxFileSizeBytes int64) *WorkerJob
InitializeWorkerJob initializes the worker job structure
func InitializeWorkerJobFromString ¶
InitializeWorkerJobFromString reads warmPathJobContents
func InitializeWorkerJobWithFilter ¶
func InitializeWorkerJobWithFilter( warmTargetMountAddresses []string, warmTargetExportPath string, warmTargetPath string, startFileFilter string, endFileFilter string, inclusionList []string, exclusionList []string, maxFileSizeBytes int64) *WorkerJob
InitializeWorkerJobWithFilter initializes the worker job structure
func (*WorkerJob) GetQueueMessageInfo ¶
func (j *WorkerJob) GetQueueMessageInfo() (azqueue.MessageID, azqueue.PopReceipt)
func (*WorkerJob) GetWorkerJobFileContents ¶
GetWorkerJobFileContents returns the contents of the file
func (*WorkerJob) SetQueueMessageInfo ¶
func (j *WorkerJob) SetQueueMessageInfo(id azqueue.MessageID, popReceipt azqueue.PopReceipt)