log_buffer

package
v0.0.0-...-45e1a9a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const BufferSize = 4 * 1024 * 1024
View Source
const PreviousBufferCount = 3

Variables

View Source
var (
	ResumeError         = fmt.Errorf("resume")
	ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)

Functions

This section is empty.

Types

type EachLogEntryFuncType

type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)

type LogBuffer

type LogBuffer struct {
	LastFlushTsNs int64

	ReadFromDiskFn LogReadFromDiskFuncType

	LastTsNs int64
	sync.RWMutex
	// contains filtered or unexported fields
}

func NewLogBuffer

func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType,
	readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer

func (*LogBuffer) AddDataToBuffer

func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64)

func (*LogBuffer) AddToBuffer

func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage)

func (*LogBuffer) GetEarliestPosition

func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition

func (*LogBuffer) GetEarliestTime

func (logBuffer *LogBuffer) GetEarliestTime() time.Time

func (*LogBuffer) IsAllFlushed

func (logBuffer *LogBuffer) IsAllFlushed() bool

IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().

func (*LogBuffer) IsStopping

func (logBuffer *LogBuffer) IsStopping() bool

func (*LogBuffer) LoopProcessLogData

func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
	waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)

func (*LogBuffer) ReadFromBuffer

func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error)

func (*LogBuffer) ReleaseMemory

func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer)

func (*LogBuffer) ShutdownLogBuffer

func (logBuffer *LogBuffer) ShutdownLogBuffer()

ShutdownLogBuffer flushes the buffer and stops the log buffer

type LogFlushFuncType

type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)

type LogReadFromDiskFuncType

type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)

type MemBuffer

type MemBuffer struct {
	// contains filtered or unexported fields
}

func (*MemBuffer) String

func (mb *MemBuffer) String() string

type MessagePosition

type MessagePosition struct {
	time.Time        // this is the timestamp of the message
	BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
}

func NewMessagePosition

func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition

type SealedBuffers

type SealedBuffers struct {
	// contains filtered or unexported fields
}

func (*SealedBuffers) SealBuffer

func (sbs *SealedBuffers) SealBuffer(startTime, stopTime time.Time, buf []byte, pos int, batchIndex int64) (newBuf []byte)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL