Documentation ¶
Index ¶
Constants ¶
const ( // AllValue is a special bucket key that represents all possible values. AllValue = "__all__" // InvalidValue is a special bucket key that represents an invalid message. InvalidValue = "__invalid__" // MissingValue is a special bucket key that represents a missing value in a message. MissingValue = "__missing__" // DimSeparator is a special string that's used to separate dimensions in multi-dimensional // bucket keys. DimSeparator = "∪∪" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bucket ¶
type Bucket struct { Key string Count int Index int // Statistics about the values in this bucket Min float64 Max float64 Sum float64 }
Bucket is a collection that we keep counts for in a heap.
type BucketsHeap ¶
type BucketsHeap []*Bucket
BucketsHeap is a heap.Interface implementation that holds Buckets. Based on example in https://golang.org/pkg/container/heap/#example__priorityQueue.
func (BucketsHeap) Len ¶
func (h BucketsHeap) Len() int
Len returns the number of buckets in the heap.
func (BucketsHeap) Less ¶
func (h BucketsHeap) Less(i, j int) bool
Less returns whether the ith element in the heap is less than the jth one.
func (*BucketsHeap) Pop ¶
func (h *BucketsHeap) Pop() interface{}
Pop removes the smallest item from the heap.
func (*BucketsHeap) Push ¶
func (h *BucketsHeap) Push(x interface{})
Push adds a new item to the heap.
func (BucketsHeap) Swap ¶
func (h BucketsHeap) Swap(i, j int)
Swap swaps the ith and jth elements and updates the indices for each.
type MessageCounter ¶
MessageCounter is a type that stores counts by partition (or file or S3 key for non-Kafka sources). It's used by the digger stats progress view.
func NewMessageCounter ¶
func NewMessageCounter() *MessageCounter
NewMessageCounter returns a new MessageCounter instance.
func (*MessageCounter) Summary ¶
func (m *MessageCounter) Summary() MessageCounterSummary
Summary returns a MessageCounterSummary instance based on the stats recorded thus far for this counter.
func (*MessageCounter) Update ¶
func (m *MessageCounter) Update(msg kafka.Message, postFilter bool)
Update updates the counter for the provided message.
type MessageCounterSummary ¶
type MessageCounterSummary struct { TotalMessages int64 PostFilterMessages int64 FirstTime time.Time LastTime time.Time PartitionCounters map[int]PartitionCounter }
MessageCounterSummary stores a summary of the message counts seen so far.
type PartitionCounter ¶
type PartitionCounter struct { PartitionID int TotalMessages int64 PostFilterMessages int64 FirstOffset int64 LastOffset int64 FirstTime time.Time LastTime time.Time }
PartitionCounter stores detailed stats about the messages seen so far in a specific partition (or file or S3 key).
type TimeBucketCounter ¶
TimeBucketCounter is a counter that records the approximate number of events over a recent interval; the length of this interval and the resolution are configurable. Used to measure the approximate processing rate for the digger.
func NewTimeBucketCounter ¶
func NewTimeBucketCounter( resolution time.Duration, length time.Duration, ) *TimeBucketCounter
NewTimeBucketCounter creates a new TimeBucketCounter instance for the given resolution and length.
func (*TimeBucketCounter) Increment ¶
func (t *TimeBucketCounter) Increment(now time.Time, count int64)
Increment updates the counter for the argument count, assuming that the current time is now.
func (*TimeBucketCounter) RatePerSec ¶
func (t *TimeBucketCounter) RatePerSec() float64
RatePerSec returns the average count per second for this counter.
func (*TimeBucketCounter) Total ¶
func (t *TimeBucketCounter) Total() int64
Total gets the total count for this counter.
type TopKCounter ¶
TopKCounter is a counter that keeps stats on the top K keys seen so far. It uses a BucketsHeap behind the scenes to do this in a memory-efficient way.
func NewTopKCounter ¶
func NewTopKCounter(k int) *TopKCounter
NewTopKCounter creates a new TopKCounter instance for the argument k value.
func (*TopKCounter) Add ¶
func (t *TopKCounter) Add(key string, value float64) error
Add updates the counter state for the argument key and value. If the key is not currently in the heap, then a bucket is created for it.
func (*TopKCounter) Buckets ¶
func (t *TopKCounter) Buckets(limit int, sortByName bool) []Bucket
Buckets returns all of the buckets currently in the heap, sorted by count and key.
func (*TopKCounter) Clean ¶
func (t *TopKCounter) Clean(limit int)
Clean removes items from the heap to get the size down to the argument limit.
func (*TopKCounter) PrettyTable ¶
func (t *TopKCounter) PrettyTable(n int, numeric bool, sortByName bool) string
PrettyTable returns a pretty table that summarizes the stats for the top k values in this counter instance.
func (*TopKCounter) Summary ¶
func (t *TopKCounter) Summary() TopKCounterSummary
Summary returns a summary of the current state of this counter instance.