walg

package module
v0.1.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2018 License: Apache-2.0 Imports: 42 Imported by: 0

README

WAL-G

Build Status Go Report Card

WAL-G is an archival restoration tool for Postgres.

WAL-G is the successor of WAL-E with a number of key differences. WAL-G uses LZ4 or LZMA compression, multiple processors and non-exclusive base backups for Postgres. More information on the design and implementation of WAL-G can be found on the Citus Data blog post "Introducing WAL-G by Citus: Faster Disaster Recovery for Postgres".

Table of Contents

Installation

A precompiled binary for Linux AMD 64 of the latest version of WAL-G can be obtained under the Releases tab.

To decompress the binary, use:

tar -zxvf wal-g.linux-amd64.tar.gz

For other incompatible systems, please consult the Development section for more information.

Configuration

Required

To connect to Amazon S3, WAL-G requires that these variables be set:

  • WALE_S3_PREFIX (eg. s3://bucket/path/to/folder)

WAL-G determines AWS credentials like other AWS tools. You can set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY (optionally with AWS_SECURITY_TOKEN), or ~/.aws/credentials (optionally with AWS_PROFILE), or you can set nothing to automatically fetch credentials from the EC2 metadata service.

WAL-G uses the usual PostgreSQL environment variables to configure its connection, especially including PGHOST, PGPORT, PGUSER, and PGPASSWORD/PGPASSFILE/~/.pgpass.

PGHOST can connect over a UNIX socket. This mode is preferred for localhost connections, set PGHOST=/var/run/postgresql to use it. WAL-G will connect over TCP if PGHOST is an IP address.

Optional

WAL-G can automatically determine the S3 bucket's region using s3:GetBucketLocation, but if you wish to avoid this API call or forbid it from the applicable IAM policy, specify:

  • AWS_REGION(eg. us-west-2)

Concurrency values can be configured using:

  • WALG_DOWNLOAD_CONCURRENCY

To configure how many goroutines to use during backup-fetch and wal-push, use WALG_DOWNLOAD_CONCURRENCY. By default, WAL-G uses the minimum of the number of files to extract and 10.

  • WALG_UPLOAD_CONCURRENCY

To configure how many concurrency streams to use during backup uploading, use WALG_UPLOAD_CONCURRENCY. By default, WAL-G uses 10 streams.

  • WALG_UPLOAD_DISK_CONCURRENCY

To configure how many concurrency streams are reading disk during backup-push. By default, WAL-G uses 1 stream.

  • WALG_SENTINEL_USER_DATA

This setting allows backup automation tools to add extra information to JSON sentinel file during backup-push. This setting can be used e.g. to give user-defined names to backups.

  • WALG_PREVENT_WAL_OVERWRITE

If this setting is specified, during wal-push WAL-G will check the existence of WAL before uploading it. If the different file is already archived under the same name, WAL-G will return the non-zero exit code to prevent PostgreSQL from removing WAL.

  • AWS_ENDPOINT

Overrides the default hostname to connect to an S3-compatible service. i.e, http://s3-like-service:9000

  • AWS_S3_FORCE_PATH_STYLE

To enable path-style addressing(i.e., http://s3.amazonaws.com/BUCKET/KEY) when connecting to an S3-compatible service that lack of support for sub-domain style bucket URLs (i.e., http://BUCKET.s3.amazonaws.com/KEY). Defaults to false.

Example: Using Minio.io S3-compatible storage

AWS_ACCESS_KEY_ID: "<minio-key>"
AWS_SECRET_ACCESS_KEY: "<minio-secret>"
WALE_S3_PREFIX: "s3://my-minio-bucket/sub-dir"
AWS_ENDPOINT: "http://minio:9000"
AWS_S3_FORCE_PATH_STYLE: "true"
AWS_REGION: us-east-1
  • WALG_S3_STORAGE_CLASS

To configure the S3 storage class used for backup files, use WALG_S3_STORAGE_CLASS. By default, WAL-G uses the "STANDARD" storage class. Other supported values include "STANDARD_IA" for Infrequent Access and "REDUCED_REDUNDANCY" for Reduced Redundancy.

  • WALG_S3_SSE

To enable S3 server-side encryption, set to the algorithm to use when storing the objects in S3 (i.e., AES256, aws:kms).

  • WALG_S3_SSE_KMS_ID

If using S3 server-side encryption with aws:kms, the KMS Key ID to use for object encryption.

  • WALE_GPG_KEY_ID

To configure GPG key for encryption and decryption. By default, no encryption is used. Public keyring is cached in the file "/.walg_key_cache".

  • WALG_DELTA_MAX_STEPS

Delta-backup is difference between previously taken backup and present state. WALG_DELTA_MAX_STEPS determines how many delta backups can be between full backups. Defaults to 0. Restoration process will automatically fetch all necessary deltas and base backup and compose valid restored backup (you still need WALs after start of last backup to restore consistent cluster). Delta computation is based on ModTime of file system and LSN number of pages in datafiles.

  • WALG_DELTA_ORIGIN

To configure base for next delta backup (only if WALG_DELTA_MAX_STEPS is not exceeded). WALG_DELTA_ORIGIN can be LATEST (chaining increments), LATEST_FULL (for bases where volatile part is compact and chaining has no meaning - deltas overwrite each other). Defaults to LATEST.

  • WALG_COMPRESSION_METHOD

To configure compression method used for backups. Possible options are: lz4, 'lzma'. Default method is lz4. LZ4 is the fastest method, but compression ratio is bad. LZMA is way much slower, however it compresses backups about 6 times better than LZ4. Zstd is a good trade-off between speed and compression ratio which is about 3 times better than LZ4, but is temporary disabled due to corruption reports.

  • WALG_DISK_RATE_LIMIT

To configure disk read rate limit during backup-push in bytes per second.

  • WALG_NETWORK_RATE_LIMIT

To configure network upload rate limit during backup-push in bytes per second.

Usage

WAL-G currently supports these commands:

  • backup-fetch

When fetching base backups, the user should pass in the name of the backup and a path to a directory to extract to. If this directory does not exist, WAL-G will create it and any dependent subdirectories.

wal-g backup-fetch ~/extract/to/here example-backup

WAL-G can also fetch the latest backup using:

wal-g backup-fetch ~/extract/to/here LATEST
  • backup-push

When uploading backups to S3, the user should pass in the path containing the backup started by Postgres as in:

wal-g backup-push /backup/directory/path

If backup is pushed from replication slave, WAL-G will control timeline of the server. In case of promotion to master or timeline switch, backup will be uploaded but not finalized, WAL-G will exit with an error. In this case logs will contain information necessary to finalize the backup. You can use backuped data if you clearly understand entangled risks.

  • wal-fetch

When fetching WAL archives from S3, the user should pass in the archive name and the name of the file to download to. This file should not exist as WAL-G will create it for you.

WAL-G will also prefetch WAL files ahead of asked WAL file. These files will be cached in ./.wal-g/prefetch directory. Cache files older than recently asked WAL file will be deleted from the cache, to prevent cache bloat. If the file is requested with wal-fetch this will also remove it from cache, but trigger fulfilment of cache with new file.

wal-g wal-fetch example-archive new-file-name
  • wal-push

When uploading WAL archives to S3, the user should pass in the absolute path to where the archive is located.

wal-g wal-push /path/to/archive
  • backup-list

Lists names and creation time of available backups.

  • delete

Is used to delete backups and WALs before them. By default delete will perform dry run. If you want to execute deletion you have to add --confirm flag at the end of the command.

delete can operate in two modes: retain and before.

retain [FULL|FIND_FULL] %number%

if FULL is specified keep 5 full backups and everything in the middle

before [FIND_FULL] %name%

if FIND_FULL is specified WAL-G will calculate minimum backup needed to keep all deltas alive. If FIND_FULL is not specified and call can produce orphaned deltas - call will fail with the list.

retain 5 will fail if 5th is delta

retain FULL 5 will keep 5 full backups and all deltas of them

retail FIND_FULL will find necessary full for 5th

before base_000010000123123123 will fail if base_000010000123123123 is delta

before FIND_FULL base_000010000123123123 will keep everything after base of base_000010000123123123

Development

Installing

To compile and build the binary:

go get github.com/wal-g/wal-g
make all

Users can also install WAL-G by using make install. Specifying the GOBIN environment variable before installing allows the user to specify the installation location. On default, make install puts the compiled binary in go/bin.

export GOBIN=/usr/local/bin
make install
Testing

WAL-G relies heavily on unit tests. These tests do not require S3 configuration as the upload/download parts are tested using mocked objects. For more information on testing, please consult test_tools.

WAL-G will perform a round-trip compression/decompression test that generates a directory for data (eg. data...), compressed files (eg. compressed), and extracted files (eg. extracted). These directories will only get cleaned up if the files in the original data directory match the files in the extracted one.

Test coverage can be obtained using:

go test -v -coverprofile=coverage.out
go tool cover -html=coverage.out

Authors

See also the list of contributors who participated in this project.

License

This project is licensed under the Apache License, Version 2.0, but the lzo support is licensed under GPL 3.0+. Please refer to the LICENSE.md file for more details.

Acknowledgements

WAL-G would not have happened without the support of Citus Data

WAL-G came into existence as a result of the collaboration between a summer engineering intern at Citus, Katie Li, and Daniel Farina, the original author of WAL-E who currently serves as a principal engineer on the Citus Cloud team. Citus Data also has an open source extension to Postgres that distributes database queries horizontally to deliver scale and performance.

Chat

We have a Slack group to discuss WAL-G usage and development. To joint PostgreSQL slack use invite app.

Documentation

Index

Constants

View Source
const (
	Lz4AlgorithmName  = "lz4"
	LzmaAlgorithmName = "lzma"
	ZstdAlgorithmName = "zstd"

	Lz4FileExtension  = "lz4"
	LzmaFileExtension = "lzma"
	ZstdFileExtension = "zst"
	LzoFileExtension  = "lzo"
)
View Source
const (
	VersionStr      = "005"
	BaseBackupsPath = "/basebackups_" + VersionStr + "/"
	WalPath         = "/wal_" + VersionStr + "/"

	// SentinelSuffix is a suffix of backup finish sentinel file
	SentinelSuffix         = "_backup_stop_sentinel.json"
	CompressedBlockMaxSize = 20 << 20
	NotFoundAWSErrorCode   = "NotFound"
)
View Source
const (
	// BlockSize is the PostgreSQL page size
	BlockSize uint16 = 8192
)
View Source
const DeleteUsageText = "delete requires at least 2 parameters" + `
		retain 5                      keep 5 backups
		retain FULL 5                 keep 5 full backups and all deltas of them
		retail FIND_FULL 5            find necessary full for 5th and keep everything after it
		before base_0123              keep everything after base_0123 including itself
		before FIND_FULL base_0123    keep everything after the base of base_0123`
View Source
const LzopBlockSize = 256 * 1024
View Source
const (
	// WalSegmentSize is the size of one WAL file
	WalSegmentSize = uint64(16 * 1024 * 1024) // xlog.c line 113ß

)

Variables

View Source
var ErrCrypterUseMischief = errors.New("Crypter is not checked before use")

ErrCrypterUseMischief happens when crypter is used before initialization

View Source
var ErrInvalidBlock = errors.New("Block is not valid")

ErrInvalidBlock indicates that file contain invalid page and cannot be archived incrementally

View Source
var ErrLatestNotFound = errors.New("No backups found")

ErrLatestNotFound happens when users asks backup-fetch LATEST, but there is no backups

View Source
var ExcludedFilenames = make(map[string]Empty)

ExcludedFilenames is a list of excluded members from the bundled backup.

View Source
var MaxRetries = 15

MaxRetries limit upload and download retries during interaction with S3

Functions

func ApplyFileIncrement added in v0.1.3

func ApplyFileIncrement(fileName string, increment io.Reader) error

ApplyFileIncrement changes pages according to supplied change map file

func Configure

func Configure() (*TarUploader, *S3Prefix, error)

Configure connects to S3 and creates an uploader. It makes sure that a valid session has started; if invalid, returns AWS error and `<nil>` values.

Requires these environment variables to be set: WALE_S3_PREFIX

Able to configure the upload part size in the S3 uploader.

func Connect

func Connect() (*pgx.Conn, error)

Connect establishes a connection to postgres using a UNIX socket. Must export PGHOST and run with `sudo -E -u postgres`. If PGHOST is not set or if the connection fails, an error is returned and the connection is `<nil>`.

Example: PGHOST=/var/run/postgresql or PGHOST=10.0.0.1

func CreateUploader

func CreateUploader(svc s3iface.S3API, partsize, concurrency int) s3manageriface.UploaderAPI

CreateUploader returns an uploader with customizable concurrency and partsize.

func DownloadAndDecompressWALFile added in v0.1.11

func DownloadAndDecompressWALFile(pre *S3Prefix, walFileName string, dstLocation string) error

DownloadAndDecompressWALFile downloads a file and writes it to local file

func ExtractAll

func ExtractAll(tarInterpreter TarInterpreter, files []ReaderMaker) error

ExtractAll Handles all files passed in. Supports `.lzo`, `.lz4`, `.lzma`, and `.tar`. File type `.nop` is used for testing purposes. Each file is extracted in its own goroutine and ExtractAll will wait for all goroutines to finish. Returns the first error encountered.

func FormatName

func FormatName(s string) (string, error)

FormatName grabs the name of the WAL file and returns it in the form of `base_...`. If no match is found, returns an empty string and a `NoMatchAvailableError`.

func GetBackupPath added in v0.1.4

func GetBackupPath(prefix *S3Prefix) *string

GetBackupPath gets path for basebackup in a bucket

func GetFileExtension added in v0.1.11

func GetFileExtension(path string) string

func GetKeyRingId added in v0.1.3

func GetKeyRingId() string

GetKeyRingId extracts name of a key to use from env variable

func GetSentinelUserData added in v0.1.8

func GetSentinelUserData() interface{}

GetSentinelUserData tries to parse WALG_SENTINEL_USER_DATA env variable

func HandleBackupFetch added in v0.1.3

func HandleBackupFetch(backupName string, pre *S3Prefix, dirArc string, mem bool) (lsn *uint64)

HandleBackupFetch is invoked to perform wal-g backup-fetch

func HandleBackupList added in v0.1.3

func HandleBackupList(pre *S3Prefix)

HandleBackupList is invoked to perform wal-g backup-list

func HandleBackupPush added in v0.1.3

func HandleBackupPush(dirArc string, tu *TarUploader, pre *S3Prefix)

HandleBackupPush is invoked to performa wal-g backup-push

func HandleDelete added in v0.1.3

func HandleDelete(pre *S3Prefix, args []string)

HandleDelete is invoked to perform wal-g delete

func HandleTar

func HandleTar(bundle TarBundle, path string, info os.FileInfo, crypter Crypter) error

HandleTar creates underlying tar writer and handles one given file. Does not follow symlinks. If file is in ExcludedFilenames, will not be included in the final tarball. EXCLUDED directories are created but their contents are not written to local disk.

func HandleWALFetch added in v0.1.3

func HandleWALFetch(pre *S3Prefix, walFileName string, location string, triggerPrefetch bool)

HandleWALFetch is invoked to performa wal-g wal-fetch

func HandleWALPrefetch added in v0.1.3

func HandleWALPrefetch(pre *S3Prefix, walFileName string, location string)

HandleWALPrefetch is invoked by wal-fetch command to speed up database restoration

func HandleWALPush added in v0.1.3

func HandleWALPush(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verify bool)

HandleWALPush is invoked to perform wal-g wal-push

func IsPagedFile added in v0.1.3

func IsPagedFile(info os.FileInfo, fileName string) bool

IsPagedFile checks basic expectaions for paged file

func MoveFileAndCreateDirs added in v0.1.3

func MoveFileAndCreateDirs(incrementalPath string, targetPath string, fileName string) (err error)

MoveFileAndCreateDirs moves file from incremental folder to target folder, creating necessary folders structure

func NewDiskLimitReader added in v0.1.11

func NewDiskLimitReader(r io.ReadCloser) io.ReadCloser

NewDiskLimitReader returns a reader that is rate limited by disk limiter

func NewLzoReader added in v0.1.11

func NewLzoReader(r io.Reader) (io.ReadCloser, error)

func NewLzoWriter added in v0.1.11

func NewLzoWriter(w io.Writer) io.WriteCloser

func NewNetworkLimitReader added in v0.1.11

func NewNetworkLimitReader(r io.ReadCloser) io.ReadCloser

NewNetworkLimitReader returns a reader that is rate limited by network limiter

func NextWALFileName added in v0.1.3

func NextWALFileName(name string) (nextname string, err error)

NextWALFileName computes name of next WAL segment

func ParseLsn added in v0.1.3

func ParseLsn(lsnStr string) (lsn uint64, err error)

ParseLsn converts PostgreSQL string representation of LSN to uint64

func ParsePageHeader added in v0.1.3

func ParsePageHeader(data []byte) (lsn uint64, valid bool)

ParsePageHeader reads information from PostgreSQL page header. Exported for test reasons.

func ParseWALFileName added in v0.1.3

func ParseWALFileName(name string) (timelineId uint32, logSegNo uint64, err error)

ParseWALFileName extracts numeric parts from WAL file name

func ReadDatabaseFile added in v0.1.3

func ReadDatabaseFile(fileName string, lsn *uint64, isNew bool) (io.ReadCloser, bool, int64, error)

ReadDatabaseFile tries to read file as an incremental data file if possible, otherwise just open the file

func ResolveSymlink(path string) string

ResolveSymlink converts path to physical if it is symlink

func UploadWALFile added in v0.1.5

func UploadWALFile(tarUploader *TarUploader, dirArc string, pre *S3Prefix, verify bool, bkgUpload bool)

UploadWALFile from FS to the cloud

func WALFileName added in v0.1.3

func WALFileName(lsn uint64, conn *pgx.Conn) (string, uint32, error)

WALFileName formats WAL file name using PostgreSQL connection. Essentially reads timeline of the server.

Types

type Archive

type Archive struct {
	Prefix  *S3Prefix
	Archive *string
}

Archive contains information associated with a WAL archive.

func (*Archive) CheckExistence

func (archive *Archive) CheckExistence() (bool, error)

CheckExistence checks that the specified WAL file exists.

func (*Archive) GetArchive

func (archive *Archive) GetArchive() (io.ReadCloser, error)

GetArchive downloads the specified archive from S3.

func (*Archive) GetETag added in v0.1.8

func (archive *Archive) GetETag() (*string, error)

GetETag aquires ETag of the object from S3

type Backup

type Backup struct {
	Prefix *S3Prefix
	Path   *string
	Name   *string
	Js     *string
}

Backup contains information about a valid backup generated and uploaded by WAL-G.

func (*Backup) CheckExistence

func (backup *Backup) CheckExistence() (bool, error)

CheckExistence checks that the specified backup exists.

func (*Backup) GetBackups added in v0.1.3

func (backup *Backup) GetBackups() ([]BackupTime, error)

GetBackups receives backup descriptions and sorts them by time

func (*Backup) GetKeys

func (backup *Backup) GetKeys() ([]string, error)

GetKeys returns all the keys for the Files in the specified backup.

func (*Backup) GetLatest

func (backup *Backup) GetLatest() (string, error)

GetLatest sorts the backups by last modified time and returns the latest backup key.

func (*Backup) GetWals added in v0.1.3

func (backup *Backup) GetWals(before string) ([]*s3.ObjectIdentifier, error)

GetWals returns all WAL file keys less then key provided

type BackupFileDescription added in v0.1.3

type BackupFileDescription struct {
	IsIncremented bool // should never be both incremented and Skipped
	IsSkipped     bool
	MTime         time.Time
}

type BackupFileList added in v0.1.3

type BackupFileList map[string]BackupFileDescription

type BackupTime

type BackupTime struct {
	Name        string
	Time        time.Time
	WalFileName string
}

BackupTime is used to sort backups by latest modified time.

func GetBackupTimeSlices added in v0.1.3

func GetBackupTimeSlices(backups []*s3.Object) []BackupTime

GetBackupTimeSlices converts S3 objects to backup description

type BgUploader added in v0.1.5

type BgUploader struct {
	// contains filtered or unexported fields
}

BgUploader represents the state of concurrent WAL upload

func (*BgUploader) Start added in v0.1.5

func (uploader *BgUploader) Start(walFilePath string, maxParallelWorkers int32, tu *TarUploader, pre *S3Prefix, verify bool)

Start up checking what's inside archive_status

func (*BgUploader) Stop added in v0.1.5

func (uploader *BgUploader) Stop()

Stop pipeline

func (*BgUploader) Upload added in v0.1.5

func (uploader *BgUploader) Upload(info os.FileInfo)

Upload one WAL file

type Bundle

type Bundle struct {
	MinSize            int64
	Sentinel           *Sentinel
	TarBall            TarBall
	TarBallMaker       TarBallMaker
	Crypter            OpenPGPCrypter
	Timeline           uint32
	Replica            bool
	IncrementFromLsn   *uint64
	IncrementFromFiles BackupFileList

	Files *sync.Map
	// contains filtered or unexported fields
}

A Bundle represents the directory to be walked. Contains at least one TarBall if walk has started. Each TarBall will be at least MinSize bytes. The Sentinel is used to ensure complete uploaded backups; in this case, pg_control is used as the sentinel.

func (*Bundle) CheckSizeAndEnqueueBack added in v0.1.8

func (bundle *Bundle) CheckSizeAndEnqueueBack(tarBall TarBall) error

func (*Bundle) CheckTimelineChanged added in v0.1.3

func (bundle *Bundle) CheckTimelineChanged(conn *pgx.Conn) bool

CheckTimelineChanged compares timelines of pg_backup_start() and pg_backup_stop()

func (*Bundle) Deque added in v0.1.8

func (bundle *Bundle) Deque() TarBall

func (*Bundle) EnqueueBack added in v0.1.8

func (bundle *Bundle) EnqueueBack(tarBall TarBall, parallelOpInProgress *bool)

func (*Bundle) FinishQueue added in v0.1.8

func (bundle *Bundle) FinishQueue() error

func (*Bundle) GetFiles added in v0.1.8

func (bundle *Bundle) GetFiles() *sync.Map

func (*Bundle) GetIncrementBaseFiles added in v0.1.3

func (bundle *Bundle) GetIncrementBaseFiles() BackupFileList

GetIncrementBaseFiles returns list of Files from previous backup

func (*Bundle) GetIncrementBaseLsn added in v0.1.3

func (bundle *Bundle) GetIncrementBaseLsn() *uint64

GetIncrementBaseLsn returns LSN of previous backup

func (*Bundle) HandleLabelFiles

func (bundle *Bundle) HandleLabelFiles(conn *pgx.Conn) (uint64, error)

HandleLabelFiles creates the `backup_label` and `tablespace_map` Files and uploads it to S3 by stopping the backup. Returns error upon failure.

func (*Bundle) HandleSentinel

func (bundle *Bundle) HandleSentinel() error

HandleSentinel uploads the compressed tar file of `pg_control`. Will only be called after the rest of the backup is successfully uploaded to S3. Returns an error upon failure.

func (*Bundle) NewTarBall

func (bundle *Bundle) NewTarBall(dedicatedUploader bool)

NewTarBall starts writing new tarball

func (*Bundle) StartBackup added in v0.1.3

func (bundle *Bundle) StartBackup(conn *pgx.Conn, backup string) (backupName string, lsn uint64, version int, err error)

StartBackup starts a non-exclusive base backup immediately. When finishing the backup, `backup_label` and `tablespace_map` contents are not immediately written to a file but returned instead. Returns empty string and an error if backup fails.

func (*Bundle) StartQueue added in v0.1.8

func (bundle *Bundle) StartQueue()

func (*Bundle) TarWalk added in v0.1.11

func (bundle *Bundle) TarWalk(path string, info os.FileInfo, err error) error

TarWalk walks files provided by the passed in directory and creates compressed tar members labeled as `part_00i.tar.lzo`.

To see which files and directories are Skipped, please consult ExcludedFilenames. Excluded directories will be created but their contents will not be included in the tar bundle.

type CachedKey added in v0.1.3

type CachedKey struct {
	KeyId string `json:"keyId"`
	Body  []byte `json:"body"`
}

CachedKey is the data transfer object describing format of key ring cache

type CascadeWriteCloser added in v0.1.11

type CascadeWriteCloser struct {
	io.WriteCloser
	Underlying io.Closer
}

CascadeWriteCloser bundles multiple closures into one function. Calling Close() will close the main and underlying writers.

func (*CascadeWriteCloser) Close added in v0.1.11

func (cascadeCloser *CascadeWriteCloser) Close() error

Close returns the first encountered error from closing main or underlying writer.

type Cleaner added in v0.1.3

type Cleaner interface {
	GetFiles(directory string) ([]string, error)
	Remove(file string)
}

Cleaner interface serves to separate file system logic from prefetch clean logic to make it testable

type CompressingPipeWriter added in v0.1.11

type CompressingPipeWriter struct {
	Input                io.Reader
	Output               io.Reader
	NewCompressingWriter func(io.Writer) ReaderFromWriteCloser
}

CompressingPipeWriter allows for flexibility of using compressed output. Input is read and compressed to a pipe reader.

func NewLz4CompressingPipeWriter added in v0.1.11

func NewLz4CompressingPipeWriter(input io.Reader) *CompressingPipeWriter

func (*CompressingPipeWriter) Compress added in v0.1.11

func (pipeWriter *CompressingPipeWriter) Compress(crypter Crypter)

Compress compresses input to a pipe reader. Output must be used or pipe will block.

type CompressingPipeWriterError added in v0.1.11

type CompressingPipeWriterError struct {
	// contains filtered or unexported fields
}

CompressingPipeWriterError is used to catch specific errors from CompressingPipeWriter when uploading to S3. Will not retry upload if this error occurs.

func (CompressingPipeWriterError) Error added in v0.1.11

func (err CompressingPipeWriterError) Error() string

type Compressor added in v0.1.11

type Compressor interface {
	NewWriter(writer io.Writer) ReaderFromWriteCloser
	FileExtension() string
}

type Crypter added in v0.1.3

type Crypter interface {
	IsUsed() bool
	Encrypt(writer io.WriteCloser) (io.WriteCloser, error)
	Decrypt(reader io.ReadCloser) (io.Reader, error)
}

Crypter is responsible for making cryptographical pipeline parts when needed

type Decompressor added in v0.1.11

type Decompressor interface {
	Decompress(dst io.Writer, src io.Reader) error
	FileExtension() string
}

type DelayWriteCloser added in v0.1.3

type DelayWriteCloser struct {
	// contains filtered or unexported fields
}

DelayWriteCloser delays first writes. Encryption starts writing header immediately. But there is a lot of places where writer is instantiated long before pipe is ready. This is why here is used special writer, which delays encryption initialization before actual write. If no write occurs, initialization still is performed, to handle zero-byte Files correctly

func (*DelayWriteCloser) Close added in v0.1.3

func (delayWriteCloser *DelayWriteCloser) Close() error

Close DelayWriteCloser

func (*DelayWriteCloser) Write added in v0.1.3

func (delayWriteCloser *DelayWriteCloser) Write(p []byte) (n int, err error)

type DeleteCommandArguments added in v0.1.3

type DeleteCommandArguments struct {
	// contains filtered or unexported fields
}

DeleteCommandArguments incapsulates arguments for delete command

func ParseDeleteArguments added in v0.1.3

func ParseDeleteArguments(args []string, fallBackFunc func()) (result DeleteCommandArguments)

ParseDeleteArguments interprets arguments for delete command. TODO: use flags or cobra

type Empty

type Empty struct{}

Empty is used for channel signaling.

type EmptyWriteIgnorer

type EmptyWriteIgnorer struct {
	io.WriteCloser
}

EmptyWriteIgnorer handles 0 byte write in LZ4 package to stop pipe reader/writer from blocking.

func (EmptyWriteIgnorer) Write

func (e EmptyWriteIgnorer) Write(p []byte) (int, error)

type FileSystemCleaner added in v0.1.3

type FileSystemCleaner struct{}

FileSystemCleaner actually performs it's functions on file system

func (FileSystemCleaner) GetFiles added in v0.1.3

func (c FileSystemCleaner) GetFiles(directory string) (files []string, err error)

GetFiles of a directory

func (FileSystemCleaner) Remove added in v0.1.3

func (c FileSystemCleaner) Remove(file string)

Remove file

type FileTarInterpreter

type FileTarInterpreter struct {
	NewDir             string
	Sentinel           S3TarBallSentinelDto
	IncrementalBaseDir string
}

FileTarInterpreter extracts input to disk.

func (*FileTarInterpreter) Interpret

func (tarInterpreter *FileTarInterpreter) Interpret(tr io.Reader, cur *tar.Header) error

Interpret extracts a tar file to disk and creates needed directories. Returns the first error encountered. Calls fsync after each file is written successfully.

type IncrementalPageReader added in v0.1.3

type IncrementalPageReader struct {
	// contains filtered or unexported fields
}

IncrementalPageReader constructs difference map during initialization and than re-read file Diff map can be of 1Gb/PostgresBlockSize elements == 512Kb

func (*IncrementalPageReader) Close added in v0.1.3

func (pageReader *IncrementalPageReader) Close() error

Close IncrementalPageReader

func (*IncrementalPageReader) Read added in v0.1.3

func (pageReader *IncrementalPageReader) Read(p []byte) (n int, err error)

Read from IncrementalPageReader

type LimitedReader added in v0.1.11

type LimitedReader struct {
	// contains filtered or unexported fields
}

func (*LimitedReader) Close added in v0.1.11

func (r *LimitedReader) Close() error

func (*LimitedReader) Read added in v0.1.11

func (r *LimitedReader) Read(buf []byte) (int, error)

type Lz4Compressor added in v0.1.11

type Lz4Compressor struct{}

func (Lz4Compressor) FileExtension added in v0.1.11

func (compressor Lz4Compressor) FileExtension() string

func (Lz4Compressor) NewWriter added in v0.1.11

func (compressor Lz4Compressor) NewWriter(writer io.Writer) ReaderFromWriteCloser

type Lz4Decompressor added in v0.1.11

type Lz4Decompressor struct{}

func (Lz4Decompressor) Decompress added in v0.1.11

func (decompressor Lz4Decompressor) Decompress(dst io.Writer, src io.Reader) error

func (Lz4Decompressor) FileExtension added in v0.1.11

func (decompressor Lz4Decompressor) FileExtension() string

type LzmaCompressor added in v0.1.11

type LzmaCompressor struct{}

func (LzmaCompressor) FileExtension added in v0.1.11

func (compressor LzmaCompressor) FileExtension() string

func (LzmaCompressor) NewWriter added in v0.1.11

func (compressor LzmaCompressor) NewWriter(writer io.Writer) ReaderFromWriteCloser

type LzmaDecompressor added in v0.1.11

type LzmaDecompressor struct{}

func (LzmaDecompressor) Decompress added in v0.1.11

func (decompressor LzmaDecompressor) Decompress(dst io.Writer, src io.Reader) error

func (LzmaDecompressor) FileExtension added in v0.1.11

func (decompressor LzmaDecompressor) FileExtension() string

type LzmaReaderFromWriter added in v0.1.11

type LzmaReaderFromWriter struct {
	lzma.Writer
}

func NewLzmaReaderFromWriter added in v0.1.11

func NewLzmaReaderFromWriter(dst io.Writer) (*LzmaReaderFromWriter, error)

func (*LzmaReaderFromWriter) ReadFrom added in v0.1.11

func (writer *LzmaReaderFromWriter) ReadFrom(reader io.Reader) (n int64, err error)

type LzoDecompressor added in v0.1.11

type LzoDecompressor struct{}

func (LzoDecompressor) Decompress added in v0.1.11

func (decompressor LzoDecompressor) Decompress(dst io.Writer, src io.Reader) error

func (LzoDecompressor) FileExtension added in v0.1.11

func (decompressor LzoDecompressor) FileExtension() string

type MD5Reader added in v0.1.11

type MD5Reader struct {
	// contains filtered or unexported fields
}

func (*MD5Reader) Read added in v0.1.11

func (reader *MD5Reader) Read(p []byte) (n int, err error)

func (*MD5Reader) Sum added in v0.1.11

func (reader *MD5Reader) Sum() string

type NilWriter added in v0.1.3

type NilWriter struct{}

NilWriter to /dev/null

func (*NilWriter) Write added in v0.1.3

func (nw *NilWriter) Write(p []byte) (n int, err error)

Write to /dev/null

type NoMatchAvailableError

type NoMatchAvailableError struct {
	// contains filtered or unexported fields
}

NoMatchAvailableError is used to signal no match found in string.

func (NoMatchAvailableError) Error

func (e NoMatchAvailableError) Error() string

type OpenPGPCrypter added in v0.1.3

type OpenPGPCrypter struct {
	// contains filtered or unexported fields
}

OpenPGPCrypter incapsulates specific of cypher method Includes keys, infrastructutre information etc If many encryption methods will be used it worth to extract interface

func (*OpenPGPCrypter) ConfigureGPGCrypter added in v0.1.3

func (crypter *OpenPGPCrypter) ConfigureGPGCrypter()

ConfigureGPGCrypter is OpenPGPCrypter internal initialization

func (*OpenPGPCrypter) Decrypt added in v0.1.3

func (crypter *OpenPGPCrypter) Decrypt(reader io.ReadCloser) (io.Reader, error)

Decrypt creates decrypted reader from ordinary reader

func (*OpenPGPCrypter) Encrypt added in v0.1.3

func (crypter *OpenPGPCrypter) Encrypt(writer io.WriteCloser) (io.WriteCloser, error)

Encrypt creates encryption writer from ordinary writer

func (*OpenPGPCrypter) IsUsed added in v0.1.3

func (crypter *OpenPGPCrypter) IsUsed() bool

IsUsed is to check necessity of Crypter use Must be called prior to any other crypter call

type PgQueryRunner added in v0.1.8

type PgQueryRunner struct {
	Version int
	// contains filtered or unexported fields
}

PgQueryRunner is implementation for controlling PostgreSQL 9.0+

func NewPgQueryRunner added in v0.1.8

func NewPgQueryRunner(conn *pgx.Conn) (*PgQueryRunner, error)

NewPgQueryRunner builds QueryRunner from available connection

func (*PgQueryRunner) BuildGetVersion added in v0.1.8

func (queryRunner *PgQueryRunner) BuildGetVersion() string

BuildGetVersion formats a query to retrieve PostgreSQL numeric version

func (*PgQueryRunner) BuildStartBackup added in v0.1.8

func (queryRunner *PgQueryRunner) BuildStartBackup() (string, error)

BuildStartBackup formats a query that starts backup according to server features and version

func (*PgQueryRunner) BuildStopBackup added in v0.1.8

func (queryRunner *PgQueryRunner) BuildStopBackup() (string, error)

BuildStopBackup formats a query that stops backup according to server features and version

func (*PgQueryRunner) StartBackup added in v0.1.8

func (queryRunner *PgQueryRunner) StartBackup(backup string) (backupName string, lsnString string, inRecovery bool, err error)

StartBackup informs the database that we are starting copy of cluster contents

func (*PgQueryRunner) StopBackup added in v0.1.8

func (queryRunner *PgQueryRunner) StopBackup() (label string, offsetMap string, lsnStr string, err error)

StopBackup informs the database that copy is over

type QueryRunner added in v0.1.8

type QueryRunner interface {
	// This call should inform the database that we are going to copy cluster's contents
	// Should fail if backup is currently impossible
	StartBackup(backup string) (string, string, bool, error)
	// Inform database that contents are copied, get information on backup
	StopBackup() (string, string, string, error)
}

The QueryRunner interface for controlling database during backup

type ReadCascadeCloser added in v0.1.11

type ReadCascadeCloser struct {
	io.Reader
	io.Closer
}

ReadCascadeCloser composes io.ReadCloser from two parts

type ReaderFromWriteCloser added in v0.1.11

type ReaderFromWriteCloser interface {
	io.ReaderFrom
	io.WriteCloser
}

type ReaderMaker

type ReaderMaker interface {
	Reader() (io.ReadCloser, error)
	Format() string
	Path() string
}

ReaderMaker is the generic interface used by extract. It allows for ease of handling different file formats.

type S3Prefix added in v0.1.11

type S3Prefix struct {
	Svc                 s3iface.S3API
	Bucket              *string
	Server              *string
	PreventWalOverwrite bool
}

S3Prefix contains the S3 service client, bucket and string.

type S3ReaderMaker

type S3ReaderMaker struct {
	Backup     *Backup
	Key        *string
	FileFormat string
}

S3ReaderMaker handles cases where backups need to be uploaded to S3.

func (*S3ReaderMaker) Format

func (readerMaker *S3ReaderMaker) Format() string

func (*S3ReaderMaker) Path

func (readerMaker *S3ReaderMaker) Path() string

Path to file in bucket

func (*S3ReaderMaker) Reader

func (readerMaker *S3ReaderMaker) Reader() (io.ReadCloser, error)

Reader creates a new S3 reader for each S3 object.

type S3TarBall

type S3TarBall struct {
	Lsn              *uint64
	IncrementFromLsn *uint64
	IncrementFrom    string
	Files            BackupFileList
	// contains filtered or unexported fields
}

S3TarBall represents a tar file that is going to be uploaded to S3.

func (*S3TarBall) AddSize added in v0.1.8

func (tarBall *S3TarBall) AddSize(i int64)

AddSize to total Size

func (*S3TarBall) AwaitUploads added in v0.1.8

func (tarBall *S3TarBall) AwaitUploads()

func (*S3TarBall) CloseTar

func (tarBall *S3TarBall) CloseTar() error

CloseTar closes the tar writer, flushing any unwritten data to the underlying writer before also closing the underlying writer.

func (*S3TarBall) FileExtension added in v0.1.11

func (tarBall *S3TarBall) FileExtension() string

func (*S3TarBall) Finish

func (tarBall *S3TarBall) Finish(sentinelDto *S3TarBallSentinelDto) error

Finish writes a .json file description and uploads it with the the backup name. Finish will wait until all tar file parts have been uploaded. The json file will only be uploaded if all other parts of the backup are present in S3. an alert is given with the corresponding error.

func (*S3TarBall) SetUp

func (tarBall *S3TarBall) SetUp(crypter Crypter, names ...string)

SetUp creates a new tar writer and starts upload to S3. Upload will block until the tar file is finished writing. If a name for the file is not given, default name is of the form `part_....tar.[Compressor file extension]`.

func (*S3TarBall) Size

func (tarBall *S3TarBall) Size() int64

Size accumulated in this tarball

func (*S3TarBall) StartUpload

func (tarBall *S3TarBall) StartUpload(name string, crypter Crypter) io.WriteCloser

StartUpload creates a compressing writer and runs upload in the background once a compressed tar member is finished writing.

func (*S3TarBall) TarWriter added in v0.1.11

func (tarBall *S3TarBall) TarWriter() *tar.Writer

func (*S3TarBall) Trim

func (tarBall *S3TarBall) Trim() string

Trim suffix

type S3TarBallMaker

type S3TarBallMaker struct {
	Trim             string
	BkupName         string
	TarUploader      *TarUploader
	Lsn              *uint64
	IncrementFromLsn *uint64
	IncrementFrom    string
	// contains filtered or unexported fields
}

S3TarBallMaker creates tarballs that are uploaded to S3.

func (*S3TarBallMaker) Make

func (tarBallMaker *S3TarBallMaker) Make(dedicatedUploader bool) TarBall

Make returns a tarball with required S3 fields.

type S3TarBallSentinelDto added in v0.1.3

type S3TarBallSentinelDto struct {
	LSN               *uint64
	IncrementFromLSN  *uint64 `json:"DeltaFromLSN,omitempty"`
	IncrementFrom     *string `json:"DeltaFrom,omitempty"`
	IncrementFullName *string `json:"DeltaFullName,omitempty"`
	IncrementCount    *int    `json:"DeltaCount,omitempty"`

	Files BackupFileList

	PgVersion int
	FinishLSN *uint64

	UserData interface{} `json:"UserData,omitempty"`
}

S3TarBallSentinelDto describes file structure of json sentinel

func (*S3TarBallSentinelDto) IsIncremental added in v0.1.3

func (dto *S3TarBallSentinelDto) IsIncremental() bool

IsIncremental checks that sentinel represents delta backup

func (*S3TarBallSentinelDto) SetFiles added in v0.1.8

func (dto *S3TarBallSentinelDto) SetFiles(p *sync.Map)

type Sentinel

type Sentinel struct {
	Info os.FileInfo
	// contains filtered or unexported fields
}

Sentinel is used to signal completion of a walked directory.

type TarBall

type TarBall interface {
	SetUp(crypter Crypter, args ...string)
	CloseTar() error
	Finish(sentinelDto *S3TarBallSentinelDto) error
	Trim() string
	Size() int64
	AddSize(int64)
	TarWriter() *tar.Writer
	FileExtension() string
	AwaitUploads()
}

A TarBall represents one tar file.

type TarBallMaker

type TarBallMaker interface {
	Make(dedicatedUploader bool) TarBall
}

TarBallMaker is used to allow for flexible creation of different TarBalls.

type TarBundle

type TarBundle interface {
	NewTarBall(dedicatedUploader bool)
	GetIncrementBaseLsn() *uint64
	GetIncrementBaseFiles() BackupFileList

	StartQueue()
	Deque() TarBall
	EnqueueBack(tarBall TarBall, parallelOpInProgress *bool)
	CheckSizeAndEnqueueBack(tarBall TarBall) error
	FinishQueue() error
	GetFiles() *sync.Map
}

TarBundle represents one completed directory.

type TarInterpreter

type TarInterpreter interface {
	Interpret(r io.Reader, hdr *tar.Header) error
}

TarInterpreter behaves differently for different file types.

type TarUploader

type TarUploader struct {
	UploaderApi          s3manageriface.UploaderAPI
	ServerSideEncryption string
	SSEKMSKeyId          string
	StorageClass         string
	Success              bool
	// contains filtered or unexported fields
}

TarUploader contains fields associated with uploading tarballs. Multiple tarballs can share one uploader. Must call CreateUploader() in 'upload.go'.

func NewLz4MockTarUploader added in v0.1.11

func NewLz4MockTarUploader() *TarUploader

func NewTarUploader

func NewTarUploader(bucket, server, compressionMethod string) *TarUploader

NewTarUploader creates a new tar uploader without the actual S3 uploader. CreateUploader() is used to configure byte size and concurrency streams for the uploader.

func (*TarUploader) Clone added in v0.1.7

func (tarUploader *TarUploader) Clone() *TarUploader

Clone creates similar TarUploader with new WaitGroup

func (*TarUploader) Finish

func (tarUploader *TarUploader) Finish()

Finish waits for all waiting parts to be uploaded. If an error occurs, prints alert to stderr.

func (*TarUploader) UploadWal

func (tarUploader *TarUploader) UploadWal(path string, pre *S3Prefix, verify bool) (string, error)

UploadWal compresses a WAL file and uploads to S3. Returns the first error encountered and an empty string upon failure.

type TimeSlice

type TimeSlice []BackupTime

TimeSlice represents a backup and its last modified time.

func (TimeSlice) Len

func (timeSlice TimeSlice) Len() int

func (TimeSlice) Less

func (timeSlice TimeSlice) Less(i, j int) bool

func (TimeSlice) Swap

func (timeSlice TimeSlice) Swap(i, j int)

type UnknownCompressionMethodError added in v0.1.11

type UnknownCompressionMethodError struct{}

func (UnknownCompressionMethodError) Error added in v0.1.11

type UnsetEnvVarError

type UnsetEnvVarError struct {
	// contains filtered or unexported fields
}

UnsetEnvVarError is used to indicate required environment variables for WAL-G.

func (UnsetEnvVarError) Error

func (e UnsetEnvVarError) Error() string

type UnsupportedFileTypeError

type UnsupportedFileTypeError struct {
	Path       string
	FileFormat string
}

UnsupportedFileTypeError is used to signal file types that are unsupported by WAL-G.

func (UnsupportedFileTypeError) Error

func (e UnsupportedFileTypeError) Error() string

type UntilEOFReader added in v0.1.11

type UntilEOFReader struct {
	// contains filtered or unexported fields
}

func NewUntilEofReader added in v0.1.11

func NewUntilEofReader(underlying io.Reader) *UntilEOFReader

func (*UntilEOFReader) Read added in v0.1.11

func (reader *UntilEOFReader) Read(p []byte) (n int, err error)

type ZeroReader

type ZeroReader struct{}

ZeroReader generates a slice of zeroes. Used to pad tar in cases where length of file changes.

func (*ZeroReader) Read

func (z *ZeroReader) Read(p []byte) (int, error)

type ZstdCompressor added in v0.1.11

type ZstdCompressor struct{}

func (ZstdCompressor) FileExtension added in v0.1.11

func (compressor ZstdCompressor) FileExtension() string

func (ZstdCompressor) NewWriter added in v0.1.11

func (compressor ZstdCompressor) NewWriter(writer io.Writer) ReaderFromWriteCloser

type ZstdDecompressor added in v0.1.11

type ZstdDecompressor struct{}

func (ZstdDecompressor) Decompress added in v0.1.11

func (decompressor ZstdDecompressor) Decompress(dst io.Writer, src io.Reader) error

func (ZstdDecompressor) FileExtension added in v0.1.11

func (decompressor ZstdDecompressor) FileExtension() string

type ZstdReaderFromWriter added in v0.1.11

type ZstdReaderFromWriter struct {
	zstd.Writer
}

func NewZstdReaderFromWriter added in v0.1.11

func NewZstdReaderFromWriter(dst io.Writer) *ZstdReaderFromWriter

func (*ZstdReaderFromWriter) ReadFrom added in v0.1.11

func (writer *ZstdReaderFromWriter) ReadFrom(reader io.Reader) (n int64, err error)

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL