objutil

package
v6.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 12, 2024 License: Apache-2.0 Imports: 29 Imported by: 0

Documentation

Overview

Package objutil provide utility functions for object store clients which expose more complex/configurable behavior than using a base 'objcli.Client'.

Index

Constants

View Source
const (
	// MinPartSize is the minimum size allowed for 'PartSize', and is a hard limit enforced by AWS.
	MinPartSize = objaws.MinUploadSize

	// MPUThreshold is the threshold at which point we break the upload up into multiple requests which are executed
	// concurrently.
	//
	// NOTE: Multipart uploads generally require at least three requests, hence the choice of 'MinPartSize * 3'.
	MPUThreshold = MinPartSize * 3
)
View Source
const MaxUploadParts = objaws.MaxUploadParts

MaxUploadParts is the hard limit on the number of parts that can be uploaded by a 'MPUploader'.

Variables

View Source
var (
	// ErrMPUploaderExceededMaxPartCount is returned if the user attempts to upload more than 'MaxUploadParts' parts.
	ErrMPUploaderExceededMaxPartCount = errors.New("exceeded maximum number of upload parts")

	// ErrMPUploaderAlreadyStopped is returned if the upload is stopped multiple times.
	ErrMPUploaderAlreadyStopped = errors.New("upload has already been stopped")
)
View Source
var ErrCopyToSamePrefix = errors.New("copying to the same prefix within a bucket is not supported")

ErrCopyToSamePrefix is returned if the user provides a destination/source prefix which is the same, within the same bucket when using `CopyObjects`.

Functions

func CompressObjects

func CompressObjects(opts CompressObjectsOptions) ([]byte, error)

CompressObjects takes an object storage prefix and a destination. It will create a zip in destination and compress and upload every object with the given prefix there. Each object will be streamed from cloud storage, through a ZipWriter and back to cloud storage.

func CopyObject

func CopyObject(opts CopyObjectOptions) error

CopyObject copies an object from one place to another breaking the request into multiple parts where it's known that cloud provider limits will be hit.

NOTE: Client must have permissions to both the source/destination buckets.

func CopyObjects

func CopyObjects(opts CopyObjectsOptions) error

CopyObjects from one location to another using a worker pool.

NOTE: When copying within the same bucket, the source/destination prefix can't be the same.

func Download

func Download(opts DownloadOptions) error

Download an object from a remote cloud by breaking it up and downloading it in multiple chunks concurrently.

func PrefixExists

func PrefixExists(opts PrefixExistsOptions) (bool, error)

PrefixExists returns a boolean indicating whether any objects exist in the remote provider that have the given prefix.

func Sync

func Sync(opts SyncOptions) error

Sync copies a directory to/from cloud storage from/to a filepath.

Example:

err = Sync(SyncOptions {
  Client: objaws.NewClient(s3.New(session.New())),
  Source: "/tmp/data/directory",
  Destination: "s3://bucket-name/sub/path/",
})

NOTE: When the filepath has a trailing slash the contents of the directory are up/downloaded, whereas without it the directory itself is up/downloaded. As an example given a file test.txt in /tmp/data/ then running Sync with SyncOptions{Source: "/tmp/data/", Destination: "s3://bucket/foo/"} will result in s3://bucket/foo/test.txt, whereas running with SyncOptions{Source: "/tmp/data", Destination: "s3://bucket/foo/"} will result in s3://bucket/foo/data/test.txt

func Upload

func Upload(opts UploadOptions) error

Upload an object to a remote cloud breaking it down into a multipart upload if the body is over a given size.

Types

type ChunkReader

type ChunkReader struct {
	// contains filtered or unexported fields
}

ChunkReader allows data from an 'io.Reader' in chunks of a given size.

func NewChunkReader

func NewChunkReader(reader ioiface.ReadAtSeeker, size int64) ChunkReader

NewChunkReader creates a new chunk reader which will read chunks of the given size from the provided reader.

func (ChunkReader) ForEach

func (c ChunkReader) ForEach(fn func(chunk *io.SectionReader) error) error

ForEach breaks the 'reader' into chunks running the given function for each chunk created.

type CloudOrFileURL

type CloudOrFileURL struct {
	Provider objval.Provider
	Bucket   string
	Path     string
}

CloudOrFileURL represents a cloud storage url (eg s3://bucket/path/to/file.txt) or a local path.

func ParseCloudOrFileURL

func ParseCloudOrFileURL(argument string) (*CloudOrFileURL, error)

ParseCloudOrFileURL parses a URL which is either a file path or a cloud path. It will automatically convert a local path into an absolute path using the 'fsutil.ConvertToAbsolutePath' function.

func (*CloudOrFileURL) Join

func (u *CloudOrFileURL) Join(args ...string) *CloudOrFileURL

Join returns a new CloudOrFileURL with args appended to u.

func (*CloudOrFileURL) String

func (u *CloudOrFileURL) String() string

type CompressObjectsOptions

type CompressObjectsOptions struct {
	Options

	// Client is the objcli.Client to use to download and upload.
	//
	// NOTE: required
	Client objcli.Client

	// PartUploadWorkers is the number of parts to upload at once.
	PartUploadWorkers int

	// PartCompleteCallback is called once a part has been uploaded.
	PartCompleteCallback PartCompleteFunc

	// ProgressReportCallback is called to report how far into the CompressUpload process we are.
	//
	// NOTE: If provided then CompressUpload will calculate the size of all objects with the given prefix before
	// starting the download, which may take some time.
	ProgressReportCallback ProgressReportFunc

	// SourceBucket is the bucket to compress objects from.
	//
	// NOTE: required
	SourceBucket string

	// Prefix is the prefix of objects to compress.
	//
	// NOTE: required
	Prefix string

	// Delimiter is used when iterating through the objects that begin with Prefix.
	Delimiter string

	// Include allows you to include certain keys in the zip by a regular expression.
	Include []*regexp.Regexp

	// Exclude allows you to exclude certain keys from being in the zip by a regular expression.
	Exclude []*regexp.Regexp

	// DestinationBucket is the bucket to upload to.
	//
	// NOTE: required
	DestinationBucket string

	// Destination is the key to give the zip that is uploaded.
	//
	// NOTE: required
	Destination string

	// Logger is the log.Logger we should use for reporting information.
	Logger *slog.Logger

	// Checksum is a variable of type hash.Hash used to compute checksums for data validation.
	Checksum hash.Hash
}

CompressObjectsOptions specifies options which configure what and how objects are compressed and uploaded.

type CopyObjectOptions

type CopyObjectOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// DestinationBucket is the bucket which the copied object will be placed in.
	//
	// NOTE: This attribute is required.
	DestinationBucket string

	// DestinationKey is the key which will be used for the copied object.
	//
	// NOTE: This attribute is required.
	DestinationKey string

	// SourceBucket is the bucket in which the object being copied resides in.
	//
	// NOTE: This attribute is required.
	SourceBucket string

	// SourceKey is the key of the source object.
	//
	// NOTE: This attribute is required.
	SourceKey string
}

CopyObjectOptions encapsulates the available options which can be used when copying an object.

type CopyObjectsOptions

type CopyObjectsOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// DestinationBucket is the bucket which the copied object will be placed in.
	//
	// NOTE: This attribute is required.
	DestinationBucket string

	// DestinationPrefix is the prefix under which all the objects will be copied to.
	//
	// NOTE: This attribute is required.
	DestinationPrefix string

	// SourceBucket is the bucket in which the object being copied resides in.
	//
	// NOTE: This attribute is required.
	SourceBucket string

	// SourcePrefix is the prefix which will be copied.
	//
	// NOTE: This attribute is required.
	SourcePrefix string

	// SourceDelimiter is the delimiter used when listing, allowing listing/copying of only a single directory.
	SourceDelimiter string

	// SourceInclude allows selecting keys which only match any of the given expressions.
	SourceInclude []*regexp.Regexp

	// SourceExclude allows skipping keys which may any of the given expressions.
	SourceExclude []*regexp.Regexp

	// Logger is the logger that'll be used.
	Logger *slog.Logger
}

CopyObjectsOptions encapsulates the available options which can be used when copying objects from one prefix to another.

type DownloadOptions

type DownloadOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// Bucket is the bucket to download the object from.
	//
	// NOTE: This attribute is required.
	Bucket string

	// Key is the key for the object being downloaded.
	//
	// NOTE: This attribute is required.
	Key string

	// ByteRange to download from the object.
	//
	// NOTE: Download will not create sparse files, a non-zero start offset will be "shifted" prior to being written to
	// disk.
	ByteRange *objval.ByteRange

	// Writer is the destination for the object.
	//
	// NOTE: The given write must be thread safe.
	Writer io.WriterAt
}

