Documentation ¶
Index ¶
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func DecodeMarkerV1 ¶
DecodeMarkerV1 decodes the segment number from a segment marker, encoded with EncodeMarkerV1.
func EncodeMarkerV1 ¶
EncodeMarkerV1 encodes the segment number, from whom we need to create a marker, in the marker file format, which in v1 includes the segment number and a trailing CRC code of the first 10 bytes.
func FindMarkableSegment ¶
func FindMarkableSegment(segmentDataCount map[int]*countDataItem, tooOldThreshold time.Duration) int
FindMarkableSegment finds, given the summary of data updates received, and a threshold on how much time can pass for a segment that hasn't received updates to be considered as "live", the segment that should be marked as last consumed. The algorithm will find the highest numbered segment that is considered as "consumed", with its all predecessors "consumed" as well.
A consumed segment is one with data count of zero, meaning that there's no data left in flight for it, or it hasn't received any updates for tooOldThreshold time.
Also, while reviewing the data items in segmentDataCount, those who are consumed will be deleted to clean up space.
This algorithm runs in O(N log N), being N the size of segmentDataCount, and allocates O(N) memory.
Types ¶
type MarkerFileHandler ¶
type MarkerFileHandler interface { wal.Marker // MarkSegment writes in the backing file-store that a particular segment is the last one marked. MarkSegment(segment int) }
MarkerFileHandler is a file-backed wal.Marker, that also allows one to write to the backing store as particular segment number as the last one marked.
func NewMarkerFileHandler ¶
func NewMarkerFileHandler(logger log.Logger, walDir string) (MarkerFileHandler, error)
NewMarkerFileHandler creates a new markerFileHandler.
type MarkerHandler ¶
type MarkerHandler interface { wal.Marker // UpdateReceivedData sends an update event to the handler, that informs that some dataUpdate, coming from a particular WAL // segment, has been read out of the WAL and enqueued for sending. UpdateReceivedData(segmentId, dataCount int) // UpdateSentData sends an update event to the handler, informing that some dataUpdate, coming from a particular WAL // segment, has been delivered, or the sender has given up on it. UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending // Stop stops the handler, and it's async processing of receive/send dataUpdate updates. Stop() }
func NewMarkerHandler ¶
func NewMarkerHandler(mfh MarkerFileHandler, maxSegmentAge time.Duration, logger log.Logger, metrics *MarkerMetrics) MarkerHandler
NewMarkerHandler creates a new markerHandler.
type MarkerMetrics ¶
type MarkerMetrics struct {
// contains filtered or unexported fields
}
func NewMarkerMetrics ¶
func NewMarkerMetrics(reg prometheus.Registerer) *MarkerMetrics
func (*MarkerMetrics) WithCurriedId ¶
func (m *MarkerMetrics) WithCurriedId(id string) *MarkerMetrics
WithCurriedId returns a curried version of MarkerMetrics, with the id label pre-filled. This is a helper that avoids having to move the id around where it's unnecessary, and won't change inside the consumer of the metrics.