storage

package
v1.1.0-beta.0...-cc83417 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2024 License: Apache-2.0, Apache-2.0 Imports: 85 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// AccessBuckets represents bucket access permission
	// it replace the origin skip-check-path.
	AccessBuckets Permission = "AccessBucket"

	// ListObjects represents listObjects permission
	ListObjects Permission = "ListObjects"
	// GetObject represents GetObject permission
	GetObject Permission = "GetObject"
	// PutObject represents PutObject permission
	PutObject Permission = "PutObject"
	// PutAndDeleteObject represents PutAndDeleteObject permission
	// we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead.
	PutAndDeleteObject Permission = "PutAndDeleteObject"

	DefaultRequestConcurrency uint  = 128
	TombstoneSize             int64 = -1
)
View Source
const (

	// LocalURIPrefix represents the local storage prefix.
	LocalURIPrefix = "file://"
)

Variables

View Source
var WriteBufferSize = 5 * 1024 * 1024

WriteBufferSize is the size of the buffer used for writing. (64K may be a better choice)

Functions

func CloneDefaultHttpTransport

func CloneDefaultHttpTransport() (*http.Transport, bool)

func DefineFlags

func DefineFlags(flags *pflag.FlagSet)

DefineFlags adds flags to the flag set corresponding to all backend options.

func ExtractQueryParameters

func ExtractQueryParameters(u *url.URL, options any)

ExtractQueryParameters moves the query parameters of the URL into the options using reflection.

The options must be a pointer to a struct which contains only string or bool fields (more types will be supported in the future), and tagged for JSON serialization.

All of the URL's query parameters will be removed after calling this method.

func FormatBackendURL

func FormatBackendURL(backend *backuppb.StorageBackend) (u url.URL)

FormatBackendURL obtains the raw URL which can be used the reconstruct the backend. The returned URL does not contain options for further configurating the backend. This is to avoid exposing secret tokens.

func GetActiveUploadWorkerCount

func GetActiveUploadWorkerCount() int64

GetActiveUploadWorkerCount returns the active upload worker count.

func GetDefaultHttpClient

func GetDefaultHttpClient(concurrency uint) *http.Client

Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` to the actual request concurrency to reuse tcp connection as much as possible.

func HiddenFlagsForStream

func HiddenFlagsForStream(flags *pflag.FlagSet)

HiddenFlagsForStream hidden flags for stream cmd.

func IsLocal

func IsLocal(u *url.URL) bool

IsLocal returns true if the URL is a local file path.

func IsLocalPath

func IsLocalPath(p string) (bool, error)

IsLocalPath returns true if the path is a local file path.

func JSONEffects

func JSONEffects(es []Effect, output io.Writer) error

JSONEffects converts a slices of effects into json. The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}`

func ParseBackend

func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error)

ParseBackend constructs a structured backend description from the storage URL.

func ParseBackendFromURL

func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error)

ParseBackendFromURL constructs a structured backend description from the *url.URL.

func ParseRawURL

func ParseRawURL(rawURL string) (*url.URL, error)

ParseRawURL parse raw url to url object.

func PutAndDeleteObjectCheck

func PutAndDeleteObjectCheck(ctx context.Context, svc s3iface.S3API, options *backuppb.S3) (err error)

PutAndDeleteObjectCheck checks the permission of putObject S3 API doesn't provide a way to check the permission, we have to put an object to check the permission. exported for testing.

func ReadDataInRange

func ReadDataInRange(
	ctx context.Context,
	storage ExternalStorage,
	name string,
	start int64,
	p []byte,
) (n int, err error)

ReadDataInRange reads data from storage in range [start, start+len(p)).

func SaveJSONEffectsToTmp

func SaveJSONEffectsToTmp(es []Effect) (string, error)

func UnmarshalDir

func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T]

UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function. Returning an iterator that yields the unmarshaled content. The "unmarshal" function should put the result of unmarshalling to the `target` argument.

func ValidateCloudStorageURI

func ValidateCloudStorageURI(ctx context.Context, uri string) error

ValidateCloudStorageURI makes validation for tidb_cloud_storage_uri.

Types

type AzblobBackendOptions

type AzblobBackendOptions struct {
	Endpoint        string `json:"endpoint" toml:"endpoint"`
	AccountName     string `json:"account-name" toml:"account-name"`
	AccountKey      string `json:"account-key" toml:"account-key"`
	AccessTier      string `json:"access-tier" toml:"access-tier"`
	SASToken        string `json:"sas-token" toml:"sas-token"`
	EncryptionScope string `json:"encryption-scope" toml:"encryption-scope"`
	EncryptionKey   string `json:"encryption-key" toml:"encryption-key"`
}

AzblobBackendOptions is the options for Azure Blob storage.

type AzureBlobStorage

type AzureBlobStorage struct {
	// contains filtered or unexported fields
}

AzureBlobStorage is a storage engine that stores data in Azure Blob Storage.

func (*AzureBlobStorage) Close

func (*AzureBlobStorage) Close()

Close implements the ExternalStorage interface.

func (*AzureBlobStorage) Create

Create implements the StorageWriter interface.

func (*AzureBlobStorage) DeleteFile

func (s *AzureBlobStorage) DeleteFile(ctx context.Context, name string) error

DeleteFile deletes the file with the given name.

func (*AzureBlobStorage) DeleteFiles

func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) error

DeleteFile deletes the files with the given names.

func (*AzureBlobStorage) FileExists

func (s *AzureBlobStorage) FileExists(ctx context.Context, name string) (bool, error)

FileExists checks if a file exists in Azure Blob Storage.

func (*AzureBlobStorage) MarkStrongConsistency

func (*AzureBlobStorage) MarkStrongConsistency()

func (*AzureBlobStorage) Open

Open implements the StorageReader interface.

func (*AzureBlobStorage) ReadFile

func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error)

ReadFile reads a file from Azure Blob Storage.

func (*AzureBlobStorage) Rename

func (s *AzureBlobStorage) Rename(ctx context.Context, oldFileName, newFileName string) error

Rename implements the StorageWriter interface.

func (*AzureBlobStorage) URI

func (s *AzureBlobStorage) URI() string

URI implements the StorageReader interface.

func (*AzureBlobStorage) WalkDir

func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error

WalkDir implements the StorageReader interface.

func (*AzureBlobStorage) WriteFile

func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []byte) error

WriteFile writes a file to Azure Blob Storage.

type BackendOptions

type BackendOptions struct {
	S3     S3BackendOptions     `json:"s3" toml:"s3"`
	GCS    GCSBackendOptions    `json:"gcs" toml:"gcs"`
	Azblob AzblobBackendOptions `json:"azblob" toml:"azblob"`
}

BackendOptions further configures the storage backend not expressed by the storage URL.

func (*BackendOptions) ParseFromFlags

func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error

ParseFromFlags obtains the backend options from the flag set.

type Batched

type Batched struct {
	ExternalStorage
	// contains filtered or unexported fields
}

Batched is a wrapper of an external storage that suspends all write operations ("effects"). If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage. In that case, we have done a "dry run".

You may use `ReadOnlyEffects()` to get the history of the effects. But don't modify the returned slice!

You may use `Commit()` to execute all suspended effects.

func Batch

func Batch(s ExternalStorage) *Batched

Batch wraps an external storage instance to a batched version.

func (*Batched) CleanEffects

func (d *Batched) CleanEffects()

CleanEffects cleans all suspended effects.

func (*Batched) Commit

func (d *Batched) Commit(ctx context.Context) error

Commit performs all effects recorded so long in the REAL external storage. This will cleanup all of the suspended effects.

func (*Batched) Create

func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)

func (*Batched) DeleteFile

func (d *Batched) DeleteFile(ctx context.Context, name string) error

func (*Batched) DeleteFiles

func (d *Batched) DeleteFiles(ctx context.Context, names []string) error

func (*Batched) ReadOnlyEffects

func (d *Batched) ReadOnlyEffects() []Effect

Fetch all effects from the batched storage.

**The returned slice should not be modified.**

func (*Batched) Rename

func (d *Batched) Rename(ctx context.Context, oldName, newName string) error

func (*Batched) WriteFile

func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error

type BytesWriter

type BytesWriter struct {
	// contains filtered or unexported fields
}

BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.

func NewBufferWriter

func NewBufferWriter() *BytesWriter

NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing).

func (*BytesWriter) Bytes

func (u *BytesWriter) Bytes() []byte

Bytes delegates to bytes.Buffer.

func (*BytesWriter) Close

func (*BytesWriter) Close(_ context.Context) error

Close delegates to bytes.Buffer.

func (*BytesWriter) Reset

func (u *BytesWriter) Reset()

Reset delegates to bytes.Buffer.

func (*BytesWriter) String

func (u *BytesWriter) String() string

String delegates to bytes.Buffer.

func (*BytesWriter) Write

func (u *BytesWriter) Write(_ context.Context, p []byte) (int, error)

Write delegates to bytes.Buffer.

type ClientBuilder

type ClientBuilder interface {
	// Example of serviceURL: https://<your_storage_account>.blob.core.windows.net
	GetServiceClient() (*azblob.Client, error)
	GetAccountName() string
}

ClientBuilder provides common method to build a service client.

