Documentation ¶
Index ¶
- Constants
- Variables
- func CloneDefaultHttpTransport() (*http.Transport, bool)
- func DefineFlags(flags *pflag.FlagSet)
- func ExtractQueryParameters(u *url.URL, options any)
- func FormatBackendURL(backend *backuppb.StorageBackend) (u url.URL)
- func GetActiveUploadWorkerCount() int64
- func GetDefaultHttpClient(concurrency uint) *http.Client
- func HiddenFlagsForStream(flags *pflag.FlagSet)
- func IsLocal(u *url.URL) bool
- func IsLocalPath(p string) (bool, error)
- func JSONEffects(es []Effect, output io.Writer) error
- func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error)
- func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error)
- func ParseRawURL(rawURL string) (*url.URL, error)
- func PutAndDeleteObjectCheck(ctx context.Context, svc s3iface.S3API, options *backuppb.S3) (err error)
- func ReadDataInRange(ctx context.Context, storage ExternalStorage, name string, start int64, ...) (n int, err error)
- func SaveJSONEffectsToTmp(es []Effect) (string, error)
- func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, ...) iter.TryNextor[*T]
- func ValidateCloudStorageURI(ctx context.Context, uri string) error
- type AzblobBackendOptions
- type AzureBlobStorage
- func (*AzureBlobStorage) Close()
- func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
- func (s *AzureBlobStorage) DeleteFile(ctx context.Context, name string) error
- func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) error
- func (s *AzureBlobStorage) FileExists(ctx context.Context, name string) (bool, error)
- func (*AzureBlobStorage) MarkStrongConsistency()
- func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (ExternalFileReader, error)
- func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error)
- func (s *AzureBlobStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
- func (s *AzureBlobStorage) URI() string
- func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error
- func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []byte) error
- type BackendOptions
- type Batched
- func (d *Batched) CleanEffects()
- func (d *Batched) Commit(ctx context.Context) error
- func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
- func (d *Batched) DeleteFile(ctx context.Context, name string) error
- func (d *Batched) DeleteFiles(ctx context.Context, names []string) error
- func (d *Batched) ReadOnlyEffects() []Effect
- func (d *Batched) Rename(ctx context.Context, oldName, newName string) error
- func (d *Batched) WriteFile(ctx context.Context, name string, data []byte) error
- type BytesWriter
- type ClientBuilder
- type CompleteMultipartUpload
- type CompressType
- type Copier
- type CopySpec
- type DecompressConfig
- type EffDeleteFile
- type EffDeleteFiles
- type EffPut
- type EffRename
- type Effect
- type ErrLocked
- type ExternalFileReader
- type ExternalFileWriter
- type ExternalStorage
- func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error)
- func New(ctx context.Context, backend *backuppb.StorageBackend, ...) (ExternalStorage, error)
- func NewFromURL(ctx context.Context, uri string) (ExternalStorage, error)
- func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (ExternalStorage, error)
- func WithCompression(inner ExternalStorage, compressionType CompressType, cfg DecompressConfig) ExternalStorage
- type ExternalStorageOptions
- type GCSBackendOptions
- type GCSStorage
- func (s *GCSStorage) Close()
- func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (ExternalFileWriter, error)
- func (s *GCSStorage) DeleteFile(ctx context.Context, name string) error
- func (s *GCSStorage) DeleteFiles(ctx context.Context, names []string) error
- func (s *GCSStorage) FileExists(ctx context.Context, name string) (bool, error)
- func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle
- func (s *GCSStorage) GetOptions() *backuppb.GCS
- func (s *GCSStorage) MarkStrongConsistency()
- func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
- func (s *GCSStorage) ReadFile(ctx context.Context, name string) ([]byte, error)
- func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
- func (s *GCSStorage) Reset(ctx context.Context) error
- func (s *GCSStorage) URI() string
- func (s *GCSStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
- func (s *GCSStorage) WriteFile(ctx context.Context, name string, data []byte) error
- type GCSWriter
- type HDFSStorage
- func (*HDFSStorage) Close()
- func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error)
- func (*HDFSStorage) DeleteFile(_ context.Context, _ string) error
- func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error
- func (s *HDFSStorage) FileExists(_ context.Context, name string) (bool, error)
- func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (ExternalFileReader, error)
- func (*HDFSStorage) ReadFile(_ context.Context, _ string) ([]byte, error)
- func (*HDFSStorage) Rename(_ context.Context, _, _ string) error
- func (s *HDFSStorage) URI() string
- func (*HDFSStorage) WalkDir(_ context.Context, _ *WalkOption, _ func(path string, size int64) error) error
- func (s *HDFSStorage) WriteFile(_ context.Context, name string, data []byte) error
- type InitiateMultipartUploadResult
- type KS3Storage
- func (*KS3Storage) Close()
- func (rs *KS3Storage) CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error
- func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)
- func (rs *KS3Storage) DeleteFile(ctx context.Context, file string) error
- func (rs *KS3Storage) DeleteFiles(ctx context.Context, files []string) error
- func (rs *KS3Storage) FileExists(ctx context.Context, file string) (bool, error)
- func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
- func (rs *KS3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)
- func (rs *KS3Storage) Rename(ctx context.Context, oldFileName, newFileName string) error
- func (rs *KS3Storage) URI() string
- func (rs *KS3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
- func (rs *KS3Storage) WriteFile(ctx context.Context, file string, data []byte) error
- type KS3Uploader
- type LocalStorage
- func (l *LocalStorage) Base() string
- func (*LocalStorage) Close()
- func (l *LocalStorage) CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error
- func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
- func (l *LocalStorage) DeleteFile(_ context.Context, name string) error
- func (l *LocalStorage) DeleteFiles(ctx context.Context, names []string) error
- func (l *LocalStorage) FileExists(_ context.Context, name string) (bool, error)
- func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
- func (l *LocalStorage) ReadFile(_ context.Context, name string) ([]byte, error)
- func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string) error
- func (l *LocalStorage) URI() string
- func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(string, int64) error) error
- func (l *LocalStorage) WriteFile(_ context.Context, name string, data []byte) error
- type LockMeta
- type Locker
- type MemStorage
- func (s *MemStorage) Close()
- func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
- func (s *MemStorage) DeleteFile(ctx context.Context, name string) error
- func (s *MemStorage) DeleteFiles(ctx context.Context, names []string) error
- func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error)
- func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (ExternalFileReader, error)
- func (s *MemStorage) ReadFile(ctx context.Context, name string) ([]byte, error)
- func (s *MemStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
- func (*MemStorage) URI() string
- func (s *MemStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
- func (s *MemStorage) WriteFile(ctx context.Context, name string, data []byte) error
- type NoopWriter
- type Part
- type Permission
- type RangeInfo
- type ReadSeekCloser
- type ReaderOption
- type RemoteLock
- func LockWith(ctx context.Context, locker Locker, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
- func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
- func TryLockRemoteRead(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
- func TryLockRemoteWrite(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
- type S3BackendOptions
- type S3Storage
- func (*S3Storage) Close()
- func (rs *S3Storage) CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error
- func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)
- func (rs *S3Storage) DeleteFile(ctx context.Context, file string) error
- func (rs *S3Storage) DeleteFiles(ctx context.Context, files []string) error
- func (rs *S3Storage) FileExists(ctx context.Context, file string) (bool, error)
- func (rs *S3Storage) GetOptions() *backuppb.S3
- func (rs *S3Storage) GetS3APIHandle() s3iface.S3API
- func (rs *S3Storage) IsObjectLockEnabled() bool
- func (*S3Storage) MarkStrongConsistency()
- func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
- func (rs *S3Storage) ReadFile(ctx context.Context, file string) ([]byte, error)
- func (rs *S3Storage) Rename(ctx context.Context, oldFileName, newFileName string) error
- func (rs *S3Storage) URI() string
- func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
- func (rs *S3Storage) WriteFile(ctx context.Context, file string, data []byte) error
- type S3Uploader
- type StrongConsisency
- type Uploader
- type VerifyWriteContext
- type WalkOption
- type Writer
- type WriterOption
Constants ¶
const ( // AccessBuckets represents bucket access permission // it replace the origin skip-check-path. AccessBuckets Permission = "AccessBucket" // ListObjects represents listObjects permission ListObjects Permission = "ListObjects" // GetObject represents GetObject permission GetObject Permission = "GetObject" // PutObject represents PutObject permission PutObject Permission = "PutObject" // PutAndDeleteObject represents PutAndDeleteObject permission // we cannot check DeleteObject permission alone, so we use PutAndDeleteObject instead. PutAndDeleteObject Permission = "PutAndDeleteObject" DefaultRequestConcurrency uint = 128 TombstoneSize int64 = -1 )
const (
// LocalURIPrefix represents the local storage prefix.
LocalURIPrefix = "file://"
)
Variables ¶
var WriteBufferSize = 5 * 1024 * 1024
WriteBufferSize is the size of the buffer used for writing. (64K may be a better choice)
Functions ¶
func DefineFlags ¶
DefineFlags adds flags to the flag set corresponding to all backend options.
func ExtractQueryParameters ¶
ExtractQueryParameters moves the query parameters of the URL into the options using reflection.
The options must be a pointer to a struct which contains only string or bool fields (more types will be supported in the future), and tagged for JSON serialization.
All of the URL's query parameters will be removed after calling this method.
func FormatBackendURL ¶
func FormatBackendURL(backend *backuppb.StorageBackend) (u url.URL)
FormatBackendURL obtains the raw URL which can be used the reconstruct the backend. The returned URL does not contain options for further configurating the backend. This is to avoid exposing secret tokens.
func GetActiveUploadWorkerCount ¶
func GetActiveUploadWorkerCount() int64
GetActiveUploadWorkerCount returns the active upload worker count.
func GetDefaultHttpClient ¶
Different from `http.DefaultTransport`, set the `MaxIdleConns` and `MaxIdleConnsPerHost` to the actual request concurrency to reuse tcp connection as much as possible.
func HiddenFlagsForStream ¶
HiddenFlagsForStream hidden flags for stream cmd.
func IsLocalPath ¶
IsLocalPath returns true if the path is a local file path.
func JSONEffects ¶
JSONEffects converts a slices of effects into json. The json will be a tagged union: `{"type": $go_type_name, "effect": $effect}`
func ParseBackend ¶
func ParseBackend(rawURL string, options *BackendOptions) (*backuppb.StorageBackend, error)
ParseBackend constructs a structured backend description from the storage URL.
func ParseBackendFromURL ¶
func ParseBackendFromURL(u *url.URL, options *BackendOptions) (*backuppb.StorageBackend, error)
ParseBackendFromURL constructs a structured backend description from the *url.URL.
func ParseRawURL ¶
ParseRawURL parse raw url to url object.
func PutAndDeleteObjectCheck ¶
func PutAndDeleteObjectCheck(ctx context.Context, svc s3iface.S3API, options *backuppb.S3) (err error)
PutAndDeleteObjectCheck checks the permission of putObject S3 API doesn't provide a way to check the permission, we have to put an object to check the permission. exported for testing.
func ReadDataInRange ¶
func ReadDataInRange( ctx context.Context, storage ExternalStorage, name string, start int64, p []byte, ) (n int, err error)
ReadDataInRange reads data from storage in range [start, start+len(p)).
func SaveJSONEffectsToTmp ¶
func UnmarshalDir ¶
func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalStorage, unmarshal func(target *T, name string, content []byte) error) iter.TryNextor[*T]
UnmarshalDir iterates over a prefix, then "unmarshal" the content of each file it met with the unmarshal function. Returning an iterator that yields the unmarshaled content. The "unmarshal" function should put the result of unmarshalling to the `target` argument.
Types ¶
type AzblobBackendOptions ¶
type AzblobBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` AccountName string `json:"account-name" toml:"account-name"` AccountKey string `json:"account-key" toml:"account-key"` AccessTier string `json:"access-tier" toml:"access-tier"` SASToken string `json:"sas-token" toml:"sas-token"` EncryptionScope string `json:"encryption-scope" toml:"encryption-scope"` EncryptionKey string `json:"encryption-key" toml:"encryption-key"` }
AzblobBackendOptions is the options for Azure Blob storage.
type AzureBlobStorage ¶
type AzureBlobStorage struct {
// contains filtered or unexported fields
}
AzureBlobStorage is a storage engine that stores data in Azure Blob Storage.
func (*AzureBlobStorage) Close ¶
func (*AzureBlobStorage) Close()
Close implements the ExternalStorage interface.
func (*AzureBlobStorage) Create ¶
func (s *AzureBlobStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
Create implements the StorageWriter interface.
func (*AzureBlobStorage) DeleteFile ¶
func (s *AzureBlobStorage) DeleteFile(ctx context.Context, name string) error
DeleteFile deletes the file with the given name.
func (*AzureBlobStorage) DeleteFiles ¶
func (s *AzureBlobStorage) DeleteFiles(ctx context.Context, names []string) error
DeleteFile deletes the files with the given names.
func (*AzureBlobStorage) FileExists ¶
FileExists checks if a file exists in Azure Blob Storage.
func (*AzureBlobStorage) MarkStrongConsistency ¶
func (*AzureBlobStorage) MarkStrongConsistency()
func (*AzureBlobStorage) Open ¶
func (s *AzureBlobStorage) Open(ctx context.Context, name string, o *ReaderOption) (ExternalFileReader, error)
Open implements the StorageReader interface.
func (*AzureBlobStorage) Rename ¶
func (s *AzureBlobStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
Rename implements the StorageWriter interface.
func (*AzureBlobStorage) URI ¶
func (s *AzureBlobStorage) URI() string
URI implements the StorageReader interface.
func (*AzureBlobStorage) WalkDir ¶
func (s *AzureBlobStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error
WalkDir implements the StorageReader interface.
type BackendOptions ¶
type BackendOptions struct { S3 S3BackendOptions `json:"s3" toml:"s3"` GCS GCSBackendOptions `json:"gcs" toml:"gcs"` Azblob AzblobBackendOptions `json:"azblob" toml:"azblob"` }
BackendOptions further configures the storage backend not expressed by the storage URL.
func (*BackendOptions) ParseFromFlags ¶
func (options *BackendOptions) ParseFromFlags(flags *pflag.FlagSet) error
ParseFromFlags obtains the backend options from the flag set.
type Batched ¶
type Batched struct { ExternalStorage // contains filtered or unexported fields }
Batched is a wrapper of an external storage that suspends all write operations ("effects"). If `Close()` without calling `Commit()`, nothing will happen in the underlying external storage. In that case, we have done a "dry run".
You may use `ReadOnlyEffects()` to get the history of the effects. But don't modify the returned slice!
You may use `Commit()` to execute all suspended effects.
func Batch ¶
func Batch(s ExternalStorage) *Batched
Batch wraps an external storage instance to a batched version.
func (*Batched) CleanEffects ¶
func (d *Batched) CleanEffects()
CleanEffects cleans all suspended effects.
func (*Batched) Commit ¶
Commit performs all effects recorded so long in the REAL external storage. This will cleanup all of the suspended effects.
func (*Batched) Create ¶
func (d *Batched) Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error)
func (*Batched) DeleteFiles ¶
func (*Batched) ReadOnlyEffects ¶
Fetch all effects from the batched storage.
**The returned slice should not be modified.**
type BytesWriter ¶
type BytesWriter struct {
// contains filtered or unexported fields
}
BytesWriter is a Writer implementation on top of bytes.Buffer that is useful for testing.
func NewBufferWriter ¶
func NewBufferWriter() *BytesWriter
NewBufferWriter creates a Writer that simply writes to a buffer (useful for testing).
func (*BytesWriter) Close ¶
func (*BytesWriter) Close(_ context.Context) error
Close delegates to bytes.Buffer.
func (*BytesWriter) String ¶
func (u *BytesWriter) String() string
String delegates to bytes.Buffer.
type ClientBuilder ¶
type ClientBuilder interface { // Example of serviceURL: https://<your_storage_account>.blob.core.windows.net GetServiceClient() (*azblob.Client, error) GetAccountName() string }
ClientBuilder provides common method to build a service client.
type CompleteMultipartUpload ¶
type CompressType ¶
type CompressType uint8
CompressType represents the type of compression.
const ( // NoCompression won't compress given bytes. NoCompression CompressType = iota // Gzip will compress given bytes in gzip format. Gzip // Snappy will compress given bytes in snappy format. Snappy // Zstd will compress given bytes in zstd format. Zstd )
type Copier ¶
type Copier interface { // CopyFrom copies a object to the current external storage by the specification. CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error }
type DecompressConfig ¶
type DecompressConfig struct { // ZStdDecodeConcurrency only used for ZStd decompress, see WithDecoderConcurrency. // if not 1, ZStd will decode file asynchronously. ZStdDecodeConcurrency int }
DecompressConfig is the config used for decompression.
type EffDeleteFile ¶
type EffDeleteFile string
EffDeleteFile is the side effect of a call to `DeleteFile`.
type EffDeleteFiles ¶
type EffDeleteFiles struct {
Files []string `json:"files"`
}
EffDeleteFiles is the side effect of a call to `DeleteFiles`.
type ErrLocked ¶
type ErrLocked struct {
Meta LockMeta
}
ErrLocked is the error returned when the lock is held by others.
type ExternalFileReader ¶
type ExternalFileReader interface { io.ReadSeekCloser // GetFileSize returns the file size. GetFileSize() (int64, error) }
ExternalFileReader represents the streaming external file reader.
func InterceptDecompressReader ¶
func InterceptDecompressReader( fileReader ExternalFileReader, compressType CompressType, cfg DecompressConfig, ) (ExternalFileReader, error)
InterceptDecompressReader intercepts the reader and wraps it with a decompress reader on the given ExternalFileReader. Note that the returned ExternalFileReader does not have the property that Seek(0, io.SeekCurrent) equals total bytes Read() if the decompress reader is used.
func NewLimitedInterceptReader ¶
func NewLimitedInterceptReader( fileReader ExternalFileReader, compressType CompressType, cfg DecompressConfig, n int64, ) (ExternalFileReader, error)
type ExternalFileWriter ¶
type ExternalFileWriter interface { // Write writes to buffer and if chunk is filled will upload it Write(ctx context.Context, p []byte) (int, error) // Close writes final chunk and completes the upload Close(ctx context.Context) error }
ExternalFileWriter represents the streaming external file writer.
func NewUploaderWriter ¶
func NewUploaderWriter(writer ExternalFileWriter, chunkSize int, compressType CompressType) ExternalFileWriter
NewUploaderWriter wraps the Writer interface over an uploader.
type ExternalStorage ¶
type ExternalStorage interface { // WriteFile writes a complete file to storage, similar to os.WriteFile, but WriteFile should be atomic WriteFile(ctx context.Context, name string, data []byte) error // ReadFile reads a complete file from storage, similar to os.ReadFile ReadFile(ctx context.Context, name string) ([]byte, error) // FileExists return true if file exists FileExists(ctx context.Context, name string) (bool, error) // DeleteFile delete the file in storage DeleteFile(ctx context.Context, name string) error // Open a Reader by file path. path is relative path to storage base path. // Some implementation will use the given ctx as the inner context of the reader. Open(ctx context.Context, path string, option *ReaderOption) (ExternalFileReader, error) // DeleteFiles delete the files in storage DeleteFiles(ctx context.Context, names []string) error // WalkDir traverse all the files in a dir. // // fn is the function called for each regular file visited by WalkDir. // The argument `path` is the file path that can be used in `Open` // function; the argument `size` is the size in byte of the file determined // by path. WalkDir(ctx context.Context, opt *WalkOption, fn func(path string, size int64) error) error // URI returns the base path as a URI URI() string // Create opens a file writer by path. path is relative path to storage base // path. The old file under same path will be overwritten. Currently only s3 // implemented WriterOption. Create(ctx context.Context, path string, option *WriterOption) (ExternalFileWriter, error) // Rename file name from oldFileName to newFileName Rename(ctx context.Context, oldFileName, newFileName string) error // Close release the resources of the storage. Close() }
ExternalStorage represents a kind of file system storage.
func Create ¶
func Create(ctx context.Context, backend *backuppb.StorageBackend, sendCreds bool) (ExternalStorage, error)
Create creates ExternalStorage.
Please consider using `New` in the future.
func New ¶
func New(ctx context.Context, backend *backuppb.StorageBackend, opts *ExternalStorageOptions) (ExternalStorage, error)
New creates an ExternalStorage with options.
func NewFromURL ¶
func NewFromURL(ctx context.Context, uri string) (ExternalStorage, error)
NewFromURL creates an ExternalStorage from URL.
func NewWithDefaultOpt ¶
func NewWithDefaultOpt(ctx context.Context, backend *backuppb.StorageBackend) (ExternalStorage, error)
NewWithDefaultOpt creates ExternalStorage with default options.
func WithCompression ¶
func WithCompression(inner ExternalStorage, compressionType CompressType, cfg DecompressConfig) ExternalStorage
WithCompression returns an ExternalStorage with compress option
type ExternalStorageOptions ¶
type ExternalStorageOptions struct { // SendCredentials marks whether to send credentials downstream. // // This field should be set to false if the credentials are provided to // downstream via external key managers, e.g. on K8s or cloud provider. SendCredentials bool // NoCredentials means that no cloud credentials are supplied to BR NoCredentials bool // HTTPClient to use. The created storage may ignore this field if it is not // directly using HTTP (e.g. the local storage). // NOTICE: the HTTPClient is only used by s3/azure/gcs. // For GCS, we will use this as base client to init a client with credentials. HTTPClient *http.Client // CheckPermissions check the given permission in New() function. // make sure we can access the storage correctly before execute tasks. CheckPermissions []Permission // S3Retryer is the retryer for create s3 storage, if it is nil, // defaultS3Retryer() will be used. S3Retryer request.Retryer // CheckObjectLockOptions check the s3 bucket has enabled the ObjectLock. // if enabled. it will send the options to tikv. CheckS3ObjectLockOptions bool }
ExternalStorageOptions are backend-independent options provided to New.
type GCSBackendOptions ¶
type GCSBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` StorageClass string `json:"storage-class" toml:"storage-class"` PredefinedACL string `json:"predefined-acl" toml:"predefined-acl"` CredentialsFile string `json:"credentials-file" toml:"credentials-file"` }
GCSBackendOptions are options for configuration the GCS storage.
type GCSStorage ¶
type GCSStorage struct {
// contains filtered or unexported fields
}
GCSStorage defines some standard operations for BR/Lightning on the GCS storage. It implements the `ExternalStorage` interface.
func NewGCSStorage ¶
func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorageOptions) (*GCSStorage, error)
NewGCSStorage creates a GCS external storage implementation.
func (*GCSStorage) Create ¶
func (s *GCSStorage) Create(ctx context.Context, name string, wo *WriterOption) (ExternalFileWriter, error)
Create implements ExternalStorage interface.
func (*GCSStorage) DeleteFile ¶
func (s *GCSStorage) DeleteFile(ctx context.Context, name string) error
DeleteFile delete the file in storage
func (*GCSStorage) DeleteFiles ¶
func (s *GCSStorage) DeleteFiles(ctx context.Context, names []string) error
DeleteFiles delete the files in storage.
func (*GCSStorage) FileExists ¶
FileExists return true if file exists.
func (*GCSStorage) GetBucketHandle ¶
func (s *GCSStorage) GetBucketHandle() *storage.BucketHandle
GetBucketHandle gets the handle to the GCS API on the bucket.
func (*GCSStorage) GetOptions ¶
func (s *GCSStorage) GetOptions() *backuppb.GCS
GetOptions gets the external storage operations for the GCS.
func (*GCSStorage) MarkStrongConsistency ¶
func (s *GCSStorage) MarkStrongConsistency()
func (*GCSStorage) Open ¶
func (s *GCSStorage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
Open a Reader by file path.
func (*GCSStorage) Rename ¶
func (s *GCSStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
Rename file name from oldFileName to newFileName.
func (*GCSStorage) Reset ¶
func (s *GCSStorage) Reset(ctx context.Context) error
Reset resets the GCS storage.
func (*GCSStorage) URI ¶
func (s *GCSStorage) URI() string
func (*GCSStorage) WalkDir ¶
func (s *GCSStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
WalkDir traverse all the files in a dir.
fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.
type GCSWriter ¶
type GCSWriter struct {
// contains filtered or unexported fields
}
GCSWriter uses XML multipart upload API to upload a single file. https://cloud.google.com/storage/docs/multipart-uploads. GCSWriter will attempt to cancel uploads that fail due to an exception. If the upload fails in a way that precludes cancellation, such as a hardware failure, process termination, or power outage, then the incomplete upload may persist indefinitely. To mitigate this, set the `AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket lifecycle rules, or refer to the XML API documentation linked above to learn more about how to list and delete individual downloads.
type HDFSStorage ¶
type HDFSStorage struct {
// contains filtered or unexported fields
}
HDFSStorage represents HDFS storage.
func NewHDFSStorage ¶
func NewHDFSStorage(remote string) *HDFSStorage
NewHDFSStorage creates a new HDFS storage.
func (*HDFSStorage) Create ¶
func (*HDFSStorage) Create(_ context.Context, _ string, _ *WriterOption) (ExternalFileWriter, error)
Create opens a file writer by path. path is relative path to storage base path
func (*HDFSStorage) DeleteFile ¶
func (*HDFSStorage) DeleteFile(_ context.Context, _ string) error
DeleteFile delete the file in storage
func (*HDFSStorage) DeleteFiles ¶
func (*HDFSStorage) DeleteFiles(_ context.Context, _ []string) error
DeleteFiles deletes files in storage
func (*HDFSStorage) FileExists ¶
FileExists return true if file exists
func (*HDFSStorage) Open ¶
func (*HDFSStorage) Open(_ context.Context, _ string, _ *ReaderOption) (ExternalFileReader, error)
Open a Reader by file path. path is relative path to storage base path
func (*HDFSStorage) Rename ¶
func (*HDFSStorage) Rename(_ context.Context, _, _ string) error
Rename a file name from oldFileName to newFileName.
func (*HDFSStorage) WalkDir ¶
func (*HDFSStorage) WalkDir(_ context.Context, _ *WalkOption, _ func(path string, size int64) error) error
WalkDir traverse all the files in a dir.
fn is the function called for each regular file visited by WalkDir. The argument `path` is the file path that can be used in `Open` function; the argument `size` is the size in byte of the file determined by path.
type KS3Storage ¶
type KS3Storage struct {
// contains filtered or unexported fields
}
KS3Storage acts almost same as S3Storage except it's used for kingsoft s3.
func NewKS3Storage ¶
func NewKS3Storage( ctx context.Context, backend *backuppb.S3, opts *ExternalStorageOptions, ) (obj *KS3Storage, errRet error)
NewKS3Storage initialize a new s3 storage for metadata.
func (*KS3Storage) CopyFrom ¶
func (rs *KS3Storage) CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error
CopyFrom implements Copier.
func (*KS3Storage) Create ¶
func (rs *KS3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)
Create creates multi upload request.
func (*KS3Storage) DeleteFile ¶
func (rs *KS3Storage) DeleteFile(ctx context.Context, file string) error
DeleteFile delete the file in s3 storage
func (*KS3Storage) DeleteFiles ¶
func (rs *KS3Storage) DeleteFiles(ctx context.Context, files []string) error
DeleteFiles delete the files in batch in s3 storage.
func (*KS3Storage) FileExists ¶
FileExists check if file exists on s3 storage.
func (*KS3Storage) Open ¶
func (rs *KS3Storage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
Open a Reader by file path.
func (*KS3Storage) Rename ¶
func (rs *KS3Storage) Rename(ctx context.Context, oldFileName, newFileName string) error
Rename implements ExternalStorage interface.
func (*KS3Storage) WalkDir ¶
func (rs *KS3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
WalkDir traverse all the files in a dir.
fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.
type KS3Uploader ¶
type KS3Uploader struct {
// contains filtered or unexported fields
}
KS3Uploader does multi-part upload to s3.
type LocalStorage ¶
type LocalStorage struct { // Whether ignoring ENOINT while deleting. // Don't fail when deleting an unexist file is more like // a normal ExternalStorage implementation does. IgnoreEnoentForDelete bool // contains filtered or unexported fields }
LocalStorage represents local file system storage.
export for using in tests.
func NewLocalStorage ¶
func NewLocalStorage(base string) (*LocalStorage, error)
NewLocalStorage return a LocalStorage at directory `base`.
export for test.
func (*LocalStorage) Base ¶
func (l *LocalStorage) Base() string
Base returns the base dir used by this local storage.
func (*LocalStorage) Close ¶
func (*LocalStorage) Close()
Close implements ExternalStorage interface.
func (*LocalStorage) CopyFrom ¶
func (l *LocalStorage) CopyFrom(ctx context.Context, e ExternalStorage, spec CopySpec) error
func (*LocalStorage) Create ¶
func (l *LocalStorage) Create(_ context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
Create implements ExternalStorage interface.
func (*LocalStorage) DeleteFile ¶
func (l *LocalStorage) DeleteFile(_ context.Context, name string) error
DeleteFile deletes the file.
func (*LocalStorage) DeleteFiles ¶
func (l *LocalStorage) DeleteFiles(ctx context.Context, names []string) error
DeleteFiles deletes the files.
func (*LocalStorage) FileExists ¶
FileExists implement ExternalStorage.FileExists.
func (*LocalStorage) Open ¶
func (l *LocalStorage) Open(_ context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
Open a Reader by file path, path is a relative path to base path.
func (*LocalStorage) Rename ¶
func (l *LocalStorage) Rename(_ context.Context, oldFileName, newFileName string) error
Rename implements ExternalStorage interface.
func (*LocalStorage) URI ¶
func (l *LocalStorage) URI() string
URI returns the base path as an URI with a file:/// prefix.
func (*LocalStorage) WalkDir ¶
func (l *LocalStorage) WalkDir(_ context.Context, opt *WalkOption, fn func(string, int64) error) error
WalkDir traverse all the files in a dir.
fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.
type LockMeta ¶
type LockMeta struct { LockedAt time.Time `json:"locked_at"` LockerHost string `json:"locker_host"` LockerPID int `json:"locker_pid"` TxnID []byte `json:"txn_id"` Hint string `json:"hint"` }
LockMeta is the meta information of a lock.
func MakeLockMeta ¶
MakeLockMeta creates a LockMeta by the current node's metadata. Including current time and hostname, etc..
type Locker ¶
type Locker = func(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
type MemStorage ¶
type MemStorage struct {
// contains filtered or unexported fields
}
MemStorage represents a in-memory storage.
func NewMemStorage ¶
func NewMemStorage() *MemStorage
NewMemStorage creates a new in-memory storage.
func (*MemStorage) Create ¶
func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (ExternalFileWriter, error)
Create creates a file and returning a writer to write data into. When the writer is closed, the data is stored in the file. It implements the `ExternalStorage` interface
func (*MemStorage) DeleteFile ¶
func (s *MemStorage) DeleteFile(ctx context.Context, name string) error
DeleteFile delete the file in storage It implements the `ExternalStorage` interface
func (*MemStorage) DeleteFiles ¶
func (s *MemStorage) DeleteFiles(ctx context.Context, names []string) error
DeleteFiles delete the files in storage It implements the `ExternalStorage` interface
func (*MemStorage) FileExists ¶
FileExists return true if file exists. It implements the `ExternalStorage` interface
func (*MemStorage) Open ¶
func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption) (ExternalFileReader, error)
Open opens a Reader by file path. It implements the `ExternalStorage` interface
func (*MemStorage) ReadFile ¶
ReadFile reads the storage file. It implements the `ExternalStorage` interface
func (*MemStorage) Rename ¶
func (s *MemStorage) Rename(ctx context.Context, oldFileName, newFileName string) error
Rename renames a file name to another file name. It implements the `ExternalStorage` interface
func (*MemStorage) WalkDir ¶
func (s *MemStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
WalkDir traverse all the files in a dir. It implements the `ExternalStorage` interface
type Permission ¶
type Permission string
Permission represents the permission we need to check in create storage.
type RangeInfo ¶
type RangeInfo struct { // Start is the absolute position of the first byte of the byte range, // starting from 0. Start int64 // End is the absolute position of the last byte of the byte range. This end // offset is inclusive, e.g. if the Size is 1000, the maximum value of End // would be 999. End int64 // Size is the total size of the original file. Size int64 }
RangeInfo represents the an HTTP Content-Range header value of the form `bytes [Start]-[End]/[Size]`.
func ParseRangeInfo ¶
ParseRangeInfo parses the Content-Range header and returns the offsets.
type ReadSeekCloser ¶
ReadSeekCloser is the interface that groups the basic Read, Seek and Close methods.
type ReaderOption ¶
type RemoteLock ¶
type RemoteLock struct {
// contains filtered or unexported fields
}
func LockWith ¶
func LockWith(ctx context.Context, locker Locker, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
func TryLockRemote ¶
func TryLockRemote(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
TryLockRemote tries to create a "lock file" at the external storage. If success, we will create a file at the path provided. So others may not access the file then. Will return a `ErrLocked` if there is another process already creates the lock file. This isn't a strict lock like flock in linux: that means, the lock might be forced removed by manually deleting the "lock file" in external storage.
func TryLockRemoteRead ¶
func TryLockRemoteRead(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
func TryLockRemoteWrite ¶
func TryLockRemoteWrite(ctx context.Context, storage ExternalStorage, path, hint string) (lock RemoteLock, err error)
func (*RemoteLock) String ¶
func (l *RemoteLock) String() string
func (RemoteLock) Unlock ¶
func (l RemoteLock) Unlock(ctx context.Context) error
UnlockRemote removes the lock file at the specified path. Removing that file will release the lock.
func (RemoteLock) UnlockOnCleanUp ¶
func (l RemoteLock) UnlockOnCleanUp(ctx context.Context)
type S3BackendOptions ¶
type S3BackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` Region string `json:"region" toml:"region"` StorageClass string `json:"storage-class" toml:"storage-class"` Sse string `json:"sse" toml:"sse"` SseKmsKeyID string `json:"sse-kms-key-id" toml:"sse-kms-key-id"` ACL string `json:"acl" toml:"acl"` AccessKey string `json:"access-key" toml:"access-key"` SecretAccessKey string `json:"secret-access-key" toml:"secret-access-key"` SessionToken string `json:"session-token" toml:"session-token"` Provider string `json:"provider" toml:"provider"` ForcePathStyle bool `json:"force-path-style" toml:"force-path-style"` UseAccelerateEndpoint bool `json:"use-accelerate-endpoint" toml:"use-accelerate-endpoint"` RoleARN string `json:"role-arn" toml:"role-arn"` ExternalID string `json:"external-id" toml:"external-id"` ObjectLockEnabled bool `json:"object-lock-enabled" toml:"object-lock-enabled"` }
S3BackendOptions contains options for s3 storage.
type S3Storage ¶
type S3Storage struct {
// contains filtered or unexported fields
}
S3Storage defines some standard operations for BR/Lightning on the S3 storage. It implements the `ExternalStorage` interface.
func NewS3Storage ¶
func NewS3Storage(ctx context.Context, backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error)
NewS3Storage initialize a new s3 storage for metadata.
func NewS3StorageForTest ¶
NewS3StorageForTest creates a new S3Storage for testing only.
func (*S3Storage) Create ¶
func (rs *S3Storage) Create(ctx context.Context, name string, option *WriterOption) (ExternalFileWriter, error)
Create creates multi upload request.
func (*S3Storage) DeleteFile ¶
DeleteFile delete the file in s3 storage
func (*S3Storage) DeleteFiles ¶
DeleteFiles delete the files in batch in s3 storage.
func (*S3Storage) FileExists ¶
FileExists check if file exists on s3 storage.
func (*S3Storage) GetOptions ¶
GetOptions gets the external storage operations for the S3.
func (*S3Storage) GetS3APIHandle ¶
GetS3APIHandle gets the handle to the S3 API.
func (*S3Storage) IsObjectLockEnabled ¶
func (*S3Storage) MarkStrongConsistency ¶
func (*S3Storage) MarkStrongConsistency()
func (*S3Storage) Open ¶
func (rs *S3Storage) Open(ctx context.Context, path string, o *ReaderOption) (ExternalFileReader, error)
Open a Reader by file path.
func (*S3Storage) WalkDir ¶
func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error
WalkDir traverse all the files in a dir.
fn is the function called for each regular file visited by WalkDir. The first argument is the file path that can be used in `Open` function; the second argument is the size in byte of the file determined by path.
type S3Uploader ¶
type S3Uploader struct {
// contains filtered or unexported fields
}
S3Uploader does multi-part upload to s3.
type StrongConsisency ¶
type StrongConsisency interface {
MarkStrongConsistency()
}
StrongConsistency is a marker interface that indicates the storage is strong consistent over its `Read`, `Write` and `WalkDir` APIs.
type Uploader ¶
type Uploader interface { // UploadPart upload part of file data to storage UploadPart(ctx context.Context, data []byte) error // CompleteUpload make the upload data to a complete file CompleteUpload(ctx context.Context) error }
Uploader upload file with chunks.
type VerifyWriteContext ¶
type VerifyWriteContext struct { context.Context Target string Storage ExternalStorage TxnID uuid.UUID }
func (*VerifyWriteContext) IntentFileName ¶
func (cx *VerifyWriteContext) IntentFileName() string
type WalkOption ¶
type WalkOption struct { // walk on SubDir of base directory, i.e. if the base dir is '/path/to/base' // then we're walking '/path/to/base/<SubDir>' SubDir string // whether subdirectory under the walk dir is skipped, only works for LOCAL storage now. // default is false, i.e. we walk recursively. SkipSubDir bool // ObjPrefix used fo prefix search in storage. Note that only part of storage // support it. // It can save lots of time when we want find specify prefix objects in storage. // For example. we have 10000 <Hash>.sst files and 10 backupmeta.(\d+) files. // we can use ObjPrefix = "backupmeta" to retrieve all meta files quickly. ObjPrefix string // ListCount is the number of entries per page. // // In cloud storages such as S3 and GCS, the files listed and sent in pages. // Typically a page contains 1000 files, and if a folder has 3000 descendant // files, one would need 3 requests to retrieve all of them. This parameter // controls this size. Note that both S3 and GCS limits the maximum to 1000. // // Typically you want to leave this field unassigned (zero) to use the // default value (1000) to minimize the number of requests, unless you want // to reduce the possibility of timeout on an extremely slow connection, or // perform testing. ListCount int64 // IncludeTombstone will allow `Walk` to emit removed files during walking. // // In most cases, `Walk` runs over a snapshot, if a file in the snapshot // was deleted during walking, the file will be ignored. Set this to `true` // will make them be sent to the callback. // // The size of a deleted file should be `TombstoneSize`. IncludeTombstone bool }
WalkOption is the option of storage.WalkDir.
type Writer ¶
type Writer interface { // Write writes to buffer and if chunk is filled will upload it Write(ctx context.Context, p []byte) (int, error) // Close writes final chunk and completes the upload Close(ctx context.Context) error }
Writer is like io.Writer but with Context, create a new writer on top of Uploader with NewUploaderWriter.