Documentation ¶
Overview ¶
Packge pstore and sub packages handle writing metrics to persistent storage.
Index ¶
- Constants
- Variables
- type AsyncConsumer
- type Consumer
- type ConsumerAttributes
- type ConsumerMetrics
- type ConsumerMetricsStore
- func (s *ConsumerMetricsStore) AddToRecordCount(count uint64)
- func (s *ConsumerMetricsStore) DisableWrites()
- func (s *ConsumerMetricsStore) Filter(r *store.Record) bool
- func (s *ConsumerMetricsStore) Metrics(m *ConsumerMetrics)
- func (s *ConsumerMetricsStore) Pause()
- func (s *ConsumerMetricsStore) RemoveFromRecordCount(count uint64)
- func (s *ConsumerMetricsStore) Resume()
- func (s *ConsumerMetricsStore) SetMetrics(m *ConsumerMetrics)
- type ConsumerWithMetrics
- func (c *ConsumerWithMetrics) Attributes(a *ConsumerAttributes)
- func (c *ConsumerWithMetrics) Flush()
- func (c *ConsumerWithMetrics) MetricsStore() *ConsumerMetricsStore
- func (c *ConsumerWithMetrics) Name() string
- func (c *ConsumerWithMetrics) Write(n store.NamedIterator, host string, tagGroup TagGroup)
- type ConsumerWithMetricsBuilder
- func (b *ConsumerWithMetricsBuilder) AddHook(hook RecordWriteHooker)
- func (b *ConsumerWithMetricsBuilder) Attributes(attrs *ConsumerAttributes)
- func (b *ConsumerWithMetricsBuilder) Build() *ConsumerWithMetrics
- func (b *ConsumerWithMetricsBuilder) Name() string
- func (b *ConsumerWithMetricsBuilder) Paused() bool
- func (b *ConsumerWithMetricsBuilder) SetBatchSizeDist(d *tricorder.CumulativeDistribution)
- func (b *ConsumerWithMetricsBuilder) SetBufferSize(size uint)
- func (b *ConsumerWithMetricsBuilder) SetConcurrency(concurrency uint)
- func (b *ConsumerWithMetricsBuilder) SetConsumerMetrics(m *ConsumerMetrics)
- func (b *ConsumerWithMetricsBuilder) SetLogger(logger log.Logger)
- func (b *ConsumerWithMetricsBuilder) SetName(name string)
- func (b *ConsumerWithMetricsBuilder) SetPaused(paused bool)
- func (b *ConsumerWithMetricsBuilder) SetPerMetricWriteTimeDist(d *tricorder.CumulativeDistribution)
- func (b *ConsumerWithMetricsBuilder) SetRecordsPerSecond(recordsPerSecond uint)
- func (b *ConsumerWithMetricsBuilder) SetRegexesOfMetricsToExclude(regexes []string) error
- func (b *ConsumerWithMetricsBuilder) SetRollUpSpan(dur time.Duration)
- type LimitedBySubTypeRecordWriter
- type LimitedRecordWriter
- type Record
- type RecordWriteHooker
- type RecordWriter
- type RecordWriterMetrics
- type RecordWriterWithMetrics
- func (w *RecordWriterWithMetrics) Disable()
- func (w *RecordWriterWithMetrics) Metrics(m *RecordWriterMetrics)
- func (w *RecordWriterWithMetrics) Pause()
- func (w *RecordWriterWithMetrics) Resume()
- func (w *RecordWriterWithMetrics) SetMetrics(m *RecordWriterMetrics)
- func (w *RecordWriterWithMetrics) Write(records []Record) error
- type TagGroup
Constants ¶
const ( TagAppName = "appname" TagRegionName = "region" TagIpAddress = "ipaddress" )
Commonly used keys in TagGroup instances
Variables ¶
var (
ErrDisabled = errors.New("pstore: Writer disabled.")
)
Functions ¶
This section is empty.
Types ¶
type AsyncConsumer ¶
type AsyncConsumer struct {
// contains filtered or unexported fields
}
AsyncConsumer works like Consumer but does the work asynchronously. For certain persistent stores, AsyncConsumer can offer greater throughput than Consumer. However unlike Consumer, AsyncConsumer methods do not return any errors encountered to the caller. Like Consumer, AsyncConsumer is NOT safe to use with multiple goroutines.
func NewAsyncConsumer ¶
func NewAsyncConsumer( w RecordWriter, bufferSize, concurrency uint) *AsyncConsumer
NewAsyncConsumer creates a new AsyncConsumer instance. w is the underlying writer. concurrency is the count of goroutines doing the consuming. bufferSize is the size of each buffer. Each goroutine gets its own buffer.
func (*AsyncConsumer) Flush ¶
func (a *AsyncConsumer) Flush()
Flush works like Consumer.Flush waiting until previous calls to WriteAsync resolve before flushing the buffers. Flush does not return until it has completed flushing all the buffers and committing all necessary NamedIterator instances.
After calling Flush, the client can safely assume that this instance is not holding onto any NamedIterator instances.
func (*AsyncConsumer) WriteAsync ¶
func (a *AsyncConsumer) WriteAsync( n store.NamedIterator, host string, tagGroup TagGroup)
WriteAsync works like the Consumer.Write except that it returns before completing the work. Sometime in the future, a separate goroutine does the work of consuming values from n, writing them to the underlying RecordWriter and committing progress on n.
Like Consumer.Write, caller must avoid creating and using another NamedIterator instance with the same name iterating over the same values until it has called Flush on this instance.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer writes values from NamedIterator instances to persistent storage. Consumer buffers values to be written. Whenever the buffer becomes full, the Consumer instance clears the buffer, writes the values out to the underlying writer, and commits progress on the corresponding NamedIterator instances. Multiple goroutines cannot share the same Consumer instance.
func NewConsumer ¶
func NewConsumer(w RecordWriter, bufferSize uint) *Consumer
NewConsumer creates a new Consumer instance. w is the underlying writer. bufferSize is how many values the buffer holds.
func (*Consumer) Flush ¶
Flush writes any pending values out to the underlying writer committing progress on the corresponding NamedIterator instances.
After calling Flush, the client can safely assume that this instance is not holding onto any NamedIterator instances.
If an error happens writing out pending values, Flush returns that error and does not commit progress on the corresponding NamedIterator instances.
func (*Consumer) Write ¶
host and tagGroup are the host and tag group for the values in n.
When the caller passes a NamedIterator instance to Write, this instance holds onto that NamedIterator until either its values are written out to the underlying RecordWriter or an error happens. Therefore, the caller should avoid creating and using another NamedIterator instance with the same name iterating over the same values until it has called Flush on this instance.
type ConsumerAttributes ¶
type ConsumerAttributes struct { // The number of writing goroutines Concurrency uint // The number of records written each time BatchSize uint // The maximum records per second per goroutine. 0 means unlimited. RecordsPerSecond uint // The time period length for rolled up values. // A value bigger than 0 means that client should feed this consumer // store.NamedIterator instances that report summarised values for // the same time period length. 0 means client should feed this // consumer store.NamedIterator instances that report all metric // values. RollUpSpan time.Duration // Compiled regex forms of metrics to exclude MetricsToExclude []*regexp.Regexp BatchSizes *tricorder.CumulativeDistribution PerMetricWriteTimes *tricorder.CumulativeDistribution }
ConsumerAttributes represent the unchanging attributes of a particular ConsumerWithMetrics instance.
func (*ConsumerAttributes) TotalRecordsPerSecond ¶
func (c *ConsumerAttributes) TotalRecordsPerSecond() uint
TotalRecordsPerSecond returns RecordsPerSecond * Concurrency
func (*ConsumerAttributes) WritesSameAs ¶
func (c *ConsumerAttributes) WritesSameAs(other *ConsumerAttributes) bool
WritesSameAs returns true if these consumer attributes would cause the same values to be written as other.
type ConsumerMetrics ¶
type ConsumerMetrics struct { RecordWriterMetrics // Total values to write or skip TotalValues uint64 // Values skipped SkippedValues uint64 }
ConsumerMetrics represents metrics for a consumer.
func (*ConsumerMetrics) ValuesNotWritten ¶
func (c *ConsumerMetrics) ValuesNotWritten() uint64
ValuesNotWritten returns the number of values not written
func (*ConsumerMetrics) ZeroValuesNotWritten ¶
func (c *ConsumerMetrics) ZeroValuesNotWritten()
Zero values not written zeros out the values not written by setting TotalValues to ValuesWritten and SkippedValues to 0.
type ConsumerMetricsStore ¶
type ConsumerMetricsStore struct {
// contains filtered or unexported fields
}
ConsumerMetricsStore stores metrics for a consumer. ConsumerMetricStore instances are safe to use with multiple goroutines.
func (*ConsumerMetricsStore) AddToRecordCount ¶
func (s *ConsumerMetricsStore) AddToRecordCount(count uint64)
Adds count to the total number of records consumer must write out.
func (*ConsumerMetricsStore) DisableWrites ¶
func (s *ConsumerMetricsStore) DisableWrites()
DisableWrites causes all future writes to underlying persistent store of corresponding consumer to return an error. Any in progress writes will complete before DisableWrites returns.
func (*ConsumerMetricsStore) Filter ¶
func (s *ConsumerMetricsStore) Filter(r *store.Record) bool
Returns true if consumer is to write out this record or false otherwise.
func (*ConsumerMetricsStore) Metrics ¶
func (s *ConsumerMetricsStore) Metrics(m *ConsumerMetrics)
Metrics writes the metrics of corresponding consumer to m.
func (*ConsumerMetricsStore) Pause ¶
func (s *ConsumerMetricsStore) Pause()
Pause pauses the writer in the corresponding consumer waiting for any in progress writes to complete
func (*ConsumerMetricsStore) RemoveFromRecordCount ¶
func (s *ConsumerMetricsStore) RemoveFromRecordCount(count uint64)
Removed count from the total number of records consumer must write out.
func (*ConsumerMetricsStore) Resume ¶
func (s *ConsumerMetricsStore) Resume()
Resume resumes the writer in the corresponding consumer
func (*ConsumerMetricsStore) SetMetrics ¶
func (s *ConsumerMetricsStore) SetMetrics(m *ConsumerMetrics)
SetMetrics sets the metrics for corresponding consumer to m.
type ConsumerWithMetrics ¶
type ConsumerWithMetrics struct {
// contains filtered or unexported fields
}
ConsumerWithMetrics instances work like Consumer instances but also have metrics. Like Consumer instances, ConsumerWithMetric instances are NOT safe to use with multiple goroutines.
func (*ConsumerWithMetrics) Attributes ¶
func (c *ConsumerWithMetrics) Attributes(a *ConsumerAttributes)
Attributes writes this instance's attirbutes to a
func (*ConsumerWithMetrics) Flush ¶
func (c *ConsumerWithMetrics) Flush()
Flush works like Consumer.Flush but does not return an error.
func (*ConsumerWithMetrics) MetricsStore ¶
func (c *ConsumerWithMetrics) MetricsStore() *ConsumerMetricsStore
MetricsStore returns the ConsumerMetricsStore for this instance.
func (*ConsumerWithMetrics) Name ¶
func (c *ConsumerWithMetrics) Name() string
func (*ConsumerWithMetrics) Write ¶
func (c *ConsumerWithMetrics) Write( n store.NamedIterator, host string, tagGroup TagGroup)
Write works like Consumer.Write but does not return an error.
type ConsumerWithMetricsBuilder ¶
type ConsumerWithMetricsBuilder struct {
// contains filtered or unexported fields
}
ConsumerWithMetricsBuilder builds a ConsumerWithMetrics instance. Each instance is good for building one and only one ConsumerWithMetrics instance. These instances are NOT safe to use with multiple goroutines.
func NewConsumerWithMetricsBuilder ¶
func NewConsumerWithMetricsBuilder( w LimitedRecordWriter) *ConsumerWithMetricsBuilder
NewConsumerWithMetricsBuilder creates a new instance that will build a consumer that uses w to write values out.
func (*ConsumerWithMetricsBuilder) AddHook ¶
func (b *ConsumerWithMetricsBuilder) AddHook(hook RecordWriteHooker)
AddHook adds a hook for writes. hook must be non-nil.
func (*ConsumerWithMetricsBuilder) Attributes ¶
func (b *ConsumerWithMetricsBuilder) Attributes(attrs *ConsumerAttributes)
Attributes gets the attributes in this builder object
func (*ConsumerWithMetricsBuilder) Build ¶
func (b *ConsumerWithMetricsBuilder) Build() *ConsumerWithMetrics
Build builds the ConsumerWithMetrics instance and destroys this builder.
func (*ConsumerWithMetricsBuilder) Name ¶
func (b *ConsumerWithMetricsBuilder) Name() string
func (*ConsumerWithMetricsBuilder) Paused ¶
func (b *ConsumerWithMetricsBuilder) Paused() bool
Paused returns whether or not built consumer will be paused.
func (*ConsumerWithMetricsBuilder) SetBatchSizeDist ¶
func (b *ConsumerWithMetricsBuilder) SetBatchSizeDist( d *tricorder.CumulativeDistribution)
SetBatchSizeDist sets the distribution that the consumer will use to record the batch size of values written out. The default is not to record batch sizes.
func (*ConsumerWithMetricsBuilder) SetBufferSize ¶
func (b *ConsumerWithMetricsBuilder) SetBufferSize(size uint)
SetBufferSize sets how many values the consumer will buffer before writing them out. The default is 1000. SetBufferSize panics if size < 1.
func (*ConsumerWithMetricsBuilder) SetConcurrency ¶
func (b *ConsumerWithMetricsBuilder) SetConcurrency(concurrency uint)
SetConcurrency sets how many goroutines will write to the underlying writer. Default is 1. SetConcurrency panics if concurrency < 1.
func (*ConsumerWithMetricsBuilder) SetConsumerMetrics ¶
func (b *ConsumerWithMetricsBuilder) SetConsumerMetrics(m *ConsumerMetrics)
SetConsumerMetrics ensures that built instance with have metrics equal to m.
func (*ConsumerWithMetricsBuilder) SetLogger ¶
func (b *ConsumerWithMetricsBuilder) SetLogger(logger log.Logger)
SetLogger sets the logger that built instance is to use
func (*ConsumerWithMetricsBuilder) SetName ¶
func (b *ConsumerWithMetricsBuilder) SetName(name string)
SetName sets the name of the consumer. Default is the empty string.
func (*ConsumerWithMetricsBuilder) SetPaused ¶
func (b *ConsumerWithMetricsBuilder) SetPaused(paused bool)
Pause causes the built consumer's writer to be paused.
func (*ConsumerWithMetricsBuilder) SetPerMetricWriteTimeDist ¶
func (b *ConsumerWithMetricsBuilder) SetPerMetricWriteTimeDist( d *tricorder.CumulativeDistribution)
SetPerMetricWriteTimeDist sets the distribution that the consumer will use to record write times. The default is not to record write times.
func (*ConsumerWithMetricsBuilder) SetRecordsPerSecond ¶
func (b *ConsumerWithMetricsBuilder) SetRecordsPerSecond( recordsPerSecond uint)
SetRecordsPerSecond throttles writes. Default is 0 which means no throttling.
func (*ConsumerWithMetricsBuilder) SetRegexesOfMetricsToExclude ¶
func (b *ConsumerWithMetricsBuilder) SetRegexesOfMetricsToExclude( regexes []string) error
SetRegexesOfMetricsToExclude sets what metric names to exclude. SetRegexesOfMetricsToExclude returns an error if one of regexes is an invalid regular expression.
func (*ConsumerWithMetricsBuilder) SetRollUpSpan ¶
func (b *ConsumerWithMetricsBuilder) SetRollUpSpan(dur time.Duration)
SetRollUpSpan sets the length of time periods for rolled up values Other than setting RollUpSpan consumer attribute, this method has no effect on built consumer.
type LimitedBySubTypeRecordWriter ¶
type LimitedBySubTypeRecordWriter interface { // IsTypeSupported(kind) = // IsTypeAndSubTypeSupported(kind, types.Unknown) LimitedRecordWriter // IsTypeAndSubTypeSupported returns true if this writer supports // metrics of a particular kind. IsTypeAndSubTypeSupported(kind, subType types.Type) bool }
LimitedBySubTypeRecordWriter is a sub interface of LimitedRecordWriter that allows filtering by both kind and sub-type. For instance, if a RecordWriter can write lists of int64s but not lists of strings, it should implement this interface.
type LimitedRecordWriter ¶
type LimitedRecordWriter interface { RecordWriter // IsTypeSupported returns true if this writer supports metrics // of a particular kind. IsTypeSupported(kind types.Type) bool }
LimitedRecordWriter is a RecordWriter which provides information on what types of values it can write.
type Record ¶
type Record struct { // Originating machine HostName string // Path of metric Path string // Arbitrary key-value pairs describing this metric Tags TagGroup // Kind of metric Kind types.Type // Subtype of metric SubType types.Type // Unit of metric Unit units.Unit // Value of metric Value interface{} // The timestamp of the metric value. Timestamp time.Time }
Record represents one value of one metric in persistent storage.
type RecordWriteHooker ¶
type RecordWriteHooker interface { // WriteHook is called just after a write or attempted write to // pstore. records are the records written; err is the resulting // error if any. Implementations must not modify the records array. WriteHook(records []Record, result error) }
Instances that want to know when a batch of records are written to persistent store implement this interface.
type RecordWriter ¶
type RecordWriter interface { // Write writes given collection of records to persistent storage Write(records []Record) error }
RecordWriter is the interface for writing to persistent store. Implementations of RecordWriter must be safe to use with multiple goroutines.
type RecordWriterMetrics ¶
type RecordWriterMetrics struct { ValuesWritten uint64 WriteAttempts uint64 SuccessfulWrites uint64 LastWriteError string LastWriteErrorTS time.Time LastSuccessfulWriteTS time.Time TimeSpentWriting time.Duration Paused bool Disabled bool }
RecordWriterMetrics represents writing metrics
func (*RecordWriterMetrics) SuccessfulWriteRatio ¶
func (w *RecordWriterMetrics) SuccessfulWriteRatio() float64
type RecordWriterWithMetrics ¶
type RecordWriterWithMetrics struct { // Client must provide the underlying writer W RecordWriter // Client populates this to collect write times per metric PerMetricWriteTimes *tricorder.CumulativeDistribution // Client populates this to collect batch sizes BatchSizes *tricorder.CumulativeDistribution Logger log.Logger // contains filtered or unexported fields }
RecordWriterWithMetrics implements RecordWriter and provides metrics
func NewRecordWriterWithMetrics ¶
func NewRecordWriterWithMetrics(writer RecordWriter) *RecordWriterWithMetrics
NewRecordWriterWithMetrics returns a new RecordWriterWithMetrics. Use this method instead of initialising RecordWithMetrics directly.
func (*RecordWriterWithMetrics) Disable ¶
func (w *RecordWriterWithMetrics) Disable()
Disable disables this writer so that future calls to Write return ErrDisabled. Any in progress calls to Write will complete before Disable returns. If this writer is currently paused, Disable resumes it so that any blocking calls to Write return ErrDisabled. Calling Write on a disabled writer does not update any metrics.
func (*RecordWriterWithMetrics) Metrics ¶
func (w *RecordWriterWithMetrics) Metrics(m *RecordWriterMetrics)
Metrics stores the current metrics at m
func (*RecordWriterWithMetrics) Pause ¶
func (w *RecordWriterWithMetrics) Pause()
Pause pauses this writer so that subsequent calls to Write block. Any in progress calls wo Write will complete before Pause returns.
func (*RecordWriterWithMetrics) Resume ¶
func (w *RecordWriterWithMetrics) Resume()
Resume resumes this writer. Any blocked calls to Write resume immediatley.
func (*RecordWriterWithMetrics) SetMetrics ¶
func (w *RecordWriterWithMetrics) SetMetrics(m *RecordWriterMetrics)
SetMetrics sets the metrics in this instance to m but leaves the Paused and Disabled metrics in this instance unchanged.
func (*RecordWriterWithMetrics) Write ¶
func (w *RecordWriterWithMetrics) Write(records []Record) error
Directories ¶
Path | Synopsis |
---|---|
Package config includes utilities for handling configuration files.
|
Package config includes utilities for handling configuration files. |
influx
Package influx enables writing metric values to influxdb.
|
Package influx enables writing metric values to influxdb. |
kafka
Package kafka enables writing metric values to kafka.
|
Package kafka enables writing metric values to kafka. |
lmm
Package lmm enables writing metric values to lmm.
|
Package lmm enables writing metric values to lmm. |
mock
Package mock enables writing metric values to a mockdb for testing.
|
Package mock enables writing metric values to a mockdb for testing. |
tsdb
Package tsdb enables writing metric values to tsdb.
|
Package tsdb enables writing metric values to tsdb. |