Documentation
¶
Index ¶
- type Accumulator
- type CachedStatisticsRepository
- func (cachedStatisticsRepository *CachedStatisticsRepository) EmitIntervalStatisticsForKeys(keys []string, fromOrdinal int64, untilOrdinal int64, ...) error
- func (cachedStatisticsRepository *CachedStatisticsRepository) RegisterCache(key string, cache *IntervalStatisticsCache) error
- func (cachedStatisticsRepository *CachedStatisticsRepository) UnregisterCache(key string) error
- type ClearParams
- type Controller
- func (controller *Controller) ClearStatisticsCache(params *ClearParams) (err error)
- func (controller *Controller) GetStatistics(params *StatisticsParams) (err error)
- func (controller *Controller) ModifyTags(params *TagParams) (err error)
- func (controller *Controller) Push(params *PushParams) (err error)
- func (controller *Controller) Register(params *RegisterParams) (err error)
- func (controller *Controller) Unregister(params *UnregisterParams) (err error)
- type IntervalRouter
- func (intervalRouter *IntervalRouter) Accumulate(ordinalValue OrdinalValue, output chan IntervalStatistics)
- func (intervalRouter *IntervalRouter) AccumulateFromChannel(input chan OrdinalValue, output chan IntervalStatistics, done chan bool)
- func (intervalRouter *IntervalRouter) FinaliseAll()
- func (intervalRouter *IntervalRouter) FinalisePriorTo(ordinal int64)
- func (intervalRouter *IntervalRouter) FinalisePriorToTime(t time.Time)
- type IntervalStatistics
- type IntervalStatisticsCache
- func (intervalStatisticsCache *IntervalStatisticsCache) EmitIntervalStatistics(fromOrdinal int64, untilOrdinal int64, output chan IntervalStatistics) error
- func (intervalStatisticsCache *IntervalStatisticsCache) GetFromOrdinal(fromOrdinal int64) []IntervalStatistics
- func (intervalStatisticsCache *IntervalStatisticsCache) GetLast(maxCount int) []IntervalStatistics
- func (intervalStatisticsCache *IntervalStatisticsCache) GetOrdinalRange(fromOrdinal int64, untilOrdinal int64) []IntervalStatistics
- func (intervalStatisticsCache *IntervalStatisticsCache) ProcessAndForward(input chan IntervalStatistics, output chan IntervalStatistics)
- type IntervalType
- type OrdinalValue
- type PushParams
- type RegisterParams
- type Router
- type StatisticsParams
- type TagParams
- type TagRepository
- func (tagRepository *TagRepository) ApplyTags(key string, tags []string, removeExisting bool)
- func (tagRepository *TagRepository) Clear()
- func (tagRepository *TagRepository) ClearForKey(key string)
- func (tagRepository *TagRepository) GetMatchingKeys(withTags []string, excludingTags []string) []string
- func (tagRepository *TagRepository) GetTagsForKey(key string) []string
- func (tagRepository *TagRepository) RemoveTags(key string, tags []string)
- type UnregisterParams
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Accumulator ¶
type Accumulator struct {
// contains filtered or unexported fields
}
Accumulator used to calculate statistics for a period of time by processing provided values
func NewAccumulator ¶
func NewAccumulator(streamKey string, intervalStart int64, intervalEnd int64, intervalType IntervalType, targetSampleCount uint32) *Accumulator
NewAccumulator creates an accumulator
func (*Accumulator) Accumulate ¶
func (accumulator *Accumulator) Accumulate(input chan OrdinalValue, output chan IntervalStatistics, done chan bool)
Accumulate values from a channel
func (*Accumulator) Finalise ¶
func (accumulator *Accumulator) Finalise() IntervalStatistics
Finalise calculates statistics from the accumulator and prevents any further accumulation
func (*Accumulator) Include ¶
func (accumulator *Accumulator) Include(ordinalValue OrdinalValue)
Include a new value within the accumulation
type CachedStatisticsRepository ¶
type CachedStatisticsRepository struct {
// contains filtered or unexported fields
}
CachedStatisticsRepository provides access to generated interval statistics
func NewCachedStatisticsRepository ¶
func NewCachedStatisticsRepository() CachedStatisticsRepository
NewCachedStatisticsRepository creates an empty statistics repository
func (*CachedStatisticsRepository) EmitIntervalStatisticsForKeys ¶
func (cachedStatisticsRepository *CachedStatisticsRepository) EmitIntervalStatisticsForKeys(keys []string, fromOrdinal int64, untilOrdinal int64, output chan IntervalStatistics) error
EmitIntervalStatisticsForKeys outputs matching interval statistics to a provided channel
func (*CachedStatisticsRepository) RegisterCache ¶
func (cachedStatisticsRepository *CachedStatisticsRepository) RegisterCache(key string, cache *IntervalStatisticsCache) error
RegisterCache includes a new key and cache within the repository
func (*CachedStatisticsRepository) UnregisterCache ¶
func (cachedStatisticsRepository *CachedStatisticsRepository) UnregisterCache(key string) error
UnregisterCache removes a cache and key from the repository
type ClearParams ¶
type ClearParams struct { // Identifies the stream Stream string `json:"stream,omitempty" xml:"stream,omitempty"` // If true, all data for the stream(s) will be cleared ClearAll bool `json:"clearAll,omitempty" xml:"clearAll,omitempty"` // An array of tags to be matched WithTags []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"` // An array of tags that exclude a stream from a match ExcludingTags []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"` // Specifies a maximum ordinal value used to restrict the clear operation. MaxOrdinal int `json:"maxOrdinal,omitempty" xml:"maxOrdinal,omitempty"` }
ClearParams encapsulates the information required for the clear action
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is used to dispatch all operations related to stream processing
func NewController ¶
func NewController(contextKey string) *Controller
NewController creates a controller used to manage streams
func (*Controller) ClearStatisticsCache ¶
func (controller *Controller) ClearStatisticsCache(params *ClearParams) (err error)
ClearStatisticsCache removes existing statistics data from cache
func (*Controller) GetStatistics ¶
func (controller *Controller) GetStatistics(params *StatisticsParams) (err error)
GetStatistics returns statistics matching search parameters
func (*Controller) ModifyTags ¶
func (controller *Controller) ModifyTags(params *TagParams) (err error)
ModifyTags modifies the tags associated with a stream
func (*Controller) Push ¶
func (controller *Controller) Push(params *PushParams) (err error)
Push new values onto a stream
func (*Controller) Register ¶
func (controller *Controller) Register(params *RegisterParams) (err error)
Register a stream within the controller
func (*Controller) Unregister ¶
func (controller *Controller) Unregister(params *UnregisterParams) (err error)
Unregister streams from the controller
type IntervalRouter ¶
type IntervalRouter struct {
// contains filtered or unexported fields
}
IntervalRouter sends stream data to an accumulator based matching the ordinal value to an interval
func NewIntervalRouter ¶
func NewIntervalRouter(key string, intervalSize int64, intervalType IntervalType, maxIntervalLag uint32, targetSampleCount uint32) *IntervalRouter
NewIntervalRouter creates a new router used to assign values to intervals
func (*IntervalRouter) Accumulate ¶
func (intervalRouter *IntervalRouter) Accumulate(ordinalValue OrdinalValue, output chan IntervalStatistics)
Accumulate directs a value to the appropriate accumulator for an ordinal value
func (*IntervalRouter) AccumulateFromChannel ¶
func (intervalRouter *IntervalRouter) AccumulateFromChannel(input chan OrdinalValue, output chan IntervalStatistics, done chan bool)
AccumulateFromChannel directs a value to the appropriate accumulator for an ordinal value
func (*IntervalRouter) FinaliseAll ¶
func (intervalRouter *IntervalRouter) FinaliseAll()
FinaliseAll causes all accumulators to be finalised
func (*IntervalRouter) FinalisePriorTo ¶
func (intervalRouter *IntervalRouter) FinalisePriorTo(ordinal int64)
FinalisePriorTo causes all accumulators for intervals prior to that related to the specified ordinal and removes them from the intervalRouter
func (*IntervalRouter) FinalisePriorToTime ¶
func (intervalRouter *IntervalRouter) FinalisePriorToTime(t time.Time)
FinalisePriorToTime causes all accumulators for intervals prior to that related to the specified time, and removes them from the intervalRouter
type IntervalStatistics ¶
type IntervalStatistics struct { StreamKey string IntervalStart int64 IntervalEnd int64 IntervalType IntervalType Minimum float64 Maximum float64 Mean float64 Count uint64 Sum float64 SampleMean float64 SampleSum float64 SampleCount uint32 SampleStandardDeviation float64 CoefficientOfVariation float64 }
IntervalStatistics is a set of statistics generated based on processed events within a period of time or over a range or ordinal positions
type IntervalStatisticsCache ¶
type IntervalStatisticsCache struct {
// contains filtered or unexported fields
}
IntervalStatisticsCache stores a limited set of interval statistics for a stream
func NewIntervalStatisticsCache ¶
func NewIntervalStatisticsCache(streamKey string, size uint32) *IntervalStatisticsCache
NewIntervalStatisticsCache creates a new cache for interval statistics
func (*IntervalStatisticsCache) EmitIntervalStatistics ¶
func (intervalStatisticsCache *IntervalStatisticsCache) EmitIntervalStatistics(fromOrdinal int64, untilOrdinal int64, output chan IntervalStatistics) error
EmitIntervalStatistics outputs matching interval statistics to a provided channel
func (*IntervalStatisticsCache) GetFromOrdinal ¶
func (intervalStatisticsCache *IntervalStatisticsCache) GetFromOrdinal(fromOrdinal int64) []IntervalStatistics
GetFromOrdinal returns the cached statistics that have an ordinal value equal or greater than the value provided
func (*IntervalStatisticsCache) GetLast ¶
func (intervalStatisticsCache *IntervalStatisticsCache) GetLast(maxCount int) []IntervalStatistics
GetLast returns the most recent statistics from the cache up to the specified maxCount
func (*IntervalStatisticsCache) GetOrdinalRange ¶
func (intervalStatisticsCache *IntervalStatisticsCache) GetOrdinalRange(fromOrdinal int64, untilOrdinal int64) []IntervalStatistics
GetOrdinalRange returns the cached statistics that exist between a range of ordinal values
func (*IntervalStatisticsCache) ProcessAndForward ¶
func (intervalStatisticsCache *IntervalStatisticsCache) ProcessAndForward(input chan IntervalStatistics, output chan IntervalStatistics)
ProcessAndForward stores values from an input channel and then forwards values to an output channel. Only the most recent set of results are stored
type IntervalType ¶
type IntervalType int
IntervalType is a specifier used to differentiate between Ordinal and Time intervals
const ( //OrdinalInterval indicates that intervals are determined based on ordinal position OrdinalInterval IntervalType = iota //TimeInterval indicates that intervals are determined based on time TimeInterval )
type OrdinalValue ¶
OrdinalValue represents a value with an ordinal position within a stream
func NewOrdinalValue ¶
func NewOrdinalValue(streamKey string, ordinal int64, value float64) OrdinalValue
NewOrdinalValue creates a new ordinal value
func NewOrdinalValueForTime ¶
func NewOrdinalValueForTime(streamKey string, t time.Time, value float64) OrdinalValue
NewOrdinalValueForTime creates a new ordinal value for a time
type PushParams ¶
type PushParams struct { // The ordinal position within the stream Ordinal int `json:"ordinal,omitempty" xml:"ordinal,omitempty"` // Identifies the stream that the ordinal value relates to Stream string `json:"stream,omitempty" xml:"stream,omitempty"` // The value at the ordinal position Value float64 `json:"value,omitempty" xml:"value,omitempty"` }
PushParams encapsulates the information required for the push action
type RegisterParams ¶
type RegisterParams struct { // Identifies the stream that the definition relates to Stream string `json:"stream,omitempty" xml:"stream,omitempty"` // The ordinal position within the stream IntervalSize int `json:"intervalSize,omitempty" xml:"intervalSize,omitempty"` // The value at the ordinal position MaxIntervalLag int `json:"maxIntervalLag,omitempty" xml:"maxIntervalLag,omitempty"` // A set of tag values to be assigned to the stream Tags []string `json:"tags,omitempty" xml:"tags,omitempty"` // The value at the ordinal position TargetSampleSize int `json:"targetSampleSize,omitempty" xml:"targetSampleSize,omitempty"` }
RegisterParams encapsulates the information required for the register action
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router sends data to an appropriate channel based on key
func NewRouter ¶
func NewRouter() *Router
NewRouter creates a router for an input channel and begins reading immediately
func (*Router) Register ¶
func (router *Router) Register(streamKey string, c chan OrdinalValue)
Register the channel for a stream based on key
func (*Router) Route ¶
func (router *Router) Route(input chan OrdinalValue, unassigned chan OrdinalValue)
Route values from an input channel
func (*Router) StopAndUnregister ¶
StopAndUnregister removes a stream key from the router and closes the stream channel
type StatisticsParams ¶
type StatisticsParams struct { // Specifies a maximum date time used to restrict the interval statistics returned. Only statistics for intervals that are for a time range up until this date time value will be returned. MaxDateTime time.Time `json:"maxDateTime,omitempty" xml:"maxDateTime,omitempty"` // Specifies a maximum ordinal value used to restrict the interval statistics returned. Only statistics for intervals that end on or before this ordinal value will be returned. MaxOrdinal int `json:"maxOrdinal,omitempty" xml:"maxOrdinal,omitempty"` // If true, results across multiple intervals will be merged together to produce a summary result. MergeIntervals bool `json:"mergeIntervals,omitempty" xml:"mergeIntervals,omitempty"` // If true, results from multiple streams will be merged together to produce a summary result. MergeStreams bool `json:"mergeStreams,omitempty" xml:"mergeStreams,omitempty"` // Specifies a minimum date time used to restrict the interval statistics returned. Only statistics for intervals that are for a time range on or after this date time value will be returned. MinDateTime time.Time `json:"minDateTime,omitempty" xml:"minDateTime,omitempty"` // Specifies a minimum ordinal value used to restrict the interval statistics returned. Only statistics for intervals that begin on or after this ordinal value will be returned. MinOrdinal int `json:"minOrdinal,omitempty" xml:"minOrdinal,omitempty"` // Specifies the criteria by which streams are to be matched StreamMatchSearchParams streamMatchSearchParams `json:"streamMatchCriteria,omitempty" xml:"streamMatchCriteria,omitempty"` }
StatisticsParams encapsulates the information required for the GetStatistics action
type TagParams ¶
type TagParams struct { // If true, previously assigned tags will be cleared ClearAll bool `json:"clearAll,omitempty" xml:"clearAll,omitempty"` // Identifies the stream that the definition relates to Stream string `json:"stream,omitempty" xml:"stream,omitempty"` // An array of tags to be assigned TagsToAssign []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"` // An array of tags to be unassigned TagsToUnassign []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"` }
TagParams encapsulates the information required for the ModifyTags action
type TagRepository ¶
type TagRepository struct {
// contains filtered or unexported fields
}
TagRepository manages sets of tags by key
func NewTagRepository ¶
func NewTagRepository() TagRepository
NewTagRepository creates an empty tag repository
func (*TagRepository) ApplyTags ¶
func (tagRepository *TagRepository) ApplyTags(key string, tags []string, removeExisting bool)
ApplyTags applies a set of tags to a key
func (*TagRepository) Clear ¶
func (tagRepository *TagRepository) Clear()
Clear removes all tag information
func (*TagRepository) ClearForKey ¶
func (tagRepository *TagRepository) ClearForKey(key string)
ClearForKey removes all tag information for akey
func (*TagRepository) GetMatchingKeys ¶
func (tagRepository *TagRepository) GetMatchingKeys(withTags []string, excludingTags []string) []string
GetMatchingKeys returns all keys that have all tags within the withTags slice and no tags within the excludingTags slice
func (*TagRepository) GetTagsForKey ¶
func (tagRepository *TagRepository) GetTagsForKey(key string) []string
GetTagsForKey returns a set of tags for a key
func (*TagRepository) RemoveTags ¶
func (tagRepository *TagRepository) RemoveTags(key string, tags []string)
RemoveTags removes a set of tags from a key
type UnregisterParams ¶
type UnregisterParams struct { // Identifies the stream Stream string `json:"stream,omitempty" xml:"stream,omitempty"` // An array of tags to be matched WithTags []string `json:"tagsToAssign,omitempty" xml:"tagsToAssign,omitempty"` // An array of tags that exclude a stream from a match ExcludingTags []string `json:"tagsToUnassign,omitempty" xml:"tagsToUnassign,omitempty"` }
UnregisterParams encapsulates the information required for the unregister action