type CompleteMultipartUpload

type CompleteMultipartUpload struct {
	XMLName xml.Name `xml:"CompleteMultipartUpload"`
	Text    string   `xml:",chardata"`
	Parts   []Part   `xml:"Part"`
}

type CompressType

type CompressType uint8

CompressType represents the type of compression.

const (
	// NoCompression won't compress given bytes.
	NoCompression CompressType = iota
	// Gzip will compress given bytes in gzip format.
	Gzip
	// Snappy will compress given bytes in snappy format.
	Snappy
	// Zstd will compress given bytes in zstd format.
	Zstd
)

type DecompressConfig

type DecompressConfig struct {
	// ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency.
	// if not 1, ZStd will decode file asynchronously.
	ZStdDecodeConcurrency int
}

DecompressConfig is the config used for decompression.

type EffDeleteFile

type EffDeleteFile string

EffDeleteFile is the side effect of a call to `DeleteFile`.

type EffDeleteFiles

type EffDeleteFiles struct {
	Files []string `json:"files"`
}

EffDeleteFiles is the side effect of a call to `DeleteFiles`.

type EffPut

type EffPut struct {
	File    string `json:"file"`
	Content []byte `json:"content"`
}

EffPut is the side effect of a call to `WriteFile`.

type EffRename

type EffRename struct {
	From string `json:"from"`
	To   string `json:"to"`
}

EffRename is the side effect of a call to `Rename`.

type Effect

type Effect any

Effect is an side effect that happens in the batch storage.

type ErrLocked

type ErrLocked struct {
	Meta LockMeta
}

ErrLocked is the error returned when the lock is held by others.

func (ErrLocked) Error

func (e ErrLocked) Error() string

type ExternalFileReader

type ExternalFileReader interface {
	io.ReadSeekCloser
	// GetFileSize returns the file size.
	GetFileSize() (int64, error)
}

ExternalFileReader represents the streaming external file reader.

func InterceptDecompressReader

func InterceptDecompressReader(
	fileReader ExternalFileReader,
	compressType CompressType,
	cfg DecompressConfig,
) (ExternalFileReader, error)

InterceptDecompressReader intercepts the reader and wraps it with a decompress reader on the given ExternalFileReader. Note that the returned ExternalFileReader does not have the property that Seek(0, io.SeekCurrent) equals total bytes Read() if the decompress reader is used.

func NewLimitedInterceptReader

func NewLimitedInterceptReader(
	fileReader ExternalFileReader,
	compressType CompressType,
	cfg DecompressConfig,
	n int64,
) (ExternalFileReader, error)

type ExternalFileWriter

type ExternalFileWriter interface {
	// Write writes to buffer and if chunk is filled will upload it
	Write(ctx context.Context, p []byte) (int, error)
	// Close writes final chunk and completes the upload
	Close(ctx context.Context) error
}

ExternalFileWriter represents the streaming external file writer.

func NewUploaderWriter

func NewUploaderWriter(writer ExternalFileWriter, chunkSize int, compressType CompressType) ExternalFileWriter

NewUploaderWriter wraps the Writer interface over an uploader.

type ExternalStorage

type ExternalStorage interface {
	// WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic
	WriteFile(ctx context.Context, name string, data []byte) error
	// ReadFile reads a complete file from storage, similar to os.ReadFile
	ReadFile(ctx context.Context, name string) ([]byte, error)
	// FileExists return true if file exists
	FileExists(ctx context.Context, name string) (bool, error)
	// DeleteFile delete the file in storage
	DeleteFile(ctx context.Context, name string) error
	// Open a Reader by file path. path is relative path to storage base path.
	// Some implementation will use the given ctx as the inner context of the reader.
	Open(ctx context.Context, path string, option *ReaderOption) (ExternalFileReader, error)
	// DeleteFiles delete the files in storage
	DeleteFiles(ctx context.Context, names []string) error
	// WalkDir traverse all the files in a dir.
	//
	// fn is the function called for each regular file visited by WalkDir.
	// The argument `path` is the file path that can be used in `Open`
	// function; the argument `size` is the size in byte of the file determined
	// by path.
	WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error

	// URI returns the base path as a URI
	URI() string

	// Create opens a file writer by path. path is relative path to storage base
	// path. The old file under same path will be overwritten. Currently only s3
	// implemented WriterOption.
	Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
	// Rename file name from oldFileName to newFileName
	Rename(ctx context.Context, oldFileName, newFileName string) error
	// Close release the resources of the storage.
	Close()
}

ExternalStorage represents a kind of file system storage.

func Create

func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error)

Create creates ExternalStorage.

Please consider using `New` in the future.

func New

New creates an ExternalStorage with options.

func NewFromURL

func NewFromURL(ctx context.Context, uri string) (ExternalStorage, error)

NewFromURL creates an ExternalStorage from URL.

func NewWithDefaultOpt

func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (ExternalStorage, error)

NewWithDefaultOpt creates ExternalStorage with default options.

func WithCompression

func WithCompression(inner ExternalStorage, compressionType CompressType, cfg DecompressConfig) ExternalStorage

WithCompression returns an ExternalStorage with compress option

type ExternalStorageOptions

type ExternalStorageOptions struct {
	// SendCredentials marks whether to send credentials downstream.
	//
	// This field should be set to false if the credentials are provided to
	// downstream via external key managers, e.g. on K8s or cloud provider.
	SendCredentials bool

	// NoCredentials means that no cloud credentials are supplied to BR
	NoCredentials bool

	// HTTPClient to use. The created storage may ignore this field if it is not
	// directly using HTTP (e.g. the local storage).
	// NOTICE: the HTTPClient is only used by s3/azure/gcs.
	// For GCS, we will use this as base client to init a client with credentials.
	HTTPClient *http.Client

	// CheckPermissions check the given permission in New() function.
	// make sure we can access the storage correctly before execute tasks.
	CheckPermissions []Permission

	// S3Retryer is the retryer for create s3 storage, if it is nil,
	// defaultS3Retryer() will be used.
	S3Retryer request.Retryer

	// CheckObjectLockOptions check the s3 bucket has enabled the ObjectLock.
	// if enabled. it will send the options to tikv.
	CheckS3ObjectLockOptions bool
}

ExternalStorageOptions are backend-independent options provided to New.

type GCSBackendOptions

type GCSBackendOptions struct {
	Endpoint        string `json:"endpoint" toml:"endpoint"`
	StorageClass    string `json:"storage-class" toml:"storage-class"`
	PredefinedACL   string `json:"predefined-acl" toml:"predefined-acl"`
	CredentialsFile string `json:"credentials-file" toml:"credentials-file"`
}

GCSBackendOptions are options for configuration the GCS storage.

type GCSStorage

type GCSStorage struct {
	// contains filtered or unexported fields
}

GCSStorage defines some standard operations for BR/Lightning on the GCS storage. It implements the `ExternalStorage` interface.

func NewGCSStorage

func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorageOptions) (*GCSStorage, error)

NewGCSStorage creates a GCS external storage implementation.

func (*GCSStorage) Close

func (s *GCSStorage) Close()

Close implements ExternalStorage interface.

func (*GCSStorage) Create

func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (ExternalFileWriter, error)

Create implements ExternalStorage interface.

func (*GCSStorage) DeleteFile

func (s *GCSStorage) DeleteFile(ctx context.Context, name string) error

DeleteFile delete the file in storage

func (*GCSStorage) DeleteFiles

func (s *GCSStorage) DeleteFiles(ctx context.Context, names []string) error

DeleteFiles delete the files in storage.

func (*GCSStorage) FileExists

func (s *GCSStorage) FileExists(ctx context.Context, name string) (bool, error)

FileExists return true if file exists.

func (*GCSStorage) GetBucketHandle

func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle

GetBucketHandle gets the handle to the GCS API on the bucket.

func (*GCSStorage) GetOptions

func (s *GCSStorage) GetOptions() *backuppb.GCS

GetOptions gets the external storage operations for the GCS.

func (*GCSStorage) MarkStrongConsistency

func (s *GCSStorage) MarkStrongConsistency()

func (*GCSStorage) Open

Open a Reader by file path.

func (*GCSStorage) ReadFile

func (s *GCSStorage) ReadFile(ctx context.Context, name string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*GCSStorage) Rename

func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string) error

Rename file name from oldFileName to newFileName.

func (*GCSStorage) Reset

func (s *GCSStorage) Reset(ctx context.Context) error

Reset resets the GCS storage.

func (*GCSStorage) URI

func (s *GCSStorage) URI() string

func (*GCSStorage) WalkDir

func (s *GCSStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*GCSStorage) WriteFile

func (s *GCSStorage) WriteFile(ctx context.Context, name string, data []byte) error

WriteFile writes data to a file to storage.

type GCSWriter

type GCSWriter struct {
	// contains filtered or unexported fields
}

GCSWriter uses XML multipart upload API to upload a single file. https://cloud.google.com/storage/docs/multipart-uploads. GCSWriter will attempt to cancel uploads that fail due to an exception. If the upload fails in a way that precludes cancellation, such as a hardware failure, process termination, or power outage, then the incomplete upload may persist indefinitely. To mitigate this, set the `AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket lifecycle rules, or refer to the XML API documentation linked above to learn more about how to list and delete individual downloads.

