Documentation ¶
Index ¶
- Constants
- Variables
- func EncodeTags(tags map[string]string) string
- func IsConsoleOutput(sink SampleProcessor) bool
- func IsFileClosedError(err error) bool
- func IsValidFilename(path string) bool
- func RegisterBuiltinMarshallers(factory *EndpointFactory)
- func RegisterConsoleBoxOutput(e *EndpointFactory)
- func RegisterDefaults(factory *EndpointFactory)
- func RegisterEmptyInputOutput(factory *EndpointFactory)
- func RegisterGolibFlags()
- func RequiredValues(numFields int, sink SampleSink) int
- func ResolveTagTemplate(template string, missingValues string, sample *Sample) string
- type AbstractMarshallingSampleOutput
- type AbstractSampleOutput
- type AbstractSampleProcessor
- type AbstractSampleSource
- type AbstractTcpSink
- type AbstractUnmarshallingSampleSource
- type BatchProcessingStep
- type BatchProcessor
- func (p *BatchProcessor) Add(step BatchProcessingStep) *BatchProcessor
- func (p *BatchProcessor) Close()
- func (p *BatchProcessor) ContainedStringers() []fmt.Stringer
- func (p *BatchProcessor) MergeProcessor(other SampleProcessor) bool
- func (p *BatchProcessor) OutputSampleSize(sampleSize int) int
- func (p *BatchProcessor) Sample(sample *Sample, header *Header) (err error)
- func (p *BatchProcessor) Start(wg *sync.WaitGroup) golib.StopChan
- func (p *BatchProcessor) String() string
- type BidiMarshaller
- type BinaryMarshaller
- func (BinaryMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)
- func (m BinaryMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)
- func (BinaryMarshaller) String() string
- func (BinaryMarshaller) WriteHeader(header *Header, withTags bool, writer io.Writer) error
- func (m BinaryMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error
- type BufferedWriteCloser
- type ConsoleBoxSink
- type CsvMarshaller
- func (CsvMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)
- func (c CsvMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)
- func (CsvMarshaller) String() string
- func (CsvMarshaller) WriteHeader(header *Header, withTags bool, writer io.Writer) error
- func (CsvMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error
- type DroppingSampleProcessor
- type EmptySampleSource
- type EndpointDescription
- type EndpointFactory
- func (f *EndpointFactory) Clear()
- func (f *EndpointFactory) CreateInput(inputs ...string) (SampleSource, error)
- func (f *EndpointFactory) CreateMarshaller(format MarshallingFormat) (Marshaller, error)
- func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error)
- func (f *EndpointFactory) ParseEndpointDescription(endpoint string, isOutput bool) (EndpointDescription, error)
- func (f *EndpointFactory) ParseParameters(params map[string]string) (err error)
- func (f *EndpointFactory) ParseUrlEndpointDescription(endpoint string) (res EndpointDescription, err error)
- func (f *EndpointFactory) Reader(um Unmarshaller) SampleReader
- func (f *EndpointFactory) RegisterFlags()
- func (f *EndpointFactory) RegisterGeneralFlagsTo(fs *flag.FlagSet)
- func (f *EndpointFactory) RegisterInputFlagsTo(fs *flag.FlagSet)
- func (f *EndpointFactory) RegisterOutputFlagsTo(fs *flag.FlagSet)
- func (f *EndpointFactory) Writer() SampleWriter
- type EndpointType
- type FileGroup
- func (group *FileGroup) AllFiles() (all []string, err error)
- func (group *FileGroup) BuildFilename(num int) string
- func (group *FileGroup) BuildFilenameStr(suffix string) string
- func (group *FileGroup) DeleteFiles() error
- func (group *FileGroup) FileRegex() *regexp.Regexp
- func (group *FileGroup) OpenNewFile(counter *int) (file *os.File, err error)
- func (group *FileGroup) WalkFiles(walk func(string, os.FileInfo) error) (num int, err error)
- type FileSink
- type FileSource
- type Header
- type HeaderChecker
- type HttpServerSink
- type IndentPrinter
- type KeyValuePair
- type Marshaller
- type MarshallingFormat
- type MarshallingSampleOutput
- type MergeableProcessor
- type NoopProcessor
- type ParallelSampleHandler
- type ProcessorTaskWrapper
- type ReadSampleHandler
- type ReaderSource
- type ResizingBatchProcessingStep
- type ResizingSampleProcessor
- type Sample
- func (sample *Sample) AddTagsFrom(other *Sample)
- func (sample *Sample) Clone() *Sample
- func (sample *Sample) CopyMetadataFrom(other *Sample)
- func (sample *Sample) DeepClone() *Sample
- func (sample *Sample) DeleteTag(name string)
- func (sample *Sample) HasTag(name string) (ok bool)
- func (sample *Sample) Metadata() *SampleMetadata
- func (sample *Sample) NumTags() (l int)
- func (sample *Sample) ParseTagString(tags string) (err error)
- func (sample *Sample) Resize(newSize int) bool
- func (sample *Sample) SetTag(name, value string)
- func (sample *Sample) SortedTags() (res []KeyValuePair)
- func (sample *Sample) Tag(name string) (value string)
- func (sample *Sample) TagMap() (res map[string]string)
- func (sample *Sample) TagString() (res string)
- type SampleAndHeader
- type SampleHeaderIndex
- type SampleInputStream
- func (stream *SampleInputStream) Close() error
- func (stream *SampleInputStream) Format() string
- func (stream *SampleInputStream) ReadNamedSamples(sourceName string) (err error)
- func (stream *SampleInputStream) ReadSamples(source string) (int, error)
- func (stream *SampleInputStream) ReadTcpSamples(conn io.ReadCloser, remote string, checkClosed func() bool)
- type SampleMetadata
- type SampleOutputStream
- type SamplePipeline
- func (p *SamplePipeline) Add(processor SampleProcessor) *SamplePipeline
- func (p *SamplePipeline) Batch(steps ...BatchProcessingStep) *SamplePipeline
- func (p *SamplePipeline) Construct(tasks *golib.TaskGroup)
- func (p *SamplePipeline) ContainedStringers() []fmt.Stringer
- func (p *SamplePipeline) FormatLines() []string
- func (p *SamplePipeline) StartAndWait(extraTasks ...golib.Task) int
- func (p *SamplePipeline) String() string
- type SampleProcessor
- type SampleReader
- type SampleRing
- type SampleSink
- type SampleSource
- type SampleWriter
- type SimpleBatchProcessingStep
- type SimpleProcessor
- type SortedStringPairs
- type SortedStringers
- type SourceTaskWrapper
- type String
- type StringerContainer
- type SynchronizedReadCloser
- type SynchronizingSampleSink
- type TCPConnCounter
- type TCPListenerSink
- type TCPListenerSource
- type TCPSink
- type TCPSource
- type TagTemplate
- type TcpWriteConn
- type TextMarshaller
- type TitledSamplePipeline
- type UnmarshalledHeader
- type Unmarshaller
- type UnmarshallingSampleSource
- type Value
- type WriteCascade
- type WriterSink
Constants ¶
const ( UndefinedEndpoint = EndpointType("") TcpEndpoint = EndpointType("tcp") TcpListenEndpoint = EndpointType("listen") FileEndpoint = EndpointType("file") StdEndpoint = EndpointType("std") HttpEndpoint = EndpointType("http") EmptyEndpoint = EndpointType("empty") UndefinedFormat = MarshallingFormat("") TextFormat = MarshallingFormat("text") CsvFormat = MarshallingFormat("csv") BinaryFormat = MarshallingFormat("bin") )
const ( // CsvSeparator is the character separating fields in the marshalled output // of CsvMarshaller. CsvSeparator = ',' // CsvNewline is used by CsvMarshaller after outputting the header line and // each sample. CsvNewline = '\n' // CsvDateFormat is the format used by CsvMarshaller to marshall the timestamp // of samples. CsvDateFormat = "2006-01-02 15:04:05.999999999" )
const ( // TextMarshallerDateFormat is the date format used by TextMarshaller to // print the timestamp of each sample. TextMarshallerDateFormat = "2006-01-02 15:04:05.999" // TextMarshallerDefaultSpacing is the default spacing between the columns // printed by TextMarshaller. TextMarshallerDefaultSpacing = 3 // TextMarshallerHeaderChar is used as fill-character in the header line // preceding each sample marshalled by TextMarshaller. TextMarshallerHeaderChar = '=' )
const ( // MaxOutputFileErrors is the number of retries that are accepted before // giving up to open a new output file. After each try, the output filename // will be changed. MaxOutputFileErrors = 5 // MkdirsPermissions defines the permission bits used when creating new // directories for storing output files. MkdirsPermissions = 0755 )
const ( // BinarySeparator is the character separating fields in the marshalled output // of BinaryMarshaller. Every field is marshalled on a separate line. BinarySeparator = '\n' )
const ConsoleBoxEndpoint = EndpointType("box")
const MinimumInputIoBuffer = 16 // Needed for auto-detecting stream format
const TAG_TEMPLATE_ENV_PREFIX = "ENV_"
Variables ¶
var ( ConsoleBoxSettings = gotermBox.CliLogBox{ NoUtf8: false, LogLines: 10, MessageBuffer: 500, } ConsoleBoxUpdateInterval = 500 * time.Millisecond ConsoleBoxMinUpdateInterval = 50 * time.Millisecond )
var DefaultEndpointFactory = EndpointFactory{ FlagOutputFilesClean: false, FlagIoBuffer: 4096, FlagTcpConnectionLimit: 0, FlagParallelHandler: ParallelSampleHandler{ ParallelParsers: runtime.NumCPU(), BufferedSamples: 10000, }, FlagFilesKeepAlive: false, FlagInputFilesRobust: false, FlagInputTcpAcceptLimit: 0, FlagTcpSourceDropErrors: false, FlagOutputTcpListenBuffer: 0, FlagFilesAppend: false, FlagFileVanishedCheck: 0, }
var StopWalking = errors.New("stop walking")
StopWalking can be returned from the walk function parameter for WalkFiles to indicate, that the tree should not be walked any further down the current directory.
var ( TagStringEscaper = strings.NewReplacer( tag_equals, tag_replacement, tag_separator, tag_replacement, string(BinarySeparator), tag_replacement, string(CsvSeparator), tag_replacement, string(CsvNewline), tag_replacement) )
var WarnObsoleteBinaryFormat = true
Functions ¶
func EncodeTags ¶ added in v0.0.6
func IsConsoleOutput ¶
func IsConsoleOutput(sink SampleProcessor) bool
IsConsoleOutput returns true if the given processor will output to the standard output when started.
func IsFileClosedError ¶
IsFileClosedError returns true, if the given error likely originates from intentionally closing a file, while it is still being read concurrently.
func IsValidFilename ¶
IsValidFilename tries to infer in a system-independent way, if the given path is a valid file name.
func RegisterBuiltinMarshallers ¶
func RegisterBuiltinMarshallers(factory *EndpointFactory)
func RegisterConsoleBoxOutput ¶
func RegisterConsoleBoxOutput(e *EndpointFactory)
func RegisterDefaults ¶
func RegisterDefaults(factory *EndpointFactory)
func RegisterEmptyInputOutput ¶
func RegisterEmptyInputOutput(factory *EndpointFactory)
func RegisterGolibFlags ¶
func RegisterGolibFlags()
func RequiredValues ¶
func RequiredValues(numFields int, sink SampleSink) int
RequiredValues the number of Values that should be large enough to hold the end-result after processing a Sample by all intermediate SampleProcessors. The result is based on ResizingSampleProcessor.OutputSampleSize(). SampleProcessor instances that do not implement the ResizingSampleProcessor interface are assumed to not increase the number metrics.
Types ¶
type AbstractMarshallingSampleOutput ¶
type AbstractMarshallingSampleOutput struct { AbstractSampleOutput // Marshaller will be used when converting Samples to byte buffers before // writing them to the given output stream. Marshaller Marshaller // Writer contains variables that control the marshalling and writing process. // They must be configured before calling Start() on this AbstractSampleOutput. Writer SampleWriter }
AbstractMarshallingSampleOutput is a partial implementation of MarshallingSampleOutput with a simple implementation of SetMarshaller().
func (*AbstractMarshallingSampleOutput) SetMarshaller ¶
func (out *AbstractMarshallingSampleOutput) SetMarshaller(marshaller Marshaller)
SetMarshaller implements the SampleOutput interface.
type AbstractSampleOutput ¶
type AbstractSampleOutput struct { AbstractSampleProcessor // DontForwardSamples can be set to true to disable forwarding of received samples // to the subsequent SampleProcessor. DontForwardSamples bool // DropOutputErrors can be set to true to make this AbstractSampleOutput ignore // errors that occurred from outputting samples to byte streams like files or network Connections. // In that case, such errors will be logged and the samples will be forwarded to subsequent // processing steps. DropOutputErrors bool }
AbstractSampleOutput is a partial implementation of SampleProcessor intended for processors that output samples to an external data sink (e.g. console, file, ...). Configuration variables are provided for controlling the error handling.
func (*AbstractSampleOutput) Sample ¶
func (out *AbstractSampleOutput) Sample(err error, sample *Sample, header *Header) error
Sample forwards the received header and sample the the subsequent SampleProcessor, unless the DontForwardSamples flag has been set. Actual implementations of SampleOutput should provide an implementation that writes the samples to some destination. The error parameter should be an error (possibly nil), that resulted from previously writing the sample to some byte stream output (like a file or network connection). Depending on the configuration of this AbstractSampleOutput, this error will be returned immediately or simply logged so that the sample can be forwarded to the subsequent processing step.
type AbstractSampleProcessor ¶
type AbstractSampleProcessor struct {
AbstractSampleSource
}
AbstractSampleProcessor provides a few basic methods for implementations of SampleProcessor. It currently simply embeds the AbstractSampleSource type, but should be used instead of it to make the purpose more clear.
type AbstractSampleSource ¶
type AbstractSampleSource struct {
// contains filtered or unexported fields
}
AbstractSampleSource is a partial implementation of SampleSource that stores the SampleProcessor and closes the outgoing SampleProcessor after all samples have been generated.
func (*AbstractSampleSource) CloseSink ¶
func (s *AbstractSampleSource) CloseSink()
CloseSink closes the subsequent SampleProcessor. It must be called after the receiving AbstractSampleSource has finished producing samples.
func (*AbstractSampleSource) CloseSinkParallel ¶
func (s *AbstractSampleSource) CloseSinkParallel(wg *sync.WaitGroup)
CloseSinkParallel closes the subsequent SampleProcessor in a concurrent goroutine, which is registered in the WaitGroup. This can be useful compared to CloseSink() in certain cases to avoid deadlocks due to long-running Close() invocations. As a general rule of thumb, Implementations of SampleSource should use CloseSinkParallel(), while SampleProcessors should simply use CloseSink().
func (*AbstractSampleSource) GetSink ¶
func (s *AbstractSampleSource) GetSink() SampleProcessor
GetSink implements the SampleSource interface.
func (*AbstractSampleSource) SetSink ¶
func (s *AbstractSampleSource) SetSink(sink SampleProcessor)
SetSink implements the SampleSource interface.
type AbstractTcpSink ¶
type AbstractTcpSink struct { AbstractMarshallingSampleOutput TCPConnCounter // LogReceivedTraffic enables logging received TCP traffic, which is usually not expected. // Only the values log.ErrorLevel, log.WarnLevel, log.InfoLevel, log.DebugLevel enable logging. LogReceivedTraffic log.Level // Protocol is used for more detailed logging Protocol string }
AbstractTcpSink is a helper type for TCP-based SampleSink implementations. The two fields AbstractSampleOutput and TCPConnCounter can be used to configure different aspects of the marshalling and writing of the data. The purpose of AbstractTcpSink is to create instances of TcpWriteConn with the configured parameters.
func (*AbstractTcpSink) OpenWriteConn ¶
func (sink *AbstractTcpSink) OpenWriteConn(wg *sync.WaitGroup, remoteAddr string, conn io.WriteCloser) *TcpWriteConn
OpenWriteConn wraps a net.TCPConn in a new TcpWriteConn using the parameters defined in the receiving AbstractTcpSink.
type AbstractUnmarshallingSampleSource ¶
type AbstractUnmarshallingSampleSource struct { AbstractSampleSource // Reader configures aspects of parallel reading and parsing. See SampleReader for more info. Reader SampleReader }
AbstractUnmarshallingSampleSource extends AbstractSampleSource by adding configuration fields required for unmarshalling samples.
func (*AbstractUnmarshallingSampleSource) SetSampleHandler ¶
func (s *AbstractUnmarshallingSampleSource) SetSampleHandler(handler ReadSampleHandler)
SetSampleHandler implements the UnmarshallingSampleSource interface
type BatchProcessingStep ¶
type BatchProcessor ¶
type BatchProcessor struct { NoopProcessor Steps []BatchProcessingStep FlushTimeout time.Duration // If > 0, flush when no new samples are received for the given duration. The wall-time is used for this (not sample timestamps) SampleTimestampFlushTimeout time.Duration // If > 0, flush when a sample is received with a timestamp jump bigger than this FlushTags []string // If set, flush every time any of these tags change // contains filtered or unexported fields }
func (*BatchProcessor) Add ¶
func (p *BatchProcessor) Add(step BatchProcessingStep) *BatchProcessor
func (*BatchProcessor) Close ¶
func (p *BatchProcessor) Close()
func (*BatchProcessor) ContainedStringers ¶
func (p *BatchProcessor) ContainedStringers() []fmt.Stringer
func (*BatchProcessor) MergeProcessor ¶
func (p *BatchProcessor) MergeProcessor(other SampleProcessor) bool
func (*BatchProcessor) OutputSampleSize ¶
func (p *BatchProcessor) OutputSampleSize(sampleSize int) int
func (*BatchProcessor) Sample ¶
func (p *BatchProcessor) Sample(sample *Sample, header *Header) (err error)
func (*BatchProcessor) String ¶
func (p *BatchProcessor) String() string
type BidiMarshaller ¶
type BidiMarshaller interface { Read(input *bufio.Reader, previousHeader *UnmarshalledHeader) (newHeader *UnmarshalledHeader, sampleData []byte, err error) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (*Sample, error) WriteHeader(header *Header, withTags bool, output io.Writer) error WriteSample(sample *Sample, header *Header, withTags bool, output io.Writer) error String() string }
BidiMarshaller is a bidirectional marshaller that combines the Marshaller and Unmarshaller interfaces.
type BinaryMarshaller ¶
type BinaryMarshaller struct { }
BinaryMarshaller marshalled every sample to a dense binary format.
The header is marshalled to a newline-separated list of strings. The first field is 'timB', the second field is 'tags' if the following samples include tags. The following fields are the names of the metrics in the header. An empty line denotes the end of the header.
After the header, every sample is marshalled as follows. A special byte sequence signals the start of a sample. This is used to distinguish between sample data and a new header. Headers always start with the string "time". Then, the timestamp is marshalled as a big-endian unsigned int64 value containing the nanoseconds since the Unix epoch (8 bytes). Then the tags are marshalled as a newline-delimited string containing a space-separated list of key-values pairs for the tags. If the 'tags' field was missing in the header fields, this tags string is missing, including the newline delimiter. After the optional tags string the values for the sample are marshalled as an array of big-endian double-precision values, 8 bytes each. Since the number of metrics is known from the header, the number of bytes for one sample is given as 8 * number of metrics.
func (BinaryMarshaller) ParseSample ¶
func (BinaryMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)
ParseSample implements the Unmarshaller interface by parsing the byte buffer to a new Sample instance. See the godoc for BinaryMarshaller for details on the format.
func (BinaryMarshaller) Read ¶
func (m BinaryMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)
Read implements the Unmarshaller interface. It peeks a few bytes from the input stream to decide if the stream contains a header or a sample. In case of a header, Read() continues reading until an empty line and parse the data to a header instance. In case of a sample, the size is derived from the previousHeader parameter.
func (BinaryMarshaller) String ¶
func (BinaryMarshaller) String() string
String implements the Marshaller interface.
func (BinaryMarshaller) WriteHeader ¶
WriteHeader implements the Marshaller interface by writing a newline-separated list of header field strings and an additional empty line.
func (BinaryMarshaller) WriteSample ¶
func (m BinaryMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error
WriteSample implements the Marshaller interface by writing the Sample out in a dense binary format. See the BinaryMarshaller godoc for information on the format.
type BufferedWriteCloser ¶
BufferedWriteCloser is a helper type that wraps a bufio.Writer around a io.WriteCloser, while still implementing the io.WriteCloser interface and forwarding all method calls to the correct receiver. The Writer field should not be accessed directly.
func NewBufferedWriteCloser ¶
func NewBufferedWriteCloser(writer io.WriteCloser, io_buffer int) *BufferedWriteCloser
NewBufferedWriteCloser creates a BufferedWriteCloser instance wrapping the writer parameter. It creates a bufio.Writer with a buffer size of the io_buffer parameter.
func (*BufferedWriteCloser) Close ¶
func (writer *BufferedWriteCloser) Close() (err error)
Close implements the Close method in io.WriteCloser by flushing its bufio.Writer and forwarding the Close call to the io.WriteCloser used to create it.
type ConsoleBoxSink ¶
type ConsoleBoxSink struct { AbstractSampleOutput gotermBox.CliLogBoxTask // ImmediateScreenUpdate causes the console box to be updated immediately // whenever a sample is received by this ConsoleBoxSink. Otherwise, the screen // will be updated in regular intervals based on the settings in CliLogBoxTask. ImmediateScreenUpdate bool // contains filtered or unexported fields }
ConsoleBoxSink implements the SampleSink interface by printing the received samples to the standard out. Contrary to the ConsoleSink, the screen is erased before printing a new sample, and the output is embedded in a box that shows the last lines of log output at the bottom. ConsoleBoxSink does not implement MarshallingSampleSink, because it uses its own, fixed marshaller.
Multiple embedded fields provide access to configuration options.
Init() must be called as early as possible when using ConsoleBoxSink, to make sure that all log messages are capture and none are overwritten by the box.
func (*ConsoleBoxSink) Close ¶
func (sink *ConsoleBoxSink) Close()
Close implements the SampleSink interface. It stops the screen refresh goroutine.
func (*ConsoleBoxSink) Sample ¶
func (sink *ConsoleBoxSink) Sample(sample *Sample, header *Header) error
Sample implements the SampleSink interface. The latest sample is stored and displayed on the console on the next screen refresh. Intermediate samples might get lost without being displayed.
func (*ConsoleBoxSink) Start ¶
func (sink *ConsoleBoxSink) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSink interface. It starts a goroutine that regularly refreshes the screen to display the current sample values and latest log output lines.
func (*ConsoleBoxSink) Stop ¶
func (sink *ConsoleBoxSink) Stop()
Stop shadows the Stop() method from gotermBox.CliLogBoxTask to make sure that this SampleSink is actually closed in the Close() method.
func (*ConsoleBoxSink) String ¶
func (sink *ConsoleBoxSink) String() string
String implements the SampleSink interface.
type CsvMarshaller ¶
type CsvMarshaller struct { }
CsvMarshaller marshals Headers and Samples to a CSV format.
Every header is marshalled as a comma-separated CSV header line. The first field is 'time', the second field is 'tags' (if the following samples contain tags). After that the header contains a list of all metrics.
Every sample is marshalled to a comma-separated line starting with a textual representation of the timestamp (see CsvDateFormat, UTC timezone), then a space-separated key-value list for the tags (only if the 'tags' field was included in the header), and then all the metric values in the same order as on the preceding header line. To follow the semantics of a correct CSV file, every changed header should start a new CSV file.
Every CSV line must be terminated by a newline character (including the last line in a file).
CsvMarshaller can deal with multiple header declarations in the same file or data stream. A line that begins with the string "time" is assumed to start a new header, since samples usually start with a timestamp, which cannot be formatted as "time".
There are no configuration options for CsvMarshaller.
func (CsvMarshaller) ParseSample ¶
func (CsvMarshaller) ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (sample *Sample, err error)
ParseSample implements the Unmarshaller interface by parsing a CSV line.
func (CsvMarshaller) Read ¶
func (c CsvMarshaller) Read(reader *bufio.Reader, previousHeader *UnmarshalledHeader) (*UnmarshalledHeader, []byte, error)
Read implements the Unmarshaller interface by reading CSV line from the input stream. Based on the first field, Read decides whether the line represents a header or a Sample. In case of a header, the CSV fields are split and parsed to a Header instance. In case of a Sample, the data for the line is returned without parsing it.
func (CsvMarshaller) String ¶
func (CsvMarshaller) String() string
String implements the Marshaller interface.
func (CsvMarshaller) WriteHeader ¶
WriteHeader implements the Marshaller interface by printing a CSV header line.
func (CsvMarshaller) WriteSample ¶
func (CsvMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error
WriteSample implements the Marshaller interface by writing a CSV line.
type DroppingSampleProcessor ¶
type DroppingSampleProcessor struct {
AbstractSampleProcessor
}
DroppingSampleProcessor implements the SampleProcessor interface by dropping any incoming samples.
func (*DroppingSampleProcessor) Close ¶
func (s *DroppingSampleProcessor) Close()
Close implements the SampleProcessor interface.
func (*DroppingSampleProcessor) Sample ¶
func (s *DroppingSampleProcessor) Sample(sample *Sample, header *Header) error
Sample implements the SampleProcessor interface.
func (*DroppingSampleProcessor) Start ¶
func (s *DroppingSampleProcessor) Start(wg *sync.WaitGroup) (_ golib.StopChan)
Start implements the golib.Task interface.
func (*DroppingSampleProcessor) String ¶
func (s *DroppingSampleProcessor) String() string
String implements the golib.Task interface.
type EmptySampleSource ¶
type EmptySampleSource struct { AbstractSampleSource // contains filtered or unexported fields }
EmptySampleSource implements SampleSource but does not generate any samples. It is used in cases where a source is required but no real implementation is available.
func (*EmptySampleSource) Close ¶
func (s *EmptySampleSource) Close()
Close implements the SampleSource interface.
func (*EmptySampleSource) SetSampleHandler ¶
func (s *EmptySampleSource) SetSampleHandler(handler ReadSampleHandler)
SetSampleHandler implements the UnmarshallingSampleSource interface.
func (*EmptySampleSource) Start ¶
func (s *EmptySampleSource) Start(wg *sync.WaitGroup) (_ golib.StopChan)
Start implements the golib.Task interface.
func (*EmptySampleSource) String ¶
func (s *EmptySampleSource) String() string
String implements the golib.Task interface.
type EndpointDescription ¶
type EndpointDescription struct { Format MarshallingFormat Type EndpointType IsCustomType bool Target string Params map[string]string }
EndpointDescription describes a data endpoint, regardless of the data direction (input or output).
func GuessEndpointDescription ¶
func GuessEndpointDescription(endpoint string) (res EndpointDescription, err error)
GuessEndpointDescription guesses the transport type and format of the given endpoint target. See GuessEndpointType for details.
func (EndpointDescription) DefaultOutputFormat ¶
func (e EndpointDescription) DefaultOutputFormat() MarshallingFormat
DefaultOutputFormat returns the default MarshallingFormat that should be used when sending data to the described endpoint, if no format is specified by the user.
func (EndpointDescription) OutputFormat ¶
func (e EndpointDescription) OutputFormat() MarshallingFormat
OutputFormat returns the MarshallingFormat that should be used when sending data to the described endpoint.
type EndpointFactory ¶
type EndpointFactory struct { FlagSourceTag string FlagInputFilesRobust bool FlagOutputFilesClean bool FlagIoBuffer int FlagFilesKeepAlive bool FlagFilesAppend bool FlagFileVanishedCheck time.Duration FlagOutputTcpListenBuffer uint FlagTcpConnectionLimit uint FlagInputTcpAcceptLimit uint FlagTcpSourceDropErrors bool FlagTcpLogReceivedData bool FlagParallelHandler ParallelSampleHandler // CustomDataSources can be filled by client code before EndpointFactory.CreateInput or similar // methods to allow creation of custom data sources. The map key is a short name of the data source // that can be used in URL endpoint descriptions. The parameter for the function will be // the URL path of the endpoint. Example: When registering a function with the key "http", the following // URL endpoint: // http://localhost:5555/abc // will invoke the factory function with the parameter "localhost:5555/abc" CustomDataSources map[EndpointType]func(string) (SampleSource, error) // CustomDataSinks can be filled by client code before EndpointFactory.CreateOutput or similar // methods to allow creation of custom data sinks. See CustomDataSources for the meaning of the // map keys and values. CustomDataSinks map[EndpointType]func(string) (SampleProcessor, error) // Marshallers can be filled by client code before EndpointFactory.CreateOutput or similar // methods to allow custom marshalling formats in output files, network connections and so on. Marshallers map[MarshallingFormat]func() Marshaller // CustomGeneralFlags, CustomInputFlags and CustomOutputFlags lets client code // register custom command line flags that configure aspects of endpoints created // through CustomDataSources and CustomDataSinks. CustomGeneralFlags []func(f *flag.FlagSet) CustomInputFlags []func(f *flag.FlagSet) CustomOutputFlags []func(f *flag.FlagSet) }
EndpointFactory creates SampleSink and SampleSource instances for a SamplePipeline. It defines command line flags for configuring the objects it creates. All fields named Flag* are set by the according command line flags and evaluated in CreateInput() and CreateOutput(). FlagInputs is not set by command line flags automatically. After flag.Parse(), those fields can be modified to override the command line flags defined by the user.
func NewEndpointFactory ¶
func NewEndpointFactory() *EndpointFactory
func (*EndpointFactory) Clear ¶
func (f *EndpointFactory) Clear()
func (*EndpointFactory) CreateInput ¶
func (f *EndpointFactory) CreateInput(inputs ...string) (SampleSource, error)
CreateInput creates a SampleSource object based on the given input endpoint descriptions and the configuration flags in the EndpointFactory.
func (*EndpointFactory) CreateMarshaller ¶
func (f *EndpointFactory) CreateMarshaller(format MarshallingFormat) (Marshaller, error)
func (*EndpointFactory) CreateOutput ¶
func (f *EndpointFactory) CreateOutput(output string) (SampleProcessor, error)
CreateInput creates a SampleSink object based on the given output endpoint description and the configuration flags in the EndpointFactory.
func (*EndpointFactory) ParseEndpointDescription ¶
func (f *EndpointFactory) ParseEndpointDescription(endpoint string, isOutput bool) (EndpointDescription, error)
ParseEndpointDescription parses the given string to an EndpointDescription object. The string can be one of two forms: the URL-style description will be parsed by ParseUrlEndpointDescription, other descriptions will be parsed by GuessEndpointDescription.
func (*EndpointFactory) ParseParameters ¶
func (f *EndpointFactory) ParseParameters(params map[string]string) (err error)
func (*EndpointFactory) ParseUrlEndpointDescription ¶
func (f *EndpointFactory) ParseUrlEndpointDescription(endpoint string) (res EndpointDescription, err error)
ParseUrlEndpointDescription parses the endpoint string as a URL endpoint description. It has the form:
format+transport://target
One of the format and transport parts must be specified, optionally both. If one of format or transport is missing, it will be guessed. The order does not matter. The 'target' part must not be empty.
func (*EndpointFactory) Reader ¶
func (f *EndpointFactory) Reader(um Unmarshaller) SampleReader
Writer returns an instance of SampleReader, configured by the values stored in the EndpointFactory.
func (*EndpointFactory) RegisterFlags ¶
func (f *EndpointFactory) RegisterFlags()
RegisterConfigFlags registers all flags to the global CommandLine object.
func (*EndpointFactory) RegisterGeneralFlagsTo ¶
func (f *EndpointFactory) RegisterGeneralFlagsTo(fs *flag.FlagSet)
RegisterGeneralFlagsTo registers flags that configure different aspects of both data input and data output. These flags affect to both performance and functionality of TCP, file and std I/O.
func (*EndpointFactory) RegisterInputFlagsTo ¶
func (f *EndpointFactory) RegisterInputFlagsTo(fs *flag.FlagSet)
RegisterInputFlagsTo registers flags that configure aspects of data input.
func (*EndpointFactory) RegisterOutputFlagsTo ¶
func (f *EndpointFactory) RegisterOutputFlagsTo(fs *flag.FlagSet)
RegisterOutputConfigFlagsTo registers flags that configure data outputs.
func (*EndpointFactory) Writer ¶
func (f *EndpointFactory) Writer() SampleWriter
Writer returns an instance of SampleWriter, configured by the values stored in the EndpointFactory.
type EndpointType ¶
type EndpointType string
func GuessEndpointType ¶
func GuessEndpointType(target string) (EndpointType, error)
GuessEndpointType guesses the EndpointType for the given target. Three forms of are recognized for the target:
- A host:port pair indicates an active TCP endpoint
- A :port pair (without the host part, but with the colon) indicates a passive TCP endpoint listening on the given port.
- The hyphen '-' is interpreted as standard input/output.
- All other targets are treated as file names.
type FileGroup ¶
type FileGroup struct {
// contains filtered or unexported fields
}
FileGroup provides utility functionality when dealing with a group of files sharing the same directory, file prefix and file extension. It provides methods for listing, walking or deleting files that belong to that group.
func NewFileGroup ¶
NewFileGroup returns a new FileGroup instance. The filename parameter is parsed and split into directory, file name prefix and file extension. The file can also have no extension.
func (*FileGroup) AllFiles ¶
AllFiles returns a slice of all files that belong to the receiving FileGroup, and a non-nil error if the list could not be determined. AllFiles returns all files matching the regular expression returned by FileRegex().
The files are returned sorted in the order they would be written out by FileSink.
func (*FileGroup) BuildFilename ¶
BuildFilename returns a file belonging to the receiving group, with the added number as suffix. The suffix is added before the file extension, separated with a hyphen, like so:
dir1/dir2/filePrefix-<num>.ext
func (*FileGroup) BuildFilenameStr ¶
BuildFilenameStr returns a file belonging to the receiving group, with the added string as suffix. The suffix is added before the file extension, separated with a hyphen, like so:
dir1/dir2/filePrefix-<suffix>.ext
func (*FileGroup) DeleteFiles ¶
DeleteFiles tries to delete all files that belong to the receiving FileGroup and returns a non-nil error when deleting any of the files failed. DeleteFiles deletes all files matching the regular expression returned by FileRegex().
func (*FileGroup) FileRegex ¶
FileRegex returns a regular expression that matches file names belonging to the receiving group. Only files with an optional numeric suffix are matched, e.g.:
dir1/dir2/filePrefix(-[0-9]+)?.ext
For empty 'filePrefix':
dir1/dir2/[0-9]+.ext
func (*FileGroup) OpenNewFile ¶
OpenFile attempts to open a new file that will belong to the file group. An integer suffix is counted up to find a non-existing file. A small number of errors is tolerated before giving up.
func (*FileGroup) WalkFiles ¶
WalkFiles walks all files that belong to the receiving FileGroup. It returns the number of walked files and a non-nil error if there was an error while walking. The walk function parameter is called for every file, providing the file name and the respective os.FileInfo.
WalkFiles walks all files that match the regular expression returns by FileRegex().
The files are walked in lexical order, which does not represent the order the files would be written by FileSink.
type FileSink ¶
type FileSink struct { // AbstractSampleOutput defines the Marshaller and SampleWriter that will // be used when writing Samples. See their documentation for further info. AbstractMarshallingSampleOutput // Filename defines the file that will be used for writing Samples. Each time a new Header // is received be FileSink, a new file will be opened automatically. The file names are built // by FileGroup.BuildFilename(), using an automatically incrementing integer suffix. The first // filename will not have any suffix, the second file will have suffix "-0", the second "-1", and so on. // If one of those files already exists, the suffix keeps incrementing, until a free slot is found. // If errors occur while opening output files, a number of retries is attempted while incrementing // the suffix, until the number of error exceeds MaxOutputFileErrors. After this, the FileSink stops // and reports the last error. All intermediate errors are logged as warnings. Filename string // IoBuffer defines the output buffer when writing samples to a file. It should be large // enough to minimize the number of write() calls in the operating system. IoBuffer int // CleanFiles can be set to true to delete all files that would potentially collide with output files. // In particular, this causes the following when starting the FileSink: // NewFileGroup(sink.Filename).DeleteFiles() // When deleting these files fails, the FileSink stops and reports an error. CleanFiles bool // Append can be set to true to make the FileSink append data to a file, if it exists. Append bool // VanishedFileCheck can be set to > 0 to enable a periodic check, if the currently opened // output file is still available under the same file path as it was opened. The check will // be performed whenever a sample is to be written and the last check is older than the given // duration. If the check fails, the output file is reopened, including the creation of all necessary directories. // This can happen, if the output file is deleted while still being written to, and enabling // the VanishedFileCheck leads to the file be recreated, which could be the more expected behavior. VanishedFileCheck time.Duration // contains filtered or unexported fields }
FileSink is an implementation of SampleSink that writes output Headers and Samples to a given file. Every time a new Header is received by the FileSink, a new file is opened using an automatically incremented number as suffix (see FileGroup). Other parameters define the parsing behavior of the FileSink.
func (*FileSink) Close ¶
func (sink *FileSink) Close()
Close implements the SampleSink interface. It flushes and closes the currently open file. No more data should be written to Sample/Header after calling Close.
type FileSource ¶
type FileSource struct { AbstractUnmarshallingSampleSource // File names is a slice of all files that will be read by the FileSource in sequence. // For every Filename, the FileSource will not only read the file itself, // but also for all files that belong to the same FileGroup, as returned by: // NewFileGroup(filename).AllFiles() FileNames []string // ReadFileGroups can be set to true to extend the input files to the associated // file groups. For an input file named 'data.bin', all files named 'data-[0-9]+.bin' // will be read as well. The file group for 'data' is 'data-[0-9]+', the file // group for '.bin' is '[0-9]+.bin'. ReadFileGroups bool // Robust can be set to true to allow errors when reading or parsing files, // and only print Warnings instead. This is useful if the files to be parsed // are mostly valid, but have garbage at the end. Robust bool // IoBuffer configures the buffer size for read files. It should be large enough // to allow multiple goroutines to parse the read data in parallel. IoBuffer int // ConvertFile is an optional hook for converting the filename to a custom string. // The custom string will then be passed to the ReadSampleHandler configured in // the Reader field, instead of simply using the filename. ConvertFilename func(string) string // KeepAlive makes this FileSource not close after all files have been read. // Instead, it will stay open without producing any more data. KeepAlive bool // UnsynchronizedFileAccess can be set to true to disable synchronizing Read() and Close() // methods of files through a sync.RWMutex. Tests shows no measurable performance difference // from the additional Lock/Unlock operations, but they prevent potential race conditions // when accessing the underlying fd (file descriptor) field, as reported by the Go race detector. UnsynchronizedFileAccess bool // contains filtered or unexported fields }
FileSource is an implementation of UnmarshallingSampleSource that reads samples from one or more files. Various parameters control the behavior and performance of the FileSource.
func (*FileSource) Close ¶
func (source *FileSource) Close()
Close implements the SampleSource interface. it stops all goroutines that are spawned for reading files and prints any errors to the logger. Calling it after the FileSource finished on its own will have no effect.
func (*FileSource) Start ¶
func (source *FileSource) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSource interface. It starts reading all configured files in sequence using background goroutines. Depending on the Robust flag of the receiving FileSource, the reading exits after the first error, or continues until all configured files have been opened.
func (*FileSource) String ¶
func (source *FileSource) String() string
String implements the SampleSource interface.
type Header ¶
type Header struct { // Fields defines the names of the metrics of samples belonging to this header. Fields []string }
Header defines the structure of samples that belong to this header. When unmarshalling headers and sample, usually one header precedes a number of samples. Those samples are defined by the header.
func (*Header) BuildIndex ¶
BuildIndex creates a dictionary of the header field names to their index in the header for optimize access to sample values.
func (*Header) Clone ¶
Clone creates a copy of the Header receiver, using a new string-array as the header fields.
func (*Header) Equals ¶
Equals compares the receiving header with the argument header and returns true, if the two represent the same header. This method tried to optimize the comparison by first comparing the header pointers and the length of the Fields slices, and pointers to the arrays backing the Fields slices. If all the checks fail, the last resort is to compare all the fields string-by-string.
type HeaderChecker ¶
type HeaderChecker struct {
LastHeader *Header
}
HeaderChecker is a helper type for implementations of SampleSink to find out, when the incoming header changes.
func (*HeaderChecker) HeaderChanged ¶
func (h *HeaderChecker) HeaderChanged(newHeader *Header) bool
HeaderChanged returns true, if the newHeader parameter represents a different header from the last time HeaderChanged was called. The result will also be true for the first time this method is called.
func (*HeaderChecker) InitializedHeaderChanged ¶
func (h *HeaderChecker) InitializedHeaderChanged(newHeader *Header) bool
InitializedHeaderChanged returns true, if the newHeader parameter represents a different header from the last time HeaderChanged was called. The first call to this method will return false, so this can be used in situations where the header has to be initialized.
type HttpServerSink ¶ added in v0.0.5
type HttpServerSink struct { AbstractTcpSink // Endpoint defines the TCP host and port to listen on for incoming TCP connections. // The host can be empty (e.g. ":1234"). If not, it must contain a hostname or IP of the // local host. Endpoint string // If BufferedSamples is >0, the given number of latest samples will be kept in a ring buffer. // New requests will first receive all samples currently in the buffer, and will // afterwards continue receiving live incoming samples. BufferedSamples uint // SubPathTag can be set to allow requesting samples on HTTP path /<val>, which will only output that // contain the tag <SubPathTag>=<val>. The root path '/' still serves all samples. SubPathTag string // RootPathPrefix is the base path for requests. A '/' will be appended. RootPathPrefix string // contains filtered or unexported fields }
HttpServerSink implements the SampleSink interface as an HTTP server. It listens for incoming HTTP connections on a port and provides incoming data on certain HTTP request paths.
func (*HttpServerSink) Close ¶ added in v0.0.5
func (sink *HttpServerSink) Close()
Close implements the SampleSink interface. It closes any existing connection and shuts down the HTTP server.
func (*HttpServerSink) Sample ¶ added in v0.0.5
func (sink *HttpServerSink) Sample(sample *Sample, header *Header) error
Sample implements the SampleSink interface. It stores the sample in a ring buffer and sends it to all established connections. New connections will first receive all samples stored in the buffer, before getting the live samples directly. If the buffer is disable or full, and there are no established connections, samples are dropped.
func (*HttpServerSink) Start ¶ added in v0.0.5
func (sink *HttpServerSink) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSink interface. It creates the TCP socket and starts listening on it in a separate goroutine. Any incoming connection is then handled in their own goroutine.
func (*HttpServerSink) String ¶ added in v0.0.5
func (sink *HttpServerSink) String() string
String implements the SampleSink interface.
type IndentPrinter ¶
type IndentPrinter struct { OuterIndent string InnerIndent string FillerIndent string CornerIndent string }
func (IndentPrinter) PrintLines ¶
func (p IndentPrinter) PrintLines(obj fmt.Stringer) []string
type KeyValuePair ¶
KeyValuePair represents a key-value string pair
type Marshaller ¶
type Marshaller interface { String() string WriteHeader(header *Header, withTags bool, output io.Writer) error WriteSample(sample *Sample, header *Header, withTags bool, output io.Writer) error }
Marshaller is an interface for converting Samples and Headers into byte streams. The byte streams can be anything including files, network connections, console output, or in-memory byte buffers.
type MarshallingFormat ¶
type MarshallingFormat string
type MarshallingSampleOutput ¶
type MarshallingSampleOutput interface { SampleProcessor // SetMarshaller must configure a valid instance of Marshaller before Start() is called. // All received samples will be converted to a byte stream using the configured marshaller. SetMarshaller(marshaller Marshaller) }
MarshallingSampleOutput is a SampleProcessor that outputs the received samples to a byte stream that is generated by a Marshaller instance.
type MergeableProcessor ¶
type MergeableProcessor interface { SampleProcessor MergeProcessor(other SampleProcessor) bool }
MergeableProcessor is an extension of SampleProcessor, that also allows merging two processor instances of the same time into one. Merging is only allowed when the result of the merge would has exactly the same functionality as using the two separate instances. This can be used as an optional optimization.
type NoopProcessor ¶
type NoopProcessor struct { AbstractSampleProcessor StopChan golib.StopChan }
NoopProcessor is an empty implementation of SampleProcessor. It can be directly added to a SamplePipeline and will behave as a no-op processing step. Other implementations of SampleProcessor can embed this and override parts of the methods as required. No initialization is needed for this type, but an instance can only be used once, in one pipeline.
func (*NoopProcessor) Close ¶
func (p *NoopProcessor) Close()
Close implements the SampleProcessor interface by closing the outgoing sink and internal golib.StopChan. Other types that embed NoopProcessor can override this to perform specific actions when closing, but CloseSink() should always be called in the end.
func (*NoopProcessor) CloseSink ¶
func (p *NoopProcessor) CloseSink()
CloseSink reports that this NoopProcessor is finished processing. All goroutines must be stopped, and all Headers and Samples must be already forwarded to the outgoing sink, when this is called. CloseSink forwards the Close() invocation to the outgoing sink.
func (*NoopProcessor) Error ¶
func (p *NoopProcessor) Error(err error)
Error reports that NoopProcessor has encountered an error and has stopped operation. After calling this, no more Headers and Samples can be forwarded to the outgoing sink. Ultimately, p.Close() will be called for cleaning up.
func (*NoopProcessor) Sample ¶
func (p *NoopProcessor) Sample(sample *Sample, header *Header) error
Sample implements the SampleProcessor interface. It forwards the sample to the subsequent processor.
func (*NoopProcessor) Start ¶
func (p *NoopProcessor) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleProcessor interface. It creates an error-channel with a small channel buffer. Calling CloseSink() or Error() writes a value to that channel to signalize that this NoopProcessor is finished.
func (*NoopProcessor) String ¶
func (p *NoopProcessor) String() string
String implements the SampleProcessor interface. This should be overridden by types that are embedding NoopProcessor.
type ParallelSampleHandler ¶
type ParallelSampleHandler struct { // BufferedSamples is the number of Samples that are buffered between the // marshall/unmarshall routines and the routine that writes/reads the input // or output streams. // The purpose of the buffer is, for example, to allow the routine reading a file // to read the data for multiple Samples in one read operation, which then // allows the parallel parsing routines to parse all the read Samples at the same time. // Setting BufferedSamples is a trade-off between memory consumption and // parallelism, but most of the time a value of around 1000 or so should be enough. // If this value is not set, no parallelism will be possible because // the channel between the cooperating routines will block on each operation. BufferedSamples int // ParallelParsers can be set to the number of goroutines that will be // used when marshalling or unmarshalling samples. These routines can // parallelize the parsing and marshalling operations. The most benefit // from the parallelism comes when reading samples from e.g. files, because // reading the file into memory can be decoupled from parsing Samples, // and multiple Samples can be parsed at the same time. // // This must be set to a value greater than zero, otherwise no goroutines // will be started. ParallelParsers int }
ParallelSampleHandler is a configuration type that is included in SampleReader and SampleWriter. Both the reader and writer can marshall and unmarshall Samples in parallel, and these routines are controlled through the two parameters in ParallelSampleHandler.
type ProcessorTaskWrapper ¶
type ProcessorTaskWrapper struct {
SampleProcessor
}
ProcessorTaskWrapper can be used to convert an instance of SampleProcessor to a golib.Task. The Stop() method of the resulting Task is ignored.
func (*ProcessorTaskWrapper) Stop ¶
func (t *ProcessorTaskWrapper) Stop()
Stop implements the golib.Task interface. Calls to this Stop() method are ignored, because SampleProcessor instances should be shutdown through the Close() method.
type ReadSampleHandler ¶
type ReadSampleHandler interface { // HandleSample allows modifying received Samples. It can be used to modify // the tags of the Sample based on the source string. The source string depends on the // SampleSource that is using the SampleReader that contains this ReadSampleHandler. // In general it represents the data source of the sample. For FileSource this will be // the file name, for TCPSource it will be the remote TCP endpoint sending the data. // It might also be useful to change the values or the timestamp of the Sample here, // but that should rather be done in a later processing step. HandleSample(sample *Sample, source string) }
ReadSampleHandler defines a hook for modifying unmarshalled Samples.
type ReaderSource ¶
type ReaderSource struct { AbstractUnmarshallingSampleSource Input io.ReadCloser Description string // contains filtered or unexported fields }
ReaderSource implements the SampleSource interface by reading Headers and Samples from an arbitrary io.ReadCloser instance. An instance of SampleReader is used to read the data in parallel.
func NewConsoleSource ¶
func NewConsoleSource() *ReaderSource
NewConsoleSource creates a SampleSource that reads from the standard input.
func (*ReaderSource) Close ¶
func (source *ReaderSource) Close()
Close implements the SampleSource interface. It stops the underlying stream and prints any errors to the logger.
func (*ReaderSource) Start ¶
func (source *ReaderSource) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSource interface by starting a SampleInputStream instance that reads from the given io.ReadCloser.
func (*ReaderSource) String ¶
func (source *ReaderSource) String() string
String implements the SampleSource interface.
type ResizingBatchProcessingStep ¶
type ResizingBatchProcessingStep interface { BatchProcessingStep OutputSampleSize(sampleSize int) int }
type ResizingSampleProcessor ¶
type ResizingSampleProcessor interface { SampleProcessor OutputSampleSize(sampleSize int) int }
ResizingSampleProcessor is a helper interface that can be implemented by SampleProcessors in order to make RequiredValues() more reliable. The result of the OutputSampleSize() method should give a worst-case estimation of the number of values that will be present in Samples after this SampleProcessor is done processing a sample. This allows the optimization of pre-allocating a value array large enough to hold the final amount of metrics. The optimization works best when all samples are processed in a one-to-one fashion, i.e. no samples are split into multiple samples.
type Sample ¶
Sample contains an array of Values, a timestamp, and a string-to-string map of tags. The values are explained by the header belonging to this sample. There is no direct pointer from the sample to the header, so the header must be known from the context. In other words, the Sample should always be passed along with the header it belongs to. Without the header, the meaning of the Value array is not defined. The Values slice and the timestamp an be accessed and modified directly, but the tags map should only be manipulated through methods to ensure concurrency safe map operations. The Values and Tags should be modified by only one goroutine at a time.
func (*Sample) AddTagsFrom ¶
CopyTagsFrom adds the tags of the parameter sample to the set of tags already present in the receiving sample. Colliding tags are overwritten, but other existing tags are not deleted.
func (*Sample) Clone ¶
Clone returns a copy of the receiving sample. The metadata (timestamp and tags) is copied deeply, but values are referencing the old values. After using this, the old Sample should either not be used anymore, or the Values slice in the new Sample should be replaced by a new slice.
func (*Sample) CopyMetadataFrom ¶
CopyMetadataFrom copies the timestamp and tags from the argument Sample into the receiving Sample. All previous tags in the receiving Sample are discarded.
func (*Sample) DeepClone ¶
DeepClone returns a deep copy of the receiving sample, including the timestamp, tags and actual metric values.
func (*Sample) HasTag ¶
HasTag returns true if the receiving Sample includes a tag with the given name.
func (*Sample) Metadata ¶
func (sample *Sample) Metadata() *SampleMetadata
Metadata returns an instance of SampleMetadata containing the tags and timestamp of the receiving Sample.
func (*Sample) ParseTagString ¶
ParseTagString parses a string in the format produced by TagString(). The resulting tags and tag values directly replace the tags inside the receiving Sample. Old tags are discarded.
A non-nil error is returned if the format of the input string does not follow the defined format (see TagString).
This method is used on freshly created Samples by CsvMarshaller and BinaryMarshaller when unmarshalling Samples from the respective format.
func (*Sample) Resize ¶
Resize ensures that the Values slice of the sample has the given length. If possible, the current Values slice will be reused (shrinking or growing within the limits if its capacity). Otherwise, a new slice will be allocated, without copying any values. The result value will be true, if the current slice was reused, and false if a new slice was allocated.
func (*Sample) SetTag ¶
SetTag sets the tag of the given name to the given value in the receiving Sample.
func (*Sample) SortedTags ¶
func (sample *Sample) SortedTags() (res []KeyValuePair)
SortedTags returns a slice of key-value tag pairs, sorted by key
func (*Sample) Tag ¶
Tag returns the value of the given tag inside the receiving Sample. If the tag is not defined in the sample, an empty string is returned. HasTag can be used to find out if a tag is defined or not.
func (*Sample) TagString ¶
TagString returns a string representation of all the tags and tag values in the receiving Sample. This representation is used for marshalling by the CsvMarshaller and BinaryMarshaller. The format is a space-separated string of key-value pairs separated by '=' characters.
Example:
tag1=value1 tag2=value2
type SampleAndHeader ¶
SampleAndHeader is a convenience type combining pointers to a Sample and a Header.
type SampleHeaderIndex ¶ added in v0.0.2
type SampleHeaderIndex struct {
// contains filtered or unexported fields
}
SampleHeaderIndex builds a `map[string]int` index of the fields of a Header instance. The index is cached and only updated when the header changes. This allows efficient access to specific header fields.
func (*SampleHeaderIndex) GetSingle ¶ added in v0.0.2
func (index *SampleHeaderIndex) GetSingle(sample *Sample, field string) (Value, bool)
func (*SampleHeaderIndex) Update ¶ added in v0.0.2
func (index *SampleHeaderIndex) Update(header *Header)
type SampleInputStream ¶
type SampleInputStream struct {
// contains filtered or unexported fields
}
SampleInputStream represents one input stream of Headers and Samples that reads and parses data from one io.ReadCloser instance. A SampleInputStream can be created using SampleReader.Open or .OpenBuffered. The stream then has to be started using one of the Read* methods. The Read* method will block until the stream is finished. Reading and parsing the Samples will be done in parallel goroutines. The Read* methods behave differently in terms of printing errors. The stream can be closed forcefully using the Close method.
func (*SampleInputStream) Close ¶
func (stream *SampleInputStream) Close() error
Close closes the receiving SampleInputStream. Close should be called even if the Read* method, that started the stream, returns an error. Close() might return the same error as the Read* method.
func (*SampleInputStream) Format ¶
func (stream *SampleInputStream) Format() string
Format returns a string description of the unmarshalling format used by the receiving SampleInputStream. It returns "auto-detected", if no Unmarshaller is configured, and if the unmarshalling format was not yet detected automatically. After the unmarshalling format is detected, Format will return the correct format description.
func (*SampleInputStream) ReadNamedSamples ¶
func (stream *SampleInputStream) ReadNamedSamples(sourceName string) (err error)
ReadNamedSamples calls ReadSamples with the given source string, and prints some additional logging information. It is a convenience function for different implementations of SampleSource.
func (*SampleInputStream) ReadSamples ¶
func (stream *SampleInputStream) ReadSamples(source string) (int, error)
ReadSamples starts the receiving input stream and blocks until the stream is finished or closed by Close(). It returns the number of successfully received samples and a non-nil error, if any occurred while reading or parsing. The source string parameter will be forwarded to the ReadSampleHandler, if one is set in the SampleReader that created this SampleInputStream. The source string will be used for the HandleSample() method.
func (*SampleInputStream) ReadTcpSamples ¶
func (stream *SampleInputStream) ReadTcpSamples(conn io.ReadCloser, remote string, checkClosed func() bool)
ReadTcpSamples reads Samples from the given net.TCPConn and blocks until the connection is closed by the remote host, or Close() is called on the input stream. Any error is logged instead of being returned. The checkClosed() function parameter is used when a read error occurs: if it returns true, ReadTcpSamples assumes that the connection was closed by the local host, because of a call to Close() or some other external reason. If checkClosed() returns false, it is assumed that a network error or timeout caused the connection to be closed.
type SampleMetadata ¶
SampleMetadata is a helper type containing the timestamp and the tags of a Sample. It can be used to store samples in cases where the actual Values of the Sample are not relevant, or stored in a different location. The timestamp can be accessed directly, but the tags are hidden to ensure consistency. If a SampleMetadata instance is required with specific tags, a Sample with those tags should be created, and Metadata() should be called on that Sample to create the desired SampleMetadata instance.
func (*SampleMetadata) NewSample ¶
func (meta *SampleMetadata) NewSample(values []Value) *Sample
NewSample returns a new Sample instances containing the tags and timestamp defined in the receiving SampleMetadata instance and the Values given as argument. The metadata is copied deeply, so the resulting Sample can be modified independently of the receiving SampleMetadata instance.
type SampleOutputStream ¶
type SampleOutputStream struct {
// contains filtered or unexported fields
}
SampleOutputStream represents one open output stream that marshals and writes Headers and Samples in parallel. It is created by using SampleWriter.Open or SampleWriter.OpenBuffered. The Sample() method can be used to output data on this stream, and the Close() method must be called when no more Samples are expected. No more data can be written after calling Close().
func (*SampleOutputStream) Close ¶
func (stream *SampleOutputStream) Close() error
Close closes the receiving SampleOutputStream. After calling this, neither Sample nor Header can be called anymore! The returned error is the first error that ever occurred in any of the Sample/Header/Close calls on this stream.
func (*SampleOutputStream) Sample ¶
func (stream *SampleOutputStream) Sample(sample *Sample, header *Header) error
Sample marshals the given Sample and writes the resulting byte buffer into the writer behind the stream receiver. If a non-nil error is returned here, the stream should not be used any further, but still must be closed externally.
type SamplePipeline ¶
type SamplePipeline struct { Source SampleSource Processors []SampleProcessor // contains filtered or unexported fields }
SamplePipeline reads data from a source and pipes it through zero or more SampleProcessor instances. The job of the SamplePipeline is to connect all the processing steps in the Construct method. After calling Construct, the SamplePipeline should not used any further.
func (*SamplePipeline) Add ¶
func (p *SamplePipeline) Add(processor SampleProcessor) *SamplePipeline
Add adds the SampleProcessor parameter to the list of SampleProcessors in the receiving SamplePipeline. The Source field must be accessed directly. The Processors field can also be accessed directly, but the Add method allows chaining multiple Add invocations like so:
pipeline.Add(processor1).Add(processor2)
func (*SamplePipeline) Batch ¶
func (p *SamplePipeline) Batch(steps ...BatchProcessingStep) *SamplePipeline
func (*SamplePipeline) Construct ¶
func (p *SamplePipeline) Construct(tasks *golib.TaskGroup)
Construct connects the SampleSource and all SampleProcessors. It adds small wrapping golib.StoppableTask instances to the given golib.TaskGroup. Afterwards, tasks.WaitAndStop() can be called to start the entire pipeline. If the Source field is missing, it will be replaced with a new EmptySampleSource instance. nil values in the Processors field will be ignored. A new instance of DroppingSampleProcessor is added to the list of Processors to ensure that every step has a valid subsequent step.
Additionally, all SampleProcessor instances will be wrapped in small wrapper objects that ensure that the samples and headers forwarded between the processors are consistent.
func (*SamplePipeline) ContainedStringers ¶
func (p *SamplePipeline) ContainedStringers() []fmt.Stringer
func (*SamplePipeline) FormatLines ¶
func (p *SamplePipeline) FormatLines() []string
func (*SamplePipeline) StartAndWait ¶
func (p *SamplePipeline) StartAndWait(extraTasks ...golib.Task) int
StartAndWait constructs the pipeline and starts it. It blocks until the pipeline is finished. The Sink and Source fields must be set to non-nil values, for example using Configure* methods or setting the fields directly.
The sequence of operations to start a SamplePipeline should roughly follow the following example:
// ... Define additional flags using the "flag" package (Optional) var p sample.SamplePipeline var f EndpointFactory f.RegisterFlags() flag.Parse() // ... Modify f.Flag* values (Optional) defer golib.ProfileCpu()() // (Optional) // ... Set p.Processors (Optional, e.g. using f.CreateSink()) // ... Set p.Source using f.CreateSource() os.Exit(p.StartAndWait()) // os.Exit() should be called in an outer method if 'defer' is used here
An additional golib.Task is started along with the pipeline, which listens for the Ctrl-C user external interrupt and makes the pipeline stoppable cleanly by the user.
StartAndWait returns the number of errors that occurred in the pipeline.
func (*SamplePipeline) String ¶
func (p *SamplePipeline) String() string
type SampleProcessor ¶
type SampleProcessor interface { SampleSource SampleSink }
SampleProcessor is the basic interface to receive and process samples. It receives Samples through the Sample method and sends samples to the subsequent SampleProcessor configured over SetSink. The forwarded Samples can be the same as received, completely new generated samples, and also a different number of Samples from the incoming ones. The Header can also be changed, but then the SampleProcessor implementation must take care to adjust the outgoing Samples accordingly. All required goroutines must be started in Start() and stopped when Close() is called. When Start() is called, it can be assumed that SetSink() has already been called to configure a non-nil subsequent SampleProcessor. As a special case, some SampleProcessor implementations output samples to external sinks like files or network connections. In this case, the incoming samples should usually be forwarded to the subsequent SampleProcessor without changes.
type SampleReader ¶
type SampleReader struct { ParallelSampleHandler // Handler is an optional hook for modifying Headers and Samples that were // read by this SampleReader. The hook method receives a string-representation of // the data source and can use it to modify tags in the Samples. Handler ReadSampleHandler // Unmarshaller will be used when reading and parsing Headers and Samples. // If this field is nil when creating an input stream, the SampleInputStream will try // to automatically determine the format of the incoming data and create // a fitting Unmarshaller instance accordingly. Unmarshaller Unmarshaller }
SampleReader is used to read Headers and Samples from an io.Reader, parallelizing the reading and parsing procedures. The parallelization must be configured through the ParallelSampleHandler parameters before starting this SampleReader.
func (*SampleReader) Format ¶
func (r *SampleReader) Format() string
Format returns a string description of the unmarshalling format used by the receiving SampleReader. It returns "auto-detected", if no Unmarshaller is configured.
func (*SampleReader) Open ¶
func (r *SampleReader) Open(input io.ReadCloser, sink SampleSink) *SampleInputStream
Open creates an input stream reading from the given io.ReadCloser and writing the received Headers and Samples to the given SampleSink. Although no buffer size is given, the stream will actually have a small input buffer to enable automatically detecting the format of incoming data, if no Unmarshaller was configured in the receiving SampleReader.
func (*SampleReader) OpenBuffered ¶
func (r *SampleReader) OpenBuffered(input io.ReadCloser, sink SampleSink, bufSize int) *SampleInputStream
OpenBuffered creates an input stream with a given buffer size. The buffer size should be at least MinimumInputIoBuffer bytes to support automatically discovering the input stream format. See Open() for more details.
type SampleRing ¶
type SampleRing struct {
// contains filtered or unexported fields
}
SampleRing is a one-way circular queue of Sample instances. There is no dequeue operation. The stored samples can be copied into a correctly ordered slice.
func NewSampleRing ¶
func NewSampleRing(capacity int) *SampleRing
func (*SampleRing) Get ¶
func (r *SampleRing) Get() []*SampleAndHeader
func (*SampleRing) IsFull ¶
func (r *SampleRing) IsFull() bool
func (*SampleRing) Len ¶
func (r *SampleRing) Len() int
func (*SampleRing) Push ¶
func (r *SampleRing) Push(sample *Sample, header *Header) *SampleRing
func (*SampleRing) PushSampleAndHeader ¶
func (r *SampleRing) PushSampleAndHeader(sample *SampleAndHeader) *SampleRing
type SampleSink ¶
A SampleSink receives samples and headers to do arbitrary operations on them. The usual interface for this is SampleProcessor, but sometimes this simpler interface is useful.
type SampleSource ¶
type SampleSource interface { golib.Startable String() string SetSink(sink SampleProcessor) GetSink() SampleProcessor Close() }
SampleSource is the interface used for producing Headers and Samples. It should start producing samples in a separate goroutine when Start() is called, and should stop all goroutines when Close() is called. Before Start() is called, SetSink() must be called to inform the SampleSource about the SampleSink it should output the Headers/Samples into. After all samples have been generated (for example because the data source is finished, like a file, or because Close() has been called) the Close() method must be called on the outgoing SampleProcessor. After calling Close(), no more headers or samples are allowed to go into the SampleProcessor. See the golib.Task interface for info about the Start() method.
type SampleWriter ¶
type SampleWriter struct {
ParallelSampleHandler
}
SampleWriter implements parallel writing of Headers and Samples to an instance of io.WriteCloser. The WriteCloser can be anything like a file or a network connection. The parallel writing must be configured before using the SampleWriter. See ParallelSampleHandler for the configuration variables.
SampleWriter instances are mainly used by implementations of SampleOutput that write to output streams, like FileSink or TCPSink.
func (*SampleWriter) Open ¶
func (w *SampleWriter) Open(writer io.WriteCloser, marshaller Marshaller) *SampleOutputStream
Open returns an output stream that sends the marshalled samples directly to the given writer. Marshalling and writing is done in separate routines, as configured in the SampleWriter configuration parameters.
func (*SampleWriter) OpenBuffered ¶
func (w *SampleWriter) OpenBuffered(writer io.WriteCloser, marshaller Marshaller, io_buffer int) *SampleOutputStream
OpenBuffered returns a buffered output stream with a buffer of the size io_buffer. Samples coming into that stream are marshalled using marshaller and finally written the given writer.
type SimpleBatchProcessingStep ¶
type SimpleBatchProcessingStep struct { Description string Process func(header *Header, samples []*Sample) (*Header, []*Sample, error) OutputSampleSizeFunc func(sampleSize int) int }
func (*SimpleBatchProcessingStep) OutputSampleSize ¶
func (s *SimpleBatchProcessingStep) OutputSampleSize(sampleSize int) int
func (*SimpleBatchProcessingStep) ProcessBatch ¶
func (*SimpleBatchProcessingStep) String ¶
func (s *SimpleBatchProcessingStep) String() string
type SimpleProcessor ¶
type SimpleProcessor struct { NoopProcessor Description string Process func(sample *Sample, header *Header) (*Sample, *Header, error) OnClose func() OutputSampleSizeFunc func(sampleSize int) int }
func (*SimpleProcessor) Close ¶
func (p *SimpleProcessor) Close()
func (*SimpleProcessor) OutputSampleSize ¶
func (p *SimpleProcessor) OutputSampleSize(sampleSize int) int
func (*SimpleProcessor) Sample ¶
func (p *SimpleProcessor) Sample(sample *Sample, header *Header) error
func (*SimpleProcessor) String ¶
func (p *SimpleProcessor) String() string
type SortedStringPairs ¶
func (*SortedStringPairs) FillFromMap ¶
func (s *SortedStringPairs) FillFromMap(values map[string]string)
func (*SortedStringPairs) Len ¶
func (s *SortedStringPairs) Len() int
func (*SortedStringPairs) Less ¶
func (s *SortedStringPairs) Less(i, j int) bool
func (*SortedStringPairs) String ¶
func (s *SortedStringPairs) String() string
func (*SortedStringPairs) Swap ¶
func (s *SortedStringPairs) Swap(i, j int)
type SortedStringers ¶
func (SortedStringers) Len ¶
func (t SortedStringers) Len() int
func (SortedStringers) Less ¶
func (t SortedStringers) Less(a, b int) bool
func (SortedStringers) Swap ¶
func (t SortedStringers) Swap(a, b int)
type SourceTaskWrapper ¶
type SourceTaskWrapper struct {
SampleSource
}
SourceTaskWrapper can be used to convert an instance of SampleSource to a golib.Task. Calls to the Stop() method are mapped to the Close() method of the underlying SampleSource.
func (*SourceTaskWrapper) Stop ¶
func (t *SourceTaskWrapper) Stop()
type StringerContainer ¶
type SynchronizedReadCloser ¶
type SynchronizedReadCloser struct { ReadCloser io.ReadCloser // contains filtered or unexported fields }
SynchronizedReadCloser is a helper type to wrap *os.File and synchronize calls to Read() and Close(). This prevents race condition warnings from the Go race detector due to parallel access to the fd field of the internal os.file type. The performance overhead is not measurable, but this can be deactivated by setting the UnsynchronizedFileAccess flag in FileSource.
func (*SynchronizedReadCloser) Close ¶
func (s *SynchronizedReadCloser) Close() error
type SynchronizingSampleSink ¶
type SynchronizingSampleSink struct { Out SampleSink // contains filtered or unexported fields }
SynchronizingSampleSink is a SampleSink implementation that allows multiple goroutines to write data to the same sink and synchronizes these writes through a mutex.
type TCPConnCounter ¶
type TCPConnCounter struct { // TcpConnLimit defines a limit for the number of TCP connections that should be accepted // or initiated. When this is <= 0, the number of not limited. TcpConnLimit uint // contains filtered or unexported fields }
TCPConnCounter contains the TcpConnLimit configuration parameter that optionally defines a limit for the number of TCP connection that are accepted or initiated by the SampleSink and SampleSource implementations using TCP connections.
type TCPListenerSink ¶
type TCPListenerSink struct { // AbstractTcpSink defines parameters for controlling TCP and marshalling // aspects of the TCPListenerSink. See AbstractTcpSink for details. AbstractTcpSink // Endpoint defines the TCP host and port to listen on for incoming TCP connections. // The host can be empty (e.g. ":1234"). If not, it must contain a hostname or IP of the // local host. Endpoint string // If BufferedSamples is >0, the given number of samples will be kept in a ring buffer. // New incoming connections will first receive all samples currently in the buffer, and will // afterwards continue receiving live incoming samples. BufferedSamples uint // contains filtered or unexported fields }
TCPListenerSink implements the SampleSink interface through a TCP server. It creates a socket listening on a local TCP endpoint and listens for incoming TCP connections. Once one or more connections are established, it forwards all incoming Headers and Samples to those connections. If a new header should be sent into a TCP connection, the old connection is instead closed and the TCPListenerSink waits for a new connection to be created.
func (*TCPListenerSink) Close ¶
func (sink *TCPListenerSink) Close()
Close implements the SampleSink interface. It closes any existing connection and closes the TCP socket.
func (*TCPListenerSink) Sample ¶
func (sink *TCPListenerSink) Sample(sample *Sample, header *Header) error
Sample implements the SampleSink interface. It stores the sample in a ring buffer and sends it to all established connections. New connections will first receive all samples stored in the buffer, before getting the live samples directly. If the buffer is disable or full, and there are no established connections, samples are dropped.
func (*TCPListenerSink) Start ¶
func (sink *TCPListenerSink) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSink interface. It creates the TCP socket and starts listening on it in a separate goroutine. Any incoming connection is then handled in their own goroutine.
func (*TCPListenerSink) String ¶
func (sink *TCPListenerSink) String() string
String implements the SampleSink interface.
type TCPListenerSource ¶
type TCPListenerSource struct { AbstractUnmarshallingSampleSource // TCPConnCounter has a configuration for limiting the total number of // accepted connections. After that number of connections were accepted, no // further connections are accepted. After they all are closed, the TCPListenerSource // automatically stops. TCPConnCounter // SimultaneousConnections can limit the number of TCP connections accepted // at the same time. Set to >0 to activate the limit. Connections going over // the limit will be immediately closed, and a warning will be printed on the logger. SimultaneousConnections uint // contains filtered or unexported fields }
TCPListenerSource implements the SampleSource interface as a TCP server. It listens for incoming TCP connections on a port and reads Headers and Samples from every accepted connection. See the doc for the different fields for options affecting the TCP connections and aspects of reading and parsing.
func NewTcpListenerSource ¶
func NewTcpListenerSource(endpoint string) *TCPListenerSource
NewTcpListenerSource creates a new instance of TCPListenerSource listening on the given TCP endpoint. It must be a IP/hostname combined with a port that can be bound on the local machine.
func (*TCPListenerSource) Close ¶
func (source *TCPListenerSource) Close()
Stop implements the SampleSource interface. It closes all active TCP connections and closes the listening socket.
func (*TCPListenerSource) Start ¶
func (source *TCPListenerSource) Start(wg *sync.WaitGroup) golib.StopChan
Start implements the SampleSource interface. It creates a socket listening for incoming connections on the configured endpoint. New connections are handled in separate goroutines.
func (*TCPListenerSource) String ¶
func (source *TCPListenerSource) String() string
String implements the SampleSource interface.
type TCPSink ¶
type TCPSink struct { // AbstractTcpSink contains different configuration options regarding the // marshalling and writing of data to the remote TCP connection. AbstractTcpSink // Endpoint is the target TCP endpoint to connect to for sending marshalled data. Endpoint string // DialTimeout can be set to time out automatically when connecting to a remote TCP endpoint DialTimeout time.Duration // contains filtered or unexported fields }
TCPSink implements SampleSink by sending the received Headers and Samples to a given remote TCP endpoint. Every time it receives a Header or a Sample, it checks whether a TCP connection is already established. If so, it sends the data on the existing connection. Otherwise, it tries to connect to the configured endpoint and sends the data there, if the connection is successful.
func (*TCPSink) Close ¶
func (sink *TCPSink) Close()
Close implements the SampleSink interface. It stops the current TCP connection, if one is running, and prevents future connections from being created. No more data can be sent into the TCPSink after this.
func (*TCPSink) Sample ¶
Sample implements the SampleSink interface. If a connection is already established, the Sample is directly sent through it. Otherwise, a new connection is established, and the sample is sent there.
type TCPSource ¶
type TCPSource struct { AbstractUnmarshallingSampleSource TCPConnCounter // RemoteAddrs defines the list of remote TCP endpoints that the TCPSource will try to // connect to. If there are more than one connection, all connections will run in parallel. // In that case, an additional instance of SynchronizedSampleSink is used to synchronize all // received data. For multiple connections, all samples and headers will be pushed into the // outgoing SampleSink in an interleaved fashion, so the outgoing SampleSink must be able to handle that. RemoteAddrs []string // PrintErrors controls whether errors from establishing download TCP connections are logged or not. PrintErrors bool // RetryInterval defines the time to wait before trying to reconnect after a closed connection // or failed connection attempt. RetryInterval time.Duration // DialTimeout can be set to time out automatically when connecting to a remote TCP endpoint DialTimeout time.Duration // UseHTTP instructs this data source to use the HTTP protocol instead of TCP. In this case, the RemoteAddrs // strings are treated as HTTP URLs, but without the http:// prefix. This prefix is appended before attempting to // send an HTTP request. UseHTTP bool // contains filtered or unexported fields }
TCPSource implements the SampleSource interface by connecting to a list of remote TCP endpoints and downloading Header and Sample data from there. A background goroutine continuously tries to establish the required TCP connections and reads data from it whenever a connection succeeds. The contained AbstractUnmarshallingSampleSource and TCPConnCounter fields provide various parameters for configuring different aspects of the TCP connections and reading of data from them.
func (*TCPSource) Close ¶
func (source *TCPSource) Close()
Close implements the SampleSource interface. It stops all background goroutines and tries to gracefully close all established TCP connections.
func (*TCPSource) SourceString ¶
SourceString returns a string representation of the TCP endpoints the TCPSource will download data from.
type TagTemplate ¶
type TagTemplate struct { Template string // Placeholders like ${xxx} will be replaced by tag values. Values matching ENV_* will be replaced by the environment variable. MissingValue string // Replacement for missing values IgnoreEnvVars bool // Set to true to not treat ENV_ replacement templates specially }
func (TagTemplate) Resolve ¶
func (t TagTemplate) Resolve(sample *Sample) string
type TcpWriteConn ¶
type TcpWriteConn struct {
// contains filtered or unexported fields
}
TcpWriteConn is a helper type for TCP-base SampleSink implementations. It can send Headers and Samples over an opened TCP connection. It is created from AbstractTcpSink.OpenWriteConn() and can be used until Sample() returns an error or Close() is called explicitly.
func (*TcpWriteConn) Close ¶
func (conn *TcpWriteConn) Close()
Close explicitly closes the underlying TCP connection of the receiving TcpWriteConn.
func (*TcpWriteConn) IsRunning ¶
func (conn *TcpWriteConn) IsRunning() bool
IsRunning returns true, if the receiving TcpWriteConn is connected to a remote TCP endpoint.
func (*TcpWriteConn) Sample ¶
func (conn *TcpWriteConn) Sample(sample *Sample, header *Header)
Sample writes the given sample into the receiving TcpWriteConn and closes the underlying TCP connection if there is an error.
type TextMarshaller ¶
type TextMarshaller struct { // TextWidths sets the width of the header line and value table. // If Columns > 0, this value is ignored as the width is determined by the // number of columns. If this is 0, the width will be determined automatically: // If the output is a TTY (or if AssumeStdout is true), the width of the terminal // will be used. If it cannot be obtained, golib.GetTerminalSize() will return // a default value. TextWidth int // Columns can be set to > 0 to override TextWidth and set a fixed number of // columns in the table. Otherwise it will be computed automatically based // on TextWidth. Columns int // Set additional spacing between the columns of the output table. If <= 0, the // default value TextMarshallerDefaultSpacing will be used. Spacing int // If true, assume the output is a TTY and try to obtain the TextWidth from // the operating system. AssumeStdout bool }
TextMarshaller marshals Headers and Samples to a human readable test format. It is mainly intended for easily readable output on the console. Headers are not printed separately. Every Sample is preceded by a header line containing the timestamp and tags. Afterwards, all values are printed in a aligned table in a key = value format. The width of the header line, the number of columns in the table, and the spacing between the columns in the table can be configured.
func (TextMarshaller) String ¶
func (TextMarshaller) String() string
String implements the Marshaller interface.
func (TextMarshaller) WriteHeader ¶
WriteHeader implements the Marshaller interface. It is empty, because TextMarshaller prints a separate header for each Sample.
func (TextMarshaller) WriteSample ¶
func (m TextMarshaller) WriteSample(sample *Sample, header *Header, withTags bool, writer io.Writer) error
WriteSample implements the Marshaller interface. See the TextMarshaller godoc for information about the format.
type TitledSamplePipeline ¶
type TitledSamplePipeline struct { *SamplePipeline Title string }
func (*TitledSamplePipeline) String ¶
func (t *TitledSamplePipeline) String() string
type UnmarshalledHeader ¶
UnmarshalledHeader extends a Header by adding a flag that indicated whether the unmarshalled samples will contain tags or not. This enables backwards-compatibility for data input without tags.
type Unmarshaller ¶
type Unmarshaller interface { // String returns a short description of the Unmarshaller. String() string // Read must inspect the data in the stream and perform exactly one of two tasks: // read a header, or read a sample. // The Unmarshaller must be able to distinguish between a header and a sample based // on the first bytes received from the stream. If the previousHeader parameter is nil, // the Unmarshaller must attempt to receive a header, regardless of the stream contents. // // If a header is read, it is also parsed and a Header instance is allocated. // A pointer to the new header is returned, the sampleData byte-slice must be returned as nil. // // If sample data is read, Read must read data from the stream, until a full Sample has been read. // The sample data is not parsed, the ParseSample() method will be invoked separately. // The size of the Sample should be known based on the previousHeader parameter. // If sample data is read, is must be returned as the sampleData return value, and the Header pointer // must be returned as nil. // // Error handling: // The io.EOF error can be returned in two cases: 1) the read operation was successful and complete, // but the stream ended immediately afterwards, or 2) the stream was already empty. In the second // case, both other return values must be nil. // If io.EOF occurs in the middle of reading the stream, it must be converted to io.ErrUnexpectedEOF // to indicate an actual error condition. Read(input *bufio.Reader, previousHeader *UnmarshalledHeader) (newHeader *UnmarshalledHeader, sampleData []byte, err error) // ParseSample uses a header and a byte buffer to parse it to a newly // allocated Sample instance. The resulting Sample must have a Value slice with at least the capacity // of minValueCapacity. A non-nil error indicates that the data was in the wrong format. ParseSample(header *UnmarshalledHeader, minValueCapacity int, data []byte) (*Sample, error) }
Unmarshaller is an interface for reading Samples and Headers from byte streams. The byte streams can be anything including files, network connections, console output, or in-memory byte buffers. Reading is split into three parts: reading the header, receiving the bytes for a sample, and parsing those bytes into the actual sample. This separation is done for optimization purpose, to enable parallel parsing of samples by separating the data reading part from the parsing part. One goroutine can continuously call ReadSampleData(), while multiple other routines execute ParseSample() in parallel.
func DetectFormatFrom ¶
func DetectFormatFrom(start string) (Unmarshaller, error)
DetectFormatFrom uses the start of a marshalled header to determine what unmarshaller should be used to decode the header and all following samples.
type UnmarshallingSampleSource ¶
type UnmarshallingSampleSource interface { SampleSource SetSampleHandler(handler ReadSampleHandler) }
UnmarshallingSampleSource extends SampleSource and adds a configuration setter that gives access to the samples that are read by this data source.
type Value ¶
type Value float64
Value is a type alias for float64 and defines the type for metric values.
type WriteCascade ¶
type WriteCascade struct { // Writer must be set before calling Write. It will receive the Write calls. Writer io.Writer // Err stores the error that occurred in one of the write calls. Err error }
WriteCascade is a helper type for more concise Write code by avoiding error checks on every Write() invocation. Multiple Write calls can be cascaded without intermediate checks for errors. The trade-off/overhead are additional no-op Write()/WriteStr() calls after an error has occurred (which is the exception).
func (*WriteCascade) Write ¶
func (w *WriteCascade) Write(bytes []byte) error
Write forwards the call to the contained Writer, but only of no error has been encountered yet. If an error occurs, it is stored in the Error field.
func (*WriteCascade) WriteAny ¶
func (w *WriteCascade) WriteAny(i interface{}) error
WriteAny uses the fmt package to format he given object directly into the underlying writer. The write is only executed, if previous writes have been successful.
func (*WriteCascade) WriteByte ¶
func (w *WriteCascade) WriteByte(b byte) error
WriteByte calls Write with the single parameter byte.
func (*WriteCascade) WriteStr ¶
func (w *WriteCascade) WriteStr(str string) error
WriteStr calls Write with a []byte representation of the string parameter.
type WriterSink ¶
type WriterSink struct { AbstractMarshallingSampleOutput Output io.WriteCloser Description string // contains filtered or unexported fields }
WriterSink implements SampleSink by writing all Headers and Samples to a single io.WriteCloser instance. An instance of SampleWriter is used to write the data in parallel.
func NewConsoleSink ¶
func NewConsoleSink() *WriterSink
NewConsoleSink creates a SampleSink that writes to the standard output.
func (*WriterSink) Close ¶
func (sink *WriterSink) Close()
Close implements the SampleSink interface. It flushes the remaining data to the underlying io.WriteCloser and closes it.
func (*WriterSink) Sample ¶
func (sink *WriterSink) Sample(sample *Sample, header *Header) error
Header implements the SampleSink interface by using a SampleOutputStream to write the given Sample to the configured io.WriteCloser.
func (*WriterSink) Start ¶
func (sink *WriterSink) Start(wg *sync.WaitGroup) (_ golib.StopChan)
Start implements the SampleSink interface. No additional goroutines are spawned, only a log message is printed.
func (*WriterSink) String ¶
func (sink *WriterSink) String() string
String implements the SampleSink interface.
Source Files ¶
- endpoints.go
- marshall.go
- marshall_binary.go
- marshall_csv.go
- marshall_text.go
- pipeline.go
- printing.go
- sample.go
- sample_batch.go
- sample_processor.go
- sample_source.go
- transport.go
- transport_console.go
- transport_console_box.go
- transport_console_box_flags.go
- transport_file.go
- transport_http.go
- transport_read.go
- transport_server.go
- transport_tcp.go
- transport_write.go