Documentation ¶
Index ¶
- Constants
- Variables
- func ContainsLockType(a []LockType, typ LockType) bool
- func FormatNodeID(id uint64) string
- func GenerateClusterID() string
- func JournalChecksum(data []byte, initial uint32) uint32
- func ParseNodeID(s string) (uint64, error)
- func TrimName(name string) string
- func ValidateClusterID(id string) error
- func WALChecksum(bo binary.ByteOrder, s0, s1 uint32, b []byte) (uint32, uint32)
- func WriteStreamFrame(w io.Writer, f StreamFrame) error
- type BackupClient
- type ChangeSetSubscriber
- func (s *ChangeSetSubscriber) Close() error
- func (s *ChangeSetSubscriber) DirtySet() map[string]struct{}
- func (s *ChangeSetSubscriber) HandoffCh() chan string
- func (s *ChangeSetSubscriber) MarkDirty(name string)
- func (s *ChangeSetSubscriber) NodeID() uint64
- func (s *ChangeSetSubscriber) NotifyCh() <-chan struct{}
- type Client
- type DB
- func (db *DB) AcquireHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error)
- func (db *DB) AcquireRemoteHaltLock(ctx context.Context, lockID int64) (_ *HaltLock, retErr error)
- func (db *DB) AcquireWriteLock(ctx context.Context, fn func() error) (_ *GuardSet, err error)
- func (db *DB) ApplyLTXNoLock(path string, fatalOnError bool) (retErr error)
- func (db *DB) CanLock(ctx context.Context, owner uint64, lockTypes []LockType) (bool, RWMutexState)
- func (db *DB) CanRLock(ctx context.Context, owner uint64, lockTypes []LockType) bool
- func (db *DB) Checkpoint(ctx context.Context) (err error)
- func (db *DB) CheckpointNoLock(ctx context.Context) (err error)
- func (db *DB) CloseDatabase(ctx context.Context, f *os.File, owner uint64) error
- func (db *DB) CloseJournal(ctx context.Context, f *os.File, owner uint64) error
- func (db *DB) CloseSHM(ctx context.Context, f *os.File, owner uint64) error
- func (db *DB) CloseWAL(ctx context.Context, f *os.File, owner uint64) error
- func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error)
- func (db *DB) CommitWAL(ctx context.Context) (err error)
- func (db *DB) CreateGuardSetIfNotExists(owner uint64) *GuardSet
- func (db *DB) CreateJournal() (*os.File, error)
- func (db *DB) CreateSHM() (*os.File, error)
- func (db *DB) CreateWAL() (*os.File, error)
- func (db *DB) DatabasePath() string
- func (db *DB) Drop(ctx context.Context) (err error)
- func (db *DB) EnforceHaltLockExpiration(ctx context.Context)
- func (db *DB) EnforceRetention(ctx context.Context, minTime time.Time) error
- func (db *DB) Export(ctx context.Context, dst io.Writer) (ltx.Pos, error)
- func (db *DB) GuardSet(owner uint64) *GuardSet
- func (db *DB) HWM() ltx.TXID
- func (db *DB) HasRemoteHaltLock() bool
- func (db *DB) Import(ctx context.Context, r io.Reader) error
- func (db *DB) InWriteTx() bool
- func (db *DB) JournalPath() string
- func (db *DB) LTXDir() string
- func (db *DB) LTXPath(minTXID, maxTXID ltx.TXID) string
- func (db *DB) Mode() DBMode
- func (db *DB) Name() string
- func (db *DB) Open() error
- func (db *DB) OpenDatabase(ctx context.Context) (*os.File, error)
- func (db *DB) OpenJournal(ctx context.Context) (*os.File, error)
- func (db *DB) OpenLTXFile(txID ltx.TXID) (*os.File, error)
- func (db *DB) OpenSHM(ctx context.Context) (*os.File, error)
- func (db *DB) OpenWAL(ctx context.Context) (*os.File, error)
- func (db *DB) PageN() uint32
- func (db *DB) Path() string
- func (db *DB) Pos() ltx.Pos
- func (db *DB) ReadDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
- func (db *DB) ReadJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
- func (db *DB) ReadLTXDir() ([]fs.DirEntry, error)
- func (db *DB) ReadSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
- func (db *DB) ReadWALAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
- func (db *DB) Recover(ctx context.Context) error
- func (db *DB) ReleaseHaltLock(ctx context.Context, id int64)
- func (db *DB) ReleaseRemoteHaltLock(ctx context.Context, lockID int64) (retErr error)
- func (db *DB) RemoteHaltLock() *HaltLock
- func (db *DB) RemoveJournal(ctx context.Context) error
- func (db *DB) RemoveSHM(ctx context.Context) error
- func (db *DB) RemoveWAL(ctx context.Context) (err error)
- func (db *DB) SHMPath() string
- func (db *DB) SetHWM(txID ltx.TXID)
- func (db *DB) Store() *Store
- func (db *DB) SyncDatabase(ctx context.Context) (err error)
- func (db *DB) SyncJournal(ctx context.Context) (err error)
- func (db *DB) SyncSHM(ctx context.Context) (err error)
- func (db *DB) SyncWAL(ctx context.Context) (err error)
- func (db *DB) TXID() ltx.TXID
- func (db *DB) Timestamp() time.Time
- func (db *DB) TruncateDatabase(ctx context.Context, size int64) (err error)
- func (db *DB) TruncateJournal(ctx context.Context) error
- func (db *DB) TruncateSHM(ctx context.Context, size int64) error
- func (db *DB) TruncateWAL(ctx context.Context, size int64) (err error)
- func (db *DB) TryAcquireWriteLock() (ret *GuardSet)
- func (db *DB) TryLocks(ctx context.Context, owner uint64, lockTypes []LockType) (bool, error)
- func (db *DB) TryRLocks(ctx context.Context, owner uint64, lockTypes []LockType) bool
- func (db *DB) Unlock(ctx context.Context, owner uint64, lockTypes []LockType) error
- func (db *DB) UnlockDatabase(ctx context.Context, owner uint64)
- func (db *DB) UnlockSHM(ctx context.Context, owner uint64)
- func (db *DB) UnsetRemoteHaltLock(ctx context.Context, lockID int64) (retErr error)
- func (db *DB) WALPath() string
- func (db *DB) WaitPosExact(ctx context.Context, target ltx.Pos) error
- func (db *DB) WriteDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) error
- func (db *DB) WriteJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (err error)
- func (db *DB) WriteLTXFileAt(ctx context.Context, r io.Reader) (string, error)
- func (db *DB) WriteSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
- func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.Header, trailer ltx.Trailer, err error)
- func (db *DB) WriteWALAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (err error)
- func (db *DB) Writeable() bool
- type DBMode
- type DropDBStreamFrame
- type EndStreamFrame
- type Environment
- type Event
- type EventSubscriber
- type FileBackupClient
- func (c *FileBackupClient) FetchSnapshot(ctx context.Context, name string) (_ io.ReadCloser, retErr error)
- func (c *FileBackupClient) Open() (err error)
- func (c *FileBackupClient) PosMap(ctx context.Context) (map[string]ltx.Pos, error)
- func (c *FileBackupClient) URL() string
- func (c *FileBackupClient) WriteTx(ctx context.Context, name string, r io.Reader) (hwm ltx.TXID, err error)
- type FileType
- type GuardSet
- func (s *GuardSet) Ckpt() *RWMutexGuard
- func (s *GuardSet) DMS() *RWMutexGuard
- func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard
- func (s *GuardSet) Pending() *RWMutexGuard
- func (s *GuardSet) Read0() *RWMutexGuard
- func (s *GuardSet) Read1() *RWMutexGuard
- func (s *GuardSet) Read2() *RWMutexGuard
- func (s *GuardSet) Read3() *RWMutexGuard
- func (s *GuardSet) Read4() *RWMutexGuard
- func (s *GuardSet) Recover() *RWMutexGuard
- func (s *GuardSet) Reserved() *RWMutexGuard
- func (s *GuardSet) Shared() *RWMutexGuard
- func (s *GuardSet) Unlock()
- func (s *GuardSet) UnlockDatabase()
- func (s *GuardSet) UnlockSHM()
- func (s *GuardSet) Write() *RWMutexGuard
- type HWMStreamFrame
- type HaltLock
- type HandoffStreamFrame
- type HeartbeatStreamFrame
- type InitEventData
- type Invalidator
- type JournalMode
- type JournalReader
- type LTXStreamFrame
- type Lease
- type Leaser
- type LockType
- type NodeInfo
- type OS
- type PrimaryChangeEventData
- type PrimaryInfo
- type RWMutex
- type RWMutexGuard
- func (g *RWMutexGuard) CanLock() (canLock bool, mutexState RWMutexState)
- func (g *RWMutexGuard) CanRLock() bool
- func (g *RWMutexGuard) Lock(ctx context.Context) error
- func (g *RWMutexGuard) RLock(ctx context.Context) error
- func (g *RWMutexGuard) State() RWMutexState
- func (g *RWMutexGuard) TryLock() bool
- func (g *RWMutexGuard) TryRLock() bool
- func (g *RWMutexGuard) Unlock()
- type RWMutexState
- type ReadyStreamFrame
- type StaticLease
- func (l *StaticLease) Close() error
- func (l *StaticLease) Handoff(ctx context.Context, nodeID uint64) error
- func (l *StaticLease) HandoffCh() <-chan uint64
- func (l *StaticLease) ID() string
- func (l *StaticLease) Renew(ctx context.Context) error
- func (l *StaticLease) RenewedAt() time.Time
- func (l *StaticLease) TTL() time.Duration
- type StaticLeaser
- func (l *StaticLeaser) Acquire(ctx context.Context) (Lease, error)
- func (l *StaticLeaser) AcquireExisting(ctx context.Context, leaseID string) (Lease, error)
- func (l *StaticLeaser) AdvertiseURL() string
- func (l *StaticLeaser) Close() (err error)
- func (l *StaticLeaser) ClusterID(ctx context.Context) (string, error)
- func (l *StaticLeaser) Hostname() string
- func (l *StaticLeaser) IsPrimary() bool
- func (l *StaticLeaser) PrimaryInfo(ctx context.Context) (PrimaryInfo, error)
- func (l *StaticLeaser) SetClusterID(ctx context.Context, clusterID string) error
- func (l *StaticLeaser) Type() string
- type Store
- func (s *Store) Candidate() bool
- func (s *Store) Close() (retErr error)
- func (s *Store) ClusterID() string
- func (s *Store) ClusterIDPath() string
- func (s *Store) CreateDB(name string) (db *DB, f *os.File, err error)
- func (s *Store) CreateDBIfNotExists(name string) (*DB, error)
- func (s *Store) DB(name string) *DB
- func (s *Store) DBDir() string
- func (s *Store) DBPath(name string) string
- func (s *Store) DBs() []*DB
- func (s *Store) Demote()
- func (s *Store) EnforceHaltLockExpiration(ctx context.Context)
- func (s *Store) EnforceRetention(ctx context.Context) (err error)
- func (s *Store) Expvar() expvar.Var
- func (s *Store) Handoff(ctx context.Context, nodeID uint64) error
- func (s *Store) ID() uint64
- func (s *Store) IsPrimary() bool
- func (s *Store) Lag() time.Duration
- func (s *Store) MarkDirty(name string)
- func (s *Store) NotifyEvent(event Event)
- func (s *Store) Open() error
- func (s *Store) Path() string
- func (s *Store) PosMap() map[string]ltx.Pos
- func (s *Store) PrimaryCtx(ctx context.Context) context.Context
- func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo)
- func (s *Store) PrimaryInfoWithContext(ctx context.Context) (isPrimary bool, info *PrimaryInfo)
- func (s *Store) PrimaryTimestamp() int64
- func (s *Store) ReadyCh() chan struct{}
- func (s *Store) Recover(ctx context.Context) (err error)
- func (s *Store) SubscribeChangeSet(nodeID uint64) *ChangeSetSubscriber
- func (s *Store) SubscribeEvents() *EventSubscriber
- func (s *Store) SubscriberByNodeID(nodeID uint64) *ChangeSetSubscriber
- func (s *Store) SyncBackup(ctx context.Context) error
- func (s *Store) UnsubscribeChangeSet(sub *ChangeSetSubscriber)
- func (s *Store) UnsubscribeEvents(sub *EventSubscriber)
- type StoreVar
- type Stream
- type StreamFrame
- type StreamFrameType
- type TxEventData
- type WALReader
Constants ¶
const ( StreamFrameTypeLTX = StreamFrameType(1) StreamFrameTypeReady = StreamFrameType(2) StreamFrameTypeEnd = StreamFrameType(3) StreamFrameTypeDropDB = StreamFrameType(4) StreamFrameTypeHandoff = StreamFrameType(5) StreamFrameTypeHWM = StreamFrameType(6) StreamFrameTypeHeartbeat = StreamFrameType(7) )
const ( // ClusterIDLen is the length of a cluster ID. ClusterIDLen = 20 // ClusterIDPrefix is the prefix for every cluster ID. ClusterIDPrefix = "LFSC" )
const ( WALHeaderSize = 32 WALFrameHeaderSize = 24 WALIndexHeaderSize = 136 WALIndexBlockSize = 32768 )
SQLite constants
const ( PENDING_BYTE = 0x40000000 RESERVED_BYTE = (PENDING_BYTE + 1) SHARED_FIRST = (PENDING_BYTE + 2) SHARED_SIZE = 510 )
SQLite rollback journal lock constants.
const ( WAL_WRITE_LOCK = 120 WAL_CKPT_LOCK = 121 WAL_RECOVER_LOCK = 122 WAL_READ_LOCK0 = 123 WAL_READ_LOCK1 = 124 WAL_READ_LOCK2 = 125 WAL_READ_LOCK3 = 126 WAL_READ_LOCK4 = 127 )
SQLite WAL lock constants.
const ( JournalModeDelete = "DELETE" JournalModeTruncate = "TRUNCATE" JournalModePersist = "PERSIST" JournalModeWAL = "WAL" )
const ( FileTypeNone = FileType(iota) FileTypeDatabase FileTypeJournal FileTypeWAL FileTypeSHM FileTypePos FileTypeLock )
Database file types.
const ( SQLITE_DATABASE_HEADER_STRING = "SQLite format 3\x00" // Location of the database size, in pages, in the main database file. SQLITE_DATABASE_SIZE_OFFSET = 28 /// Magic header string that identifies a SQLite journal header. /// https://www.sqlite.org/fileformat.html#the_rollback_journal SQLITE_JOURNAL_HEADER_STRING = "\xd9\xd5\x05\xf9\x20\xa1\x63\xd7" // Size of the journal header, in bytes. SQLITE_JOURNAL_HEADER_SIZE = 28 )
const ( // Database file locks LockTypeHalt = LockType(72) // LiteFS-specific lock byte LockTypePending = LockType(0x40000000) // 1073741824 LockTypeReserved = LockType(0x40000001) // 1073741825 // SHM file locks LockTypeWrite = LockType(120) LockTypeCkpt = LockType(121) LockTypeRecover = LockType(122) LockTypeRead0 = LockType(123) LockTypeRead1 = LockType(124) LockTypeRead2 = LockType(125) LockTypeRead3 = LockType(126) LockTypeRead4 = LockType(127) LockTypeDMS = LockType(128) )
const ( DBModeRollback = DBMode(0) DBModeWAL = DBMode(1) )
Database journal modes.
const ( RWMutexStateUnlocked = RWMutexState(iota) RWMutexStateExclusive )
const ( DefaultReconnectDelay = 1 * time.Second DefaultDemoteDelay = 10 * time.Second DefaultRetention = 10 * time.Minute DefaultRetentionMonitorInterval = 1 * time.Minute DefaultHaltAcquireTimeout = 10 * time.Second DefaultHaltLockTTL = 30 * time.Second DefaultHaltLockMonitorInterval = 5 * time.Second DefaultBackupDelay = 1 * time.Second DefaultBackupFullSyncInterval = 10 * time.Second )
Default store settings.
const ( // MaxBackupLTXFileN is the number of LTX files that can be compacted // together at a time when sending data to the backup service. MaxBackupLTXFileN = 256 MetricsMonitorInterval = 1 * time.Second )
const ( EventTypeInit = "init" EventTypeTx = "tx" EventTypePrimaryChange = "primaryChange" )
const ChecksumBlockSize = 256
ChecksumBlockSize is the number of pages that are grouped into a single checksum block.
const EventChannelBufferSize = 1024
const RWMutexInterval = 10 * time.Microsecond
RWMutexInterval is the time between reattempting lock acquisition.
const TraceLogFlags = log.LstdFlags | log.Lmicroseconds | log.LUTC
TraceLogFlags are the flags to be used with TraceLog.
const WaitInterval = 100 * time.Microsecond
WaitInterval is the time between checking if the DB has reached a position in DB.Wait().
Variables ¶
var ( ErrDatabaseNotFound = fmt.Errorf("database not found") ErrDatabaseExists = fmt.Errorf("database already exists") ErrNoPrimary = errors.New("no primary") ErrPrimaryExists = errors.New("primary exists") ErrNotEligible = errors.New("not eligible to become primary") ErrLeaseExpired = errors.New("lease expired") ErrNoHaltPrimary = errors.New("no remote halt needed on primary node") ErrReadOnlyReplica = fmt.Errorf("read only replica") ErrDuplicateLTXFile = fmt.Errorf("duplicate ltx file") )
LiteFS errors
var ErrInvalidClusterID = errors.New("invalid cluster id")
ErrInvalidClusterID is returned when a cluster ID is invalid.
var ErrInvalidNodeID = errors.New("invalid node id")
var ErrStoreClosed = fmt.Errorf("store closed")
var GlobalStore atomic.Value
GlobalStore represents a single store used for metrics collection.
var LogLevel struct { sync.Mutex slog.LevelVar }
Global log level. Can be adjusted dynamically.
var NativeEndian = binary.LittleEndian
NativeEndian is always set to little endian as that is the only endianness used by supported platforms for LiteFS. This may be expanded in the future.
var TraceLog = log.New(io.Discard, "", TraceLogFlags)
TraceLog is a log for low-level tracing.
Functions ¶
func ContainsLockType ¶ added in v0.3.0
ContainsLockType returns true if a contains typ.
func FormatNodeID ¶ added in v0.4.0
FormatNodeID formats a node identifier as a 16-character uppercase hex string.
func GenerateClusterID ¶ added in v0.5.0
func GenerateClusterID() string
GenerateClusterID returns a new, randomly-generated cluster ID.
func JournalChecksum ¶ added in v0.3.0
JournalChecksum returns the checksum used by the journal format.
func ParseNodeID ¶ added in v0.4.0
ParseNodeID parses a 16-character hex string into a node identifier.
func ValidateClusterID ¶ added in v0.5.0
ValidateClusterID returns nil if id is a valid cluster ID.
func WALChecksum ¶ added in v0.3.0
WALChecksum computes a running SQLite WAL checksum over a byte slice.
func WriteStreamFrame ¶
func WriteStreamFrame(w io.Writer, f StreamFrame) error
WriteStreamFrame writes the stream type & frame to the writer.
Types ¶
type BackupClient ¶ added in v0.5.0
type BackupClient interface { // URL of the backup service. URL() string // PosMap returns the replication position for all databases on the backup service. PosMap(ctx context.Context) (map[string]ltx.Pos, error) // WriteTx writes an LTX file to the backup service. The file must be // contiguous with the latest LTX file on the backup service or else it // will return an ltx.PosMismatchError. // // Returns the high-water mark that indicates it is safe to remove LTX files // before that transaction ID. WriteTx(ctx context.Context, name string, r io.Reader) (hwm ltx.TXID, err error) // FetchSnapshot requests a full snapshot of the database as it exists on // the backup service. This should be used if the LiteFS node has become // out of sync with the backup service. FetchSnapshot(ctx context.Context, name string) (io.ReadCloser, error) }
type ChangeSetSubscriber ¶ added in v0.5.5
type ChangeSetSubscriber struct {
// contains filtered or unexported fields
}
ChangeSetSubscriber subscribes to changes to databases in the store.
It implements a set of "dirty" databases instead of a channel of all events as clients can be slow and we don't want to cause channels to back up. It is the responsibility of the caller to determine the state changes which is usually just checking the position of the client versus the store's database.
func (*ChangeSetSubscriber) Close ¶ added in v0.5.5
func (s *ChangeSetSubscriber) Close() error
Close removes the subscriber from the store.
func (*ChangeSetSubscriber) DirtySet ¶ added in v0.5.5
func (s *ChangeSetSubscriber) DirtySet() map[string]struct{}
DirtySet returns a set of database IDs that have changed since the last call to DirtySet(). This call clears the set.
func (*ChangeSetSubscriber) HandoffCh ¶ added in v0.5.5
func (s *ChangeSetSubscriber) HandoffCh() chan string
HandoffCh returns a channel that returns a lease ID on handoff.
func (*ChangeSetSubscriber) MarkDirty ¶ added in v0.5.5
func (s *ChangeSetSubscriber) MarkDirty(name string)
MarkDirty marks a database ID as dirty.
func (*ChangeSetSubscriber) NodeID ¶ added in v0.5.5
func (s *ChangeSetSubscriber) NodeID() uint64
NodeID returns the ID of the subscribed node.
func (*ChangeSetSubscriber) NotifyCh ¶ added in v0.5.5
func (s *ChangeSetSubscriber) NotifyCh() <-chan struct{}
NotifyCh returns a channel that receives a value when the dirty set has changed.
type Client ¶
type Client interface { // AcquireHaltLock attempts to acquire a remote halt lock on the primary node. AcquireHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*HaltLock, error) // ReleaseHaltLock releases a previous held remote halt lock on the primary node. ReleaseHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) error // Commit sends an LTX file to the primary to be committed. // Must be holding the halt lock to be successful. Commit(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error // Stream starts a long-running connection to stream changes from another node. // If filter is specified, only those databases will be replicated. Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (Stream, error) }
Client represents a client for connecting to other LiteFS nodes.
type DB ¶
type DB struct { // Returns the current time. Used for mocking time in tests. Now func() time.Time // contains filtered or unexported fields }
DB represents a SQLite database.
func (*DB) AcquireHaltLock ¶ added in v0.4.0
AcquireHaltLock acquires the halt lock locally. This implicitly acquires locks required for locking & performs a checkpoint.
func (*DB) AcquireRemoteHaltLock ¶ added in v0.4.0
AcquireRemoteHaltLock acquires the remote lock and syncs the database to its position before returning to the caller. Caller should provide a random lock identifier so that the primary can deduplicate retry requests.
func (*DB) AcquireWriteLock ¶ added in v0.3.0
AcquireWriteLock acquires the appropriate locks for a write depending on if the database uses a rollback journal or WAL.
func (*DB) ApplyLTXNoLock ¶ added in v0.4.0
ApplyLTXNoLock applies an LTX file to the database.
func (*DB) CanLock ¶ added in v0.3.0
CanLock returns true if all locks can acquire a write lock. If false, also returns the mutex state of the blocking lock.
func (*DB) Checkpoint ¶ added in v0.4.0
Checkpoint acquires locks and copies pages from the WAL into the database and truncates the WAL.
func (*DB) CheckpointNoLock ¶ added in v0.4.0
CheckpointNoLock copies pages from the WAL into the database and truncates the WAL. Appropriate locks must be held by the caller.
func (*DB) CloseDatabase ¶ added in v0.3.0
CloseDatabase closes a handle associated with the database file.
func (*DB) CloseJournal ¶ added in v0.3.0
CloseJournal closes a handle associated with the journal file.
func (*DB) CommitJournal ¶
func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error)
CommitJournal deletes the journal file which commits or rolls back the transaction.
func (*DB) CommitWAL ¶ added in v0.3.0
CommitWAL is called when the client releases the WAL_WRITE_LOCK(120). The transaction data is copied from the WAL into an LTX file and committed.
func (*DB) CreateGuardSetIfNotExists ¶ added in v0.3.0
CreateGuardSetIfNotExists returns a guard set for the given owner. Creates a new guard set if one is not associated with the owner.
func (*DB) CreateJournal ¶
CreateJournal creates a new journal file on disk.
func (*DB) DatabasePath ¶ added in v0.1.1
DatabasePath returns the path to the underlying database file.
func (*DB) Drop ¶ added in v0.5.5
Drop writes a zero "commit" value to indicate that the database has been deleted.
func (*DB) EnforceHaltLockExpiration ¶ added in v0.4.0
EnforceHaltLockExpiration unsets the HALT lock if it has expired.
func (*DB) EnforceRetention ¶ added in v0.2.0
EnforceRetention removes all LTX files created before minTime.
func (*DB) Export ¶ added in v0.4.0
Export writes the contents of the database to dst. Returns the current replication position.
func (*DB) GuardSet ¶ added in v0.2.0
GuardSet returns a guard set for the given owner, if it exists.
func (*DB) HasRemoteHaltLock ¶ added in v0.4.0
HasRemoteHaltLock returns true if the node currently has the remote lock acquired.
func (*DB) Import ¶ added in v0.3.0
Import replaces the contents of the database with the contents from the r. NOTE: LiteFS does not validate the integrity of the imported database!
func (*DB) JournalPath ¶ added in v0.1.1
JournalPath returns the path to the underlying journal file.
func (*DB) Mode ¶ added in v0.4.0
Mode returns the journaling mode for the database (DBModeWAL or DBModeRollback).
func (*DB) OpenDatabase ¶ added in v0.3.0
OpenDatabase returns a handle for the database file.
func (*DB) OpenJournal ¶ added in v0.3.0
OpenJournal returns a handle for the journal file.
func (*DB) OpenLTXFile ¶
OpenLTXFile returns a file handle to an LTX file that contains the given TXID.
func (*DB) ReadDatabaseAt ¶ added in v0.3.0
func (db *DB) ReadDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
ReadDatabaseAt reads from the database at the specified index.
func (*DB) ReadJournalAt ¶ added in v0.3.0
func (db *DB) ReadJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
ReadJournalAt reads from the journal at the specified offset.
func (*DB) ReadLTXDir ¶ added in v0.2.0
ReadLTXDir returns DirEntry for every LTX file.
func (*DB) ReadSHMAt ¶ added in v0.3.0
func (db *DB) ReadSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
ReadSHMAt reads from the shared memory at the specified offset.
func (*DB) ReadWALAt ¶ added in v0.3.0
func (db *DB) ReadWALAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
ReadWALAt reads from the WAL at the specified index.
func (*DB) ReleaseHaltLock ¶ added in v0.4.0
ReleaseHaltLock releases a halt lock by identifier. If the current halt lock does not match the identifier then it has already been released.
func (*DB) ReleaseRemoteHaltLock ¶ added in v0.4.0
ReleaseRemoteHaltLock releases the current remote lock from the primary.
func (*DB) RemoteHaltLock ¶ added in v0.4.0
RemoteHaltLock returns a copy of the current remote lock, if any.
func (*DB) RemoveJournal ¶ added in v0.3.0
RemoveJournal deletes the journal file from disk.
func (*DB) SyncDatabase ¶ added in v0.3.0
SyncDatabase fsync's the database file.
func (*DB) SyncJournal ¶ added in v0.3.0
SyncJournal fsync's the journal file.
func (*DB) TruncateDatabase ¶ added in v0.3.0
TruncateDatabase sets the size of the database file.
func (*DB) TruncateJournal ¶ added in v0.3.0
TruncateJournal sets the size of the journal file.
func (*DB) TruncateSHM ¶ added in v0.3.0
TruncateSHM sets the size of the the SHM file.
func (*DB) TruncateWAL ¶ added in v0.3.0
TruncateWAL sets the size of the WAL file.
func (*DB) TryAcquireWriteLock ¶ added in v0.4.0
TryAcquireWriteLock acquires the appropriate locks for a write. If any locks fail then the action is aborted.
func (*DB) TryLocks ¶ added in v0.3.0
TryLocks attempts to lock one or more locks on the database for a given owner. Returns an error if no locks are supplied.
func (*DB) TryRLocks ¶ added in v0.3.0
TryRLocks attempts to read lock one or more locks on the database for a given owner. Returns an error if no locks are supplied.
func (*DB) Unlock ¶ added in v0.3.0
Unlock unlocks one or more locks on the database for a given owner.
func (*DB) UnlockDatabase ¶ added in v0.3.0
UnlockDatabase unlocks all locks from the database file.
func (*DB) UnsetRemoteHaltLock ¶ added in v0.4.0
UnsetRemoteHaltLock releases the current remote lock because of expiration. This only removes the reference locally as it's assumed it has already been removed on the primary.
func (*DB) WaitPosExact ¶ added in v0.4.0
WaitPosExact returns once db has reached the target position. Returns an error if ctx is done, TXID is exceeded, or on checksum mismatch.
func (*DB) WriteDatabaseAt ¶ added in v0.3.0
func (db *DB) WriteDatabaseAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) error
WriteDatabaseAt writes data to the main database file at the given index.
func (*DB) WriteJournalAt ¶ added in v0.3.0
func (db *DB) WriteJournalAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (err error)
WriteJournal writes data to the rollback journal file.
func (*DB) WriteLTXFileAt ¶ added in v0.4.0
WriteLTXFileAt atomically writes r to the database's LTX directory but does not apply the file. That should be done after the file is written.
If file is a snapshot, then all other LTX files are removed.
Returns the path of the new LTX file on success.
func (*DB) WriteSHMAt ¶ added in v0.3.0
func (db *DB) WriteSHMAt(ctx context.Context, f *os.File, data []byte, offset int64, owner uint64) (int, error)
WriteSHMAt writes data to the SHM file.
func (*DB) WriteSnapshotTo ¶ added in v0.2.0
func (db *DB) WriteSnapshotTo(ctx context.Context, dst io.Writer) (header ltx.Header, trailer ltx.Trailer, err error)
WriteSnapshotTo writes an LTX snapshot to dst.
type DBMode ¶ added in v0.3.0
type DBMode int
DBMode represents either a rollback journal or WAL mode.
type DropDBStreamFrame ¶ added in v0.4.0
type DropDBStreamFrame struct {
Name string // database name
}
DropDBStreamFrame notifies replicas that a database has been deleted. DEPRECATED: LTX files with a zero "commit" field now represent deletions.
func (*DropDBStreamFrame) ReadFrom ¶ added in v0.4.0
func (f *DropDBStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*DropDBStreamFrame) Type ¶ added in v0.4.0
func (*DropDBStreamFrame) Type() StreamFrameType
Type returns the type of stream frame.
type EndStreamFrame ¶ added in v0.3.0
type EndStreamFrame struct{}
func (*EndStreamFrame) ReadFrom ¶ added in v0.3.0
func (f *EndStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*EndStreamFrame) Type ¶ added in v0.3.0
func (f *EndStreamFrame) Type() StreamFrameType
type Environment ¶ added in v0.5.0
type Environment interface { // Type returns the name of the environment type. Type() string // SetPrimaryStatus sets marks the current node as the primary or not. SetPrimaryStatus(ctx context.Context, isPrimary bool) }
Environment represents an interface for interacting with the host environment.
type Event ¶ added in v0.5.5
type Event struct { Type string `json:"type"` DB string `json:"db,omitempty"` Data any `json:"data,omitempty"` }
Event represents a generic event.
func (*Event) UnmarshalJSON ¶ added in v0.5.5
type EventSubscriber ¶ added in v0.5.5
type EventSubscriber struct {
// contains filtered or unexported fields
}
EventSubscriber subscribes to generic store events.
func (*EventSubscriber) C ¶ added in v0.5.5
func (s *EventSubscriber) C() <-chan Event
C returns a channel that receives event notifications. If caller cannot read events fast enough then channel will be closed.
func (*EventSubscriber) Stop ¶ added in v0.5.5
func (s *EventSubscriber) Stop()
Stop closes the subscriber and removes it from the store.
type FileBackupClient ¶ added in v0.5.0
type FileBackupClient struct {
// contains filtered or unexported fields
}
FileBackupClient is a reference implemenation for BackupClient. This implementation is typically only used for testing.
func NewFileBackupClient ¶ added in v0.5.0
func NewFileBackupClient(path string) *FileBackupClient
NewFileBackupClient returns a new instance of FileBackupClient.
func (*FileBackupClient) FetchSnapshot ¶ added in v0.5.0
func (c *FileBackupClient) FetchSnapshot(ctx context.Context, name string) (_ io.ReadCloser, retErr error)
FetchSnapshot requests a full snapshot of the database as it exists on the backup service. This should be used if the LiteFS node has become out of sync with the backup service.
func (*FileBackupClient) Open ¶ added in v0.5.0
func (c *FileBackupClient) Open() (err error)
Open validates & creates the path the client was initialized with.
func (*FileBackupClient) PosMap ¶ added in v0.5.0
PosMap returns the replication position for all databases on the backup service.
func (*FileBackupClient) URL ¶ added in v0.5.0
func (c *FileBackupClient) URL() string
URL of the backup service.
func (*FileBackupClient) WriteTx ¶ added in v0.5.0
func (c *FileBackupClient) WriteTx(ctx context.Context, name string, r io.Reader) (hwm ltx.TXID, err error)
WriteTx writes an LTX file to the backup service. The file must be contiguous with the latest LTX file on the backup service or else it will return an ltx.PosMismatchError.
type GuardSet ¶ added in v0.2.0
type GuardSet struct {
// contains filtered or unexported fields
}
GuardSet represents a set of mutex guards by a single owner.
func (*GuardSet) Ckpt ¶ added in v0.3.0
func (s *GuardSet) Ckpt() *RWMutexGuard
Ckpt returns a reference to the CKPT mutex guard.
func (*GuardSet) DMS ¶ added in v0.3.0
func (s *GuardSet) DMS() *RWMutexGuard
DMS returns a reference to the DMS mutex guard.
func (*GuardSet) Guard ¶ added in v0.2.0
func (s *GuardSet) Guard(lockType LockType) *RWMutexGuard
Guard returns a guard by lock type. Panic on invalid lock type.
func (*GuardSet) Pending ¶ added in v0.3.0
func (s *GuardSet) Pending() *RWMutexGuard
Pending returns a reference to the PENDING mutex guard.
func (*GuardSet) Read0 ¶ added in v0.3.0
func (s *GuardSet) Read0() *RWMutexGuard
Read0 returns a reference to the READ0 mutex guard.
func (*GuardSet) Read1 ¶ added in v0.3.0
func (s *GuardSet) Read1() *RWMutexGuard
Read1 returns a reference to the READ1 mutex guard.
func (*GuardSet) Read2 ¶ added in v0.3.0
func (s *GuardSet) Read2() *RWMutexGuard
Read2 returns a reference to the READ2 mutex guard.
func (*GuardSet) Read3 ¶ added in v0.3.0
func (s *GuardSet) Read3() *RWMutexGuard
Read3 returns a reference to the READ3 mutex guard.
func (*GuardSet) Read4 ¶ added in v0.3.0
func (s *GuardSet) Read4() *RWMutexGuard
Read4 returns a reference to the READ4 mutex guard.
func (*GuardSet) Recover ¶ added in v0.3.0
func (s *GuardSet) Recover() *RWMutexGuard
Recover returns a reference to the RECOVER mutex guard.
func (*GuardSet) Reserved ¶ added in v0.3.0
func (s *GuardSet) Reserved() *RWMutexGuard
Reserved returns a reference to the RESERVED mutex guard.
func (*GuardSet) Shared ¶ added in v0.3.0
func (s *GuardSet) Shared() *RWMutexGuard
Shared returns a reference to the SHARED mutex guard.
func (*GuardSet) Unlock ¶ added in v0.2.0
func (s *GuardSet) Unlock()
Unlock unlocks all the guards in reversed order that they are acquired by SQLite.
func (*GuardSet) UnlockDatabase ¶ added in v0.3.0
func (s *GuardSet) UnlockDatabase()
UnlockDatabase unlocks all the database file guards.
func (*GuardSet) UnlockSHM ¶ added in v0.3.0
func (s *GuardSet) UnlockSHM()
UnlockSHM unlocks all the SHM file guards.
func (*GuardSet) Write ¶ added in v0.3.0
func (s *GuardSet) Write() *RWMutexGuard
Write returns a reference to the WRITE mutex guard.
type HWMStreamFrame ¶ added in v0.5.0
HWMStreamFrame propagates the high-water mark to replica nodes.
func (*HWMStreamFrame) ReadFrom ¶ added in v0.5.0
func (f *HWMStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*HWMStreamFrame) Type ¶ added in v0.5.0
func (*HWMStreamFrame) Type() StreamFrameType
Type returns the type of stream frame.
type HaltLock ¶ added in v0.4.0
type HaltLock struct { // Unique identifier for the lock. ID int64 `json:"id"` // Position of the primary when this lock was acquired. Pos ltx.Pos `json:"pos"` // Time that the halt lock expires at. Expires *time.Time `json:"expires"` }
HaltLock represents a lock remotely held on the primary. This allows the local node to perform writes and send them to the primary while the lock is held.
type HandoffStreamFrame ¶ added in v0.4.0
type HandoffStreamFrame struct {
LeaseID string
}
func (*HandoffStreamFrame) ReadFrom ¶ added in v0.4.0
func (f *HandoffStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*HandoffStreamFrame) Type ¶ added in v0.4.0
func (*HandoffStreamFrame) Type() StreamFrameType
Type returns the type of stream frame.
type HeartbeatStreamFrame ¶ added in v0.5.0
type HeartbeatStreamFrame struct {
Timestamp int64 // ms since unix epoch
}
HeartbeatStreamFrame informs replicas that there have been no recent transactions
func (*HeartbeatStreamFrame) ReadFrom ¶ added in v0.5.0
func (f *HeartbeatStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*HeartbeatStreamFrame) Type ¶ added in v0.5.0
func (f *HeartbeatStreamFrame) Type() StreamFrameType
Type returns the type of stream frame.
type InitEventData ¶ added in v0.5.5
type Invalidator ¶ added in v0.1.1
type Invalidator interface { InvalidateDB(db *DB) error InvalidateDBRange(db *DB, offset, size int64) error InvalidateSHM(db *DB) error InvalidatePos(db *DB) error InvalidateEntry(name string) error InvalidateLag() error }
Invalidator is a callback for the store to use to invalidate the kernel page cache.
type JournalReader ¶ added in v0.3.0
type JournalReader struct {
// contains filtered or unexported fields
}
JouralReader represents a reader of the SQLite journal file format.
func NewJournalReader ¶ added in v0.3.0
func NewJournalReader(f *os.File, pageSize uint32) *JournalReader
JournalReader returns a new instance of JournalReader.
func (*JournalReader) DatabaseSize ¶ added in v0.3.0
func (r *JournalReader) DatabaseSize() int64
DatabaseSize returns the size of the database before the journal transaction, in bytes.
func (*JournalReader) IsValid ¶ added in v0.3.0
func (r *JournalReader) IsValid() bool
IsValid returns true if at least one journal header was read.
func (*JournalReader) Next ¶ added in v0.3.0
func (r *JournalReader) Next() (err error)
Next reads the next segment of the journal. Returns io.EOF if no more segments exist.
type LTXStreamFrame ¶
func (*LTXStreamFrame) Type ¶
func (*LTXStreamFrame) Type() StreamFrameType
Type returns the type of stream frame.
type Lease ¶
type Lease interface { ID() string RenewedAt() time.Time TTL() time.Duration // Renew attempts to reset the TTL on the lease. // Returns ErrLeaseExpired if the lease has expired or was deleted. Renew(ctx context.Context) error // Marks the lease as handed-off to another node. // This should send the nodeID to the channel returned by HandoffCh(). Handoff(ctx context.Context, nodeID uint64) error HandoffCh() <-chan uint64 // Close attempts to remove the lease from the server. Close() error }
Lease represents an acquired lease from a Leaser.
type Leaser ¶
type Leaser interface { io.Closer // Type returns the name of the leaser. Type() string Hostname() string AdvertiseURL() string // Acquire attempts to acquire the lease to become the primary. Acquire(ctx context.Context) (Lease, error) // AcquireExisting returns a lease from an existing lease ID. // This occurs when the primary is handed off to a replica node. AcquireExisting(ctx context.Context, leaseID string) (Lease, error) // PrimaryInfo attempts to read the current primary data. // Returns ErrNoPrimary if no primary currently has the lease. PrimaryInfo(ctx context.Context) (PrimaryInfo, error) // ClusterID returns the cluster ID set on the leaser. // This is used to ensure two clusters do not accidentally overlap. ClusterID(ctx context.Context) (string, error) // SetClusterID sets the cluster ID on the leaser. SetClusterID(ctx context.Context, clusterID string) error }
Leaser represents an API for obtaining a lease for leader election.
type LockType ¶
type LockType int
LockType represents a SQLite lock type.
func ParseDatabaseLockRange ¶ added in v0.3.0
ParseDatabaseLockRange returns a list of SQLite database locks that are within a range.
This does not include the HALT lock as that is specific to LiteFS and we don't want to accidentally include it when locking/unlocking the whole file.
func ParseSHMLockRange ¶ added in v0.3.0
ParseSHMLockRange returns a list of SQLite WAL locks that are within a range.
type NodeInfo ¶ added in v0.4.0
type NodeInfo struct { ClusterID string `json:"clusterID,omitempty"` // cluster ID IsPrimary bool `json:"isPrimary"` // if true, node is currently primary Candidate bool `json:"candidate"` // if true, node is eligible to be primary Path string `json:"path"` // data directory Primary struct { Hostname string `json:"hostname"` } `json:"primary"` }
NodeInfo represents basic info about a node.
type OS ¶ added in v0.5.6
type OS interface { Create(op, name string) (*os.File, error) Mkdir(op, path string, perm os.FileMode) error MkdirAll(op, path string, perm os.FileMode) error Open(op, name string) (*os.File, error) OpenFile(op, opname string, flag int, perm os.FileMode) (*os.File, error) ReadDir(op, opname string) ([]os.DirEntry, error) ReadFile(op, name string) ([]byte, error) Remove(op, name string) error RemoveAll(op, name string) error Rename(op, oldpath, newpath string) error Stat(op, name string) (os.FileInfo, error) Truncate(op, name string, size int64) error WriteFile(op, name string, data []byte, perm os.FileMode) error }
OS represents an interface for os package calls so they can be mocked for testing.
type PrimaryChangeEventData ¶ added in v0.5.5
type PrimaryInfo ¶ added in v0.2.0
type PrimaryInfo struct { Hostname string `json:"hostname"` AdvertiseURL string `json:"advertise-url"` }
PrimaryInfo is the JSON object stored in the Consul lease value.
func (*PrimaryInfo) Clone ¶ added in v0.2.0
func (info *PrimaryInfo) Clone() *PrimaryInfo
Clone returns a copy of info.
type RWMutex ¶
type RWMutex struct { // If set, this function is called when the state transitions. // Must be set before use of the mutex or its guards. OnLockStateChange func(prevState, newState RWMutexState) // contains filtered or unexported fields }
RWMutex is a reader/writer mutual exclusion lock. It wraps the sync package to provide additional capabilities such as lock upgrades & downgrades. It only supports TryLock() & TryRLock() as that is what's supported by our FUSE file system.
func (*RWMutex) Guard ¶ added in v0.2.0
func (rw *RWMutex) Guard() RWMutexGuard
Guard returns an unlocked guard for the mutex.
func (*RWMutex) State ¶
func (rw *RWMutex) State() RWMutexState
State returns whether the mutex has a exclusive lock, one or more shared locks, or if the mutex is unlocked.
type RWMutexGuard ¶
type RWMutexGuard struct {
// contains filtered or unexported fields
}
RWMutexGuard is a reference to a mutex. Locking, unlocking, upgrading, & downgrading operations are all performed via the guard instead of directly on the RWMutex itself as this works similarly to how POSIX locks work.
func (*RWMutexGuard) CanLock ¶
func (g *RWMutexGuard) CanLock() (canLock bool, mutexState RWMutexState)
CanLock returns true if the guard can become an exclusive lock. Also returns the current state of the underlying mutex to determine if the lock is blocked by a shared or exclusive lock.
func (*RWMutexGuard) CanRLock ¶ added in v0.2.0
func (g *RWMutexGuard) CanRLock() bool
CanRLock returns true if the guard can become a shared lock.
func (*RWMutexGuard) Lock ¶ added in v0.2.0
func (g *RWMutexGuard) Lock(ctx context.Context) error
Lock attempts to obtain a exclusive lock for the guard. Returns an error if ctx is done.
func (*RWMutexGuard) RLock ¶
func (g *RWMutexGuard) RLock(ctx context.Context) error
RLock attempts to obtain a shared lock for the guard. Returns an error if ctx is done.
func (*RWMutexGuard) State ¶ added in v0.3.0
func (g *RWMutexGuard) State() RWMutexState
State returns the current state of the guard.
func (*RWMutexGuard) TryLock ¶
func (g *RWMutexGuard) TryLock() bool
TryLock upgrades the lock from a shared lock to an exclusive lock. This is a no-op if the lock is already an exclusive lock. This function will trigger OnLockStateChange on the mutex, if set, and if state changes.
func (*RWMutexGuard) TryRLock ¶ added in v0.2.0
func (g *RWMutexGuard) TryRLock() bool
TryRLock attempts to obtain a shared lock on the mutex for the guard. This will upgrade an unlocked guard and downgrade an exclusive guard. Shared guards are a no-op.
type RWMutexState ¶
type RWMutexState int
RWMutexState represents the lock state of an RWMutex or RWMutexGuard.
func (RWMutexState) String ¶ added in v0.2.0
func (s RWMutexState) String() string
String returns the string representation of the state.
type ReadyStreamFrame ¶ added in v0.2.0
type ReadyStreamFrame struct{}
func (*ReadyStreamFrame) ReadFrom ¶ added in v0.2.0
func (f *ReadyStreamFrame) ReadFrom(r io.Reader) (int64, error)
func (*ReadyStreamFrame) Type ¶ added in v0.2.0
func (f *ReadyStreamFrame) Type() StreamFrameType
type StaticLease ¶ added in v0.2.0
type StaticLease struct {
// contains filtered or unexported fields
}
StaticLease represents a lease for a fixed primary.
func (*StaticLease) Close ¶ added in v0.2.0
func (l *StaticLease) Close() error
func (*StaticLease) Handoff ¶ added in v0.4.0
func (l *StaticLease) Handoff(ctx context.Context, nodeID uint64) error
Handoff always returns an error.
func (*StaticLease) HandoffCh ¶ added in v0.4.0
func (l *StaticLease) HandoffCh() <-chan uint64
HandoffCh always returns a nil channel.
func (*StaticLease) ID ¶ added in v0.4.0
func (l *StaticLease) ID() string
ID always returns a blank string.
func (*StaticLease) Renew ¶ added in v0.2.0
func (l *StaticLease) Renew(ctx context.Context) error
Renew is a no-op.
func (*StaticLease) RenewedAt ¶ added in v0.2.0
func (l *StaticLease) RenewedAt() time.Time
RenewedAt returns the Unix epoch in UTC.
func (*StaticLease) TTL ¶ added in v0.2.0
func (l *StaticLease) TTL() time.Duration
TTL returns the duration until the lease expires which is a time well into the future.
type StaticLeaser ¶ added in v0.2.0
type StaticLeaser struct {
// contains filtered or unexported fields
}
StaticLeaser always returns a lease to a static primary.
func NewStaticLeaser ¶ added in v0.2.0
func NewStaticLeaser(isPrimary bool, hostname, advertiseURL string) *StaticLeaser
NewStaticLeaser returns a new instance of StaticLeaser.
func (*StaticLeaser) Acquire ¶ added in v0.2.0
func (l *StaticLeaser) Acquire(ctx context.Context) (Lease, error)
Acquire returns a lease if this node is the static primary. Otherwise returns ErrPrimaryExists.
func (*StaticLeaser) AcquireExisting ¶ added in v0.4.0
AcquireExisting always returns an error. Static leasing does not support handoff.
func (*StaticLeaser) AdvertiseURL ¶ added in v0.2.0
func (l *StaticLeaser) AdvertiseURL() string
AdvertiseURL returns the primary URL if this is the primary. Otherwise returns blank.
func (*StaticLeaser) Close ¶ added in v0.2.0
func (l *StaticLeaser) Close() (err error)
Close is a no-op.
func (*StaticLeaser) ClusterID ¶ added in v0.5.0
func (l *StaticLeaser) ClusterID(ctx context.Context) (string, error)
ClusterID always returns a blank string for the static leaser.
func (*StaticLeaser) Hostname ¶ added in v0.5.7
func (l *StaticLeaser) Hostname() string
func (*StaticLeaser) IsPrimary ¶ added in v0.2.0
func (l *StaticLeaser) IsPrimary() bool
IsPrimary returns true if the current node is the primary.
func (*StaticLeaser) PrimaryInfo ¶ added in v0.2.0
func (l *StaticLeaser) PrimaryInfo(ctx context.Context) (PrimaryInfo, error)
PrimaryInfo returns the primary's info. Returns ErrNoPrimary if the node is the primary.
func (*StaticLeaser) SetClusterID ¶ added in v0.5.0
func (l *StaticLeaser) SetClusterID(ctx context.Context, clusterID string) error
SetClusterID is always a no-op for the static leaser.
func (*StaticLeaser) Type ¶ added in v0.5.0
func (l *StaticLeaser) Type() string
Type returns "static".
type Store ¶
type Store struct { // The operating system interface to use for system calls. Defaults to SystemOS. OS OS Exit func(int) // Client used to connect to other LiteFS instances. Client Client // Leaser manages the lease that controls leader election. Leaser Leaser // BackupClient is the client to connect to an external backup service. BackupClient BackupClient // If true, LTX files are compressed using LZ4. Compress bool // Time to wait after disconnecting from the primary to reconnect. ReconnectDelay time.Duration // Time to wait after manually demoting trying to become primary again. DemoteDelay time.Duration // Length of time to retain LTX files. Retention time.Duration RetentionMonitorInterval time.Duration // Max time to hold HALT lock and interval between expiration checks. HaltLockTTL time.Duration HaltLockMonitorInterval time.Duration // Time to wait to acquire the HALT lock. HaltAcquireTimeout time.Duration // Time after a change is made before it is sent to the backup service. // This allows multiple changes in quick succession to be batched together. BackupDelay time.Duration // Interval between checks to re-fetch the position map. This ensures that // restores on the backup server are detected by the LiteFS primary. BackupFullSyncInterval time.Duration // Callback to notify kernel of file changes. Invalidator Invalidator // Interface to interact with the host environment. Environment Environment // Specifies a subset of databases to replicate from the primary. DatabaseFilter []string // If true, computes and verifies the checksum of the entire database // after every transaction. Should only be used during testing. StrictVerify bool // contains filtered or unexported fields }
Store represents a collection of databases.
func (*Store) Candidate ¶ added in v0.2.0
Candidate returns true if store is eligible to be the primary.
func (*Store) ClusterIDPath ¶ added in v0.5.0
ClusterIDPath returns the filename where the cluster ID is stored.
func (*Store) CreateDB ¶
CreateDB creates a new database with the given name. The returned file handle must be closed by the caller. Returns an error if a database with the same name already exists.
func (*Store) CreateDBIfNotExists ¶ added in v0.2.0
CreateDBIfNotExists creates an empty database with the given name.
func (*Store) Demote ¶ added in v0.3.0
func (s *Store) Demote()
Demote instructs store to destroy its primary lease, if any. Store will wait momentarily before attempting to become primary again.
func (*Store) EnforceHaltLockExpiration ¶ added in v0.4.0
EnforceHaltLockExpiration expires any overdue HALT locks.
func (*Store) EnforceRetention ¶ added in v0.2.0
EnforceRetention enforces retention of LTX files on all databases.
func (*Store) Handoff ¶ added in v0.4.0
Handoff instructs store to send its lease to a connected replica.
func (*Store) ID ¶ added in v0.2.0
ID returns the unique identifier for this instance. Available after Open(). Persistent across restarts if underlying storage is persistent.
func (*Store) Lag ¶ added in v0.5.3
Lag returns the number of seconds that the local instance is lagging behind the primary node. Returns 0 if the node is the primary or if the node is not marked as ready yet.
func (*Store) NotifyEvent ¶ added in v0.5.5
NotifyEvent sends event to all event subscribers. If a subscriber has no additional buffer space available then it is closed.
func (*Store) PrimaryCtx ¶ added in v0.2.0
PrimaryCtx wraps ctx with another context that will cancel when no longer primary.
func (*Store) PrimaryInfo ¶ added in v0.2.0
func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo)
PrimaryInfo returns info about the current primary.
func (*Store) PrimaryInfoWithContext ¶ added in v0.5.7
func (s *Store) PrimaryInfoWithContext(ctx context.Context) (isPrimary bool, info *PrimaryInfo)
PrimaryInfoWithContext continually attempts to fetch the primary info until available. Returns when isPrimary is true, info is non-nil, or when ctx is done.
func (*Store) PrimaryTimestamp ¶ added in v0.5.0
PrimaryTimestamp returns the last timestamp (ms since epoch) received from the primary. Returns -1 if we are the primary or if we haven't finished initial replication yet.
func (*Store) ReadyCh ¶ added in v0.2.0
func (s *Store) ReadyCh() chan struct{}
ReadyCh returns a channel that is closed once the store has become primary or once it has connected to the primary.
func (*Store) Recover ¶ added in v0.3.0
Recover forces a rollback (journal) or checkpoint (wal) on all open databases. This is done when switching the primary/replica state.
func (*Store) SubscribeChangeSet ¶ added in v0.5.5
func (s *Store) SubscribeChangeSet(nodeID uint64) *ChangeSetSubscriber
SubscribeChangeSet creates a new subscriber for store changes.
func (*Store) SubscribeEvents ¶ added in v0.5.5
func (s *Store) SubscribeEvents() *EventSubscriber
SubscribeEvents creates a new subscriber for store events.
func (*Store) SubscriberByNodeID ¶ added in v0.4.0
func (s *Store) SubscriberByNodeID(nodeID uint64) *ChangeSetSubscriber
SubscriberByNodeID returns a subscriber by node ID. Returns nil if the node is not currently subscribed to the store.
func (*Store) SyncBackup ¶ added in v0.5.0
SyncBackup connects to a backup server performs a one-time sync.
func (*Store) UnsubscribeChangeSet ¶ added in v0.5.5
func (s *Store) UnsubscribeChangeSet(sub *ChangeSetSubscriber)
UnsubscribeChangeSet removes a subscriber from the store.
func (*Store) UnsubscribeEvents ¶ added in v0.5.5
func (s *Store) UnsubscribeEvents(sub *EventSubscriber)
UnsubscribeEvents removes an event subscriber from the store.
type Stream ¶ added in v0.5.0
type Stream interface { io.ReadCloser // ClusterID of the primary node. ClusterID() string }
Stream represents a stream of frames.
type StreamFrame ¶
type StreamFrame interface { io.ReaderFrom io.WriterTo Type() StreamFrameType }
func ReadStreamFrame ¶
func ReadStreamFrame(r io.Reader) (StreamFrame, error)
ReadStreamFrame reads a the stream type & frame from the reader.
type StreamFrameType ¶
type StreamFrameType uint32
type TxEventData ¶ added in v0.5.5
type WALReader ¶ added in v0.3.0
type WALReader struct {
// contains filtered or unexported fields
}
WALReader wraps an io.Reader and parses SQLite WAL frames.
This reader verifies the salt & checksum integrity while it reads. It does not enforce transaction boundaries (i.e. it may return uncommitted frames). It is the responsibility of the caller to handle this.
func NewWALReader ¶ added in v0.3.0
NewWALReader returns a new instance of WALReader.
func (*WALReader) Offset ¶ added in v0.3.0
Offset returns the file offset of the last read frame. Returns zero if no frames have been read.
func (*WALReader) PageSize ¶ added in v0.3.0
PageSize returns the page size from the header. Must call ReadHeader() first.
func (*WALReader) ReadFrame ¶ added in v0.3.0
ReadFrame reads the next frame from the WAL and returns the page number. Returns io.EOF at the end of the valid WAL.
func (*WALReader) ReadHeader ¶ added in v0.3.0
ReadHeader reads the WAL header into the reader. Returns io.EOF if WAL is invalid.