DownloadOptions encapsulates the options available when using the 'Download' function to download data from a remote cloud.

type ErrInvalidCloudPath

type ErrInvalidCloudPath struct {
	// contains filtered or unexported fields
}

ErrInvalidCloudPath returns if the user has incorrectly used the cloud style scheme prefixed argument; the error message indicates/display the correct usage to the user.

func (*ErrInvalidCloudPath) Error

func (e *ErrInvalidCloudPath) Error() string

type MPDownloader

type MPDownloader struct {
	// contains filtered or unexported fields
}

MPDownloader is a multipart downloader which downloads an object from a remote cloud by performing multiple requests concurrently using a worker pool.

func NewMPDownloader

func NewMPDownloader(opts MPDownloaderOptions) *MPDownloader

NewMPDownloader creates a new multipart downloader using the given objects.

func (*MPDownloader) Download

func (m *MPDownloader) Download() error

Download executes the download.

NOTE: If no byte range is provided, the whole object will be downloaded.

type MPDownloaderOptions

type MPDownloaderOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// Bucket is the bucket to download the object from.
	//
	// NOTE: This attribute is required.
	Bucket string

	// Key is the key for the object being downloaded.
	//
	// NOTE: This attribute is required.
	Key string

	// ByteRange to download from the object.
	//
	// NOTE: Download will not create sparse files, a non-zero start offset will be "shifted" prior to being written to
	// disk.
	ByteRange *objval.ByteRange

	// Writer is the destination for the object.
	//
	// NOTE: The given write must be thread safe.
	Writer io.WriterAt
}

MPDownloaderOptions encapsulates the options available when creating a 'MPDownloader'.

type MPUploader

type MPUploader struct {
	// contains filtered or unexported fields
}

MPUploader is a multipart uploader which adds parts to a remote multipart upload whilst concurrently uploading data using a worker pool.

func NewMPUploader

func NewMPUploader(opts MPUploaderOptions) (*MPUploader, error)

NewMPUploader creates a new multipart uploader using the given options, this will create a new multipart upload if one hasn't already been provided.

NOTE: Either Commit or Abort should be called to avoid resource leaks.

func (*MPUploader) Abort

func (m *MPUploader) Abort() error

Abort the multipart upload and stop the worker pool.

func (*MPUploader) Commit

func (m *MPUploader) Commit() error

Commit the multipart upload and stop the worker pool.

func (*MPUploader) Stop

func (m *MPUploader) Stop() error

Stop the worker pool without committing/aborting the upload.

NOTE: Using the uploader after calling 'Stop' will lead to undefined behavior.

func (*MPUploader) Upload

func (m *MPUploader) Upload(body io.ReadSeeker) error

Upload the given body as a part for the multipart upload.

NOTE: This function is not thread safe.

func (*MPUploader) UploadID

func (m *MPUploader) UploadID() string

UploadID returns the upload id created by the multipart uploader.

NOTE: Depending on the underlying client, this upload id may be empty.

func (*MPUploader) UploadWithMeta

func (m *MPUploader) UploadWithMeta(metadata any, body io.ReadSeeker) error

UploadWithMeta uploads the given body as a part for the multipart upload. The provided metadata will be returned unmodified via the 'OnPartComplete' callback and may be used to pass metdata that may be persisted to disk at the same time as the completed part.

NOTE: This function is not thread safe.

type MPUploaderOptions

type MPUploaderOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// Bucket is the bucket to upload the object to.
	//
	// NOTE: This attribute is required.
	Bucket string

	// ID is the id for an in-progress multipart upload that should be "continued".
	//
	// NOTE: Here be dragons, no validation takes place to ensure an upload with the provided id exists.
	ID string

	// Key is the key for the object being uploaded.
	//
	// NOTE: This attribute is required.
	Key string

	// Parts is the list of parts for an in-progress multipart upload which is being continued. Should be supplied in
	// conjunction with 'ID'.
	//
	// NOTE: Here be dragons, no validation takes place to ensure these parts are still available.
	Parts []objval.Part

	// OnPartComplete is a callback which is run after successfully uploading each part.
	//
	// This function:
	// 1. Should not block, since it will block other parts from uploading
	// 2. Will not be called concurrently by multiple goroutines
	// 3. Will be called "out-of-order", parts may be completed in any arbitrary order
	//
	// This callback may be used to track parts and persist them to disk to allow robust multipart uploads.
	OnPartComplete OnPartCompleteFunc
}

MPUploaderOptions encapsulates the options available when creating a 'MPUploader'.

type OnPartCompleteFunc

type OnPartCompleteFunc func(metadata any, part objval.Part) error

OnPartCompleteFunc is a readability wrapper around a callback function which may be run after each part has been uploaded.

type Options

type Options struct {
	// Context is the 'context.Context' that can be used to cancel all requests.
	Context context.Context

	// ParseSize is the size in bytes of individual parts in multipart up/download.
	PartSize int64
}

Options contains common options for upload/download of objects.

func (Options) WithContext

func (o Options) WithContext(ctx context.Context) Options

WithContext returns a copy of the options using the given context.

type PartCompleteFunc

type PartCompleteFunc func(size int)

PartCompleteFunc is called once a part of the zip file has been uploaded. size is the size of the part uploaded.

type PrefixExistsOptions

type PrefixExistsOptions struct {
	// Context is the ctx.Context that can be used to cancel all requests.
	Context context.Context

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// Bucket is the bucket to upload the object to.
	//
	// NOTE: This attribute is required.
	Bucket string

	// Prefix is the prefix that is being checked.
	//
	// NOTE: This attribute is required.
	Prefix string
}

PrefixExistsOptions encapsulates the options available when running 'PrefixExists'.

type ProgressReportFunc

type ProgressReportFunc func(progress float64)

ProgressReportFunc is called every time a file has been fully downloaded during CompressUpload. progress is how far into downloading every object with the path prefix we are.

type SyncOptions

type SyncOptions struct {
	Options

	// Limiter will rate limit the reads/writes for upload/download.
	Limiter *rate.Limiter

	// Client is the client used to perform the operation. If not passed then a default client will be created using the
	// scheme
	//
	// NOTE: Required
	Client objcli.Client

	// Source is where to sync from
	//
	// NOTE: Required
	Source string

	// Destination is where to sync to
	//
	// NOTE: Required
	Destination string

	// MPUThreshold is a threshold at which point objects which broken down into multipart uploads.
	//
	// NOTE: Only used for upload.
	MPUThreshold int64

	// Logger is the passed Logger struct that impletments the Log method for logger the user wants to use.
	Logger *slog.Logger
}

SyncOptions encapsulates all the options available when syncing a directory/object to/from a remote cloud.

type Syncer

type Syncer struct {
	// contains filtered or unexported fields
}

Syncer exposes the ability to sync files and directories to/from a remote cloud provider.

func NewSyncer

func NewSyncer(opts SyncOptions) *Syncer

NewSyncer creates a new syncer using the given options.

func (*Syncer) Download

func (s *Syncer) Download(source, destination *CloudOrFileURL) error

Download all files under the prefix in opts.Source to the given destination. Assumes source is a cloud path and destination is a local path to a directory.

NOTE: If you specify a source such as "path/to/dir" then the directory "path/to/dir/" is considered under the source, so a "dir" directory will be created under your destination. To avoid this specify your source with a trailing slash.

func (*Syncer) Upload

func (s *Syncer) Upload(source, destination *CloudOrFileURL) error

Upload a directory from the local file system to cloud storage. Assumes source is a file:// URL and destination is a cloud-specific one.

type UploadOptions

type UploadOptions struct {
	Options

	// Client is the client used to perform the operation.
	//
	// NOTE: This attribute is required.
	Client objcli.Client

	// Bucket is the bucket to upload the object to.
	//
	// NOTE: This attribute is required.
	Bucket string

	// Key is the key for the object being uploaded.
	//
	// NOTE: This attribute is required.
	Key string

	// Body is the content which should be used for the body of the object.
	//
	// NOTE: This attribute is required.
	Body ioiface.ReadAtSeeker

	// MPUThreshold is a threshold at which point objects which broken down into multipart uploads.
	MPUThreshold int64
}

UploadOptions encapsulates the options available when using the 'Upload' function to upload data to a remote cloud.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL