Documentation ¶
Index ¶
- Constants
- Variables
- type AggregateType
- type MetricName
- type MetricsAggregator
- type TimeslotAggregator
- func (tsaggr *TimeslotAggregator) DataChannelLength() int
- func (tsaggr *TimeslotAggregator) Get(hostID string, groupTag string, metricName MetricName, ...) (int64, error)
- func (tsaggr *TimeslotAggregator) Put(hostID string, groupTag string, metricName MetricName, metricValue int64, ...) error
- func (tsaggr *TimeslotAggregator) Start()
- func (tsaggr *TimeslotAggregator) Stop()
Constants ¶
const ( // NumConns is a guage that refers to number of connections NumConns MetricName = "numConns" // NumExtentsActive is a guage that refers to number of active extents NumExtentsActive = "numExtentsActive" // RemDiskSpaceBytes is a guage that refers to remaining disk space RemDiskSpaceBytes = "remDiskSpaceBytes" // MsgsInPerSec refers to incoming messages per sec counter MsgsInPerSec MetricName = "msgsInPerSec" // MsgsOutPerSec refers to outgoing messages per sec counter MsgsOutPerSec = "msgsOutPerSec" // BytesInPerSec refers to incoming bytes per sec counter BytesInPerSec = "bytesInPerSec" // BytesOutPerSec refers to outgoing messages per sec counter BytesOutPerSec = "bytesOutPerSec" // SmartRetryOn is a 0/1 counter that indicates if a // consumer group has smart retry on or off SmartRetryOn = "smartRetryOn" )
const ( // EmptyTag represents an empty tag EmptyTag string = "" )
Variables ¶
var ErrBackpressure = errors.New("Data channel full")
ErrBackpressure is returned when a data point cannot be accepted due to downstream backpressure
var ErrNoData = errors.New("No data found for input")
ErrNoData is returned when there is not enough data to perform the requested aggregation operation
Functions ¶
This section is empty.
Types ¶
type AggregateType ¶
type AggregateType int
AggregateType is an enum type that refers to some aggregation function. Examples are Average, Sum etc
const ( // OneMinAvg is an aggregation operation // that does average of all data points // in the past minute OneMinAvg AggregateType = iota // OneMinSum gives the sum of // all data points over the // past minute OneMinSum // FiveMinAvg is an aggregation function // that does average of all data points // across the past five minutes FiveMinAvg // FiveMinSum gives the sum of // all data points over the // past five minutes FiveMinSum )
type MetricsAggregator ¶
type MetricsAggregator interface { // Start starts the metrics aggregator Start() // Stop stops the metrics aggregator Stop() // Put collects a metrics data point // for aggregation. Returns an error // if the data point cannot be accepted Put(hostID string, groupTag string, metricName MetricName, metricValue int64, timestamp int64) error // Get returns an aggregated metric value // identified by the given parameters // resultType refers to the aggregation // operation that needs to be performed for // obtaining the result. Examples are // OneMinAvg or FiveMinAvg. Get(hostID string, groupTag string, metricName MetricName, resultType AggregateType) (int64, error) }
MetricsAggregator collects and aggregates metrics, usually by time. Any implementation of such time series aggregator MUST support the following methods.
type TimeslotAggregator ¶
type TimeslotAggregator struct {
// contains filtered or unexported fields
}
TimeslotAggregator is an implementation of metrics aggregator that aggregates data by time slots
func NewTimeSlotAggregator ¶
func NewTimeSlotAggregator(clock common.TimeSource, logger bark.Logger) *TimeslotAggregator
NewTimeSlotAggregator creates and returns an instance of MetricsAggregator that aggregates data at fixed interval timeslots. Internally, the TimeSlotAggregator maintains a circular buffer of timeslots, where each timeslot is typically, a minute. The size of the buffer determines the history that's stored by the aggregator. Incoming data is always added to the current timeslot whereas readers always start reading from the timeslot preceding the current one (and go back). This allows lockless access to the hashmaps that hold the aggregated metric data.
MetricGroups:
Metrics reported to the aggregator are composed of a
timesamp, metricName, metricValue and a groupTag. A group tag allows for aggregation of same metric at different levels. For example, when a store reports an extent metric nExtentsActive, this metric can be aggregated by many levels, i.e. storeHostUUID, extentUUID, dstUUID. This problem is solved by the group tag.
Inside a TimeSlot: Each timeslot internally maintains a single hashmap that's indexed by a key representing a specific metric. Since the same metric can be used by multiple metric groups, the indexed string is a combination of groupTag and metricName.
SigChannel: As mentioned before, timeslots are only advanced when there is incoming data. So, when there is no incoming data for a while, the data in the current time slot will not be consumed by readers. To overcome this problem, we have a signchannel that will force advancement of timeslot when the input stream is idle.
func (*TimeslotAggregator) DataChannelLength ¶
func (tsaggr *TimeslotAggregator) DataChannelLength() int
DataChannelLength returns the length of the internal data channel, this method should only be used by unit tests.
func (*TimeslotAggregator) Get ¶
func (tsaggr *TimeslotAggregator) Get(hostID string, groupTag string, metricName MetricName, resultType AggregateType) (int64, error)
Get returns an aggregated value for a metric identified by the given inputs
func (*TimeslotAggregator) Put ¶
func (tsaggr *TimeslotAggregator) Put(hostID string, groupTag string, metricName MetricName, metricValue int64, timestamp int64) error
Put adds a data point to the aggregator
func (*TimeslotAggregator) Start ¶
func (tsaggr *TimeslotAggregator) Start()
Start stats the timeslot aggregator
func (*TimeslotAggregator) Stop ¶
func (tsaggr *TimeslotAggregator) Stop()
Stop stops the time slot aggregator