func NewGCSWriter

func NewGCSWriter(
	ctx context.Context,
	cli *storage.Client,
	uri string,
	partSize int64,
	parallelCnt int,
	bucketName string,
) (*GCSWriter, error)

NewGCSWriter returns a GCSWriter which uses GCS multipart upload API behind the scene.

func (*GCSWriter) Close

func (w *GCSWriter) Close() error

Close finishes the upload.

func (*GCSWriter) Write

func (w *GCSWriter) Write(p []byte) (n int, err error)

Write uploads given bytes as a part to Google Cloud Storage. Write is not concurrent safe.

type HDFSStorage

type HDFSStorage struct {
	// contains filtered or unexported fields
}

HDFSStorage represents HDFS storage.

func NewHDFSStorage

func NewHDFSStorage(remote string) *HDFSStorage

NewHDFSStorage creates a new HDFS storage.

func (*HDFSStorage) Close

func (*HDFSStorage) Close()

Close implements ExternalStorage interface.

func (*HDFSStorage) Create

Create opens a file writer by path. path is relative path to storage base path

func (*HDFSStorage) DeleteFile

func (*HDFSStorage) DeleteFile(_ context.Context, _ string) error

DeleteFile delete the file in storage

func (*HDFSStorage) DeleteFiles

func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error

DeleteFiles deletes files in storage

func (*HDFSStorage) FileExists

func (s *HDFSStorage) FileExists(_ context.Context, name string) (bool, error)

FileExists return true if file exists

func (*HDFSStorage) Open

Open a Reader by file path. path is relative path to storage base path

func (*HDFSStorage) ReadFile

func (*HDFSStorage) ReadFile(_ context.Context, _ string) ([]byte, error)

ReadFile reads a complete file from storage, similar to os.ReadFile

func (*HDFSStorage) Rename

func (*HDFSStorage) Rename(_ context.Context, _, _ string) error

Rename a file name from oldFileName to newFileName.

func (*HDFSStorage) URI

func (s *HDFSStorage) URI() string

URI returns the base path as a URI

func (*HDFSStorage) WalkDir

func (*HDFSStorage) WalkDir(_ context.Context, _ *WalkOption, _ func(path string, size int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The argument `path` is the file path that can be used in `Open` function; the argument `size` is the size in byte of the file determined by path.

func (*HDFSStorage) WriteFile

func (s *HDFSStorage) WriteFile(_ context.Context, name string, data []byte) error

WriteFile writes a complete file to storage, similar to os.WriteFile

type InitiateMultipartUploadResult

type InitiateMultipartUploadResult struct {
	XMLName  xml.Name `xml:"InitiateMultipartUploadResult"`
	Text     string   `xml:",chardata"`
	Xmlns    string   `xml:"xmlns,attr"`
	Bucket   string   `xml:"Bucket"`
	Key      string   `xml:"Key"`
	UploadId string   `xml:"UploadId"`
}

type KS3Storage

type KS3Storage struct {
	// contains filtered or unexported fields
}

KS3Storage acts almost same as S3Storage except it's used for kingsoft s3.

func NewKS3Storage

func NewKS3Storage(
	ctx context.Context,
	backend *backuppb.S3,
	opts *ExternalStorageOptions,
) (obj *KS3Storage, errRet error)

NewKS3Storage initialize a new s3 storage for metadata.

func (*KS3Storage) Close

func (*KS3Storage) Close()

Close implements ExternalStorage interface.

func (*KS3Storage) Create

func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)

Create creates multi upload request.

func (*KS3Storage) DeleteFile

func (rs *KS3Storage) DeleteFile(ctx context.Context, file string) error

DeleteFile delete the file in s3 storage

func (*KS3Storage) DeleteFiles

func (rs *KS3Storage) DeleteFiles(ctx context.Context, files []string) error

DeleteFiles delete the files in batch in s3 storage.

func (*KS3Storage) FileExists

func (rs *KS3Storage) FileExists(ctx context.Context, file string) (bool, error)

FileExists check if file exists on s3 storage.

func (*KS3Storage) Open

Open a Reader by file path.

func (*KS3Storage) ReadFile

func (rs *KS3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*KS3Storage) Rename

func (rs *KS3Storage) Rename(ctx context.Context, oldFileName, newFileName string) error

Rename implements ExternalStorage interface.

func (*KS3Storage) URI

func (rs *KS3Storage) URI() string

URI returns ks3://<base>/<prefix>.

func (*KS3Storage) WalkDir

func (rs *KS3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*KS3Storage) WriteFile

func (rs *KS3Storage) WriteFile(ctx context.Context, file string, data []byte) error

WriteFile writes data to a file to storage.

type KS3Uploader

type KS3Uploader struct {
	// contains filtered or unexported fields
}

KS3Uploader does multi-part upload to s3.

func (*KS3Uploader) Close

func (u *KS3Uploader) Close(ctx context.Context) error

Close complete multi upload request.

func (*KS3Uploader) Write

func (u *KS3Uploader) Write(ctx context.Context, data []byte) (int, error)

UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, and call CompleteMultipartUpload to finish it.

type LocalStorage

type LocalStorage struct {

	// Whether ignoring ENOINT while deleting.
	// Don't fail when deleting an unexist file is more like
	// a normal ExternalStorage implementation does.
	IgnoreEnoentForDelete bool
	// contains filtered or unexported fields
}

LocalStorage represents local file system storage.

export for using in tests.

func NewLocalStorage

func NewLocalStorage(base string) (*LocalStorage, error)

NewLocalStorage return a LocalStorage at directory `base`.

export for test.

func (*LocalStorage) Base

func (l *LocalStorage) Base() string

Base returns the base dir used by this local storage.

func (*LocalStorage) Close

func (*LocalStorage) Close()

Close implements ExternalStorage interface.

func (*LocalStorage) Create

Create implements ExternalStorage interface.

func (*LocalStorage) DeleteFile

func (l *LocalStorage) DeleteFile(_ context.Context, name string) error

DeleteFile deletes the file.

func (*LocalStorage) DeleteFiles

func (l *LocalStorage) DeleteFiles(ctx context.Context, names []string) error

DeleteFiles deletes the files.

func (*LocalStorage) FileExists

func (l *LocalStorage) FileExists(_ context.Context, name string) (bool, error)

FileExists implement ExternalStorage.FileExists.

func (*LocalStorage) Open

Open a Reader by file path, path is a relative path to base path.

func (*LocalStorage) ReadFile

func (l *LocalStorage) ReadFile(_ context.Context, name string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*LocalStorage) Rename

func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string) error

Rename implements ExternalStorage interface.

func (*LocalStorage) URI

func (l *LocalStorage) URI() string

URI returns the base path as an URI with a file:/// prefix.

func (*LocalStorage) WalkDir

func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*LocalStorage) WriteFile

func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) error

WriteFile writes data to a file to storage.

type LockMeta

type LockMeta struct {
	LockedAt   time.Time `json:"locked_at"`
	LockerHost string    `json:"locker_host"`
	LockerPID  int       `json:"locker_pid"`
	TxnID      []byte    `json:"txn_id"`
	Hint       string    `json:"hint"`
}

LockMeta is the meta information of a lock.

func MakeLockMeta

func MakeLockMeta(hint string) LockMeta

MakeLockMeta creates a LockMeta by the current node's metadata. Including current time and hostname, etc..

func (LockMeta) String

func (l LockMeta) String() string

type MemStorage

type MemStorage struct {
	// contains filtered or unexported fields
}

MemStorage represents a in-memory storage.

func NewMemStorage

func NewMemStorage() *MemStorage

NewMemStorage creates a new in-memory storage.

func (*MemStorage) Close

func (s *MemStorage) Close()

Close implements ExternalStorage interface.

func (*MemStorage) Create

Create creates a file and returning a writer to write data into. When the writer is closed, the data is stored in the file. It implements the `ExternalStorage` interface

func (*MemStorage) DeleteFile

func (s *MemStorage) DeleteFile(ctx context.Context, name string) error

DeleteFile delete the file in storage It implements the `ExternalStorage` interface

func (*MemStorage) DeleteFiles

func (s *MemStorage) DeleteFiles(ctx context.Context, names []string) error

DeleteFiles delete the files in storage It implements the `ExternalStorage` interface

func (*MemStorage) FileExists

func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error)

FileExists return true if file exists. It implements the `ExternalStorage` interface

func (*MemStorage) Open

func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (ExternalFileReader, error)

Open opens a Reader by file path. It implements the `ExternalStorage` interface

func (*MemStorage) ReadFile

func (s *MemStorage) ReadFile(ctx context.Context, name string) ([]byte, error)

ReadFile reads the storage file. It implements the `ExternalStorage` interface

func (*MemStorage) Rename

func (s *MemStorage) Rename(ctx context.Context, oldFileName, newFileName string) error

Rename renames a file name to another file name. It implements the `ExternalStorage` interface

func (*MemStorage) URI

func (*MemStorage) URI() string

URI returns the URI of the storage.

func (*MemStorage) WalkDir

func (s *MemStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir. It implements the `ExternalStorage` interface

func (*MemStorage) WriteFile

func (s *MemStorage) WriteFile(ctx context.Context, name string, data []byte) error

WriteFile file to storage. It implements the `ExternalStorage` interface

type NoopWriter

type NoopWriter struct{}

NoopWriter is a writer that does nothing.

func (NoopWriter) Close

func (NoopWriter) Close(_ context.Context) error

func (NoopWriter) Write

func (NoopWriter) Write(_ context.Context, p []byte) (int, error)

type Part

type Part struct {
	Text       string `xml:",chardata"`
	PartNumber int    `xml:"PartNumber"`
	ETag       string `xml:"ETag"`
}

type Permission

type Permission string

Permission represents the permission we need to check in create storage.

type RangeInfo

type RangeInfo struct {
	// Start is the absolute position of the first byte of the byte range,
	// starting from 0.
	Start int64
	// End is the absolute position of the last byte of the byte range. This end
	// offset is inclusive, e.g. if the Size is 1000, the maximum value of End
	// would be 999.
	End int64
	// Size is the total size of the original file.
	Size int64
}

RangeInfo represents the an HTTP Content-Range header value of the form `bytes [Start]-[End]/[Size]`.

func ParseRangeInfo

func ParseRangeInfo(info *string) (ri RangeInfo, err error)

ParseRangeInfo parses the Content-Range header and returns the offsets.

type ReadSeekCloser

type ReadSeekCloser interface {
	io.Reader
	io.Seeker
	io.Closer
}

ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.

type ReaderOption

type ReaderOption struct {
	// StartOffset is inclusive. And it's incompatible with Seek.
	StartOffset *int64
	// EndOffset is exclusive. And it's incompatible with Seek.
	EndOffset *int64
	// PrefetchSize will switch to NewPrefetchReader if value is positive.
	PrefetchSize int
}

type RemoteLock

type RemoteLock struct {
	// contains filtered or unexported fields
}

func TryLockRemote

func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)

TryLockRemote tries to create a "lock file" at the external storage. If success, we will create a file at the path provided. So others may not access the file then. Will return a `ErrLocked` if there is another process already creates the lock file. This isn't a strict lock like flock in linux: that means, the lock might be forced removed by manually deleting the "lock file" in external storage.

func TryLockRemoteRead

func TryLockRemoteRead(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)

func TryLockRemoteWrite

func TryLockRemoteWrite(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)

func (RemoteLock) Unlock

func (l RemoteLock) Unlock(ctx context.Context) error

UnlockRemote removes the lock file at the specified path. Removing that file will release the lock.

type S3BackendOptions

type S3BackendOptions struct {
	Endpoint              string `json:"endpoint" toml:"endpoint"`
	Region                string `json:"region" toml:"region"`
	StorageClass          string `json:"storage-class" toml:"storage-class"`
	Sse                   string `json:"sse" toml:"sse"`
	SseKmsKeyID           string `json:"sse-kms-key-id" toml:"sse-kms-key-id"`
	ACL                   string `json:"acl" toml:"acl"`
	AccessKey             string `json:"access-key" toml:"access-key"`
	SecretAccessKey       string `json:"secret-access-key" toml:"secret-access-key"`
	SessionToken          string `json:"session-token" toml:"session-token"`
	Provider              string `json:"provider" toml:"provider"`
	ForcePathStyle        bool   `json:"force-path-style" toml:"force-path-style"`
	UseAccelerateEndpoint bool   `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"`
	RoleARN               string `json:"role-arn" toml:"role-arn"`
	ExternalID            string `json:"external-id" toml:"external-id"`
	ObjectLockEnabled     bool   `json:"object-lock-enabled" toml:"object-lock-enabled"`
}

S3BackendOptions contains options for s3 storage.

func (*S3BackendOptions) Apply

func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error

Apply apply s3 options on backuppb.S3.

type S3Storage

type S3Storage struct {
	// contains filtered or unexported fields
}

S3Storage defines some standard operations for BR/Lightning on the S3 storage. It implements the `ExternalStorage` interface.

func NewS3Storage

func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error)

NewS3Storage initialize a new s3 storage for metadata.

func NewS3StorageForTest

func NewS3StorageForTest(svc s3iface.S3API, options *backuppb.S3) *S3Storage

NewS3StorageForTest creates a new S3Storage for testing only.

func (*S3Storage) Close

func (*S3Storage) Close()

Close implements ExternalStorage interface.

func (*S3Storage) Create

func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)

Create creates multi upload request.

func (*S3Storage) DeleteFile

func (rs *S3Storage) DeleteFile(ctx context.Context, file string) error

DeleteFile delete the file in s3 storage

func (*S3Storage) DeleteFiles

func (rs *S3Storage) DeleteFiles(ctx context.Context, files []string) error

DeleteFiles delete the files in batch in s3 storage.

func (*S3Storage) FileExists

func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)

FileExists check if file exists on s3 storage.

func (*S3Storage) GetOptions

func (rs *S3Storage) GetOptions() *backuppb.S3

GetOptions gets the external storage operations for the S3.

func (*S3Storage) GetS3APIHandle

func (rs *S3Storage) GetS3APIHandle() s3iface.S3API

GetS3APIHandle gets the handle to the S3 API.

func (*S3Storage) IsObjectLockEnabled

func (rs *S3Storage) IsObjectLockEnabled() bool

func (*S3Storage) MarkStrongConsistency

func (*S3Storage) MarkStrongConsistency()

func (*S3Storage) Open

Open a Reader by file path.

func (*S3Storage) ReadFile

func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)

ReadFile reads the file from the storage and returns the contents.

func (*S3Storage) Rename

func (rs *S3Storage) Rename(ctx context.Context, oldFileName, newFileName string) error

Rename implements ExternalStorage interface.

func (*S3Storage) URI

func (rs *S3Storage) URI() string

URI returns s3://<base>/<prefix>.

func (*S3Storage) WalkDir

func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error

WalkDir traverse all the files in a dir.

fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.

func (*S3Storage) WriteFile

func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error

WriteFile writes data to a file to storage.

type S3Uploader

type S3Uploader struct {
	// contains filtered or unexported fields
}

S3Uploader does multi-part upload to s3.

func (*S3Uploader) Close

func (u *S3Uploader) Close(ctx context.Context) error

Close complete multi upload request.

func (*S3Uploader) Write

func (u *S3Uploader) Write(ctx context.Context, data []byte) (int, error)

UploadPart update partial data to s3, we should call CreateMultipartUpload to start it, and call CompleteMultipartUpload to finish it.

type StrongConsisency

type StrongConsisency interface {
	MarkStrongConsistency()
}

StrongConsistency is a marker interface that indicates the storage is strong consistent over its `Read`, `Write` and `WalkDir` APIs.

type Uploader

type Uploader interface {
	// UploadPart upload part of file data to storage
	UploadPart(ctx context.Context, data []byte) error
	// CompleteUpload make the upload data to a complete file
	CompleteUpload(ctx context.Context) error
}

Uploader upload file with chunks.

type VerifyWriteContext

type VerifyWriteContext struct {
	context.Context
	Target  string
	Storage ExternalStorage
	TxnID   uuid.UUID
}

func (*VerifyWriteContext) IntentFileName

func (cx *VerifyWriteContext) IntentFileName() string

type WalkOption

type WalkOption struct {
	// walk on SubDir of base directory, i.e. if the base dir is '/path/to/base'
	// then we're walking '/path/to/base/<SubDir>'
	SubDir string
	// whether subdirectory under the walk dir is skipped, only works for LOCAL storage now.
	// default is false, i.e. we walk recursively.
	SkipSubDir bool
	// ObjPrefix used fo prefix search in storage. Note that only part of storage
	// support it.
	// It can save lots of time when we want find specify prefix objects in storage.
	// For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files.
	// we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly.
	ObjPrefix string
	// ListCount is the number of entries per page.
	//
	// In cloud storages such as S3 and GCS, the files listed and sent in pages.
	// Typically a page contains 1000 files, and if a folder has 3000 descendant
	// files, one would need 3 requests to retrieve all of them. This parameter
	// controls this size. Note that both S3 and GCS limits the maximum to 1000.
	//
	// Typically you want to leave this field unassigned (zero) to use the
	// default value (1000) to minimize the number of requests, unless you want
	// to reduce the possibility of timeout on an extremely slow connection, or
	// perform testing.
	ListCount int64
	// IncludeTombstone will allow `Walk` to emit removed files during walking.
	//
	// In most cases, `Walk` runs over a snapshot, if a file in the snapshot
	// was deleted during walking, the file will be ignored. Set this to `true`
	// will make them be sent to the callback.
	//
	// The size of a deleted file should be `TombstoneSize`.
	IncludeTombstone bool
}

WalkOption is the option of storage.WalkDir.

type Writer

type Writer interface {
	// Write writes to buffer and if chunk is filled will upload it
	Write(ctx context.Context, p []byte) (int, error)
	// Close writes final chunk and completes the upload
	Close(ctx context.Context) error
}

Writer is like io.Writer but with Context, create a new writer on top of Uploader with NewUploaderWriter.

type WriterOption

type WriterOption struct {
	Concurrency int
	PartSize    int64
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL