Documentation ¶
Overview ¶
Package objutil provide utility functions for object store clients which expose more complex/configurable behavior than using a base 'objcli.Client'.
Index ¶
- Constants
- Variables
- func CompressObjects(opts CompressObjectsOptions) ([]byte, error)
- func CopyObject(opts CopyObjectOptions) error
- func CopyObjects(opts CopyObjectsOptions) error
- func Download(opts DownloadOptions) error
- func PrefixExists(opts PrefixExistsOptions) (bool, error)
- func Sync(opts SyncOptions) error
- func Upload(opts UploadOptions) error
- type ChunkReader
- type CloudOrFileURL
- type CompressObjectsOptions
- type CopyObjectOptions
- type CopyObjectsOptions
- type DownloadOptions
- type ErrInvalidCloudPath
- type MPDownloader
- type MPDownloaderOptions
- type MPUploader
- type MPUploaderOptions
- type OnPartCompleteFunc
- type Options
- type PartCompleteFunc
- type PrefixExistsOptions
- type ProgressReportFunc
- type SyncOptions
- type Syncer
- type UploadOptions
Constants ¶
const ( // MinPartSize is the minimum size allowed for 'PartSize', and is a hard limit enforced by AWS. MinPartSize = objaws.MinUploadSize // MPUThreshold is the threshold at which point we break the upload up into multiple requests which are executed // concurrently. // // NOTE: Multipart uploads generally require at least three requests, hence the choice of 'MinPartSize * 3'. MPUThreshold = MinPartSize * 3 )
const MaxUploadParts = objaws.MaxUploadParts
MaxUploadParts is the hard limit on the number of parts that can be uploaded by a 'MPUploader'.
Variables ¶
var ( // ErrMPUploaderExceededMaxPartCount is returned if the user attempts to upload more than 'MaxUploadParts' parts. ErrMPUploaderExceededMaxPartCount = errors.New("exceeded maximum number of upload parts") // ErrMPUploaderAlreadyStopped is returned if the upload is stopped multiple times. ErrMPUploaderAlreadyStopped = errors.New("upload has already been stopped") )
var ErrCopyToSamePrefix = errors.New("copying to the same prefix within a bucket is not supported")
ErrCopyToSamePrefix is returned if the user provides a destination/source prefix which is the same, within the same bucket when using `CopyObjects`.
Functions ¶
func CompressObjects ¶
func CompressObjects(opts CompressObjectsOptions) ([]byte, error)
CompressObjects takes an object storage prefix and a destination. It will create a zip in destination and compress and upload every object with the given prefix there. Each object will be streamed from cloud storage, through a ZipWriter and back to cloud storage.
func CopyObject ¶
func CopyObject(opts CopyObjectOptions) error
CopyObject copies an object from one place to another breaking the request into multiple parts where it's known that cloud provider limits will be hit.
NOTE: Client must have permissions to both the source/destination buckets.
func CopyObjects ¶
func CopyObjects(opts CopyObjectsOptions) error
CopyObjects from one location to another using a worker pool.
NOTE: When copying within the same bucket, the source/destination prefix can't be the same.
func Download ¶
func Download(opts DownloadOptions) error
Download an object from a remote cloud by breaking it up and downloading it in multiple chunks concurrently.
func PrefixExists ¶
func PrefixExists(opts PrefixExistsOptions) (bool, error)
PrefixExists returns a boolean indicating whether any objects exist in the remote provider that have the given prefix.
func Sync ¶
func Sync(opts SyncOptions) error
Sync copies a directory to/from cloud storage from/to a filepath.
Example:
err = Sync(SyncOptions { Client: objaws.NewClient(s3.New(session.New())), Source: "/tmp/data/directory", Destination: "s3://bucket-name/sub/path/", })
NOTE: When the filepath has a trailing slash the contents of the directory are up/downloaded, whereas without it the directory itself is up/downloaded. As an example given a file test.txt in /tmp/data/ then running Sync with SyncOptions{Source: "/tmp/data/", Destination: "s3://bucket/foo/"} will result in s3://bucket/foo/test.txt, whereas running with SyncOptions{Source: "/tmp/data", Destination: "s3://bucket/foo/"} will result in s3://bucket/foo/data/test.txt
func Upload ¶
func Upload(opts UploadOptions) error
Upload an object to a remote cloud breaking it down into a multipart upload if the body is over a given size.
Types ¶
type ChunkReader ¶
type ChunkReader struct {
// contains filtered or unexported fields
}
ChunkReader allows data from an 'io.Reader' in chunks of a given size.
func NewChunkReader ¶
func NewChunkReader(reader ioiface.ReadAtSeeker, size int64) ChunkReader
NewChunkReader creates a new chunk reader which will read chunks of the given size from the provided reader.
func (ChunkReader) ForEach ¶
func (c ChunkReader) ForEach(fn func(chunk *io.SectionReader) error) error
ForEach breaks the 'reader' into chunks running the given function for each chunk created.
type CloudOrFileURL ¶
CloudOrFileURL represents a cloud storage url (eg s3://bucket/path/to/file.txt) or a local path.
func ParseCloudOrFileURL ¶
func ParseCloudOrFileURL(argument string) (*CloudOrFileURL, error)
ParseCloudOrFileURL parses a URL which is either a file path or a cloud path. It will automatically convert a local path into an absolute path using the 'fsutil.ConvertToAbsolutePath' function.
func (*CloudOrFileURL) Join ¶
func (u *CloudOrFileURL) Join(args ...string) *CloudOrFileURL
Join returns a new CloudOrFileURL with args appended to u.
func (*CloudOrFileURL) String ¶
func (u *CloudOrFileURL) String() string
type CompressObjectsOptions ¶
type CompressObjectsOptions struct { Options // Client is the objcli.Client to use to download and upload. // // NOTE: required Client objcli.Client // PartUploadWorkers is the number of parts to upload at once. PartUploadWorkers int // PartCompleteCallback is called once a part has been uploaded. PartCompleteCallback PartCompleteFunc // ProgressReportCallback is called to report how far into the CompressUpload process we are. // // NOTE: If provided then CompressUpload will calculate the size of all objects with the given prefix before // starting the download, which may take some time. ProgressReportCallback ProgressReportFunc // SourceBucket is the bucket to compress objects from. // // NOTE: required SourceBucket string // Prefix is the prefix of objects to compress. // // NOTE: required Prefix string // Delimiter is used when iterating through the objects that begin with Prefix. Delimiter string // Include allows you to include certain keys in the zip by a regular expression. Include []*regexp.Regexp // Exclude allows you to exclude certain keys from being in the zip by a regular expression. Exclude []*regexp.Regexp // DestinationBucket is the bucket to upload to. // // NOTE: required DestinationBucket string // Destination is the key to give the zip that is uploaded. // // NOTE: required Destination string // Logger is the log.Logger we should use for reporting information. Logger *slog.Logger // Checksum is a variable of type hash.Hash used to compute checksums for data validation. Checksum hash.Hash }
CompressObjectsOptions specifies options which configure what and how objects are compressed and uploaded.
type CopyObjectOptions ¶
type CopyObjectOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // DestinationBucket is the bucket which the copied object will be placed in. // // NOTE: This attribute is required. DestinationBucket string // DestinationKey is the key which will be used for the copied object. // // NOTE: This attribute is required. DestinationKey string // SourceBucket is the bucket in which the object being copied resides in. // // NOTE: This attribute is required. SourceBucket string // SourceKey is the key of the source object. // // NOTE: This attribute is required. SourceKey string }
CopyObjectOptions encapsulates the available options which can be used when copying an object.
type CopyObjectsOptions ¶
type CopyObjectsOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // DestinationBucket is the bucket which the copied object will be placed in. // // NOTE: This attribute is required. DestinationBucket string // DestinationPrefix is the prefix under which all the objects will be copied to. // // NOTE: This attribute is required. DestinationPrefix string // SourceBucket is the bucket in which the object being copied resides in. // // NOTE: This attribute is required. SourceBucket string // SourcePrefix is the prefix which will be copied. // // NOTE: This attribute is required. SourcePrefix string // SourceDelimiter is the delimiter used when listing, allowing listing/copying of only a single directory. SourceDelimiter string // SourceInclude allows selecting keys which only match any of the given expressions. SourceInclude []*regexp.Regexp // SourceExclude allows skipping keys which may any of the given expressions. SourceExclude []*regexp.Regexp // Logger is the logger that'll be used. Logger *slog.Logger }
CopyObjectsOptions encapsulates the available options which can be used when copying objects from one prefix to another.
type DownloadOptions ¶
type DownloadOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // Bucket is the bucket to download the object from. // // NOTE: This attribute is required. Bucket string // Key is the key for the object being downloaded. // // NOTE: This attribute is required. Key string // ByteRange to download from the object. // // NOTE: Download will not create sparse files, a non-zero start offset will be "shifted" prior to being written to // disk. ByteRange *objval.ByteRange // Writer is the destination for the object. // // NOTE: The given write must be thread safe. Writer io.WriterAt }
DownloadOptions encapsulates the options available when using the 'Download' function to download data from a remote cloud.
type ErrInvalidCloudPath ¶
type ErrInvalidCloudPath struct {
// contains filtered or unexported fields
}
ErrInvalidCloudPath returns if the user has incorrectly used the cloud style scheme prefixed argument; the error message indicates/display the correct usage to the user.
func (*ErrInvalidCloudPath) Error ¶
func (e *ErrInvalidCloudPath) Error() string
type MPDownloader ¶
type MPDownloader struct {
// contains filtered or unexported fields
}
MPDownloader is a multipart downloader which downloads an object from a remote cloud by performing multiple requests concurrently using a worker pool.
func NewMPDownloader ¶
func NewMPDownloader(opts MPDownloaderOptions) *MPDownloader
NewMPDownloader creates a new multipart downloader using the given objects.
func (*MPDownloader) Download ¶
func (m *MPDownloader) Download() error
Download executes the download.
NOTE: If no byte range is provided, the whole object will be downloaded.
type MPDownloaderOptions ¶
type MPDownloaderOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // Bucket is the bucket to download the object from. // // NOTE: This attribute is required. Bucket string // Key is the key for the object being downloaded. // // NOTE: This attribute is required. Key string // ByteRange to download from the object. // // NOTE: Download will not create sparse files, a non-zero start offset will be "shifted" prior to being written to // disk. ByteRange *objval.ByteRange // Writer is the destination for the object. // // NOTE: The given write must be thread safe. Writer io.WriterAt }
MPDownloaderOptions encapsulates the options available when creating a 'MPDownloader'.
type MPUploader ¶
type MPUploader struct {
// contains filtered or unexported fields
}
MPUploader is a multipart uploader which adds parts to a remote multipart upload whilst concurrently uploading data using a worker pool.
func NewMPUploader ¶
func NewMPUploader(opts MPUploaderOptions) (*MPUploader, error)
NewMPUploader creates a new multipart uploader using the given options, this will create a new multipart upload if one hasn't already been provided.
NOTE: Either Commit or Abort should be called to avoid resource leaks.
func (*MPUploader) Abort ¶
func (m *MPUploader) Abort() error
Abort the multipart upload and stop the worker pool.
func (*MPUploader) Commit ¶
func (m *MPUploader) Commit() error
Commit the multipart upload and stop the worker pool.
func (*MPUploader) Stop ¶
func (m *MPUploader) Stop() error
Stop the worker pool without committing/aborting the upload.
NOTE: Using the uploader after calling 'Stop' will lead to undefined behavior.
func (*MPUploader) Upload ¶
func (m *MPUploader) Upload(body io.ReadSeeker) error
Upload the given body as a part for the multipart upload.
NOTE: This function is not thread safe.
func (*MPUploader) UploadID ¶
func (m *MPUploader) UploadID() string
UploadID returns the upload id created by the multipart uploader.
NOTE: Depending on the underlying client, this upload id may be empty.
func (*MPUploader) UploadWithMeta ¶
func (m *MPUploader) UploadWithMeta(metadata any, body io.ReadSeeker) error
UploadWithMeta uploads the given body as a part for the multipart upload. The provided metadata will be returned unmodified via the 'OnPartComplete' callback and may be used to pass metdata that may be persisted to disk at the same time as the completed part.
NOTE: This function is not thread safe.
type MPUploaderOptions ¶
type MPUploaderOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // Bucket is the bucket to upload the object to. // // NOTE: This attribute is required. Bucket string // ID is the id for an in-progress multipart upload that should be "continued". // // NOTE: Here be dragons, no validation takes place to ensure an upload with the provided id exists. ID string // Key is the key for the object being uploaded. // // NOTE: This attribute is required. Key string // Parts is the list of parts for an in-progress multipart upload which is being continued. Should be supplied in // conjunction with 'ID'. // // NOTE: Here be dragons, no validation takes place to ensure these parts are still available. Parts []objval.Part // OnPartComplete is a callback which is run after successfully uploading each part. // // This function: // 1. Should not block, since it will block other parts from uploading // 2. Will not be called concurrently by multiple goroutines // 3. Will be called "out-of-order", parts may be completed in any arbitrary order // // This callback may be used to track parts and persist them to disk to allow robust multipart uploads. OnPartComplete OnPartCompleteFunc }
MPUploaderOptions encapsulates the options available when creating a 'MPUploader'.
type OnPartCompleteFunc ¶
OnPartCompleteFunc is a readability wrapper around a callback function which may be run after each part has been uploaded.
type Options ¶
type Options struct { // Context is the 'context.Context' that can be used to cancel all requests. Context context.Context // ParseSize is the size in bytes of individual parts in multipart up/download. PartSize int64 }
Options contains common options for upload/download of objects.
type PartCompleteFunc ¶
type PartCompleteFunc func(size int)
PartCompleteFunc is called once a part of the zip file has been uploaded. size is the size of the part uploaded.
type PrefixExistsOptions ¶
type PrefixExistsOptions struct { // Context is the ctx.Context that can be used to cancel all requests. Context context.Context // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // Bucket is the bucket to upload the object to. // // NOTE: This attribute is required. Bucket string // Prefix is the prefix that is being checked. // // NOTE: This attribute is required. Prefix string }
PrefixExistsOptions encapsulates the options available when running 'PrefixExists'.
type ProgressReportFunc ¶
type ProgressReportFunc func(progress float64)
ProgressReportFunc is called every time a file has been fully downloaded during CompressUpload. progress is how far into downloading every object with the path prefix we are.
type SyncOptions ¶
type SyncOptions struct { Options // Limiter will rate limit the reads/writes for upload/download. Limiter *rate.Limiter // Client is the client used to perform the operation. If not passed then a default client will be created using the // scheme // // NOTE: Required Client objcli.Client // Source is where to sync from // // NOTE: Required Source string // Destination is where to sync to // // NOTE: Required Destination string // MPUThreshold is a threshold at which point objects which broken down into multipart uploads. // // NOTE: Only used for upload. MPUThreshold int64 // Logger is the passed Logger struct that impletments the Log method for logger the user wants to use. Logger *slog.Logger }
SyncOptions encapsulates all the options available when syncing a directory/object to/from a remote cloud.
type Syncer ¶
type Syncer struct {
// contains filtered or unexported fields
}
Syncer exposes the ability to sync files and directories to/from a remote cloud provider.
func NewSyncer ¶
func NewSyncer(opts SyncOptions) *Syncer
NewSyncer creates a new syncer using the given options.
func (*Syncer) Download ¶
func (s *Syncer) Download(source, destination *CloudOrFileURL) error
Download all files under the prefix in opts.Source to the given destination. Assumes source is a cloud path and destination is a local path to a directory.
NOTE: If you specify a source such as "path/to/dir" then the directory "path/to/dir/" is considered under the source, so a "dir" directory will be created under your destination. To avoid this specify your source with a trailing slash.
func (*Syncer) Upload ¶
func (s *Syncer) Upload(source, destination *CloudOrFileURL) error
Upload a directory from the local file system to cloud storage. Assumes source is a file:// URL and destination is a cloud-specific one.
type UploadOptions ¶
type UploadOptions struct { Options // Client is the client used to perform the operation. // // NOTE: This attribute is required. Client objcli.Client // Bucket is the bucket to upload the object to. // // NOTE: This attribute is required. Bucket string // Key is the key for the object being uploaded. // // NOTE: This attribute is required. Key string // Body is the content which should be used for the body of the object. // // NOTE: This attribute is required. Body ioiface.ReadAtSeeker // MPUThreshold is a threshold at which point objects which broken down into multipart uploads. MPUThreshold int64 }
UploadOptions encapsulates the options available when using the 'Upload' function to upload data to a remote cloud.