Documentation ¶
Overview ¶
Package transfermanager provides an easy way to parallelize downloads in Google Cloud Storage.
More information about Google Cloud Storage is available at https://cloud.google.com/storage/docs.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts, connection pooling and similar aspects of this package.
NOTE: This package is in preview. It is not stable, and is likely to change.
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DownloadBuffer ¶ added in v1.43.0
type DownloadBuffer struct {
// contains filtered or unexported fields
}
DownloadBuffer satisfies the io.WriterAt interface, allowing you to use it as a buffer to download to when using Downloader. DownloadBuffer is thread-safe as long as the ranges being written to do not overlap.
func NewDownloadBuffer ¶ added in v1.43.0
func NewDownloadBuffer(buf []byte) *DownloadBuffer
NewDownloadBuffer initializes a DownloadBuffer using buf as the underlying buffer. Preferred way to create a DownloadBuffer as it does not need to grow the buffer if len(buf) is larger than or equal to the object length or range being downloaded to.
func (*DownloadBuffer) Bytes ¶ added in v1.43.0
func (db *DownloadBuffer) Bytes() []byte
Bytes returns the slice of bytes written to DownloadBuffer. The slice aliases the buffer content at least until the next buffer modification, so immediate changes to the slice will affect the result of future reads.
func (*DownloadBuffer) WriteAt ¶ added in v1.43.0
func (db *DownloadBuffer) WriteAt(p []byte, off int64) (n int, err error)
WriteAt writes len(p) bytes from p to the underlying buffer at offset off, growing the buffer if needed. It returns the number of bytes written from p and any error encountered that caused the write to stop early. WriteAt is thread-safe as long as the ranges being written to do not overlap. The supplied slice p is not retained.
type DownloadDirectoryInput ¶ added in v1.43.0
type DownloadDirectoryInput struct { // Bucket is the bucket in GCS to download from. Required. Bucket string // LocalDirectory specifies the directory to download the matched objects // to. Relative paths are allowed. The directory structure and contents // must not be modified while the download is in progress. // The directory will be created if it does not already exist. Required. LocalDirectory string // StripPrefix is a prefix to be removed from the path of the local file on // download from GCS. For example, if you have an object in GCS called // "mydirectory/a/file", and StripPrefix is set to "mydirectory/", the file // will be downloaded to "{LocalDirectory}/a/file". Optional. StripPrefix string // Prefix is the prefix filter to download objects whose names begin with this. // Optional. Prefix string // StartOffset is used to filter results to objects whose names are // lexicographically equal to or after startOffset. If endOffset is also // set, the objects listed will have names between startOffset (inclusive) // and endOffset (exclusive). Optional. StartOffset string // EndOffset is used to filter results to objects whose names are // lexicographically before endOffset. If startOffset is also set, the // objects listed will have names between startOffset (inclusive) and // endOffset (exclusive). Optional. EndOffset string // MatchGlob is a glob pattern used to filter results (for example, foo*bar). See // https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob // for syntax details. Optional. MatchGlob string // Callback will run after all the objects in the directory as selected by // the provided filters are finished downloading. // It must be set if and only if the [WithCallbacks] option is set. // WaitAndClose will wait for all callbacks to finish. Callback func([]DownloadOutput) // OnObjectDownload will run after every finished object download. Optional. OnObjectDownload func(*DownloadOutput) }
DownloadDirectoryInput is the input for a directory to download.
type DownloadObjectInput ¶
type DownloadObjectInput struct { // Bucket is the bucket in GCS to download from. Required. Bucket string // Object is the object in GCS to download. Required. Object string // Destination is the WriterAt to which the Downloader will write the object // data, such as an [os.File] file handle or a [DownloadBuffer]. Required. Destination io.WriterAt // Generation, if specified, will request a specific generation of the object. // Optional. By default, the latest generation is downloaded. Generation *int64 // Conditions constrains the download to act on a specific // generation/metageneration of the object. // Optional. Conditions *storage.Conditions // EncryptionKey will be used to decrypt the object's contents. // The encryption key must be a 32-byte AES-256 key. // See https://cloud.google.com/storage/docs/encryption for details. // Optional. EncryptionKey []byte // Range specifies the range to read of the object. // Optional. If not specified, the entire object will be read. Range *DownloadRange // Callback will be run once the object is finished downloading. It must be // set if and only if the [WithCallbacks] option is set; otherwise, it must // not be set. // A worker will be used to execute the callback; therefore, it should not // be a long-running function. WaitAndClose will wait for all callbacks to // finish. Callback func(*DownloadOutput) // contains filtered or unexported fields }
DownloadObjectInput is the input for a single object to download.
type DownloadOutput ¶
type DownloadOutput struct { Bucket string Object string Range *DownloadRange // requested range, if it was specified Err error // error occurring during download Attrs *storage.ReaderObjectAttrs // attributes of downloaded object, if successful // contains filtered or unexported fields }
DownloadOutput provides output for a single object download, including all errors received while downloading object parts. If the download was successful, Attrs will be populated.
type DownloadRange ¶
type DownloadRange struct { // Offset is the starting offset (inclusive) from with the object is read. // If offset is negative, the object is not sharded and is read by a single // worker abs(offset) bytes from the end, and length must also be negative // to indicate all remaining bytes will be read. Offset int64 // Length is the number of bytes to read. // If length is negative or larger than the object size, the object is read // until the end. Length int64 }
DownloadRange specifies the object range. If the object's metadata property "Content-Encoding" is set to "gzip" or satisfies decompressive transcoding per https://cloud.google.com/storage/docs/transcoding that file will be served back whole, regardless of the requested range as Google Cloud Storage dictates.
type Downloader ¶
type Downloader struct {
// contains filtered or unexported fields
}
Downloader manages a set of parallelized downloads.
Example (Asynchronous) ¶
package main import ( "context" "log" "os" "cloud.google.com/go/storage" "cloud.google.com/go/storage/transfermanager" ) func main() { ctx := context.Background() // Pass in any client opts or set retry policy here. client, err := storage.NewClient(ctx) // can also use NewGRPCClient if err != nil { // handle error } // Create Downloader with callbacks plus any desired options, including // number of workers, part size, per operation timeout, etc. d, err := transfermanager.NewDownloader(client, transfermanager.WithCallbacks()) if err != nil { // handle error } defer func() { if _, err := d.WaitAndClose(); err != nil { // one or more of the downloads failed } }() // Create local file writer for output. f, err := os.Create("/path/to/localfile") if err != nil { // handle error } // Create callback function callback := func(out *transfermanager.DownloadOutput) { if out.Err != nil { log.Printf("download of %v failed with error %v", out.Object, out.Err) } else { log.Printf("download of %v succeeded", out.Object) } } // Create download input in := &transfermanager.DownloadObjectInput{ Bucket: "mybucket", Object: "myblob", Destination: f, // Optionally specify params to apply to download. EncryptionKey: []byte("mykey"), // Specify the callback Callback: callback, } // Add to Downloader. if err := d.DownloadObject(ctx, in); err != nil { // handle error } // Repeat if desired. }
Output:
Example (Synchronous) ¶
package main import ( "context" "log" "os" "cloud.google.com/go/storage" "cloud.google.com/go/storage/transfermanager" ) func main() { ctx := context.Background() // Pass in any client opts or set retry policy here. client, err := storage.NewClient(ctx) // can also use NewGRPCClient if err != nil { // handle error } // Create Downloader with desired options, including number of workers, // part size, per operation timeout, etc. d, err := transfermanager.NewDownloader(client, transfermanager.WithWorkers(16)) if err != nil { // handle error } // Create local file writer for output. f, err := os.Create("/path/to/localfile") if err != nil { // handle error } // Create download input in := &transfermanager.DownloadObjectInput{ Bucket: "mybucket", Object: "myblob", Destination: f, // Optionally specify params to apply to download. EncryptionKey: []byte("mykey"), } // Add to Downloader. if err := d.DownloadObject(ctx, in); err != nil { // handle error } // Repeat if desired. // Wait for all downloads to complete. results, err := d.WaitAndClose() if err != nil { // handle error } // Iterate through completed downloads and process results. for _, out := range results { if out.Err != nil { log.Printf("download of %v failed with error %v", out.Object, out.Err) } else { log.Printf("download of %v succeeded", out.Object) } } }
Output:
func NewDownloader ¶
func NewDownloader(c *storage.Client, opts ...Option) (*Downloader, error)
NewDownloader creates a new Downloader to add operations to. Choice of transport, etc is configured on the client that's passed in. The returned Downloader can be shared across goroutines to initiate downloads.
func (*Downloader) DownloadDirectory ¶ added in v1.43.0
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) error
DownloadDirectory queues the download of a set of objects to a local path. This will initiate the download but is non-blocking; call Downloader.Results or use the callback to process the result. DownloadDirectory is thread-safe and can be called simultaneously from different goroutines. DownloadDirectory will resolve any filters on the input and create the needed directory structure locally. Do not modify this struture until the download has completed. DownloadDirectory will fail if any of the files it attempts to download already exist in the local directory.
func (*Downloader) DownloadObject ¶
func (d *Downloader) DownloadObject(ctx context.Context, input *DownloadObjectInput) error
DownloadObject queues the download of a single object. This will initiate the download but is non-blocking; call Downloader.Results or use the callback to process the result. DownloadObject is thread-safe and can be called simultaneously from different goroutines. The download may not start immediately if all workers are busy, so a deadline set on the ctx may time out before the download even starts. To set a timeout that starts with the download, use the [WithPerOpTimeout()] option.
func (*Downloader) WaitAndClose ¶
func (d *Downloader) WaitAndClose() ([]DownloadOutput, error)
WaitAndClose waits for all outstanding downloads to complete and closes the Downloader. Adding new downloads after this has been called will cause an error.
WaitAndClose returns all the results of the downloads and an error wrapping all errors that were encountered by the Downloader when downloading objects. These errors are also returned in the respective DownloadOutput for the failing download. The results are not guaranteed to be in any order. Results will be empty if using the WithCallbacks option. WaitAndClose will wait for all callbacks to finish.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
A Option is an option for a transfermanager Downloader or Uploader.
func SkipIfExists ¶ added in v1.44.0
func SkipIfExists() Option
SkipIfExists returns a TransferManagerOption that will not download files that already exist in the local directory.
By default, if a file already exists the operation will abort and return an error.
func WithCallbacks ¶
func WithCallbacks() Option
WithCallbacks returns a TransferManagerOption that allows the use of callbacks to process the results. If this option is set, then results will not be returned by Downloader.WaitAndClose and must be processed through the callback.
func WithPartSize ¶ added in v1.43.0
WithPartSize returns a TransferManagerOption that specifies the size in bytes of the shards to transfer; that is, if the object is larger than partSize, it will be uploaded or downloaded in concurrent pieces of size partSize.
The default is 32 MiB for downloads.
To turn off sharding, set partSize to 0.
Note that files that support decompressive transcoding will be downloaded in a single piece regardless of the partSize set here.
func WithPerOpTimeout ¶
WithPerOpTimeout returns a TransferManagerOption that sets a timeout on each operation that is performed to download or upload an object. The timeout is set when the operation begins processing, not when it is added. By default, no timeout is set other than an overall timeout as set on the provided context.
func WithWorkers ¶
WithWorkers returns a TransferManagerOption that specifies the maximum number of concurrent goroutines that will be used to download or upload objects. Defaults to runtime.NumCPU()/2.