Documentation ¶
Index ¶
- Constants
- Variables
- type EachLogEntryFuncType
- type LogBuffer
- func (logBuffer *LogBuffer) AddDataToBuffer(partitionKey, data []byte, processingTsNs int64)
- func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage)
- func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition
- func (logBuffer *LogBuffer) GetEarliestTime() time.Time
- func (logBuffer *LogBuffer) IsAllFlushed() bool
- func (logBuffer *LogBuffer) IsStopping() bool
- func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64, ...) (lastReadPosition MessagePosition, isDone bool, err error)
- func (logBuffer *LogBuffer) ReadFromBuffer(lastReadPosition MessagePosition) (bufferCopy *bytes.Buffer, batchIndex int64, err error)
- func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer)
- func (logBuffer *LogBuffer) ShutdownLogBuffer()
- type LogFlushFuncType
- type LogReadFromDiskFuncType
- type MemBuffer
- type MessagePosition
- type SealedBuffers
Constants ¶
View Source
const BufferSize = 4 * 1024 * 1024
View Source
const PreviousBufferCount = 3
Variables ¶
View Source
var ( ResumeError = fmt.Errorf("resume") ResumeFromDiskError = fmt.Errorf("resumeFromDisk") )
Functions ¶
This section is empty.
Types ¶
type EachLogEntryFuncType ¶
type LogBuffer ¶
type LogBuffer struct { LastFlushTsNs int64 ReadFromDiskFn LogReadFromDiskFuncType LastTsNs int64 sync.RWMutex // contains filtered or unexported fields }
func NewLogBuffer ¶
func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFuncType, readFromDiskFn LogReadFromDiskFuncType, notifyFn func()) *LogBuffer
func (*LogBuffer) AddDataToBuffer ¶
func (*LogBuffer) AddToBuffer ¶
func (logBuffer *LogBuffer) AddToBuffer(message *mq_pb.DataMessage)
func (*LogBuffer) GetEarliestPosition ¶
func (logBuffer *LogBuffer) GetEarliestPosition() MessagePosition
func (*LogBuffer) GetEarliestTime ¶
func (*LogBuffer) IsAllFlushed ¶
IsAllFlushed returns true if all data in the buffer has been flushed, after calling ShutdownLogBuffer().
func (*LogBuffer) IsStopping ¶
func (*LogBuffer) LoopProcessLogData ¶
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64, waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
func (*LogBuffer) ReadFromBuffer ¶
func (*LogBuffer) ReleaseMemory ¶
func (*LogBuffer) ShutdownLogBuffer ¶
func (logBuffer *LogBuffer) ShutdownLogBuffer()
ShutdownLogBuffer flushes the buffer and stops the log buffer
type LogFlushFuncType ¶
type LogReadFromDiskFuncType ¶
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
type MessagePosition ¶
type MessagePosition struct { time.Time // this is the timestamp of the message BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch. }
func NewMessagePosition ¶
func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition
type SealedBuffers ¶
type SealedBuffers struct {
// contains filtered or unexported fields
}
func (*SealedBuffers) SealBuffer ¶
Click to show internal directories.
Click to hide internal directories.