Documentation ¶
Index ¶
- Constants
- Variables
- func GenerateNewBufFName(now time.Time, oldFName string, isWithGZ bool) (string, error)
- func OpenBufFile(filepath string, fizeByte int64) (fp *os.File, err error)
- func PrepareDir(path string) error
- type BaseSerializer
- type BufFileStat
- type Data
- type DataDecoder
- type DataEncoder
- type IdsDecoder
- type IdsEncoder
- type Int64Set
- type Int64SetItf
- type Int64SetWithTTL
- type Journal
- func (j *Journal) Close()
- func (j *Journal) Flush() (err error)
- func (j *Journal) GetMetric() map[string]interface{}
- func (j *Journal) IsLegacyRunning() bool
- func (j *Journal) LoadLegacyBuf(data *Data) (err error)
- func (j *Journal) LoadMaxId() (int64, error)
- func (j *Journal) LockLegacy() bool
- func (j *Journal) Rotate(ctx context.Context) (err error)
- func (j *Journal) UnLockLegacy() bool
- func (j *Journal) WriteData(data *Data) (err error)
- func (j *Journal) WriteId(id int64) error
- type JournalConfig
- type LegacyLoader
- func (l *LegacyLoader) AddID(id int64)
- func (l *LegacyLoader) Clean() error
- func (l *LegacyLoader) GetIdsLen() int
- func (l *LegacyLoader) IsIDExists(id int64) bool
- func (l *LegacyLoader) Load(data *Data) (err error)
- func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (allErr error)
- func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)
- func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)
- type Uint32Set
Examples ¶
Constants ¶
const ( // FileMode default file mode FileMode = 0664 // DirMode default directory mode DirMode = 0774 // BufSize default buf file size BufSize = 1024 * 1024 * 4 // 4 MB )
const ( // FlushInterval interval to flush serializer // deafultFlushInterval = 5 * time.Second // RotateCheckInterval interval to rotate journal files RotateCheckInterval = 1 * time.Second )
Variables ¶
var ( // DataFileNameReg journal data file name pattern DataFileNameReg = regexp.MustCompile(`^\d{8}_\d{8}\.buf(.gz)?$`) // IDFileNameReg journal id file name pattern IDFileNameReg = regexp.MustCompile(`^\d{8}_\d{8}\.ids(.gz)?$`) )
var ( // DuringRotateErr rotate error DuringRotateErr = fmt.Errorf("during rotating") )
Functions ¶
func GenerateNewBufFName ¶
GenerateNewBufFName return new buf file name depends on current time file name looks like `yyyymmddnnnn.ids`, nnnn begin from 0001 for each day
func OpenBufFile ¶
OpenBufFile create and open file
Types ¶
type BaseSerializer ¶ added in v1.6.1
BaseSerializer base serializer
type BufFileStat ¶
BufFileStat current journal files' stats
func PrepareNewBufFile ¶
func PrepareNewBufFile(dirPath string, oldFsStat *BufFileStat, isScan, isWithGZ bool, sizeBytes int64) (fsStat *BufFileStat, err error)
PrepareNewBufFile create new data & id files, and update BufFileStat
type Data ¶
Data msgp data schema
func (*Data) MarshalMsg ¶
MarshalMsg implements msgp.Marshaler
type DataDecoder ¶
type DataDecoder struct { BaseSerializer // contains filtered or unexported fields }
DataDecoder data deserializer
func NewDataDecoder ¶
func NewDataDecoder(fp *os.File, isCompress bool) (decoder *DataDecoder, err error)
NewDataDecoder create new DataDecoder
func (*DataDecoder) Read ¶
func (dec *DataDecoder) Read(data *Data) (err error)
Read deserialize data from fp
type DataEncoder ¶
type DataEncoder struct { BaseSerializer // contains filtered or unexported fields }
DataEncoder data serializer
func NewDataEncoder ¶
func NewDataEncoder(fp *os.File, isCompress bool) (enc *DataEncoder, err error)
NewDataEncoder create new DataEncoder
func (*DataEncoder) Close ¶ added in v1.6.0
func (enc *DataEncoder) Close() (err error)
Close close data gzip writer
func (*DataEncoder) Write ¶
func (enc *DataEncoder) Write(msg *Data) (err error)
Write serialize data info fp
type IdsDecoder ¶
type IdsDecoder struct { BaseSerializer // contains filtered or unexported fields }
IdsDecoder ids deserializer
func NewIdsDecoder ¶
func NewIdsDecoder(fp *os.File, isCompress bool) (decoder *IdsDecoder, err error)
NewIdsDecoder create new IdsDecoder
func (*IdsDecoder) LoadMaxId ¶
func (dec *IdsDecoder) LoadMaxId() (maxId int64, err error)
LoadMaxId load the maxium id in all files
func (*IdsDecoder) ReadAllToBmap ¶
func (dec *IdsDecoder) ReadAllToBmap() (ids *roaring.Bitmap, err error)
ReadAllToBmap read all ids in all files into bmap
func (*IdsDecoder) ReadAllToInt64Set ¶ added in v1.3.0
func (dec *IdsDecoder) ReadAllToInt64Set(ids Int64SetItf) (err error)
ReadAllToBmap read all ids in all files into set
type IdsEncoder ¶
type IdsEncoder struct { BaseSerializer // contains filtered or unexported fields }
IdsEncoder ids serializer
func NewIdsEncoder ¶
func NewIdsEncoder(fp *os.File, isCompress bool) (enc *IdsEncoder, err error)
NewIdsEncoder create new IdsEncoder
func (*IdsEncoder) Close ¶ added in v1.6.0
func (enc *IdsEncoder) Close() (err error)
Close close ids gzip writer
func (*IdsEncoder) Write ¶
func (enc *IdsEncoder) Write(id int64) (err error)
Write serialize id info fp
type Int64Set ¶ added in v1.3.0
type Int64Set struct {
// contains filtered or unexported fields
}
Int64Set set depends on sync.Map. cost much more memory than bitmap
Example ¶
package main import ( "github.com/Laisky/go-utils/journal" ) func main() { s := journal.NewInt64Set() s.Add(5) s.CheckAndRemove(5) // true s.CheckAndRemove(3) // false }
Output:
func (*Int64Set) CheckAndRemove ¶ added in v1.3.0
CheckAndRemove return true if exists
type Int64SetItf ¶ added in v1.7.1
Int64SetItf set for int64
type Int64SetWithTTL ¶ added in v1.7.0
Int64SetWithTTL int64 set with TTL
func NewInt64SetWithTTL ¶ added in v1.7.0
func NewInt64SetWithTTL(ctx context.Context, ttl time.Duration) *Int64SetWithTTL
NewInt64SetWithTTL create new int64 set with ttl
func (*Int64SetWithTTL) AddInt64 ¶ added in v1.7.0
func (s *Int64SetWithTTL) AddInt64(id int64)
AddInt64 add int64
func (*Int64SetWithTTL) CheckAndRemove ¶ added in v1.7.0
func (s *Int64SetWithTTL) CheckAndRemove(id int64) (ok bool)
CheckAndRemove return true if id committed
func (*Int64SetWithTTL) Close ¶ added in v1.7.0
func (s *Int64SetWithTTL) Close()
Close close set, stop rotate
func (*Int64SetWithTTL) GetLen ¶ added in v1.7.0
func (s *Int64SetWithTTL) GetLen() (r int)
GetLen get items number of set
func (*Int64SetWithTTL) StartRotate ¶ added in v1.7.6
func (s *Int64SetWithTTL) StartRotate(ctx context.Context)
StartRotate start counter rotate
type Journal ¶
type Journal struct { sync.RWMutex // journal rwlock *JournalConfig // contains filtered or unexported fields }
Journal redo log consist by msgs and committed ids
func NewJournal ¶
func NewJournal(ctx context.Context, cfg *JournalConfig) *Journal
NewJournal create new Journal
func (*Journal) IsLegacyRunning ¶
IsLegacyRunning check whether running legacy loading
func (*Journal) LoadLegacyBuf ¶
LoadLegacyBuf load legacy data one by one ⚠️Warn: should call `j.LockLegacy()` before invoke this method
func (*Journal) LockLegacy ¶
LockLegacy lock legacy to prevent rotate
func (*Journal) UnLockLegacy ¶
UnLockLegacy release legacy lock
type JournalConfig ¶
type JournalConfig struct { BufDirPath string BufSizeBytes int64 RotateDuration time.Duration IsAggresiveGC, IsCompress bool // [beta] enable gc when writing journal FlushInterval, CommittedIDTTL time.Duration // remain ids in memory until ttl, to reduce duplicate msg }
JournalConfig configuration of Journal
func NewConfig ¶ added in v1.3.8
func NewConfig() *JournalConfig
NewConfig get JournalConfig with default configuration
type LegacyLoader ¶
LegacyLoader loader to handle legacy data and ids
func NewLegacyLoader ¶
func NewLegacyLoader(ctx context.Context, dataFNames, idsFNames []string, isCompress bool, committedIDTTL time.Duration) *LegacyLoader
NewLegacyLoader create new LegacyLoader
func (*LegacyLoader) AddID ¶ added in v1.3.0
func (l *LegacyLoader) AddID(id int64)
AddID add id in ids
func (*LegacyLoader) GetIdsLen ¶ added in v1.4.0
func (l *LegacyLoader) GetIdsLen() int
GetIdsLen return length of ids
func (*LegacyLoader) IsIDExists ¶ added in v1.10.1
func (l *LegacyLoader) IsIDExists(id int64) bool
func (*LegacyLoader) Load ¶
func (l *LegacyLoader) Load(data *Data) (err error)
func (*LegacyLoader) LoadAllids ¶
func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (allErr error)
LoadAllids read all ids from ids file into ids set
func (*LegacyLoader) LoadMaxId ¶
func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)
LoadMaxId load max id from all ids files
func (*LegacyLoader) Reset ¶
func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)
Reset reset journal legacy link to existing files
type Uint32Set ¶ added in v1.4.2
Uint32Set set depends on bitmap. only support uint32, so cannot support more than 4294967295 numbers.
func NewUint32Set ¶ added in v1.4.2
func NewUint32Set() *Uint32Set
NewUint32Set create new Uint32Set
func (*Uint32Set) CheckAndRemoveInt64 ¶ added in v1.4.2
CheckAndRemoveInt64 return true if exists
func (*Uint32Set) CheckAndRemoveUint32 ¶ added in v1.4.2
CheckAndRemoveUint32 return true if exists