Documentation
¶
Index ¶
- Constants
- func MergeDataStructure(fileNames []string, topic *TopicDataId, offset int64, ...) error
- func NewEmptyChunk(start int64) SegmentChunk
- func ReadFileStructure(topicId *TopicDataId, offset int64, config conf.DatalogConfig) ([]string, error)
- type Datalog
- type LocalWriteItem
- type ReadItem
- type ReadSegmentChunk
- type ReplicationDataItem
- type ReplicationReader
- type SegmentReader
- type SegmentWriter
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func MergeDataStructure ¶
func MergeDataStructure(fileNames []string, topic *TopicDataId, offset int64, config conf.DatalogConfig) error
func NewEmptyChunk ¶
func NewEmptyChunk(start int64) SegmentChunk
func ReadFileStructure ¶
func ReadFileStructure(topicId *TopicDataId, offset int64, config conf.DatalogConfig) ([]string, error)
Reads segment file names that will contain the data starting from offset
Types ¶
type Datalog ¶
type Datalog interface { Initializer // Seeks the position and fills the buffer with chunks until maxSize or maxRecords is reached. // Opens and close the file handle. It may issue several reads to reach to the position. ReadFileFrom( buf []byte, maxSize int, segmentId int64, startOffset int64, maxRecords int, topic *TopicDataId, ) ([]byte, error) // Blocks until there's an available buffer to be used to stream. // After use, it should be released StreamBuffer() []byte // Releases the stream buffer ReleaseStreamBuffer(buf []byte) // Gets the max producer offset from local. // Returns an error when not found. ReadProducerOffset(topicId *TopicDataId) (int64, error) // Gets a sorted list of offsets representing the name of the segment files, where the offset is less than maxOffset SegmentFileList(topic *TopicDataId, maxOffset int64) ([]int64, error) }
func NewDatalog ¶
func NewDatalog(config conf.DatalogConfig) Datalog
type LocalWriteItem ¶
type LocalWriteItem interface { SegmentChunk Replication() ReplicationInfo SetResult(error) }
type ReadItem ¶
type ReadItem interface { Origin() string // An identifier of the source of the poll used to determine whether the reader should use the last stored offset and not auto commit CommitOnly() bool // Determines whether it should only commit and not read as part of this request SetResult(error, SegmentChunk) }
Represents a queued message to read from a segment. When the read is completed, `SetResult()` is invoked.
type ReadSegmentChunk ¶
type ReadSegmentChunk struct { Buffer []byte Start int64 // The offset of the first message Length uint32 // The amount of messages in the chunk }
func (*ReadSegmentChunk) DataBlock ¶
func (s *ReadSegmentChunk) DataBlock() []byte
func (*ReadSegmentChunk) RecordLength ¶
func (s *ReadSegmentChunk) RecordLength() uint32
func (*ReadSegmentChunk) StartOffset ¶
func (s *ReadSegmentChunk) StartOffset() int64
type ReplicationDataItem ¶
type ReplicationReader ¶
type ReplicationReader interface { MergeFileStructure() (bool, error) // Merge the index files content and file structures // Reads at least a chunk from a replica and returns the amount of bytes written in the buffer StreamFile( segmentId int64, topic *TopicDataId, startOffset int64, maxRecords int, buf []byte) (int, error) }
type SegmentReader ¶
type SegmentReader struct { Items chan ReadItem Topic TopicDataId TopicRangeClusterSize int SourceVersion GenId // The version in which this reader was created, a consumer might be on Gen=v3 but the current is v4. In this case, source would be v4 and topic.Version = v3 MaxProducedOffset *int64 // When set, it determines the last offset produced for this topicId for an old generation, inclusive // contains filtered or unexported fields }
func NewSegmentReader ¶
func NewSegmentReader( group string, isLeader bool, replicationReader ReplicationReader, topic TopicDataId, topicRangeClusterSize int, sourceVersion GenId, initialOffset int64, offsetState OffsetState, maxProducedOffset *int64, datalog Datalog, config conf.DatalogConfig, ) (*SegmentReader, error)
Returns a log file reader.
The segment reader instance is valid for a single generation, closed when the generation ends or the broker is no longer the leader.
It aggressively reads ahead and maintains local cache, so there should there should be a different reader instance per consumer group.
func (*SegmentReader) HasStoppedReceiving ¶
func (s *SegmentReader) HasStoppedReceiving() bool
Determines that the reader has stopped polling the channel, no further ReadItems will be processed.
It signals that either the current offset info changed in a way that generations don't match, the during an offset
func (*SegmentReader) StoredOffsetAsCompleted ¶
func (s *SegmentReader) StoredOffsetAsCompleted() bool
Returns true when the offset state has been set as completed (previous generations only)
type SegmentWriter ¶
type SegmentWriter struct { Items chan SegmentChunk Topic TopicDataId // contains filtered or unexported fields }
SegmentWriter contains the logic to write segments on disk and replicate them.
There should be an instance per topic+token+generation. When the generation changes for a token, the channel should be closed.
func NewSegmentWriter ¶
func NewSegmentWriter( topic TopicDataId, gossiper Replicator, config conf.DatalogConfig, segmentId *int64, ) (*SegmentWriter, error)