Documentation
¶
Index ¶
- Constants
- Variables
- func NewStorageServer(args StorageServerArgs) rsstorage.StorageServer
- func WithCopierRequestOptions(opts ...request.Option) func(*Copier)
- type AwsOps
- type Copier
- type DefaultAwsOps
- func (a *DefaultAwsOps) BucketDirs(bucket, s3Prefix string) ([]string, error)
- func (a *DefaultAwsOps) BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error)
- func (a *DefaultAwsOps) BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error)
- type S3Wrapper
- type StorageServer
- func (s *StorageServer) Base() rsstorage.StorageServer
- func (s *StorageServer) CalculateUsage() (types.Usage, error)
- func (s *StorageServer) Check(dir, address string) (bool, *types.ChunksInfo, int64, time.Time, error)
- func (s *StorageServer) Copy(dir, address string, server rsstorage.StorageServer) error
- func (s *StorageServer) Dir() string
- func (s *StorageServer) Enumerate() ([]types.StoredItem, error)
- func (s *StorageServer) Flush(dir, address string)
- func (s *StorageServer) Get(dir, address string) (io.ReadCloser, *types.ChunksInfo, int64, time.Time, bool, error)
- func (s *StorageServer) Locate(dir, address string) string
- func (s *StorageServer) Move(dir, address string, server rsstorage.StorageServer) error
- func (s *StorageServer) Put(resolve types.Resolver, dir, address string) (string, string, error)
- func (s *StorageServer) PutChunked(resolve types.Resolver, dir, address string, sz uint64) (string, string, error)
- func (s *StorageServer) Remove(dir, address string) error
- func (s *StorageServer) Type() types.StorageType
- func (s *StorageServer) Validate() error
- type StorageServerArgs
Examples ¶
Constants ¶
const AmzUnencryptedContentLengthHeader = "X-Amz-Unencrypted-Content-Length"
const DefaultCopyConcurrency = 10
DefaultCopyConcurrency is the default number of goroutines to spin up when using Copy().
const DefaultMultipartCopyThreshold = 10 * 1024 * 1024
DefaultMultipartCopyThreshold is the default object size threshold (in bytes) for using multipart instead of simple copy. The default is 10 MB.
const MaxUploadPartSize int64 = 1024 * 1024 * 1024 * 5
MaxUploadPartSize is the maximum allowed part size when uploading a part to Amazon S3.
const S3Concurrency = 20
Variables ¶
var BinaryReg = regexp.MustCompile(`(.+)(\.tar\.gz|\.zip)$`)
Functions ¶
func NewStorageServer ¶
func NewStorageServer(args StorageServerArgs) rsstorage.StorageServer
func WithCopierRequestOptions ¶
WithCopierRequestOptions appends to the Copier's API request options.
Types ¶
type AwsOps ¶
type AwsOps interface { BucketDirs(bucket, s3Prefix string) ([]string, error) BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error) BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error) }
type Copier ¶
type Copier struct { // MaxPartSize is the maximum multipart chunk size to use (in bytes). It // must be at least 5 MB. The actual size of the chunks will vary, but // remain capped at MaxPartSize. If set to 0, the value of MaxUploadPartSize // is used. MaxPartSize int64 // MultipartCopyThreshold is the minimum size (in bytes) an object should // have before multipart copy is used instead of simple copy. The minimum // is 5 MB, and the maximum is 5 GB. If set to 0, the value of // DefaultMultipartCopyThreshold will be used. MultipartCopyThreshold int64 // Concurrency is the number of goroutines to use when executing a multipart // copy. If this value is set to 0, the value of DefaultCopyConcurrency // will be used. // // The concurrency pool is not shared between calls to Copy. Concurrency int // Setting this value to true will cause the SDK to avoid calling // AbortMultipartUpload on a failure, leaving all successfully copied // parts on S3 for manual recovery. // // Note that storing parts of an incomplete multipart upload counts towards // space usage on S3 and will add additional costs if not cleaned up. LeavePartsOnError bool // S3 is the client used for executing the multipart copy. S3 s3iface.S3API // RequestOptions as a list of request options that will be passed down to // individual API operation requests made by the uploader. RequestOptions []request.Option }
Copier is a structure for calling Copy(). It is safe to call Copy() on this structure for multiple objects and across concurrent goroutines. Mutating the Copier's properties is not safe to be done concurrently.
func NewCopier ¶
func NewCopier(c client.ConfigProvider, options ...func(*Copier)) *Copier
NewCopier creates a new Copier instance for copying objects between buckets and/or keys in S3. Customisations can be passed in options, or otherwise as part of the Copier's Copy method. The ConfigProvider (e.g. a *session.Session) will be used to instantiate a S3 service client.
See NewCopierWithClient for examples.
func NewCopierWithClient ¶
NewCopierWithClient creates a new Copier instance for copying objects between buckets and/or keys in S3. Customisations can be passed in options, or otherwise as part of the Copier's Copy method.
Example ¶
sess := session.Must(session.NewSession()) svc := s3.New(sess) copier := NewCopierWithClient( svc, func(copier *Copier) { copier.LeavePartsOnError = true }) _, _ = copier.CopyWithContext( aws.BackgroundContext(), &s3.CopyObjectInput{ Bucket: aws.String("dest-bucket"), Key: aws.String("lorem/ipsum.txt"), CopySource: aws.String(url.QueryEscape("src-bucket/lorem/ipsum.txt?versionId=1")), })
Output:
func (Copier) Copy ¶
func (c Copier) Copy(input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error)
Copy copies an object between buckets and/or keys in S3. See CopyWithContext for more information.
func (Copier) CopyWithContext ¶
func (c Copier) CopyWithContext(ctx aws.Context, input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error)
CopyWithContext copies an object between buckets and/or keys in S3. When the size of a source object is large enough, it will use concurrent multipart uploads to execute on the copy of an arbitrarily-sized object.
The Copier will use either the CopyObject or UploadPartCopy APIs, and no actual object data will pass through it. If you wish to transform the data, use a combination of s3manager.Downloader and s3manager.Uploader.
The Copier's AWS credentials must have s3:GetObject permission on the source object, and s3:PutObject permission on the destination. It is advisable that it also have s3:AbortMultipartUpload on the destination, as otherwise a failed copy would leave parts abandoned. To disable aborting in case of a failed copy, set LeavePartsOnError to true.
Additional functional options can be provided to configure the individual copies. These options are copies of the Copier instance Upload is called from. Modifying the options will not impact the original Copier instance.
Use the WithCopierRequestOptions helper function to pass in request options that will be applied to all API operations made with this uploader.
It is safe to call this method concurrently across goroutines.
Example ¶
var svc s3iface.S3API copier := NewCopierWithClient(svc) // Copy s3://src-bucket/lorem/ipsum.txt to s3://dest-bucket/lorem/ipsum.txt. // Version 1 of the source object will be copied. out, err := copier.Copy( &s3.CopyObjectInput{ Bucket: aws.String("dest-bucket"), Key: aws.String("lorem/ipsum.txt"), CopySource: aws.String(url.QueryEscape("src-bucket/lorem/ipsum.txt?versionId=1")), }, // Optional parameter for customization. func(c *Copier) { c.LeavePartsOnError = true }) if err != nil { panic(err) } log.Printf("The destination object's ETag is: %s", *out.CopyObjectResult.ETag)
Output:
type DefaultAwsOps ¶
type DefaultAwsOps struct {
// contains filtered or unexported fields
}
func NewAwsOps ¶
func NewAwsOps(sess *session.Session) *DefaultAwsOps
func (*DefaultAwsOps) BucketDirs ¶
func (a *DefaultAwsOps) BucketDirs(bucket, s3Prefix string) ([]string, error)
func (*DefaultAwsOps) BucketObjects ¶
type S3Wrapper ¶
type S3Wrapper interface { CreateBucket(input *s3.CreateBucketInput) (*s3.CreateBucketOutput, error) DeleteBucket(input *s3.DeleteBucketInput) (*s3.DeleteBucketOutput, error) HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error) GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error) DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) Upload(input *s3manager.UploadInput, ctx context.Context, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) CopyObject(bucket, key, newBucket, newKey string) (*s3.CopyObjectOutput, error) MoveObject(bucket, key, newBucket, newKey string) (*s3.CopyObjectOutput, error) ListObjects(bucket, prefix string) ([]string, error) KmsEncrypted() bool }
Encapsulates the S3 services we need
type StorageServer ¶
type StorageServer struct {
// contains filtered or unexported fields
}
func (*StorageServer) Base ¶
func (s *StorageServer) Base() rsstorage.StorageServer
func (*StorageServer) CalculateUsage ¶
func (s *StorageServer) CalculateUsage() (types.Usage, error)
func (*StorageServer) Check ¶
func (s *StorageServer) Check(dir, address string) (bool, *types.ChunksInfo, int64, time.Time, error)
func (*StorageServer) Copy ¶
func (s *StorageServer) Copy(dir, address string, server rsstorage.StorageServer) error
func (*StorageServer) Dir ¶
func (s *StorageServer) Dir() string
func (*StorageServer) Enumerate ¶
func (s *StorageServer) Enumerate() ([]types.StoredItem, error)
func (*StorageServer) Flush ¶
func (s *StorageServer) Flush(dir, address string)
func (*StorageServer) Get ¶
func (s *StorageServer) Get(dir, address string) (io.ReadCloser, *types.ChunksInfo, int64, time.Time, bool, error)
func (*StorageServer) Locate ¶
func (s *StorageServer) Locate(dir, address string) string
func (*StorageServer) Move ¶
func (s *StorageServer) Move(dir, address string, server rsstorage.StorageServer) error
func (*StorageServer) PutChunked ¶
func (*StorageServer) Remove ¶
func (s *StorageServer) Remove(dir, address string) error
func (*StorageServer) Type ¶
func (s *StorageServer) Type() types.StorageType
func (*StorageServer) Validate ¶
func (s *StorageServer) Validate() error
Validate performs S3 actions to ensure that the s3:GetObject, s3:PutObject, and s3:DeleteObject permissions are configured correctly. Note: This doesn't validate all the permissions (e.g. s3:AbortMultipartUpload), but it should be enough to confirm that the storage class is working.
type StorageServerArgs ¶
type StorageServerArgs struct { Bucket string Prefix string Svc S3Wrapper ChunkSize uint64 Waiter rsstorage.ChunkWaiter Notifier rsstorage.ChunkNotifier }