Documentation ¶
Overview ¶
Package pipeline describes a metrics reporting pipeline that accepts reports as input and eventually delivers them (possibly after aggregation) to one or more downstream services. A pipeline generally consists of a collection of aggregators, dispatchers, and endpoints wrapped in RetryingSender objects. Metric values can be provided by an external source (reported via an API), or can be generated by a component such as a heartbeat.
-> Aggregator -> ... -> RetryingSender -> Endpoint A
HTTP -> Selector -> Aggregator -> Dispatcher -> RetryingSender -> Endpoint B
-> Aggregator -> ... -> RetryingSender -> Endpoint C Heartbeat -> Dispatcher -> RetryingSender -> Endpoint A
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ReleaseAll ¶
ReleaseAll calls Release on all of the given Components in parallel, returning a multierror if one or more calls fail, or nil if all succeed.
Types ¶
type Component ¶
type Component interface { // Use registers new usage of this component. Use should be called whenever this component is // added downstream of some other component. When no longer used, Release should be called. Use() // Release is called when the caller is no longer using this component. If the component's usage // count reaches 0 due to this release, it should perform the following steps in order: // 1. Decrement the usage counter. If the usage counter is still greater than 0, return nil. // 2. Gracefully shutdown background processes and wait for completion. Following this step, // no data shall be sent from this component to downstream components. // 3. Call Release on all downstream components, waiting for their release operations to // complete. // // As a result, calling Release on all of the pipeline Input components should result in a graceful // shutdown of all Components in the correct order. // // Release returns an error if it or any of its downstream components generate one. Release() error }
Component represents a single component in a pipeline. Components can be used downstream of multiple other components, enabling creation of fork/join pipeline patterns. Because of this, components implement a reference counting strategy that determines when they should clean up underlying resources.
type Endpoint ¶
type Endpoint interface { // Endpoint is a Component. Component // Name returns the name of this endpoint. The name must be unique across all endpoints in the // system, and should be constant across restarts of the agent. There can be multiple instances // of the same type of endpoint with different names. Name() string // Send sends the given EndpointReport - previously built by this endpoint - to the reporting // service. Send(EndpointReport) error // BuildReport builds an EndpointReport from the given StampedMetricReport, optionally attaching // context. BuildReport(report metrics.StampedMetricReport) (EndpointReport, error) // IsTransient returns true if the given error indicates that the operation failed due to some // transient error and can be retried. IsTransient(error) bool }
Endpoint represents a metric reporting endpoint that the agent reports to. For example, Cloud Service Control or PubSub.
type EndpointReport ¶
type EndpointReport struct { metrics.StampedMetricReport `json:",inline"` Context json.RawMessage }
EndpointReport is a metrics.StampedMetricReport containing optional endpoint-specific context that can be used to help ensure idempotence across retries. For example, if a reporting service requires a unique ID or timestamp that remains the same during each retry so that requests can be deduplicated, that identifier can be generated in BuildReport, persisted in the EndpointReport's context, and resent with each retry.
func NewEndpointReport ¶
func NewEndpointReport(report metrics.StampedMetricReport, context interface{}) (EndpointReport, error)
func (*EndpointReport) UnmarshalContext ¶
func (er *EndpointReport) UnmarshalContext(ctx interface{}) error
UnmarshalContext unmarshals an EndpointReport's context into the given struct.
type Input ¶
type Input interface { // Input is also a Component. Component // AddReport adds a report to the pipeline. It returns an error if one is known immediately, // such as a report that refers to unknown metrics. See aggregator.Aggregator. AddReport(metrics.MetricReport) error }
Input represents a Component that accepts reports from an external source.
type InputAdapter ¶
type InputAdapter struct {
Sender Sender
}
Type InputAdapter is an Input that converts incoming reports to StampedMetricReport objects and sends them directly to a delegate Sender.
func (*InputAdapter) AddReport ¶
func (a *InputAdapter) AddReport(report metrics.MetricReport) error
func (*InputAdapter) Release ¶
func (a *InputAdapter) Release() error
func (*InputAdapter) Use ¶
func (a *InputAdapter) Use()
type Sender ¶
type Sender interface { // Sender is a Component. Component // Send sends the report downstream. The behavior of the Send operation depends on the type of // sender. Some implementations - the Dispatcher, for instance - simply forward the Send to // subsequent Senders. Others - like the RetryingSender - may queue the report and attempt to // send it at a later time. // // An error indicates that something failed quickly, but it does not // indicate that the operation failed completely (i.e., some senders behind a Dispatcher may have // succeeded). Likewise, the lack of an error response does not indicate that the Send operation // succeeded, due to the asynchronous nature of a RetryingSender. Send(report metrics.StampedMetricReport) error // Endpoints returns the transitive list of endpoints that this sender will ultimately send to. Endpoints() []string }
A Sender handles sending StampedMetricReports to remote endpoints.
type Source ¶
type Source interface { // Shutdown instructs the source to stop sending metric data, release any held components, and // clean up resources. Shutdown() error }
Source represents an autonomous metric data source that runs within the Agent.
type UsageTracker ¶
type UsageTracker struct {
// contains filtered or unexported fields
}
Type UsageTracker is a utility that helps track the usage of a Component. It provides Use and Release methods, and calls a close function when Release decrements the usage count to 0.
func (*UsageTracker) Release ¶
func (u *UsageTracker) Release(close func() error) error
func (*UsageTracker) Use ¶
func (u *UsageTracker) Use()