logstore

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2022 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultVersionFileSize = 300 * int(common.M)
)

Variables

View Source
var (
	EntryTypeSize     = int(unsafe.Sizeof(ETFlush))
	EntrySizeSize     = int(unsafe.Sizeof(uint32(0)))
	EntryReservedSize = int(unsafe.Sizeof(uint64(0))) * 8
	EntryMetaSize     = EntryTypeSize + EntrySizeSize + EntryReservedSize

	FlushEntry *BaseEntry
)
View Source
var (
	DefaultSuffix              string = ".rot"
	DefaultRotationFileMaxSize        = 100 * int(common.M)
)
View Source
var (
	DefaltHistoryFactory = func() IHistory {
		return &baseHistory{
			versions: make([]*VersionFile, 0),
		}
	}
)
View Source
var (
	DefaultHBInterval = time.Duration(100) * time.Millisecond
)
View Source
var (
	DefaultMaxBatchSize = 500
)
View Source
var (
	ErrVersionNotFound = errors.New("aoe: version not found")
)

Functions

func MakeVersionFile

func MakeVersionFile(prefix, suffix string, version uint64) string

func MarshallEntrySize

func MarshallEntrySize(size uint32) []byte

func MarshallEntrySizeWithBuf

func MarshallEntrySizeWithBuf(buf []byte, size uint32)

func MarshallEntryType

func MarshallEntryType(typ EntryType) []byte

func MarshallEntryTypeWithBuf

func MarshallEntryTypeWithBuf(buf []byte, typ EntryType)

func New

func New(dir, name string, cfg *RotationCfg) (*store, error)

func NewBatchStore

func NewBatchStore(dir, name string, rotationCfg *RotationCfg) (*batchStore, error)

func NewObservers

func NewObservers(o1, o2 Observer) *observers

func NewSimpleReplayer

func NewSimpleReplayer() *simpleReplayer

func NewSyncAwareStore

func NewSyncAwareStore(dir, name string, rotationCfg *RotationCfg, syncerCfg *SyncerCfg) (*syncAwareStore, error)

func ParseVersion

func ParseVersion(name, prefix, suffix string) (uint64, error)

func UnmarshallEntrySize

func UnmarshallEntrySize(buf []byte) uint32

Types

type AsyncBaseEntry

type AsyncBaseEntry struct {
	BaseEntry
	// contains filtered or unexported fields
}

func NewAsyncBaseEntry

func NewAsyncBaseEntry() *AsyncBaseEntry

func (*AsyncBaseEntry) DoneWithErr

func (e *AsyncBaseEntry) DoneWithErr(err error)

func (*AsyncBaseEntry) Free

func (e *AsyncBaseEntry) Free()

func (*AsyncBaseEntry) GetError

func (e *AsyncBaseEntry) GetError() error

func (*AsyncBaseEntry) IsAsync

func (e *AsyncBaseEntry) IsAsync() bool

func (*AsyncBaseEntry) WaitDone

func (e *AsyncBaseEntry) WaitDone() error

type AsyncEntry

type AsyncEntry interface {
	Entry
	GetError() error
	WaitDone() error
	DoneWithErr(error)
}

type AwareStore

type AwareStore interface {
	Store
	Start()
	Checkpoint(Entry) error
	GetSyncedId() uint64
	SetSyncedId(uint64)
	GetCheckpointId() uint64
	SetCheckpointId(uint64)
}

type BaseEntry

type BaseEntry struct {
	Meta      *EntryMeta
	Payload   []byte
	Auxiliary interface{}
	// contains filtered or unexported fields
}

func NewBaseEntryWithMeta

func NewBaseEntryWithMeta(meta *EntryMeta) *BaseEntry

func (*BaseEntry) Clone

func (e *BaseEntry) Clone(o Entry)

func (*BaseEntry) Free

func (e *BaseEntry) Free()

func (*BaseEntry) GetAuxilaryInfo

func (e *BaseEntry) GetAuxilaryInfo() interface{}

func (*BaseEntry) GetMeta

func (e *BaseEntry) GetMeta() *EntryMeta

func (*BaseEntry) GetPayload

func (e *BaseEntry) GetPayload() []byte

func (*BaseEntry) IsAsync

func (e *BaseEntry) IsAsync() bool

func (*BaseEntry) Marshal

func (e *BaseEntry) Marshal() ([]byte, error)

func (*BaseEntry) ReadFrom

func (e *BaseEntry) ReadFrom(r io.Reader) (int64, error)

func (*BaseEntry) SetAuxilaryInfo

func (e *BaseEntry) SetAuxilaryInfo(info interface{})

func (*BaseEntry) SetMeta

func (e *BaseEntry) SetMeta(meta *EntryMeta)

func (*BaseEntry) Unmarshal

func (e *BaseEntry) Unmarshal(buf []byte) error

func (*BaseEntry) WriteTo

func (e *BaseEntry) WriteTo(w StoreFileWriter, locker sync.Locker) (int64, error)

type Entry

type Entry interface {
	IsAsync() bool
	GetMeta() *EntryMeta
	SetMeta(*EntryMeta)
	GetPayload() []byte
	Unmarshal([]byte) error
	Marshal() ([]byte, error)
	ReadFrom(io.Reader) (int64, error)
	WriteTo(StoreFileWriter, sync.Locker) (int64, error)
	GetAuxilaryInfo() interface{}
	SetAuxilaryInfo(info interface{})
	Free()
}

type EntryHandler

type EntryHandler = func(io.Reader, *EntryMeta) (Entry, int64, error)

type EntryMeta

type EntryMeta struct {
	Buf []byte
}

func NewEntryMeta

func NewEntryMeta() *EntryMeta

func (*EntryMeta) GetReservedBuf

func (meta *EntryMeta) GetReservedBuf() []byte

func (*EntryMeta) GetType

func (meta *EntryMeta) GetType() EntryType

func (*EntryMeta) IsCheckpoint

func (meta *EntryMeta) IsCheckpoint() bool

func (*EntryMeta) IsFlush

func (meta *EntryMeta) IsFlush() bool

func (*EntryMeta) PayloadSize

func (meta *EntryMeta) PayloadSize() uint32

func (*EntryMeta) ReadFrom

func (meta *EntryMeta) ReadFrom(r io.Reader) (int64, error)

func (*EntryMeta) SetPayloadSize

func (meta *EntryMeta) SetPayloadSize(size uint32)

func (*EntryMeta) SetType

func (meta *EntryMeta) SetType(typ EntryType)

func (*EntryMeta) Size

func (meta *EntryMeta) Size() uint32

func (*EntryMeta) String

func (meta *EntryMeta) String() string

func (*EntryMeta) WriteTo

func (meta *EntryMeta) WriteTo(w io.Writer) (int64, error)

type EntryType

type EntryType = uint16
const (
	ETInvalid EntryType = iota
	ETFlush
	ETCheckpoint
	ETCustomizeStart
)

func UnmarshallEntryType

func UnmarshallEntryType(buf []byte) EntryType

type HBHandleFactory

type HBHandleFactory = func(AwareStore) base.IHBHandle

type HistoryFactory

type HistoryFactory func() IHistory

type IHistory

type IHistory interface {
	String() string
	Append(*VersionFile)
	Extend([]*VersionFile)
	Versions() []uint64
	VersionCnt() int
	Truncate(uint64) error
	Version(uint64) *VersionFile
	GetOldest() *VersionFile
	Empty() bool
	ReplayVersions(VersionReplayHandler, ReplayObserver) error
}

type IRotateChecker

type IRotateChecker interface {
	PrepareAppend(*VersionFile, int64) (bool, error)
}

type MaxSizeRotationChecker

type MaxSizeRotationChecker struct {
	MaxSize int
}

func (*MaxSizeRotationChecker) PrepareAppend

func (c *MaxSizeRotationChecker) PrepareAppend(f *VersionFile, delta int64) (bool, error)

type Observer

type Observer interface {
	OnSynced()
	OnRotated(*VersionFile)
}

type PostVersionDeleteCB

type PostVersionDeleteCB = func(uint64)

type ReplayObserver

type ReplayObserver interface {
	OnNewVersion(uint64)
	OnReplayCommit(uint64)
	OnReplayCheckpoint(common.Range)
}

type Replayer

type Replayer interface {
	Replay(Store) error
	Truncate(Store) error
	RegisterEntryHandler(EntryType, EntryHandler) error
	GetOffset() int64
}

type RotationCfg

type RotationCfg struct {
	RotateChecker  IRotateChecker
	Observer       Observer
	HistoryFactory HistoryFactory
}

type Rotational

type Rotational struct {
	sync.RWMutex
	Dir        string
	NamePrefix string
	NameSuffix string
	Checker    IRotateChecker
	// contains filtered or unexported fields
}

func OpenRotational

func OpenRotational(dir, prefix, suffix string, historyFactory HistoryFactory, checker IRotateChecker, observer Observer) (*Rotational, error)

func (*Rotational) ApplyCheckpoint

func (r *Rotational) ApplyCheckpoint(rng common.Range)

func (*Rotational) ApplyCommit

func (r *Rotational) ApplyCommit(id uint64)

func (*Rotational) Close

func (r *Rotational) Close() error

func (*Rotational) GetHistory

func (r *Rotational) GetHistory() IHistory

func (*Rotational) GetNextVersion

func (r *Rotational) GetNextVersion() uint64

func (*Rotational) OnNewVersion

func (r *Rotational) OnNewVersion(id uint64)

func (*Rotational) OnReplayCheckpoint

func (r *Rotational) OnReplayCheckpoint(rng common.Range)

func (*Rotational) OnReplayCommit

func (r *Rotational) OnReplayCommit(id uint64)

func (*Rotational) PrepareWrite

func (r *Rotational) PrepareWrite(size int) error

func (*Rotational) ReplayHistoryVersion

func (r *Rotational) ReplayHistoryVersion(handler VersionReplayHandler) error

func (*Rotational) ReplayVersions

func (r *Rotational) ReplayVersions(handler VersionReplayHandler) error

func (*Rotational) Stat

func (r *Rotational) Stat() (os.FileInfo, error)

func (*Rotational) String

func (r *Rotational) String() string

func (*Rotational) Sync

func (r *Rotational) Sync() error

func (*Rotational) Truncate

func (r *Rotational) Truncate(size int64) error

func (*Rotational) TryCompact

func (r *Rotational) TryCompact()

func (*Rotational) Write

func (r *Rotational) Write(buf []byte) (n int, err error)

type Store

type Store interface {
	io.Closer
	AppendEntry(Entry) error
	Sync() error
	ReplayVersions(VersionReplayHandler) error
	Truncate(int64) error
	GetHistory() IHistory
	TryCompact()
}

type StoreFile

type StoreFile interface {
	StoreFileWriter
	io.Closer
	sync.Locker
	RLock()
	RUnlock()
	Sync() error
	Truncate(int64) error
	Stat() (os.FileInfo, error)
	GetHistory() IHistory
	ReplayVersions(VersionReplayHandler) error
	TryCompact()
}

type StoreFileWriter

type StoreFileWriter interface {
	io.Writer
	PrepareWrite(int) error
	ApplyCommit(uint64)
	ApplyCheckpoint(common.Range)
}

type SyncerCfg

type SyncerCfg struct {
	Factory  HBHandleFactory
	Interval time.Duration
}

type VersionFile

type VersionFile struct {
	*os.File
	Version uint64
	Size    int64
}

func (*VersionFile) Destroy

func (vf *VersionFile) Destroy() error

func (*VersionFile) Truncate

func (vf *VersionFile) Truncate(size int64) error

type VersionReplayHandler

type VersionReplayHandler = func(*VersionFile, ReplayObserver) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL