Documentation ¶
Index ¶
- Constants
- Variables
- func Checksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- func FormatSnapshotPath(index int) string
- func FormatWALPath(index int) string
- func FormatWALSegmentPath(index int, offset int64) string
- func GenerationPath(root, generation string) (string, error)
- func GenerationsPath(root string) string
- func IsGenerationName(s string) bool
- func IsSnapshotPath(s string) bool
- func IsWALPath(s string) bool
- func ParseSnapshotPath(s string) (index int, err error)
- func ParseWALPath(s string) (index int, err error)
- func ParseWALSegmentPath(s string) (index int, offset int64, err error)
- func SnapshotPath(root, generation string, index int) (string, error)
- func SnapshotsPath(root, generation string) (string, error)
- func WALPath(root, generation string) (string, error)
- func WALSegmentPath(root, generation string, index int, offset int64) (string, error)
- type DB
- func (db *DB) BeginSnapshot()
- func (db *DB) CRC64(ctx context.Context) (uint64, Pos, error)
- func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (*Replica, string, error)
- func (db *DB) Checkpoint(ctx context.Context, mode string) (err error)
- func (db *DB) Close(ctx context.Context) (err error)
- func (db *DB) CurrentGeneration() (string, error)
- func (db *DB) CurrentShadowWALIndex(generation string) (index int, size int64, err error)
- func (db *DB) CurrentShadowWALPath(generation string) (string, error)
- func (db *DB) DirInfo() os.FileInfo
- func (db *DB) EndSnapshot()
- func (db *DB) FileInfo() os.FileInfo
- func (db *DB) GenerationNamePath() string
- func (db *DB) GenerationPath(generation string) string
- func (db *DB) MetaPath() string
- func (db *DB) Notify() <-chan struct{}
- func (db *DB) Open() (err error)
- func (db *DB) PageSize() int
- func (db *DB) Path() string
- func (db *DB) Pos() (Pos, error)
- func (db *DB) Replica(name string) *Replica
- func (db *DB) SQLDB() *sql.DB
- func (db *DB) SetDriverName(dn string)
- func (db *DB) SetMetaPath(mp string)
- func (db *DB) ShadowWALDir(generation string) string
- func (db *DB) ShadowWALPath(generation string, index int) string
- func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error)
- func (db *DB) SoftClose(ctx context.Context) (err error)
- func (db *DB) Sync(ctx context.Context) (err error)
- func (db *DB) UpdatedAt() (time.Time, error)
- func (db *DB) WALPath() string
- type Pos
- type Replica
- func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (generation string, updatedAt time.Time, err error)
- func (r *Replica) DB() *DB
- func (r *Replica) EnforceRetention(ctx context.Context) (err error)
- func (r *Replica) GenerationCreatedAt(ctx context.Context, generation string) (time.Time, error)
- func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (createdAt, updatedAt time.Time, err error)
- func (r *Replica) Logger() *slog.Logger
- func (r *Replica) Name() string
- func (r *Replica) Pos() Pos
- func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
- func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
- func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
- func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (int, error)
- func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
- func (r *Replica) Start(ctx context.Context) error
- func (r *Replica) Stop(hard bool) (err error)
- func (r *Replica) Sync(ctx context.Context) (err error)
- func (r *Replica) Validate(ctx context.Context) error
- type ReplicaClient
- type RestoreOptions
- type ShadowWALReader
- type SnapshotInfo
- type SnapshotInfoSlice
- type SnapshotInfoSliceIterator
- type SnapshotIterator
- type WALInfo
- type WALInfoSlice
- type WALSegmentInfo
- type WALSegmentInfoSlice
- type WALSegmentInfoSliceIterator
- type WALSegmentIterator
Constants ¶
const ( DefaultMonitorInterval = 1 * time.Second DefaultCheckpointInterval = 1 * time.Minute DefaultMinCheckpointPageN = 1000 DefaultMaxCheckpointPageN = 10000 DefaultTruncatePageN = 500000 )
Default DB settings.
const ( WALHeaderChecksumOffset = 24 WALFrameHeaderChecksumOffset = 16 )
SQLite WAL constants
const ( MetaDirSuffix = "-litestream" WALDirName = "wal" WALExt = ".wal" WALSegmentExt = ".wal.lz4" SnapshotExt = ".snapshot.lz4" GenerationNameLen = 16 )
Naming constants.
const ( CheckpointModePassive = "PASSIVE" CheckpointModeFull = "FULL" CheckpointModeRestart = "RESTART" CheckpointModeTruncate = "TRUNCATE" )
SQLite checkpoint modes.
const ( // WALHeaderSize is the size of the WAL header, in bytes. WALHeaderSize = 32 // WALFrameHeaderSize is the size of the WAL frame header, in bytes. WALFrameHeaderSize = 24 )
const ( DefaultSyncInterval = 1 * time.Second DefaultRetention = 24 * time.Hour DefaultRetentionCheckInterval = 1 * time.Hour )
Default replica settings.
const BusyTimeout = 1 * time.Second
BusyTimeout is the timeout to wait for EBUSY from SQLite.
const DefaultRestoreParallelism = 8
DefaultRestoreParallelism is the default parallelism when downloading WAL files.
const MaxIndex = 0x7FFFFFFF
MaxIndex is the maximum possible WAL index. If this index is reached then a new generation will be started.
Variables ¶
var ( ErrNoGeneration = errors.New("no generation available") ErrNoSnapshots = errors.New("no snapshots available") ErrChecksumMismatch = errors.New("invalid replica, checksum mismatch") )
Litestream errors.
Functions ¶
func FormatSnapshotPath ¶
FormatSnapshotPath formats a snapshot filename with a given index.
func FormatWALPath ¶
FormatWALPath formats a WAL filename with a given index.
func FormatWALSegmentPath ¶
FormatWALSegmentPath formats a WAL segment filename with a given index & offset.
func GenerationPath ¶
GenerationPath returns the path to a generation's root directory.
func GenerationsPath ¶
GenerationsPath returns the path to a generation root directory.
func IsGenerationName ¶
IsGenerationName returns true if s is the correct length and is only lowercase hex characters.
func IsSnapshotPath ¶
IsSnapshotPath returns true if s is a path to a snapshot file.
func ParseSnapshotPath ¶
ParseSnapshotPath returns the index for the snapshot. Returns an error if the path is not a valid snapshot path.
func ParseWALPath ¶
ParseWALPath returns the index for the WAL file. Returns an error if the path is not a valid WAL path.
func ParseWALSegmentPath ¶
ParseWALSegmentPath returns the index & offset for the WAL segment file. Returns an error if the path is not a valid wal segment path.
func SnapshotPath ¶
SnapshotPath returns the path to an uncompressed snapshot file.
func SnapshotsPath ¶
SnapshotsPath returns the path to a generation's snapshot directory.
Types ¶
type DB ¶
type DB struct { // Minimum threshold of WAL size, in pages, before a passive checkpoint. // A passive checkpoint will attempt a checkpoint but fail if there are // active transactions occurring at the same time. MinCheckpointPageN int // Maximum threshold of WAL size, in pages, before a forced checkpoint. // A forced checkpoint will block new transactions and wait for existing // transactions to finish before issuing a checkpoint and resetting the WAL. // // If zero, no checkpoints are forced. This can cause the WAL to grow // unbounded if there are always read transactions occurring. MaxCheckpointPageN int // Threshold of WAL size, in pages, before a forced truncation checkpoint. // A forced truncation checkpoint will block new transactions and wait for // existing transactions to finish before issuing a checkpoint and // truncating the WAL. // // If zero, no truncates are forced. This can cause the WAL to grow // unbounded if there's a sudden spike of changes between other // checkpoints. TruncatePageN int // Time between automatic checkpoints in the WAL. This is done to allow // more fine-grained WAL files so that restores can be performed with // better precision. CheckpointInterval time.Duration // Frequency at which to perform db sync. MonitorInterval time.Duration // List of replicas for the database. // Must be set before calling Open(). Replicas []*Replica // Where to send log messages, defaults to global slog with databas epath. Logger *slog.Logger // contains filtered or unexported fields }
DB represents a managed instance of a SQLite database in the file system.
func (*DB) BeginSnapshot ¶
func (db *DB) BeginSnapshot()
BeginSnapshot takes an internal snapshot lock preventing checkpoints.
When calling this the caller must also call EndSnapshot() once the snapshot is finished.
func (*DB) CRC64 ¶
CRC64 returns a CRC-64 ISO checksum of the database and its current position.
This function obtains a read lock so it prevents syncs from occurring until the operation is complete. The database will still be usable but it will be unable to checkpoint during this time.
If dst is set, the database file is copied to that location before checksum.
func (*DB) CalcRestoreTarget ¶
CalcRestoreTarget returns a replica & generation to restore from based on opt criteria.
func (*DB) Checkpoint ¶
Checkpoint performs a checkpoint on the WAL file.
func (*DB) Close ¶
Close releases the read lock & closes the database. This method should only be called by tests as it causes the underlying database to be checkpointed. Takes a context for final sync.
func (*DB) CurrentGeneration ¶
CurrentGeneration returns the name of the generation saved to the "generation" file in the meta data directory. Returns empty string if none exists.
func (*DB) CurrentShadowWALIndex ¶
CurrentShadowWALIndex returns the current WAL index & total size.
func (*DB) CurrentShadowWALPath ¶
CurrentShadowWALPath returns the path to the last shadow WAL in a generation.
func (*DB) DirInfo ¶
DirInfo returns the cached file stats for the parent directory of the database file when it was initialized.
func (*DB) EndSnapshot ¶
func (db *DB) EndSnapshot()
EndSnapshot releases the internal snapshot lock that prevents checkpoints.
func (*DB) FileInfo ¶
FileInfo returns the cached file stats for the database file when it was initialized.
func (*DB) GenerationNamePath ¶
GenerationNamePath returns the path of the name of the current generation.
func (*DB) GenerationPath ¶
GenerationPath returns the path of a single generation. Panics if generation is blank.
func (*DB) Notify ¶
func (db *DB) Notify() <-chan struct{}
Notify returns a channel that closes when the shadow WAL changes.
func (*DB) PageSize ¶
PageSize returns the page size of the underlying database. Only valid after database exists & Init() has successfully run.
func (*DB) SetDriverName ¶
func (*DB) SetMetaPath ¶
SetMetaPath sets the path to database metadata.
func (*DB) ShadowWALDir ¶
ShadowWALDir returns the path of the shadow wal directory. Panics if generation is blank.
func (*DB) ShadowWALPath ¶
ShadowWALPath returns the path of a single shadow WAL file. Panics if generation is blank or index is negative.
func (*DB) ShadowWALReader ¶
func (db *DB) ShadowWALReader(pos Pos) (r *ShadowWALReader, err error)
ShadowWALReader opens a reader for a shadow WAL file at a given position. If the reader is at the end of the file, it attempts to return the next file.
The caller should check Pos() & Size() on the returned reader to check offset.
func (*DB) SoftClose ¶
SoftClose closes everything but the underlying db connection. This method is available because the binary needs to avoid closing the database on exit to prevent autocheckpointing.
type Pos ¶
type Pos struct { Generation string // generation name Index int // wal file index Offset int64 // offset within wal file }
Pos is a position in the WAL for a generation.
type Replica ¶
type Replica struct { // Client used to connect to the remote replica. Client ReplicaClient // Time between syncs with the shadow WAL. SyncInterval time.Duration // Frequency to create new snapshots. SnapshotInterval time.Duration // Time to keep snapshots and related WAL files. // Database is snapshotted after interval, if needed, and older WAL files are discarded. Retention time.Duration // Time between checks for retention. RetentionCheckInterval time.Duration // Time between validation checks. ValidationInterval time.Duration // If true, replica monitors database for changes automatically. // Set to false if replica is being used synchronously (such as in tests). MonitorEnabled bool // Encryption identities and recipients AgeIdentities []age.Identity AgeRecipients []age.Recipient // contains filtered or unexported fields }
Replica connects a database to a replication destination via a ReplicaClient. The replica manages periodic synchronization and maintaining the current replica position.
func NewReplica ¶
func (*Replica) CalcRestoreTarget ¶
func (r *Replica) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (generation string, updatedAt time.Time, err error)
CalcRestoreTarget returns a generation to restore from.
func (*Replica) EnforceRetention ¶
EnforceRetention forces a new snapshot once the retention interval has passed. Older snapshots and WAL files are then removed.
func (*Replica) GenerationCreatedAt ¶
GenerationCreatedAt returns the earliest creation time of any snapshot. Returns zero time if no snapshots exist.
func (*Replica) GenerationTimeBounds ¶
func (r *Replica) GenerationTimeBounds(ctx context.Context, generation string) (createdAt, updatedAt time.Time, err error)
GenerationTimeBounds returns the creation time & last updated time of a generation. Returns zero time if no snapshots or WAL segments exist.
func (*Replica) Pos ¶
Pos returns the current replicated position. Returns a zero value if the current position cannot be determined.
func (*Replica) Restore ¶
func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error)
Replica restores the database from a replica based on the options given. This method will restore into opt.OutputPath, if specified, or into the DB's original database path. It can optionally restore from a specific replica or generation or it will automatically choose the best one. Finally, a timestamp can be specified to restore the database to a specific point-in-time.
func (*Replica) Snapshot ¶
func (r *Replica) Snapshot(ctx context.Context) (info SnapshotInfo, err error)
Snapshot copies the entire database to the replica path.
func (*Replica) SnapshotIndexAt ¶
func (r *Replica) SnapshotIndexAt(ctx context.Context, generation string, timestamp time.Time) (int, error)
SnapshotIndexAt returns the highest index for a snapshot within a generation that occurs before timestamp. If timestamp is zero, returns the latest snapshot.
func (*Replica) SnapshotIndexByIndex ¶
func (r *Replica) SnapshotIndexByIndex(ctx context.Context, generation string, index int) (int, error)
SnapshotIndexbyIndex returns the highest index for a snapshot within a generation that occurs before a given index. If index is MaxInt32, returns the latest snapshot.
func (*Replica) Snapshots ¶
func (r *Replica) Snapshots(ctx context.Context) ([]SnapshotInfo, error)
Snapshots returns a list of all snapshots across all generations.
func (*Replica) Stop ¶
Stop cancels any outstanding replication and blocks until finished.
Performing a hard stop will close the DB file descriptor which could release locks on per-process locks. Hard stops should only be performed when stopping the entire process.
type ReplicaClient ¶
type ReplicaClient interface { // Returns the type of client. Type() string // Returns a list of available generations. Order is undefined. Generations(ctx context.Context) ([]string, error) // Deletes all snapshots & WAL segments within a generation. DeleteGeneration(ctx context.Context, generation string) error // Returns an iterator of all snapshots within a generation on the replica. Order is undefined. Snapshots(ctx context.Context, generation string) (SnapshotIterator, error) // Writes LZ4 compressed snapshot data to the replica at a given index // within a generation. Returns metadata for the snapshot. WriteSnapshot(ctx context.Context, generation string, index int, r io.Reader) (SnapshotInfo, error) // Deletes a snapshot with the given generation & index. DeleteSnapshot(ctx context.Context, generation string, index int) error // Returns a reader that contains LZ4 compressed snapshot data for a // given index within a generation. Returns an os.ErrNotFound error if // the snapshot does not exist. SnapshotReader(ctx context.Context, generation string, index int) (io.ReadCloser, error) // Returns an iterator of all WAL segments within a generation on the replica. Order is undefined. WALSegments(ctx context.Context, generation string) (WALSegmentIterator, error) // Writes an LZ4 compressed WAL segment at a given position. // Returns metadata for the written segment. WriteWALSegment(ctx context.Context, pos Pos, r io.Reader) (WALSegmentInfo, error) // Deletes one or more WAL segments at the given positions. DeleteWALSegments(ctx context.Context, a []Pos) error // Returns a reader that contains an LZ4 compressed WAL segment at a given // index/offset within a generation. Returns an os.ErrNotFound error if the // WAL segment does not exist. WALSegmentReader(ctx context.Context, pos Pos) (io.ReadCloser, error) }
ReplicaClient represents client to connect to a Replica.
type RestoreOptions ¶
type RestoreOptions struct { // Target path to restore into. // If blank, the original DB path is used. OutputPath string // Specific replica to restore from. // If blank, all replicas are considered. ReplicaName string // Specific generation to restore from. // If blank, all generations considered. Generation string // Specific index to restore from. // Set to math.MaxInt32 to ignore index. Index int // Point-in-time to restore database. // If zero, database restore to most recent state available. Timestamp time.Time // Specifies how many WAL files are downloaded in parallel during restore. Parallelism int DriverName string }
RestoreOptions represents options for DB.Restore().
func NewRestoreOptions ¶
func NewRestoreOptions() RestoreOptions
NewRestoreOptions returns a new instance of RestoreOptions with defaults.
type ShadowWALReader ¶
type ShadowWALReader struct {
// contains filtered or unexported fields
}
ShadowWALReader represents a reader for a shadow WAL file that tracks WAL position.
func (*ShadowWALReader) Close ¶
func (r *ShadowWALReader) Close() error
Close closes the underlying WAL file handle.
func (*ShadowWALReader) N ¶
func (r *ShadowWALReader) N() int64
N returns the remaining bytes in the reader.
func (*ShadowWALReader) Name ¶
func (r *ShadowWALReader) Name() string
Name returns the filename of the underlying file.
func (*ShadowWALReader) Pos ¶
func (r *ShadowWALReader) Pos() Pos
Pos returns the current WAL position.
type SnapshotInfo ¶
SnapshotInfo represents file information about a snapshot.
func FilterSnapshotsAfter ¶
func FilterSnapshotsAfter(a []SnapshotInfo, t time.Time) []SnapshotInfo
FilterSnapshotsAfter returns all snapshots that were created on or after t.
func FindMinSnapshotByGeneration ¶
func FindMinSnapshotByGeneration(a []SnapshotInfo, generation string) *SnapshotInfo
FindMinSnapshotByGeneration finds the snapshot with the lowest index in a generation.
func SliceSnapshotIterator ¶
func SliceSnapshotIterator(itr SnapshotIterator) ([]SnapshotInfo, error)
SliceSnapshotIterator returns all snapshots from an iterator as a slice.
func (*SnapshotInfo) Pos ¶
func (info *SnapshotInfo) Pos() Pos
Pos returns the WAL position when the snapshot was made.
type SnapshotInfoSlice ¶
type SnapshotInfoSlice []SnapshotInfo
SnapshotInfoSlice represents a slice of snapshot metadata.
func (SnapshotInfoSlice) Len ¶
func (a SnapshotInfoSlice) Len() int
func (SnapshotInfoSlice) Less ¶
func (a SnapshotInfoSlice) Less(i, j int) bool
func (SnapshotInfoSlice) Swap ¶
func (a SnapshotInfoSlice) Swap(i, j int)
type SnapshotInfoSliceIterator ¶
type SnapshotInfoSliceIterator struct {
// contains filtered or unexported fields
}
SnapshotInfoSliceIterator represents an iterator for iterating over a slice of snapshots.
func NewSnapshotInfoSliceIterator ¶
func NewSnapshotInfoSliceIterator(a []SnapshotInfo) *SnapshotInfoSliceIterator
NewSnapshotInfoSliceIterator returns a new instance of SnapshotInfoSliceIterator.
func (*SnapshotInfoSliceIterator) Close ¶
func (itr *SnapshotInfoSliceIterator) Close() error
Close always returns nil.
func (*SnapshotInfoSliceIterator) Err ¶
func (itr *SnapshotInfoSliceIterator) Err() error
Err always returns nil.
func (*SnapshotInfoSliceIterator) Next ¶
func (itr *SnapshotInfoSliceIterator) Next() bool
Next moves to the next snapshot. Returns true if another snapshot is available.
func (*SnapshotInfoSliceIterator) Snapshot ¶
func (itr *SnapshotInfoSliceIterator) Snapshot() SnapshotInfo
Snapshot returns the metadata from the currently positioned snapshot.
type SnapshotIterator ¶
type SnapshotIterator interface { io.Closer // Prepares the the next snapshot for reading with the Snapshot() method. // Returns true if another snapshot is available. Returns false if no more // snapshots are available or if an error occured. Next() bool // Returns an error that occurred during iteration. Err() error // Returns metadata for the currently positioned snapshot. Snapshot() SnapshotInfo }
SnapshotIterator represents an iterator over a collection of snapshot metadata.
type WALInfoSlice ¶
type WALInfoSlice []WALInfo
WALInfoSlice represents a slice of WAL metadata.
func (WALInfoSlice) Len ¶
func (a WALInfoSlice) Len() int
func (WALInfoSlice) Less ¶
func (a WALInfoSlice) Less(i, j int) bool
func (WALInfoSlice) Swap ¶
func (a WALInfoSlice) Swap(i, j int)
type WALSegmentInfo ¶
type WALSegmentInfo struct { Generation string Index int Offset int64 Size int64 CreatedAt time.Time }
WALSegmentInfo represents file information about a WAL segment file.
func SliceWALSegmentIterator ¶
func SliceWALSegmentIterator(itr WALSegmentIterator) ([]WALSegmentInfo, error)
SliceWALSegmentIterator returns all WAL segment files from an iterator as a slice.
func (*WALSegmentInfo) Pos ¶
func (info *WALSegmentInfo) Pos() Pos
Pos returns the WAL position when the segment was made.
type WALSegmentInfoSlice ¶
type WALSegmentInfoSlice []WALSegmentInfo
WALSegmentInfoSlice represents a slice of WAL segment metadata.
func (WALSegmentInfoSlice) Len ¶
func (a WALSegmentInfoSlice) Len() int
func (WALSegmentInfoSlice) Less ¶
func (a WALSegmentInfoSlice) Less(i, j int) bool
func (WALSegmentInfoSlice) Swap ¶
func (a WALSegmentInfoSlice) Swap(i, j int)
type WALSegmentInfoSliceIterator ¶
type WALSegmentInfoSliceIterator struct {
// contains filtered or unexported fields
}
WALSegmentInfoSliceIterator represents an iterator for iterating over a slice of wal segments.
func NewWALSegmentInfoSliceIterator ¶
func NewWALSegmentInfoSliceIterator(a []WALSegmentInfo) *WALSegmentInfoSliceIterator
NewWALSegmentInfoSliceIterator returns a new instance of WALSegmentInfoSliceIterator.
func (*WALSegmentInfoSliceIterator) Close ¶
func (itr *WALSegmentInfoSliceIterator) Close() error
Close always returns nil.
func (*WALSegmentInfoSliceIterator) Err ¶
func (itr *WALSegmentInfoSliceIterator) Err() error
Err always returns nil.
func (*WALSegmentInfoSliceIterator) Next ¶
func (itr *WALSegmentInfoSliceIterator) Next() bool
Next moves to the next wal segment. Returns true if another segment is available.
func (*WALSegmentInfoSliceIterator) WALSegment ¶
func (itr *WALSegmentInfoSliceIterator) WALSegment() WALSegmentInfo
WALSegment returns the metadata from the currently positioned wal segment.
type WALSegmentIterator ¶
type WALSegmentIterator interface { io.Closer // Prepares the the next WAL for reading with the WAL() method. // Returns true if another WAL is available. Returns false if no more // WAL files are available or if an error occured. Next() bool // Returns an error that occurred during iteration. Err() error // Returns metadata for the currently positioned WAL segment file. WALSegment() WALSegmentInfo }
WALSegmentIterator represents an iterator over a collection of WAL segments.