journal

package
v1.10.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2020 License: MIT Imports: 23 Imported by: 0

Documentation

Index

Examples

Constants

View Source
const (
	// FileMode default file mode
	FileMode = 0664
	// DirMode default directory mode
	DirMode = 0774
	// BufSize default buf file size
	BufSize = 1024 * 1024 * 4 // 4 MB
)
View Source
const (
	// FlushInterval interval to flush serializer
	// deafultFlushInterval = 5 * time.Second
	// RotateCheckInterval interval to rotate journal files
	RotateCheckInterval = 1 * time.Second
)

Variables

View Source
var (
	// DataFileNameReg journal data file name pattern
	DataFileNameReg = regexp.MustCompile(`^\d{8}_\d{8}\.buf(.gz)?$`)
	// IDFileNameReg journal id file name pattern
	IDFileNameReg = regexp.MustCompile(`^\d{8}_\d{8}\.ids(.gz)?$`)
)
View Source
var (
	// DuringRotateErr rotate error
	DuringRotateErr = fmt.Errorf("during rotating")
)

Functions

func GenerateNewBufFName

func GenerateNewBufFName(now time.Time, oldFName string, isWithGZ bool) (string, error)

GenerateNewBufFName return new buf file name depends on current time file name looks like `yyyymmddnnnn.ids`, nnnn begin from 0001 for each day

func OpenBufFile

func OpenBufFile(filepath string, fizeByte int64) (fp *os.File, err error)

OpenBufFile create and open file

func PrepareDir

func PrepareDir(path string) error

PrepareDir `mkdir -p`

Types

type BaseSerializer added in v1.6.1

type BaseSerializer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

BaseSerializer base serializer

type BufFileStat

type BufFileStat struct {
	NewDataFp, NewIDsFp            *os.File
	OldDataFnames, OldIdsDataFname []string
}

BufFileStat current journal files' stats

func PrepareNewBufFile

func PrepareNewBufFile(dirPath string, oldFsStat *BufFileStat, isScan, isWithGZ bool, sizeBytes int64) (fsStat *BufFileStat, err error)

PrepareNewBufFile create new data & id files, and update BufFileStat

type Data

type Data struct {
	Data map[string]interface{}
	ID   int64
}

Data msgp data schema

func (*Data) DecodeMsg

func (z *Data) DecodeMsg(dc *msgp.Reader) (err error)

DecodeMsg implements msgp.Decodable

func (*Data) EncodeMsg

func (z *Data) EncodeMsg(en *msgp.Writer) (err error)

EncodeMsg implements msgp.Encodable

func (*Data) MarshalMsg

func (z *Data) MarshalMsg(b []byte) (o []byte, err error)

MarshalMsg implements msgp.Marshaler

func (*Data) Msgsize

func (z *Data) Msgsize() (s int)

Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message

func (*Data) UnmarshalMsg

func (z *Data) UnmarshalMsg(bts []byte) (o []byte, err error)

UnmarshalMsg implements msgp.Unmarshaler

type DataDecoder

type DataDecoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

DataDecoder data deserializer

func NewDataDecoder

func NewDataDecoder(fp *os.File, isCompress bool) (decoder *DataDecoder, err error)

NewDataDecoder create new DataDecoder

func (*DataDecoder) Read

func (dec *DataDecoder) Read(data *Data) (err error)

Read deserialize data from fp

type DataEncoder

type DataEncoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

DataEncoder data serializer

func NewDataEncoder

func NewDataEncoder(fp *os.File, isCompress bool) (enc *DataEncoder, err error)

NewDataEncoder create new DataEncoder

func (*DataEncoder) Close added in v1.6.0

func (enc *DataEncoder) Close() (err error)

Close close data gzip writer

func (*DataEncoder) Flush

func (enc *DataEncoder) Flush() (err error)

Flush flush buf to fp

func (*DataEncoder) Write

func (enc *DataEncoder) Write(msg *Data) (err error)

Write serialize data info fp

type IdsDecoder

type IdsDecoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

IdsDecoder ids deserializer

func NewIdsDecoder

func NewIdsDecoder(fp *os.File, isCompress bool) (decoder *IdsDecoder, err error)

NewIdsDecoder create new IdsDecoder

func (*IdsDecoder) LoadMaxId

func (dec *IdsDecoder) LoadMaxId() (maxId int64, err error)

LoadMaxId load the maxium id in all files

func (*IdsDecoder) ReadAllToBmap

func (dec *IdsDecoder) ReadAllToBmap() (ids *roaring.Bitmap, err error)

ReadAllToBmap read all ids in all files into bmap

func (*IdsDecoder) ReadAllToInt64Set added in v1.3.0

func (dec *IdsDecoder) ReadAllToInt64Set(ids Int64SetItf) (err error)

ReadAllToBmap read all ids in all files into set

type IdsEncoder

type IdsEncoder struct {
	BaseSerializer
	// contains filtered or unexported fields
}

IdsEncoder ids serializer

func NewIdsEncoder

func NewIdsEncoder(fp *os.File, isCompress bool) (enc *IdsEncoder, err error)

NewIdsEncoder create new IdsEncoder

func (*IdsEncoder) Close added in v1.6.0

func (enc *IdsEncoder) Close() (err error)

Close close ids gzip writer

func (*IdsEncoder) Flush

func (enc *IdsEncoder) Flush() (err error)

Flush flush buf to fp

func (*IdsEncoder) Write

func (enc *IdsEncoder) Write(id int64) (err error)

Write serialize id info fp

type Int64Set added in v1.3.0

type Int64Set struct {
	// contains filtered or unexported fields
}

Int64Set set depends on sync.Map. cost much more memory than bitmap

Example
package main

import (
	"github.com/Laisky/go-utils/journal"
)

func main() {
	s := journal.NewInt64Set()
	s.Add(5)

	s.CheckAndRemove(5) // true
	s.CheckAndRemove(3) // false
}
Output:

func NewInt64Set added in v1.3.0

func NewInt64Set() *Int64Set

NewInt64Set create new Int64Set

func (*Int64Set) Add added in v1.3.0

func (s *Int64Set) Add(i int)

Add add new number

func (*Int64Set) AddInt64 added in v1.7.1

func (s *Int64Set) AddInt64(i int64)

AddInt64 add int64

func (*Int64Set) CheckAndRemove added in v1.3.0

func (s *Int64Set) CheckAndRemove(i int64) (ok bool)

CheckAndRemove return true if exists

func (*Int64Set) GetLen added in v1.3.3

func (s *Int64Set) GetLen() int

GetLen return length

type Int64SetItf added in v1.7.1

type Int64SetItf interface {
	Add(int)
	AddInt64(int64)
	CheckAndRemove(int64) bool
	GetLen() int
}

Int64SetItf set for int64

type Int64SetWithTTL added in v1.7.0

type Int64SetWithTTL struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

Int64SetWithTTL int64 set with TTL

func NewInt64SetWithTTL added in v1.7.0

func NewInt64SetWithTTL(ctx context.Context, ttl time.Duration) *Int64SetWithTTL

NewInt64SetWithTTL create new int64 set with ttl

func (*Int64SetWithTTL) Add added in v1.7.0

func (s *Int64SetWithTTL) Add(id int)

Add add int

func (*Int64SetWithTTL) AddInt64 added in v1.7.0

func (s *Int64SetWithTTL) AddInt64(id int64)

AddInt64 add int64

func (*Int64SetWithTTL) CheckAndRemove added in v1.7.0

func (s *Int64SetWithTTL) CheckAndRemove(id int64) (ok bool)

CheckAndRemove return true if id committed

func (*Int64SetWithTTL) Close added in v1.7.0

func (s *Int64SetWithTTL) Close()

Close close set, stop rotate

func (*Int64SetWithTTL) GetLen added in v1.7.0

func (s *Int64SetWithTTL) GetLen() (r int)

GetLen get items number of set

func (*Int64SetWithTTL) StartRotate added in v1.7.6

func (s *Int64SetWithTTL) StartRotate(ctx context.Context)

StartRotate start counter rotate

type Journal

type Journal struct {
	sync.RWMutex // journal rwlock
	*JournalConfig
	// contains filtered or unexported fields
}

Journal redo log consist by msgs and committed ids

func NewJournal

func NewJournal(ctx context.Context, cfg *JournalConfig) *Journal

NewJournal create new Journal

func (*Journal) Close added in v1.7.0

func (j *Journal) Close()

func (*Journal) Flush

func (j *Journal) Flush() (err error)

Flush flush journal files

func (*Journal) GetMetric added in v1.3.3

func (j *Journal) GetMetric() map[string]interface{}

GetMetric monitor inteface

func (*Journal) IsLegacyRunning

func (j *Journal) IsLegacyRunning() bool

IsLegacyRunning check whether running legacy loading

func (*Journal) LoadLegacyBuf

func (j *Journal) LoadLegacyBuf(data *Data) (err error)

LoadLegacyBuf load legacy data one by one ⚠️Warn: should call `j.LockLegacy()` before invoke this method

func (*Journal) LoadMaxId

func (j *Journal) LoadMaxId() (int64, error)

LoadMaxId load max id from journal ids files

func (*Journal) LockLegacy

func (j *Journal) LockLegacy() bool

LockLegacy lock legacy to prevent rotate

func (*Journal) Rotate

func (j *Journal) Rotate(ctx context.Context) (err error)

Rotate create new data and ids buf file this function is not threadsafe.

func (*Journal) UnLockLegacy

func (j *Journal) UnLockLegacy() bool

UnLockLegacy release legacy lock

func (*Journal) WriteData

func (j *Journal) WriteData(data *Data) (err error)

WriteData write data to journal

func (*Journal) WriteId

func (j *Journal) WriteId(id int64) error

WriteId write id to journal

type JournalConfig

type JournalConfig struct {
	BufDirPath     string
	BufSizeBytes   int64
	RotateDuration time.Duration
	IsAggresiveGC,
	IsCompress bool // [beta] enable gc when writing journal
	FlushInterval,
	CommittedIDTTL time.Duration // remain ids in memory until ttl, to reduce duplicate msg
}

JournalConfig configuration of Journal

func NewConfig added in v1.3.8

func NewConfig() *JournalConfig

NewConfig get JournalConfig with default configuration

type LegacyLoader

type LegacyLoader struct {
	sync.Mutex
	// contains filtered or unexported fields
}

LegacyLoader loader to handle legacy data and ids

func NewLegacyLoader

func NewLegacyLoader(ctx context.Context, dataFNames, idsFNames []string, isCompress bool, committedIDTTL time.Duration) *LegacyLoader

NewLegacyLoader create new LegacyLoader

func (*LegacyLoader) AddID added in v1.3.0

func (l *LegacyLoader) AddID(id int64)

AddID add id in ids

func (*LegacyLoader) Clean

func (l *LegacyLoader) Clean() error

Clean clean legacy files

func (*LegacyLoader) GetIdsLen added in v1.4.0

func (l *LegacyLoader) GetIdsLen() int

GetIdsLen return length of ids

func (*LegacyLoader) IsIDExists added in v1.10.1

func (l *LegacyLoader) IsIDExists(id int64) bool

func (*LegacyLoader) Load

func (l *LegacyLoader) Load(data *Data) (err error)

func (*LegacyLoader) LoadAllids

func (l *LegacyLoader) LoadAllids(ids Int64SetItf) (allErr error)

LoadAllids read all ids from ids file into ids set

func (*LegacyLoader) LoadMaxId

func (l *LegacyLoader) LoadMaxId() (maxId int64, err error)

LoadMaxId load max id from all ids files

func (*LegacyLoader) Reset

func (l *LegacyLoader) Reset(dataFNames, idsFNames []string)

Reset reset journal legacy link to existing files

type Uint32Set added in v1.4.2

type Uint32Set struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Uint32Set set depends on bitmap. only support uint32, so cannot support more than 4294967295 numbers.

func NewUint32Set added in v1.4.2

func NewUint32Set() *Uint32Set

NewUint32Set create new Uint32Set

func (*Uint32Set) AddInt64 added in v1.4.2

func (s *Uint32Set) AddInt64(i int64)

AddInt64 add new number

func (*Uint32Set) AddUint32 added in v1.4.2

func (s *Uint32Set) AddUint32(i uint32)

AddUint32 add new number

func (*Uint32Set) CheckAndRemoveInt64 added in v1.4.2

func (s *Uint32Set) CheckAndRemoveInt64(i int64) (ok bool)

CheckAndRemoveInt64 return true if exists

func (*Uint32Set) CheckAndRemoveUint32 added in v1.4.2

func (s *Uint32Set) CheckAndRemoveUint32(i uint32) (ok bool)

CheckAndRemoveUint32 return true if exists

func (*Uint32Set) GetLen added in v1.4.2

func (s *Uint32Set) GetLen() int

GetLen return length

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL