s3server

package
v2.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 10, 2024 License: MIT Imports: 30 Imported by: 0

README

/pkg/rsstorage/servers/s3server

Description

A storage server implementation for Amazon AWS S3 storage. Includes an S3Wrapper interface with a default implementation for easier use. Also includes s3_copier.go to provide support for moving/copying files within S3 without transferring the bytes through the client.

Documentation

Index

Examples

Constants

View Source
const AmzUnencryptedContentLengthHeader = "X-Amz-Unencrypted-Content-Length"
View Source
const DefaultCopyConcurrency = 10

DefaultCopyConcurrency is the default number of goroutines to spin up when using Copy().

View Source
const DefaultMultipartCopyThreshold = 10 * 1024 * 1024

DefaultMultipartCopyThreshold is the default object size threshold (in bytes) for using multipart instead of simple copy. The default is 10 MB.

View Source
const MaxUploadPartSize int64 = 1024 * 1024 * 1024 * 5

MaxUploadPartSize is the maximum allowed part size when uploading a part to Amazon S3.

View Source
const S3Concurrency = 20

Variables

View Source
var BinaryReg = regexp.MustCompile(`(.+)(\.tar\.gz|\.zip)$`)

Functions

func NewStorageServer

func NewStorageServer(args StorageServerArgs) rsstorage.StorageServer

func WithCopierRequestOptions

func WithCopierRequestOptions(opts ...request.Option) func(*Copier)

WithCopierRequestOptions appends to the Copier's API request options.

Types

type AwsOps

type AwsOps interface {
	BucketDirs(bucket, s3Prefix string) ([]string, error)
	BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error)
	BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error)
}

type Copier

type Copier struct {
	// MaxPartSize is the maximum multipart chunk size to use (in bytes). It
	// must be at least 5 MB. The actual size of the chunks will vary, but
	// remain capped at MaxPartSize. If set to 0, the value of MaxUploadPartSize
	// is used.
	MaxPartSize int64

	// MultipartCopyThreshold is the minimum size (in bytes) an object should
	// have before multipart copy is used instead of simple copy. The minimum
	// is 5 MB, and the maximum is 5 GB. If set to 0, the value of
	// DefaultMultipartCopyThreshold will be used.
	MultipartCopyThreshold int64

	// Concurrency is the number of goroutines to use when executing a multipart
	// copy. If this value is set to 0, the value of DefaultCopyConcurrency
	// will be used.
	//
	// The concurrency pool is not shared between calls to Copy.
	Concurrency int

	// Setting this value to true will cause the SDK to avoid calling
	// AbortMultipartUpload on a failure, leaving all successfully copied
	// parts on S3 for manual recovery.
	//
	// Note that storing parts of an incomplete multipart upload counts towards
	// space usage on S3 and will add additional costs if not cleaned up.
	LeavePartsOnError bool

	// S3 is the client used for executing the multipart copy.
	S3 s3iface.S3API

	// RequestOptions as a list of request options that will be passed down to
	// individual API operation requests made by the uploader.
	RequestOptions []request.Option
}

Copier is a structure for calling Copy(). It is safe to call Copy() on this structure for multiple objects and across concurrent goroutines. Mutating the Copier's properties is not safe to be done concurrently.

func NewCopier

func NewCopier(c client.ConfigProvider, options ...func(*Copier)) *Copier

NewCopier creates a new Copier instance for copying objects between buckets and/or keys in S3. Customisations can be passed in options, or otherwise as part of the Copier's Copy method. The ConfigProvider (e.g. a *session.Session) will be used to instantiate a S3 service client.

See NewCopierWithClient for examples.

func NewCopierWithClient

func NewCopierWithClient(svc s3iface.S3API, options ...func(*Copier)) *Copier

NewCopierWithClient creates a new Copier instance for copying objects between buckets and/or keys in S3. Customisations can be passed in options, or otherwise as part of the Copier's Copy method.

Example
sess := session.Must(session.NewSession())
svc := s3.New(sess)
copier := NewCopierWithClient(
	svc, func(copier *Copier) {
		copier.LeavePartsOnError = true
	})
_, _ = copier.CopyWithContext(
	aws.BackgroundContext(), &s3.CopyObjectInput{
		Bucket:     aws.String("dest-bucket"),
		Key:        aws.String("lorem/ipsum.txt"),
		CopySource: aws.String(url.QueryEscape("src-bucket/lorem/ipsum.txt?versionId=1")),
	})
Output:

func (Copier) Copy

func (c Copier) Copy(input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error)

Copy copies an object between buckets and/or keys in S3. See CopyWithContext for more information.

func (Copier) CopyWithContext

func (c Copier) CopyWithContext(ctx aws.Context, input *s3.CopyObjectInput, options ...func(*Copier)) (*s3.CopyObjectOutput, error)

CopyWithContext copies an object between buckets and/or keys in S3. When the size of a source object is large enough, it will use concurrent multipart uploads to execute on the copy of an arbitrarily-sized object.

The Copier will use either the CopyObject or UploadPartCopy APIs, and no actual object data will pass through it. If you wish to transform the data, use a combination of s3manager.Downloader and s3manager.Uploader.

The Copier's AWS credentials must have s3:GetObject permission on the source object, and s3:PutObject permission on the destination. It is advisable that it also have s3:AbortMultipartUpload on the destination, as otherwise a failed copy would leave parts abandoned. To disable aborting in case of a failed copy, set LeavePartsOnError to true.

Additional functional options can be provided to configure the individual copies. These options are copies of the Copier instance Upload is called from. Modifying the options will not impact the original Copier instance.

Use the WithCopierRequestOptions helper function to pass in request options that will be applied to all API operations made with this uploader.

It is safe to call this method concurrently across goroutines.

Example
var svc s3iface.S3API
copier := NewCopierWithClient(svc)

// Copy s3://src-bucket/lorem/ipsum.txt to s3://dest-bucket/lorem/ipsum.txt.
// Version 1 of the source object will be copied.
out, err := copier.Copy(
	&s3.CopyObjectInput{
		Bucket:     aws.String("dest-bucket"),
		Key:        aws.String("lorem/ipsum.txt"),
		CopySource: aws.String(url.QueryEscape("src-bucket/lorem/ipsum.txt?versionId=1")),
	},
	// Optional parameter for customization.
	func(c *Copier) {
		c.LeavePartsOnError = true
	})

if err != nil {
	panic(err)
}

log.Printf("The destination object's ETag is: %s", *out.CopyObjectResult.ETag)
Output:

type DefaultAwsOps

type DefaultAwsOps struct {
	// contains filtered or unexported fields
}

func NewAwsOps

func NewAwsOps(sess *session.Session) *DefaultAwsOps

func (*DefaultAwsOps) BucketDirs

func (a *DefaultAwsOps) BucketDirs(bucket, s3Prefix string) ([]string, error)

func (*DefaultAwsOps) BucketObjects

func (a *DefaultAwsOps) BucketObjects(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) ([]string, error)

func (*DefaultAwsOps) BucketObjectsETagMap

func (a *DefaultAwsOps) BucketObjectsETagMap(bucket, s3Prefix string, concurrency int, recursive bool, reg *regexp.Regexp) (map[string]string, error)

type S3Wrapper

type S3Wrapper interface {
	CreateBucket(input *s3.CreateBucketInput) (*s3.CreateBucketOutput, error)
	DeleteBucket(input *s3.DeleteBucketInput) (*s3.DeleteBucketOutput, error)
	HeadObject(input *s3.HeadObjectInput) (*s3.HeadObjectOutput, error)
	GetObject(input *s3.GetObjectInput) (*s3.GetObjectOutput, error)
	DeleteObject(input *s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error)
	Upload(input *s3manager.UploadInput, ctx context.Context, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
	CopyObject(bucket, key, newBucket, newKey string) (*s3.CopyObjectOutput, error)
	MoveObject(bucket, key, newBucket, newKey string) (*s3.CopyObjectOutput, error)
	ListObjects(bucket, prefix string) ([]string, error)
	KmsEncrypted() bool
}

Encapsulates the S3 services we need

func NewS3Wrapper

func NewS3Wrapper(configInput *rsstorage.ConfigS3, keyID string) (S3Wrapper, error)

type StorageServer

type StorageServer struct {
	// contains filtered or unexported fields
}

func (*StorageServer) Base

func (*StorageServer) CalculateUsage

func (s *StorageServer) CalculateUsage() (types.Usage, error)

func (*StorageServer) Check

func (s *StorageServer) Check(dir, address string) (bool, *types.ChunksInfo, int64, time.Time, error)

func (*StorageServer) Copy

func (s *StorageServer) Copy(dir, address string, server rsstorage.StorageServer) error

func (*StorageServer) Dir

func (s *StorageServer) Dir() string

func (*StorageServer) Enumerate

func (s *StorageServer) Enumerate() ([]types.StoredItem, error)

func (*StorageServer) Flush

func (s *StorageServer) Flush(dir, address string)

func (*StorageServer) Get

func (s *StorageServer) Get(dir, address string) (io.ReadCloser, *types.ChunksInfo, int64, time.Time, bool, error)

func (*StorageServer) Locate

func (s *StorageServer) Locate(dir, address string) string

func (*StorageServer) Move

func (s *StorageServer) Move(dir, address string, server rsstorage.StorageServer) error

func (*StorageServer) Put

func (s *StorageServer) Put(resolve types.Resolver, dir, address string) (string, string, error)

func (*StorageServer) PutChunked

func (s *StorageServer) PutChunked(resolve types.Resolver, dir, address string, sz uint64) (string, string, error)

func (*StorageServer) Remove

func (s *StorageServer) Remove(dir, address string) error

func (*StorageServer) Type

func (s *StorageServer) Type() types.StorageType

func (*StorageServer) Validate

func (s *StorageServer) Validate() error

Validate performs S3 actions to ensure that the s3:GetObject, s3:PutObject, and s3:DeleteObject permissions are configured correctly. Note: This doesn't validate all the permissions (e.g. s3:AbortMultipartUpload), but it should be enough to confirm that the storage class is working.

type StorageServerArgs

type StorageServerArgs struct {
	Bucket    string
	Prefix    string
	Svc       S3Wrapper
	ChunkSize uint64
	Waiter    rsstorage.ChunkWaiter
	Notifier  rsstorage.ChunkNotifier
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL