batcher

package
v0.0.0-...-c4caace Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2024 License: Apache-2.0 Imports: 10 Imported by: 0

README

package batcher

The batcher is a variant of the aggregator, which instead of aggregating data batches data and flushes when data gets too much.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitBatcher

func InitBatcher(serializer serializer.AgentV1Serializer, hostname, agentName string, maxCapacity int)

InitBatcher initializes the global batcher instance

Types

type AsynchronousBatcher

type AsynchronousBatcher struct {
	// contains filtered or unexported fields
}

AsynchronousBatcher is the implementation of the batcher. Works asynchronous. Publishes data to the serializer

func (AsynchronousBatcher) Shutdown

func (batcher AsynchronousBatcher) Shutdown()

Shutdown shuts down the batcher

func (AsynchronousBatcher) SubmitComplete

func (batcher AsynchronousBatcher) SubmitComplete(checkID check.ID)

SubmitComplete signals completion of a check. May trigger a flush only if the check produced data

func (AsynchronousBatcher) SubmitComponent

func (batcher AsynchronousBatcher) SubmitComponent(checkID check.ID, instance topology.Instance, component topology.Component)

SubmitComponent submits a component to the batch

func (AsynchronousBatcher) SubmitDelete

func (batcher AsynchronousBatcher) SubmitDelete(checkID check.ID, instance topology.Instance, topologyElementID string)

SubmitDelete submits a deletion of topology element.

func (AsynchronousBatcher) SubmitError

func (batcher AsynchronousBatcher) SubmitError(checkID check.ID, err error)

SubmitError takes error in the testing code, not yet accounted for health or anything else

func (AsynchronousBatcher) SubmitHealthCheckData

func (batcher AsynchronousBatcher) SubmitHealthCheckData(checkID check.ID, stream health.Stream, data health.CheckData)

SubmitHealthCheckData submits a Health check data record to the batch

func (AsynchronousBatcher) SubmitHealthStartSnapshot

func (batcher AsynchronousBatcher) SubmitHealthStartSnapshot(checkID check.ID, stream health.Stream, intervalSeconds int, expirySeconds int)

SubmitHealthStartSnapshot submits start of a Health snapshot

func (AsynchronousBatcher) SubmitHealthStopSnapshot

func (batcher AsynchronousBatcher) SubmitHealthStopSnapshot(checkID check.ID, stream health.Stream)

SubmitHealthStopSnapshot submits a stop of a Health snapshot. This always causes a flush of the data downstream

func (AsynchronousBatcher) SubmitRawMetricsData

func (batcher AsynchronousBatcher) SubmitRawMetricsData(checkID check.ID, rawMetric telemetry.RawMetrics)

SubmitRawMetricsData submits a raw metrics data record to the batch

func (AsynchronousBatcher) SubmitRelation

func (batcher AsynchronousBatcher) SubmitRelation(checkID check.ID, instance topology.Instance, relation topology.Relation)

SubmitRelation submits a relation to the batch

func (AsynchronousBatcher) SubmitStartSnapshot

func (batcher AsynchronousBatcher) SubmitStartSnapshot(checkID check.ID, instance topology.Instance)

SubmitStartSnapshot submits start of a snapshot

func (AsynchronousBatcher) SubmitStopSnapshot

func (batcher AsynchronousBatcher) SubmitStopSnapshot(checkID check.ID, instance topology.Instance)

SubmitStopSnapshot submits a stop of a snapshot. This always causes a flush of the data downstream

type BatchBuilder

type BatchBuilder struct {
	// contains filtered or unexported fields
}

BatchBuilder is a helper class to build Topology based on submitted data, this data structure is not thread safe

func NewBatchBuilder

func NewBatchBuilder(maxCapacity int) BatchBuilder

NewBatchBuilder constructs a BatchBuilder

func (*BatchBuilder) AddComponent

func (builder *BatchBuilder) AddComponent(checkID check.ID, instance topology.Instance, component topology.Component) CheckInstanceBatchStates

AddComponent adds a component

func (*BatchBuilder) AddHealthCheckData

func (builder *BatchBuilder) AddHealthCheckData(checkID check.ID, stream health.Stream, data health.CheckData) CheckInstanceBatchStates

AddHealthCheckData adds a component

func (*BatchBuilder) AddRawMetricsData

func (builder *BatchBuilder) AddRawMetricsData(checkID check.ID, rawMetric telemetry.RawMetrics) CheckInstanceBatchStates

AddRawMetricsData adds raw metric data

func (*BatchBuilder) AddRelation

func (builder *BatchBuilder) AddRelation(checkID check.ID, instance topology.Instance, relation topology.Relation) CheckInstanceBatchStates

AddRelation adds a relation

func (*BatchBuilder) Delete

func (builder *BatchBuilder) Delete(checkID check.ID, instance topology.Instance, deleteID string) CheckInstanceBatchStates

Delete deletes a topology element

func (*BatchBuilder) DisabledForceFlush

func (builder *BatchBuilder) DisabledForceFlush()

DisabledForceFlush disables flushing data when topology/health snapshot stop signal is received

func (*BatchBuilder) Flush

func (builder *BatchBuilder) Flush() CheckInstanceBatchStates

Flush the collected data. Returning the data and wiping the current build up Topology

func (*BatchBuilder) FlushIfDataProduced

func (builder *BatchBuilder) FlushIfDataProduced(checkID check.ID) CheckInstanceBatchStates

FlushIfDataProduced checks whether the check produced data, if so, flush

func (*BatchBuilder) HealthStartSnapshot

func (builder *BatchBuilder) HealthStartSnapshot(checkID check.ID, stream health.Stream, repeatIntervalSeconds int, expirySeconds int) CheckInstanceBatchStates

HealthStartSnapshot starts a Health snapshot

func (*BatchBuilder) HealthStopSnapshot

func (builder *BatchBuilder) HealthStopSnapshot(checkID check.ID, stream health.Stream) CheckInstanceBatchStates

HealthStopSnapshot stops a Health snapshot. This will always flush

func (*BatchBuilder) TopologyStartSnapshot

func (builder *BatchBuilder) TopologyStartSnapshot(checkID check.ID, instance topology.Instance) CheckInstanceBatchStates

TopologyStartSnapshot starts a snapshot

func (*BatchBuilder) TopologyStopSnapshot

func (builder *BatchBuilder) TopologyStopSnapshot(checkID check.ID, instance topology.Instance) CheckInstanceBatchStates

TopologyStopSnapshot stops a snapshot. This will always flush

type Batcher

type Batcher interface {
	// Topology
	SubmitComponent(checkID check.ID, instance topology.Instance, component topology.Component)
	SubmitRelation(checkID check.ID, instance topology.Instance, relation topology.Relation)
	SubmitStartSnapshot(checkID check.ID, instance topology.Instance)
	SubmitStopSnapshot(checkID check.ID, instance topology.Instance)
	SubmitDelete(checkID check.ID, instance topology.Instance, topologyElementID string)

	// Health
	SubmitHealthCheckData(checkID check.ID, stream health.Stream, data health.CheckData)
	SubmitHealthStartSnapshot(checkID check.ID, stream health.Stream, intervalSeconds int, expirySeconds int)
	SubmitHealthStopSnapshot(checkID check.ID, stream health.Stream)
	SubmitError(checkID check.ID, err error)

	// Raw Metrics
	SubmitRawMetricsData(checkID check.ID, data telemetry.RawMetrics)

	// lifecycle
	SubmitComplete(checkID check.ID)
	Shutdown()
}

Batcher interface can receive data for sending to the intake and will accumulate the data in batches. This does not work on a fixed schedule like the aggregator but flushes either when data exceeds a threshold, when data is complete.

func GetBatcher

func GetBatcher() Batcher

GetBatcher returns a handle on the global batcher instance

type CheckInstanceBatchState

type CheckInstanceBatchState struct {
	Topology *topology.Topology
	Metrics  *[]telemetry.RawMetrics
	Health   map[string]health.Health
}

CheckInstanceBatchState is the type representing batched data per check instance

type CheckInstanceBatchStates

type CheckInstanceBatchStates map[check.ID]CheckInstanceBatchState

CheckInstanceBatchStates is the type representing batched data for all check instances

type MockBatcher

type MockBatcher struct {
	CollectedTopology BatchBuilder
	Errors            []error
}

MockBatcher mocks implementation of a batcher

func NewMockBatcher

func NewMockBatcher() *MockBatcher

NewMockBatcher initializes the global batcher with a mock version, intended for testing

func (*MockBatcher) Shutdown

func (batcher *MockBatcher) Shutdown()

Shutdown mock

func (*MockBatcher) SubmitComplete

func (batcher *MockBatcher) SubmitComplete(checkID check.ID)

SubmitComplete mock

func (*MockBatcher) SubmitComponent

func (batcher *MockBatcher) SubmitComponent(checkID check.ID, instance topology.Instance, component topology.Component)

SubmitComponent mock

func (*MockBatcher) SubmitDelete

func (batcher *MockBatcher) SubmitDelete(checkID check.ID, instance topology.Instance, topologyElementID string)

SubmitDelete mock

func (*MockBatcher) SubmitError

func (batcher *MockBatcher) SubmitError(checkID check.ID, err error)

SubmitError keeps track of thrown errors

func (*MockBatcher) SubmitHealthCheckData

func (batcher *MockBatcher) SubmitHealthCheckData(checkID check.ID, stream health.Stream, data health.CheckData)

SubmitHealthCheckData mock

func (*MockBatcher) SubmitHealthStartSnapshot

func (batcher *MockBatcher) SubmitHealthStartSnapshot(checkID check.ID, stream health.Stream, intervalSeconds int, repeatSeconds int)

SubmitHealthStartSnapshot mock

func (*MockBatcher) SubmitHealthStopSnapshot

func (batcher *MockBatcher) SubmitHealthStopSnapshot(checkID check.ID, stream health.Stream)

SubmitHealthStopSnapshot mock

func (*MockBatcher) SubmitRawMetricsData

func (batcher *MockBatcher) SubmitRawMetricsData(checkID check.ID, rawMetric telemetry.RawMetrics)

SubmitRawMetricsData mock

func (*MockBatcher) SubmitRelation

func (batcher *MockBatcher) SubmitRelation(checkID check.ID, instance topology.Instance, relation topology.Relation)

SubmitRelation mock

func (*MockBatcher) SubmitStartSnapshot

func (batcher *MockBatcher) SubmitStartSnapshot(checkID check.ID, instance topology.Instance)

SubmitStartSnapshot mock

func (*MockBatcher) SubmitStopSnapshot

func (batcher *MockBatcher) SubmitStopSnapshot(checkID check.ID, instance topology.Instance)

SubmitStopSnapshot mock

type Topologies

type Topologies map[check.ID]topology.Topology

Topologies is the type representing topologies gathered per check

type TopologyBuilder

type TopologyBuilder struct {
	// contains filtered or unexported fields
}

TopologyBuilder is a helper class to build topology based on submitted data, this data structure is not thread safe

func NewTopologyBuilder

func NewTopologyBuilder(maxCapacity int) TopologyBuilder

NewTopologyBuilder constructs a TopologyBuilder

func (*TopologyBuilder) AddComponent

func (builder *TopologyBuilder) AddComponent(checkID check.ID, instance topology.Instance, component topology.Component) Topologies

AddComponent adds a component

func (*TopologyBuilder) AddRelation

func (builder *TopologyBuilder) AddRelation(checkID check.ID, instance topology.Instance, relation topology.Relation) Topologies

AddRelation adds a relation

func (*TopologyBuilder) Delete

func (builder *TopologyBuilder) Delete(checkID check.ID, instance topology.Instance, topologyElementID string) Topologies

Delete adds a delete identifier

func (*TopologyBuilder) Flush

func (builder *TopologyBuilder) Flush() Topologies

Flush the collected data. Returning the data and wiping the current build up topology

func (*TopologyBuilder) FlushIfDataProduced

func (builder *TopologyBuilder) FlushIfDataProduced(checkID check.ID) Topologies

FlushIfDataProduced checks whether the check produced data, if so, flush

func (*TopologyBuilder) StartSnapshot

func (builder *TopologyBuilder) StartSnapshot(checkID check.ID, instance topology.Instance) Topologies

StartSnapshot starts a snapshot

func (*TopologyBuilder) StopSnapshot

func (builder *TopologyBuilder) StopSnapshot(checkID check.ID, instance topology.Instance) Topologies

StopSnapshot stops a snapshot. This will always flush

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL