sinks

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: MIT Imports: 28 Imported by: 1

README

CCMetric sinks

This folder contains the SinkManager and sink implementations for the cc-metric-collector.

Available sinks:

Configuration

The configuration file for the sinks is a list of configurations. The type field in each specifies which sink to initialize.

{
  "mystdout" : {
    "type" : "stdout",
    "meta_as_tags" : [
    	"unit"
    ]
  },
  "metricstore" : {
    "type" : "http",
    "host" : "localhost",
    "port" : "4123",
    "database" : "ccmetric",
    "password" : "<jwt token>"
  }
}

Contributing own sinks

A sink contains five functions and is derived from the type sink:

  • Init(name string, config json.RawMessage) error
  • Write(point CCMetric) error
  • Flush() error
  • Close()
  • New<Typename>(name string, config json.RawMessage) (Sink, error) (calls the Init() function)

The data structures should be set up in Init() like opening a file or server connection. The Write() function writes/sends the data. For non-blocking sinks, the Flush() method tells the sink to drain its internal buffers. The Close() function should tear down anything created in Init().

Finally, the sink needs to be registered in the sinkManager.go. There is a list of sinks called AvailableSinks which is a map (sink_type_string -> pointer to sink interface). Add a new entry with a descriptive name and the new sink.

Sample sink

package sinks

import (
	"encoding/json"
	"log"
	lp "github.com/ClusterCockpit/cc-metric-collector/internal/ccMetric"
)

type SampleSinkConfig struct {
	defaultSinkConfig  // defines JSON tags for 'name' and 'meta_as_tags'
}

type SampleSink struct {
	sink              // declarate 'name' and 'meta_as_tags'
	config StdoutSinkConfig // entry point to the SampleSinkConfig
}

// Initialize the sink by giving it a name and reading in the config JSON
func (s *SampleSink) Init(name string, config json.RawMessage) error {
	s.name = fmt.Sprintf("SampleSink(%s)", name)   // Always specify a name here
  // Read in the config JSON
	if len(config) > 0 {
		err := json.Unmarshal(config, &s.config)
		if err != nil {
			return err
		}
	}
	return nil
}

// Code to submit a single CCMetric to the sink
func (s *SampleSink) Write(point lp.CCMetric) error {
	log.Print(point)
	return nil
}

// If the sink uses batched sends internally, you can tell to flush its buffers
func (s *SampleSink) Flush() error {
	return nil
}


// Close sink: close network connection, close files, close libraries, ...
func (s *SampleSink) Close() {}


// New function to create a new instance of the sink
func NewSampleSink(name string, config json.RawMessage) (Sink, error) {
	s := new(SampleSink)
	err := s.Init(name, config)
	return s, err
}

Documentation

Index

Constants

View Source
const (
	GANGLIA_LIB_NAME     = "libganglia.so"
	GANGLIA_LIB_DL_FLAGS = dl.RTLD_LAZY | dl.RTLD_GLOBAL
	GMOND_CONFIG_FILE    = `/etc/ganglia/gmond.conf`
)
View Source
const DEFAULT_GANGLIA_METRIC_SLOPE = "both"
View Source
const DEFAULT_GANGLIA_METRIC_TMAX = 300
View Source
const GMETRIC_CONFIG = `/etc/ganglia/gmond.conf`
View Source
const GMETRIC_EXEC = `gmetric`
View Source
const SINK_MAX_FORWARD = 50

Variables

View Source
var AvailableSinks = map[string]func(name string, config json.RawMessage) (Sink, error){
	"ganglia":     NewGangliaSink,
	"libganglia":  NewLibgangliaSink,
	"stdout":      NewStdoutSink,
	"nats":        NewNatsSink,
	"influxdb":    NewInfluxSink,
	"influxasync": NewInfluxAsyncSink,
	"http":        NewHttpSink,
	"prometheus":  NewPrometheusSink,
}

Map of all available sinks

View Source
var CommonGangliaMetrics = []GangliaMetricGroup{
	{
		Name: "memory",
		Metrics: []GangliaMetric{
			{"mem_total", "float", "zero", 1200, "KB"},
			{"swap_total", "float", "zero", 1200, "KB"},
			{"mem_free", "float", "both", 180, "KB"},
			{"mem_shared", "float", "both", 180, "KB"},
			{"mem_buffers", "float", "both", 180, "KB"},
			{"mem_cached", "float", "both", 180, "KB"},
			{"swap_free", "float", "both", 180, "KB"},
			{"mem_sreclaimable", "float", "both", 180, "KB"},
			{"mem_slab", "float", "both", 180, "KB"},
		},
	},
	{
		Name: "cpu",
		Metrics: []GangliaMetric{
			{"cpu_num", "uint32", "zero", 1200, "CPUs"},
			{"cpu_speed", "uint32", "zero", 1200, "MHz"},
			{"cpu_user", "float", "both", 90, "%"},
			{"cpu_nice", "float", "both", 90, "%"},
			{"cpu_system", "float", "both", 90, "%"},
			{"cpu_idle", "float", "both", 3800, "%"},
			{"cpu_aidle", "float", "both", 90, "%"},
			{"cpu_wio", "float", "both", 90, "%"},
			{"cpu_intr", "float", "both", 90, "%"},
			{"cpu_sintr", "float", "both", 90, "%"},
			{"cpu_steal", "float", "both", 90, "%"},
			{"cpu_guest", "float", "both", 90, "%"},
			{"cpu_gnice", "float", "both", 90, "%"},
		},
	},
	{
		Name: "load",
		Metrics: []GangliaMetric{
			{"load_one", "float", "both", 70, ""},
			{"load_five", "float", "both", 325, ""},
			{"load_fifteen", "float", "both", 950, ""},
		},
	},
	{
		Name: "disk",
		Metrics: []GangliaMetric{
			{"disk_total", "double", "both", 1200, "GB"},
			{"disk_free", "double", "both", 180, "GB"},
			{"part_max_used", "float", "both", 180, "%"},
		},
	},
	{
		Name: "network",
		Metrics: []GangliaMetric{
			{"bytes_out", "float", "both", 300, "bytes/sec"},
			{"bytes_in", "float", "both", 300, "bytes/sec"},
			{"pkts_in", "float", "both", 300, "packets/sec"},
			{"pkts_out", "float", "both", 300, "packets/sec"},
		},
	},
	{
		Name: "process",
		Metrics: []GangliaMetric{
			{"proc_run", "uint32", "both", 950, ""},
			{"proc_total", "uint32", "both", 950, ""},
		},
	},
	{
		Name: "system",
		Metrics: []GangliaMetric{
			{"boottime", "uint32", "zero", 1200, "s"},
			{"sys_clock", "uint32", "zero", 1200, "s"},
			{"machine_type", "string", "zero", 1200, ""},
			{"os_name", "string", "zero", 1200, ""},
			{"os_release", "string", "zero", 1200, ""},
			{"mtu", "uint32", "both", 1200, ""},
		},
	},
}

Functions

func EncoderAdd added in v0.7.0

func EncoderAdd(encoder *influx.Encoder, msg lp.CCMessage) error

func GangliaMetricName

func GangliaMetricName(point lp.CCMessage) string

func GangliaMetricRename

func GangliaMetricRename(name string) string

func GangliaSlopeType

func GangliaSlopeType(point lp.CCMessage) uint

Types

type GangliaMetric

type GangliaMetric struct {
	Name  string
	Type  string
	Slope string
	Tmax  int
	Unit  string
}

type GangliaMetricConfig

type GangliaMetricConfig struct {
	Type  string
	Slope string
	Tmax  int
	Unit  string
	Group string
	Value string
	Name  string
}

func GetCommonGangliaConfig

func GetCommonGangliaConfig(point lp.CCMessage) GangliaMetricConfig

func GetGangliaConfig

func GetGangliaConfig(point lp.CCMessage) GangliaMetricConfig

type GangliaMetricGroup

type GangliaMetricGroup struct {
	Name    string
	Metrics []GangliaMetric
}

type GangliaSink

type GangliaSink struct {
	// contains filtered or unexported fields
}

func (*GangliaSink) Close

func (s *GangliaSink) Close()

func (*GangliaSink) Flush

func (s *GangliaSink) Flush() error

func (*GangliaSink) Name

func (s *GangliaSink) Name() string

Name returns the name of the metric sink

func (*GangliaSink) Write

func (s *GangliaSink) Write(msg lp.CCMessage) error

type GangliaSinkConfig

type GangliaSinkConfig struct {
	GmetricPath     string `json:"gmetric_path,omitempty"`
	GmetricConfig   string `json:"gmetric_config,omitempty"`
	AddGangliaGroup bool   `json:"add_ganglia_group,omitempty"`
	AddTagsAsDesc   bool   `json:"add_tags_as_desc,omitempty"`
	ClusterName     string `json:"cluster_name,omitempty"`
	AddTypeToName   bool   `json:"add_type_to_name,omitempty"`
	AddUnits        bool   `json:"add_units,omitempty"`
	// contains filtered or unexported fields
}

type HttpSink

type HttpSink struct {
	// contains filtered or unexported fields
}

func (*HttpSink) Close

func (s *HttpSink) Close()

func (*HttpSink) Flush

func (s *HttpSink) Flush() error

Flush sends all metrics stored in encoder to HTTP server

func (*HttpSink) Name

func (s *HttpSink) Name() string

Name returns the name of the metric sink

func (*HttpSink) Write

func (s *HttpSink) Write(msg lp.CCMessage) error

Write sends metric m as http message

type HttpSinkConfig

type HttpSinkConfig struct {

	// The full URL of the endpoint
	URL string `json:"url"`

	// JSON web tokens for authentication (Using the *Bearer* scheme)
	JWT string `json:"jwt,omitempty"`

	// Basic authentication
	Username string `json:"username"`
	Password string `json:"password"`

	// time limit for requests made by the http client
	Timeout string `json:"timeout,omitempty"`

	// Maximum amount of time an idle (keep-alive) connection will remain idle before closing itself
	// should be larger than the measurement interval to keep the connection open
	IdleConnTimeout string `json:"idle_connection_timeout,omitempty"`

	// Batch all writes arriving in during this duration
	// (default '5s', batching can be disabled by setting it to 0)
	FlushDelay string `json:"flush_delay,omitempty"`

	// Maximum number of retries to connect to the http server (default: 3)
	MaxRetries int `json:"max_retries,omitempty"`

	// Timestamp precision
	Precision string `json:"precision,omitempty"`
	// contains filtered or unexported fields
}

type InfluxAsyncSink

type InfluxAsyncSink struct {
	// contains filtered or unexported fields
}

func (*InfluxAsyncSink) Close

func (s *InfluxAsyncSink) Close()

func (*InfluxAsyncSink) Flush

func (s *InfluxAsyncSink) Flush() error

func (*InfluxAsyncSink) Name

func (s *InfluxAsyncSink) Name() string

Name returns the name of the metric sink

func (*InfluxAsyncSink) Write

func (s *InfluxAsyncSink) Write(m lp.CCMessage) error

type InfluxAsyncSinkConfig

type InfluxAsyncSinkConfig struct {
	Host         string `json:"host,omitempty"`
	Port         string `json:"port,omitempty"`
	Database     string `json:"database,omitempty"`
	User         string `json:"user,omitempty"`
	Password     string `json:"password,omitempty"`
	Organization string `json:"organization,omitempty"`
	SSL          bool   `json:"ssl,omitempty"`
	// Maximum number of points sent to server in single request. Default 5000
	BatchSize uint `json:"batch_size,omitempty"`
	// Interval, in ms, in which is buffer flushed if it has not been already written (by reaching batch size) . Default 1000ms
	FlushInterval         uint   `json:"flush_interval,omitempty"`
	InfluxRetryInterval   string `json:"retry_interval,omitempty"`
	InfluxExponentialBase uint   `json:"retry_exponential_base,omitempty"`
	InfluxMaxRetries      uint   `json:"max_retries,omitempty"`
	InfluxMaxRetryTime    string `json:"max_retry_time,omitempty"`
	CustomFlushInterval   string `json:"custom_flush_interval,omitempty"`
	MaxRetryAttempts      uint   `json:"max_retry_attempts,omitempty"`
	// Timestamp precision
	Precision string `json:"precision,omitempty"`
	// contains filtered or unexported fields
}

type InfluxSink

type InfluxSink struct {
	// contains filtered or unexported fields
}

func (*InfluxSink) Close

func (s *InfluxSink) Close()

func (*InfluxSink) Flush

func (s *InfluxSink) Flush() error

Flush sends all metrics stored in encoder to InfluxDB server

func (*InfluxSink) Name

func (s *InfluxSink) Name() string

Name returns the name of the metric sink

func (*InfluxSink) Write

func (s *InfluxSink) Write(msg lp.CCMessage) error

Write sends metric m in influxDB line protocol

type LibgangliaSink

type LibgangliaSink struct {
	// contains filtered or unexported fields
}

func (*LibgangliaSink) Close

func (s *LibgangliaSink) Close()

func (*LibgangliaSink) Flush

func (s *LibgangliaSink) Flush() error

func (*LibgangliaSink) Name

func (s *LibgangliaSink) Name() string

Name returns the name of the metric sink

func (*LibgangliaSink) Write

func (s *LibgangliaSink) Write(msg lp.CCMessage) error

type LibgangliaSinkConfig

type LibgangliaSinkConfig struct {
	GangliaLib      string `json:"libganglia_path,omitempty"`
	GmondConfig     string `json:"gmond_config,omitempty"`
	AddGangliaGroup bool   `json:"add_ganglia_group,omitempty"`
	AddTypeToName   bool   `json:"add_type_to_name,omitempty"`
	AddUnits        bool   `json:"add_units,omitempty"`
	ClusterName     string `json:"cluster_name,omitempty"`
	// contains filtered or unexported fields
}

type NatsSink

type NatsSink struct {
	// contains filtered or unexported fields
}

func (*NatsSink) Close

func (s *NatsSink) Close()

func (*NatsSink) Flush

func (s *NatsSink) Flush() error

func (*NatsSink) Name

func (s *NatsSink) Name() string

Name returns the name of the metric sink

func (*NatsSink) Write

func (s *NatsSink) Write(m lp.CCMessage) error

type NatsSinkConfig

type NatsSinkConfig struct {
	Host       string `json:"host,omitempty"`
	Port       string `json:"port,omitempty"`
	Subject    string `json:"subject,omitempty"`
	User       string `json:"user,omitempty"`
	Password   string `json:"password,omitempty"`
	FlushDelay string `json:"flush_delay,omitempty"`

	NkeyFile string `json:"nkey_file,omitempty"`
	// Timestamp precision
	Precision string `json:"precision,omitempty"`
	// contains filtered or unexported fields
}

type PrometheusSink

type PrometheusSink struct {
	// contains filtered or unexported fields
}

func (*PrometheusSink) Close

func (s *PrometheusSink) Close()

func (*PrometheusSink) Flush

func (s *PrometheusSink) Flush() error

func (*PrometheusSink) Name

func (s *PrometheusSink) Name() string

Name returns the name of the metric sink

func (*PrometheusSink) Write

func (s *PrometheusSink) Write(m lp.CCMessage) error

type PrometheusSinkConfig

type PrometheusSinkConfig struct {
	Host             string `json:"host,omitempty"`
	Port             string `json:"port"`
	Path             string `json:"path,omitempty"`
	GroupAsNameSpace bool   `json:"group_as_namespace,omitempty"`
	// contains filtered or unexported fields
}

type SampleSink

type SampleSink struct {
	// contains filtered or unexported fields
}

func (*SampleSink) Close

func (s *SampleSink) Close()

Close sink: close network connection, close files, close libraries, ...

func (*SampleSink) Flush

func (s *SampleSink) Flush() error

If the sink uses batched sends internally, you can tell to flush its buffers

func (*SampleSink) Name

func (s *SampleSink) Name() string

Name returns the name of the metric sink

func (*SampleSink) Write

func (s *SampleSink) Write(point lp.CCMessage) error

Code to submit a single CCMetric to the sink

type SampleSinkConfig

type SampleSinkConfig struct {
	// contains filtered or unexported fields
}

type Sink

type Sink interface {
	Write(point lp.CCMessage) error // Write metric to the sink
	Flush() error                   // Flush buffered metrics
	Close()                         // Close / finish metric sink
	Name() string                   // Name of the metric sink
}

func NewGangliaSink

func NewGangliaSink(name string, config json.RawMessage) (Sink, error)

func NewHttpSink

func NewHttpSink(name string, config json.RawMessage) (Sink, error)

NewHttpSink creates a new http sink

func NewInfluxAsyncSink

func NewInfluxAsyncSink(name string, config json.RawMessage) (Sink, error)

func NewInfluxSink

func NewInfluxSink(name string, config json.RawMessage) (Sink, error)

NewInfluxSink create a new InfluxDB sink

func NewLibgangliaSink

func NewLibgangliaSink(name string, config json.RawMessage) (Sink, error)

func NewNatsSink

func NewNatsSink(name string, config json.RawMessage) (Sink, error)

func NewPrometheusSink

func NewPrometheusSink(name string, config json.RawMessage) (Sink, error)

func NewSampleSink

func NewSampleSink(name string, config json.RawMessage) (Sink, error)

New function to create a new instance of the sink Initialize the sink by giving it a name and reading in the config JSON

func NewStdoutSink

func NewStdoutSink(name string, config json.RawMessage) (Sink, error)

type SinkManager

type SinkManager interface {
	Init(wg *sync.WaitGroup, sinkConfigFile string) error
	AddInput(input chan lp.CCMessage)
	AddOutput(name string, config json.RawMessage) error
	Start()
	Close()
}

Sink manager access functions

func New

func New(wg *sync.WaitGroup, sinkConfigFile string) (SinkManager, error)

New creates a new initialized sink manager

type StdoutSink

type StdoutSink struct {
	// contains filtered or unexported fields
}

func (*StdoutSink) Close

func (s *StdoutSink) Close()

func (*StdoutSink) Flush

func (s *StdoutSink) Flush() error

func (*StdoutSink) Name

func (s *StdoutSink) Name() string

Name returns the name of the metric sink

func (*StdoutSink) Write

func (s *StdoutSink) Write(m lp.CCMessage) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL