Documentation ¶
Index ¶
- Constants
- Variables
- func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- func ComparePos(a, b Pos) (int, error)
- func FormatIndex(index int) string
- func FormatOffset(offset int64) string
- func IsGenerationName(s string) bool
- func ParseIndex(s string) (int, error)
- func ParseOffset(s string) (int64, error)
- type DB
- func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error)
- func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (*Replica, string, error)
- func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)
- func (db *DB) Close() (err error)
- func (db *DB) CurrentGeneration() (string, error)
- func (db *DB) DeleteWALSegments(ctx context.Context, a []Pos) error
- func (db *DB) DirInfo() os.FileInfo
- func (db *DB) FileInfo() os.FileInfo
- func (db *DB) GenerationNamePath() string
- func (db *DB) GenerationPath(generation string) string
- func (db *DB) MetaPath() string
- func (db *DB) Notify() <-chan struct{}
- func (db *DB) Open() (err error)
- func (db *DB) PageSize() int
- func (db *DB) Path() string
- func (db *DB) Pos() Pos
- func (db *DB) Replica(name string) *Replica
- func (db *DB) SQLDB() *sql.DB
- func (db *DB) ShadowWALDir(generation string) string
- func (db *DB) SoftClose() (err error)
- func (db *DB) Sync(ctx context.Context) (err error)
- func (db *DB) UpdatedAt() (time.Time, error)
- func (db *DB) WALPath() string
- func (db *DB) WALReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error)
- func (db *DB) WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error)
- func (db *DB) WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error)
- type Pos
- type Replica
- func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (generation string, updatedAt time.Time, err error)
- func (r *Replica) DB() *DB
- func (r *Replica) EnforceRetention(ctx context.Context) (err error)
- func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error)
- func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (createdAt, updatedAt time.Time, err error)
- func (r *Replica) Name() string
- func (r *Replica) Pos() Pos
- func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
- func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
- func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
- func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (int, error)
- func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
- func (r *Replica) Start(ctx context.Context) error
- func (r *Replica) Stop(hard bool) (err error)
- func (r *Replica) Sync(ctx context.Context) (err error)
- func (r *Replica) Validate(ctx context.Context) error
- type ReplicaClient
- type RestoreOptions
- type SnapshotInfo
- type SnapshotInfoSlice
- type SnapshotInfoSliceIterator
- type SnapshotIterator
- type WALInfo
- type WALInfoSlice
- type WALSegmentInfo
- type WALSegmentInfoSlice
- type WALSegmentInfoSliceIterator
- type WALSegmentIterator
Constants ¶
const ( DefaultMonitorInterval = 1 * time.Second DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 )
Default DB settings.
const ( WALHeaderChecksumOffset = 24 WALFrameHeaderChecksumOffset = 16 )
SQLite WAL constants
const ( MetaDirSuffix = "-litestream" WALDirName = "wal" WALExt = ".wal" WALSegmentExt = ".wal.lz4" SnapshotExt = ".snapshot.lz4" GenerationNameLen = 16 )
Naming constants.
const ( CheckpointModePassive = "PASSIVE" CheckpointModeFull = "FULL" CheckpointModeRestart = "RESTART" CheckpointModeTruncate = "TRUNCATE" )
SQLite checkpoint modes.
const ( // WALHeaderSize is the size of the WAL header, in bytes. WALHeaderSize = 32 // WALFrameHeaderSize is the size of the WAL frame header, in bytes. WALFrameHeaderSize = 24 )
const ( DefaultSyncInterval = 1 * time.Second DefaultRetention = 24 * time.Hour DefaultRetentionCheckInterval = 1 * time.Hour )
Default replica settings.
const BusyTimeout = 1 * time.Second
BusyTimeout is the timeout to wait for EBUSY from SQLite.
const DefaultRestoreParallelism = 8
DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const MaxIndex = 0x7FFFFFFF
MaxIndex is the maximum possible WAL index. If this index is reached then a new generation will be started.
Variables ¶
var ( ErrNoGeneration = errors.New("no generation available") ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") )
Litestream errors.
var ( // LogWriter is the destination writer for all logging. LogWriter = os.Stderr // LogFlags are the flags passed to log.New(). LogFlags = 0 )
var Tracef = func(format string, a ...interface{}) {}
Tracef is used for low-level tracing.
Functions ¶
func ComparePos ¶
ComparePos returns -1 if a is less than b, 1 if a is greater than b, and returns 0 if a and b are equal. Only index & offset are compared. Returns an error if generations are not equal.
func FormatIndex ¶
FormatIndex formats an index as an 8-character hex value.
func FormatOffset ¶
FormatOffset formats an offset as an 8-character hex value.
func IsGenerationName ¶
IsGenerationName returns true if s is the correct length and is only lowercase hex characters.
func ParseIndex ¶
ParseIndex parses a hex-formatted index into an integer.
func ParseOffset ¶
ParseOffset parses a hex-formatted offset into an integer.
Types ¶
type DB ¶
type DB struct { // Minimum threshold of WAL size, in pages, before a passive checkpoint. // A passive checkpoint will attempt a checkpoint but fail if there are // active transactions occurring at the same time. MinCheckpointPageN int // Maximum threshold of WAL size, in pages, before a forced checkpoint. // A forced checkpoint will block new transactions and wait for existing // transactions to finish before issuing a checkpoint and resetting the WAL. // // If zero, no checkpoints are forced. This can cause the WAL to grow // unbounded if there are always read transactions occurring. MaxCheckpointPageN int // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. CheckpointInterval time.Duration // Frequency at which to perform db sync. MonitorInterval time.Duration // List of replicas for the database. // Must be set before calling Open(). Replicas []*Replica Logger *log.Logger // contains filtered or unexported fields }
DB represents a managed instance of a SQLite database in the file system.
func (*DB) CRC64 ¶ added in v0.3.0
CRC64 returns a CRC-64 ISO checksum of the database and its current position.
This function obtains a read lock so it prevents syncs from occurring until the operation is complete. The database will still be usable but it will be unable to checkpoint during this time.
If dst is set, the database file is copied to that location before checksum.
func (*DB) CalcRestoreTarget ¶ added in v0.3.1
CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
func (*DB) Checkpoint ¶ added in v0.3.0
Checkpoint performs a checkpoint on the WAL file.
func (*DB) Close ¶
Close releases the read lock & closes the database. This method should only be called by tests as it causes the underlying database to be checkpointed.
func (*DB) CurrentGeneration ¶
CurrentGeneration returns the name of the generation saved to the "generation" file in the meta data directory. Returns empty string if none exists.
func (*DB) DeleteWALSegments ¶
DeleteWALSegments deletes WAL segments at the given positions.
func (*DB) DirInfo ¶ added in v0.3.5
DirInfo returns the cached file stats for the parent directory of the database file when it was initialized.
func (*DB) FileInfo ¶ added in v0.3.5
FileInfo returns the cached file stats for the database file when it was initialized.
func (*DB) GenerationNamePath ¶
GenerationNamePath returns the path of the name of the current generation.
func (*DB) GenerationPath ¶
GenerationPath returns the path of a single generation. Panics if generation is blank.
func (*DB) Notify ¶
func (db *DB) Notify() <-chan struct{}
Notify returns a channel that closes when the shadow WAL changes.
func (*DB) PageSize ¶
PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.
func (*DB) Pos ¶ added in v0.2.0
Pos returns the cached position of the database. Returns a zero position if no position has been calculated or if there is no generation.
func (*DB) ShadowWALDir ¶ added in v0.2.0
ShadowWALDir returns the path of the shadow wal directory. Panics if generation is blank.
func (*DB) SoftClose ¶
SoftClose closes everything but the underlying db connection. This method is available because the binary needs to avoid closing the database on exit to prevent autocheckpointing.
func (*DB) WALReader ¶
func (db *DB) WALReader(ctx context.Context, generation string, index int) (_ io.ReadCloser, err error)
WALReader returns the entire uncompressed WAL file for a given index.
func (*DB) WALSegmentReader ¶
WALSegmentReader returns a reader for a section of WAL data at the given position. Returns os.ErrNotExist if no matching index/offset is found.
func (*DB) WALSegments ¶
WALSegments returns an iterator over all available WAL files for a generation.
type Pos ¶
type Pos struct { Generation string // generation name Index int // wal file index Offset int64 // offset within wal file }
Pos is a position in the WAL for a generation.
type Replica ¶
type Replica struct { // Client used to connect to the remote replica. Client ReplicaClient // Time between syncs with the shadow WAL. SyncInterval time.Duration // Frequency to create new snapshots. SnapshotInterval time.Duration // Time to keep snapshots and related WAL files. // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration // Time between checks for retention. RetentionCheckInterval time.Duration // Time between validation checks. ValidationInterval time.Duration // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool Logger *log.Logger // contains filtered or unexported fields }
Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.
func NewReplica ¶ added in v0.3.5
func (*Replica) CalcRestoreTarget ¶ added in v0.3.5
func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (generation string, updatedAt time.Time, err error)
CalcRestoreTarget returns a generation to restore from.
func (*Replica) DB ¶ added in v0.3.0
DB returns a reference to the database the replica is attached to, if any.
func (*Replica) EnforceRetention ¶ added in v0.3.5
EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.
func (*Replica) GenerationCreatedAt ¶ added in v0.3.5
GenerationCreatedAt returns the earliest creation time of any snapshot. Returns zero time if no snapshots exist.
func (*Replica) GenerationTimeBounds ¶ added in v0.3.5
func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (createdAt, updatedAt time.Time, err error)
GenerationTimeBounds returns the creation time & last updated time of a generation. Returns zero time if no snapshots or WAL segments exist.
func (*Replica) Pos ¶ added in v0.3.5
Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.
func (*Replica) Restore ¶ added in v0.3.5
func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
Replica restores the database from a replica based on the options given. This method will restore into opt.OutputPath, if specified, or into the DB's original database path. It can optionally restore from a specific replica or generation or it will automatically choose the best one. Finally, a timestamp can be specified to restore the database to a specific point-in-time.
func (*Replica) Snapshot ¶ added in v0.3.5
func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
Snapshot copies the entire database to the replica path.
func (*Replica) SnapshotIndexAt ¶
func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
SnapshotIndexAt returns the highest index for a snapshot within a generation that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (*Replica) SnapshotIndexByIndex ¶ added in v0.3.5
func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (int, error)
SnapshotIndexbyIndex returns the highest index for a snapshot within a generation that occurs before a given index. If index is MaxInt32, returns the latest snapshot.
func (*Replica) Snapshots ¶ added in v0.2.0
func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
Snapshots returns a list of all snapshots across all generations.
func (*Replica) Stop ¶
Stop cancels any outstanding replication and blocks until finished.
Performing a hard stop will close the DB file descriptor which could release locks on per-process locks. Hard stops should only be performed when stopping the entire process.
type ReplicaClient ¶ added in v0.3.5
type ReplicaClient interface { // Returns the type of client. Type() string // Returns a list of available generations. Generations(ctx context.Context) ([]string, error) // Deletes all snapshots & WAL segments within a generation. DeleteGeneration(ctx context.Context, generation string) error // Returns an iterator of all snapshots within a generation on the replica. Snapshots(ctx context.Context, generation string) (SnapshotIterator, error) // Writes LZ4 compressed snapshot data to the replica at a given index // within a generation. Returns metadata for the snapshot. WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error) // Deletes a snapshot with the given generation & index. DeleteSnapshot(ctx context.Context, generation string, index int) error // Returns a reader that contains LZ4 compressed snapshot data for a // given index within a generation. Returns an os.ErrNotFound error if // the snapshot does not exist. SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) // Returns an iterator of all WAL segments within a generation on the replica. WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) // Writes an LZ4 compressed WAL segment at a given position. // Returns metadata for the written segment. WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error) // Deletes one or more WAL segments at the given positions. DeleteWALSegments(ctx context.Context, a []Pos) error // Returns a reader that contains an LZ4 compressed WAL segment at a given // index/offset within a generation. Returns an os.ErrNotFound error if the // WAL segment does not exist. WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) }
ReplicaClient represents client to connect to a Replica.
type RestoreOptions ¶
type RestoreOptions struct { // Target path to restore into. // If blank, the original DB path is used. OutputPath string // Specific replica to restore from. // If blank, all replicas are considered. ReplicaName string // Specific generation to restore from. // If blank, all generations considered. Generation string // Specific index to restore from. // Set to math.MaxInt32 to ignore index. Index int // Point-in-time to restore database. // If zero, database restore to most recent state available. Timestamp time.Time // Specifies how many WAL files are downloaded in parallel during restore. Parallelism int // Logging settings. Logger *log.Logger Verbose bool }
RestoreOptions represents options for DB.Restore().
func NewRestoreOptions ¶ added in v0.2.0
func NewRestoreOptions() RestoreOptions
NewRestoreOptions returns a new instance of RestoreOptions with defaults.
type SnapshotInfo ¶ added in v0.2.0
SnapshotInfo represents file information about a snapshot.
func FilterSnapshotsAfter ¶ added in v0.3.0
func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo
FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FindMinSnapshotByGeneration ¶ added in v0.3.0
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo
FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func SliceSnapshotIterator ¶ added in v0.3.5
func SliceSnapshotIterator(itr SnapshotIterator) ([]SnapshotInfo, error)
SliceSnapshotIterator returns all snapshots from an iterator as a slice.
func (*SnapshotInfo) Pos ¶ added in v0.3.5
func (info *SnapshotInfo) Pos() Pos
Pos returns the WAL position when the snapshot was made.
type SnapshotInfoSlice ¶ added in v0.3.5
type SnapshotInfoSlice []SnapshotInfo
SnapshotInfoSlice represents a slice of snapshot metadata.
func (SnapshotInfoSlice) Len ¶ added in v0.3.5
func (a SnapshotInfoSlice) Len() int
func (SnapshotInfoSlice) Less ¶ added in v0.3.5
func (a SnapshotInfoSlice) Less(i, j int) bool
func (SnapshotInfoSlice) Swap ¶ added in v0.3.5
func (a SnapshotInfoSlice) Swap(i, j int)
type SnapshotInfoSliceIterator ¶ added in v0.3.5
type SnapshotInfoSliceIterator struct {
// contains filtered or unexported fields
}
SnapshotInfoSliceIterator represents an iterator for iterating over a slice of snapshots.
func NewSnapshotInfoSliceIterator ¶ added in v0.3.5
func NewSnapshotInfoSliceIterator(a []SnapshotInfo) *SnapshotInfoSliceIterator
NewSnapshotInfoSliceIterator returns a new instance of SnapshotInfoSliceIterator.
func (*SnapshotInfoSliceIterator) Close ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Close() error
Close always returns nil.
func (*SnapshotInfoSliceIterator) Err ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Err() error
Err always returns nil.
func (*SnapshotInfoSliceIterator) Next ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Next() bool
Next moves to the next snapshot. Returns true if another snapshot is available.
func (*SnapshotInfoSliceIterator) Snapshot ¶ added in v0.3.5
func (itr *SnapshotInfoSliceIterator) Snapshot() SnapshotInfo
Snapshot returns the metadata from the currently positioned snapshot.
type SnapshotIterator ¶ added in v0.3.5
type SnapshotIterator interface { io.Closer // Prepares the the next snapshot for reading with the Snapshot() method. // Returns true if another snapshot is available. Returns false if no more // snapshots are available or if an error occured. Next() bool // Returns an error that occurred during iteration. Err() error // Returns metadata for the currently positioned snapshot. Snapshot() SnapshotInfo }
SnapshotIterator represents an iterator over a collection of snapshot metadata.
type WALInfoSlice ¶ added in v0.3.5
type WALInfoSlice []WALInfo
WALInfoSlice represents a slice of WAL metadata.
func (WALInfoSlice) Len ¶ added in v0.3.5
func (a WALInfoSlice) Len() int
func (WALInfoSlice) Less ¶ added in v0.3.5
func (a WALInfoSlice) Less(i, j int) bool
func (WALInfoSlice) Swap ¶ added in v0.3.5
func (a WALInfoSlice) Swap(i, j int)
type WALSegmentInfo ¶ added in v0.3.5
type WALSegmentInfo struct { Generation string Index int Offset int64 Size int64 CreatedAt time.Time }
WALSegmentInfo represents file information about a WAL segment file.
func SliceWALSegmentIterator ¶ added in v0.3.5
func SliceWALSegmentIterator(itr WALSegmentIterator) ([]WALSegmentInfo, error)
SliceWALSegmentIterator returns all WAL segment files from an iterator as a slice.
func (*WALSegmentInfo) Pos ¶ added in v0.3.5
func (info *WALSegmentInfo) Pos() Pos
Pos returns the WAL position when the segment was made.
type WALSegmentInfoSlice ¶ added in v0.3.5
type WALSegmentInfoSlice []WALSegmentInfo
WALSegmentInfoSlice represents a slice of WAL segment metadata.
func (WALSegmentInfoSlice) Len ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Len() int
func (WALSegmentInfoSlice) Less ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Less(i, j int) bool
func (WALSegmentInfoSlice) Swap ¶ added in v0.3.5
func (a WALSegmentInfoSlice) Swap(i, j int)
type WALSegmentInfoSliceIterator ¶ added in v0.3.5
type WALSegmentInfoSliceIterator struct {
// contains filtered or unexported fields
}
WALSegmentInfoSliceIterator represents an iterator for iterating over a slice of wal segments.
func NewWALSegmentInfoSliceIterator ¶ added in v0.3.5
func NewWALSegmentInfoSliceIterator(a []WALSegmentInfo) *WALSegmentInfoSliceIterator
NewWALSegmentInfoSliceIterator returns a new instance of WALSegmentInfoSliceIterator.
func (*WALSegmentInfoSliceIterator) Close ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Close() error
Close always returns nil.
func (*WALSegmentInfoSliceIterator) Err ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Err() error
Err always returns nil.
func (*WALSegmentInfoSliceIterator) Next ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) Next() bool
Next moves to the next wal segment. Returns true if another segment is available.
func (*WALSegmentInfoSliceIterator) WALSegment ¶ added in v0.3.5
func (itr *WALSegmentInfoSliceIterator) WALSegment() WALSegmentInfo
WALSegment returns the metadata from the currently positioned wal segment.
type WALSegmentIterator ¶ added in v0.3.5
type WALSegmentIterator interface { io.Closer // Prepares the the next WAL for reading with the WAL() method. // Returns true if another WAL is available. Returns false if no more // WAL files are available or if an error occured. Next() bool // Returns an error that occurred during iteration. Err() error // Returns metadata for the currently positioned WAL segment file. WALSegment() WALSegmentInfo }
WALSegmentIterator represents an iterator over a collection of WAL segments.