pipeline

package
v0.0.0-kmdagger2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2023 License: AGPL-3.0 Imports: 41 Imported by: 0

Documentation

Overview

Package pipeline is a generated GoMock package.

Index

Constants

View Source
const ConverterTypeInfluxAuto = "influxAuto"
View Source
const ConverterTypeJsonAuto = "jsonAuto"
View Source
const ConverterTypeJsonFrame = "jsonFrame"
View Source
const DataOutputTypeBuiltin = "builtin"
View Source
const DataOutputTypeLocalSubscribers = "localSubscribers"
View Source
const DataOutputTypeLoki = "loki"
View Source
const DataOutputTypeRedirect = "redirect"
View Source
const FrameConditionCheckerTypeMultiple = "multiple"
View Source
const FrameConditionCheckerTypeNumberCompare = "numberCompare"
View Source
const FrameOutputTypeChangeLog = "changeLog"
View Source
const FrameOutputTypeConditional = "conditional"
View Source
const FrameOutputTypeLocalSubscribers = "localSubscribers"
View Source
const FrameOutputTypeLoki = "loki"
View Source
const FrameOutputTypeManagedStream = "managedStream"
View Source
const FrameOutputTypeMultiple = "multiple"
View Source
const FrameOutputTypeRedirect = "redirect"
View Source
const FrameOutputTypeRemoteWrite = "remoteWrite"
View Source
const FrameOutputTypeThreshold = "threshold"
View Source
const FrameProcessorTypeDropFields = "dropFields"
View Source
const FrameProcessorTypeKeepFields = "keepFields"
View Source
const FrameProcessorTypeMultiple = "multiple"
View Source
const SubscriberTypeBuiltin = "builtin"
View Source
const SubscriberTypeManagedStream = "managedStream"
View Source
const SubscriberTypeMultiple = "multiple"

Variables

View Source
var ConvertersRegistry = []EntityInfo{
	{
		Type:        ConverterTypeJsonAuto,
		Description: "automatic recursive JSON to Frame conversion",
	},
	{
		Type:        ConverterTypeInfluxAuto,
		Description: "accept influx line protocol",
		Example: AutoInfluxConverterConfig{
			FrameFormat: "labels_column",
		},
	},
	{
		Type:        ConverterTypeJsonFrame,
		Description: "JSON-encoded Grafana data frame",
	},
}
View Source
var DataOutputsRegistry = []EntityInfo{
	{
		Type:        DataOutputTypeBuiltin,
		Description: "use builtin publish handler",
	},
	{
		Type:        DataOutputTypeRedirect,
		Description: "redirect data processing to another channel rule",
	},
	{
		Type:        DataOutputTypeLoki,
		Description: "output data to Loki as logs",
	},
}
View Source
var FrameOutputsRegistry = []EntityInfo{
	{
		Type:        FrameOutputTypeManagedStream,
		Description: "only send schema when structure changes (note this also requires a matching subscriber)",
		Example:     ManagedStreamOutputConfig{},
	},
	{
		Type:        FrameOutputTypeConditional,
		Description: "send to an output depending on frame values",
		Example:     ConditionalOutputConfig{},
	},
	{
		Type:        FrameOutputTypeRedirect,
		Description: "redirect for processing by another channel rule",
	},
	{
		Type:        FrameOutputTypeThreshold,
		Description: "output field threshold boundaries cross into new channel",
	},
	{
		Type:        FrameOutputTypeChangeLog,
		Description: "output field changes into new channel",
	},
	{
		Type:        FrameOutputTypeRemoteWrite,
		Description: "output to remote write endpoint",
	},
	{
		Type:        FrameOutputTypeLoki,
		Description: "output frame as JSON to Loki",
	},
}
View Source
var FrameProcessorsRegistry = []EntityInfo{
	{
		Type:        FrameProcessorTypeKeepFields,
		Description: "list the fields that should stay",
		Example:     KeepFieldsFrameProcessorConfig{},
	},
	{
		Type:        FrameProcessorTypeDropFields,
		Description: "list the fields that should be removed",
		Example:     DropFieldsFrameProcessorConfig{},
	},
}
View Source
var SubscribersRegistry = []EntityInfo{
	{
		Type:        SubscriberTypeBuiltin,
		Description: "apply builtin feature subscribe logic",
	},
	{
		Type:        SubscriberTypeManagedStream,
		Description: "apply managed stream subscribe logic",
	},
}

Functions

This section is empty.

Types

type AutoInfluxConverter

type AutoInfluxConverter struct {
	// contains filtered or unexported fields
}

AutoInfluxConverter decodes Influx line protocol input and transforms it to several ChannelFrame objects where Channel is constructed from original channel + / + <metric_name>.

func NewAutoInfluxConverter

func NewAutoInfluxConverter(config AutoInfluxConverterConfig) *AutoInfluxConverter

NewAutoInfluxConverter creates new AutoInfluxConverter.

func (*AutoInfluxConverter) Convert

func (c *AutoInfluxConverter) Convert(_ context.Context, vars Vars, body []byte) ([]*ChannelFrame, error)

func (*AutoInfluxConverter) Type

func (c *AutoInfluxConverter) Type() string

type AutoInfluxConverterConfig

type AutoInfluxConverterConfig struct {
	FrameFormat string `json:"frameFormat"`
}

AutoInfluxConverterConfig ...

type AutoJsonConverter

type AutoJsonConverter struct {
	// contains filtered or unexported fields
}

func (*AutoJsonConverter) Convert

func (c *AutoJsonConverter) Convert(_ context.Context, vars Vars, body []byte) ([]*ChannelFrame, error)

Automatic conversion works this way: * Time added automatically * Nulls dropped To preserve nulls we need FieldTips from a user. Custom time can be injected on FrameProcessor stage theoretically. Custom labels can be injected on FrameProcessor stage theoretically.

func (*AutoJsonConverter) Type

func (c *AutoJsonConverter) Type() string

type AutoJsonConverterConfig

type AutoJsonConverterConfig struct {
	FieldTips map[string]Field `json:"fieldTips,omitempty"`
}

type BasicAuth

type BasicAuth struct {
	// User is a user for remote write request.
	User string `json:"user,omitempty"`
	// Password is a plain text non-encrypted password.
	// TODO: remove after integrating with the database.
	Password string `json:"password,omitempty"`
}

type BuiltinDataOutput

type BuiltinDataOutput struct {
	// contains filtered or unexported fields
}

func NewBuiltinDataOutput

func NewBuiltinDataOutput(channelHandlerGetter ChannelHandlerGetter) *BuiltinDataOutput

func (*BuiltinDataOutput) OutputData

func (s *BuiltinDataOutput) OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error)

func (*BuiltinDataOutput) Type

func (s *BuiltinDataOutput) Type() string

type BuiltinSubscriber

type BuiltinSubscriber struct {
	// contains filtered or unexported fields
}

func NewBuiltinSubscriber

func NewBuiltinSubscriber(channelHandlerGetter ChannelHandlerGetter) *BuiltinSubscriber

func (*BuiltinSubscriber) Subscribe

func (*BuiltinSubscriber) Type

func (s *BuiltinSubscriber) Type() string

type CacheSegmentedTree

type CacheSegmentedTree struct {
	// contains filtered or unexported fields
}

CacheSegmentedTree provides a fast access to channel rule configuration.

func NewCacheSegmentedTree

func NewCacheSegmentedTree(storage RuleBuilder) *CacheSegmentedTree

func (*CacheSegmentedTree) Get

func (s *CacheSegmentedTree) Get(orgID int64, channel string) (*LiveChannelRule, bool, error)

type ChangeLogFrameOutput

type ChangeLogFrameOutput struct {
	// contains filtered or unexported fields
}

ChangeLogFrameOutput can monitor value changes of the specified field and output special change frame to the configured channel.

func NewChangeLogFrameOutput

func NewChangeLogFrameOutput(frameStorage FrameGetSetter, config ChangeLogOutputConfig) *ChangeLogFrameOutput

func (*ChangeLogFrameOutput) OutputFrame

func (out *ChangeLogFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*ChangeLogFrameOutput) Type

func (out *ChangeLogFrameOutput) Type() string

type ChangeLogOutputConfig

type ChangeLogOutputConfig struct {
	FieldName string `json:"fieldName"`
	Channel   string `json:"channel"`
}

type ChannelAuthCheckConfig

type ChannelAuthCheckConfig struct {
	RequireRole org.RoleType `json:"role,omitempty"`
}

ChannelAuthCheckConfig is used to define auth rules for a channel.

type ChannelAuthConfig

type ChannelAuthConfig struct {
	// By default anyone can subscribe.
	Subscribe *ChannelAuthCheckConfig `json:"subscribe,omitempty"`

	// By default HTTP and WS require admin permissions to publish.
	Publish *ChannelAuthCheckConfig `json:"publish,omitempty"`
}

type ChannelData

type ChannelData struct {
	Channel string
	Data    []byte
}

ChannelData is a wrapper over raw data with additional channel information. Channel is used for rule routing, if the channel is empty then data processing stops. If channel is not empty then data processing will be redirected to a corresponding channel rule.

type ChannelFrame

type ChannelFrame struct {
	Channel string      `json:"channel"`
	Frame   *data.Frame `json:"frame"`
}

ChannelFrame is a wrapper over data.Frame with additional channel information. Channel is used for rule routing, if the channel is empty then frame processing will try to take current rule FrameProcessor and FrameOutputter. If channel is not empty then frame processing will be redirected to a corresponding channel rule.

type ChannelHandlerGetter

type ChannelHandlerGetter interface {
	GetChannelHandler(ctx context.Context, user *user.SignedInUser, channel string) (model.ChannelHandler, live.Channel, error)
}

type ChannelRule

type ChannelRule struct {
	OrgId    int64               `json:"-"`
	Pattern  string              `json:"pattern"`
	Settings ChannelRuleSettings `json:"settings"`
}

func (ChannelRule) Valid

func (r ChannelRule) Valid() (bool, string)

type ChannelRuleCreateCmd

type ChannelRuleCreateCmd struct {
	Pattern  string              `json:"pattern"`
	Settings ChannelRuleSettings `json:"settings"`
}

type ChannelRuleDeleteCmd

type ChannelRuleDeleteCmd struct {
	Pattern string `json:"pattern"`
}

type ChannelRuleGetter

type ChannelRuleGetter interface {
	Get(orgID int64, channel string) (*LiveChannelRule, bool, error)
}

type ChannelRuleSettings

type ChannelRuleSettings struct {
	Auth            *ChannelAuthConfig      `json:"auth,omitempty"`
	Subscribers     []*SubscriberConfig     `json:"subscribers,omitempty"`
	DataOutputters  []*DataOutputterConfig  `json:"dataOutputs,omitempty"`
	Converter       *ConverterConfig        `json:"converter,omitempty"`
	FrameProcessors []*FrameProcessorConfig `json:"frameProcessors,omitempty"`
	FrameOutputters []*FrameOutputterConfig `json:"frameOutputs,omitempty"`
}

type ChannelRuleUpdateCmd

type ChannelRuleUpdateCmd struct {
	Pattern  string              `json:"pattern"`
	Settings ChannelRuleSettings `json:"settings"`
}

type ChannelRules

type ChannelRules struct {
	Rules []ChannelRule `json:"rules"`
}

type ConditionType

type ConditionType string

ConditionType represents multiple condition operator type.

const (
	ConditionAll ConditionType = "all"
	ConditionAny ConditionType = "any"
)

Known condition types.

type ConditionalOutput

type ConditionalOutput struct {
	Condition FrameConditionChecker
	Outputter FrameOutputter
}

func NewConditionalOutput

func NewConditionalOutput(condition FrameConditionChecker, outputter FrameOutputter) *ConditionalOutput

func (ConditionalOutput) OutputFrame

func (out ConditionalOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*ConditionalOutput) Type

func (out *ConditionalOutput) Type() string

type ConditionalOutputConfig

type ConditionalOutputConfig struct {
	Condition *FrameConditionCheckerConfig `json:"condition"`
	Outputter *FrameOutputterConfig        `json:"output"`
}

type Converter

type Converter interface {
	Type() string
	Convert(ctx context.Context, vars Vars, body []byte) ([]*ChannelFrame, error)
}

Converter converts raw bytes to slice of ChannelFrame. Each element of resulting slice will be then individually processed and outputted according configured channel rules.

type ConverterConfig

type ConverterConfig struct {
	Type                      string                     `json:"type" ts_type:"Omit<keyof ConverterConfig, 'type'>"`
	AutoJsonConverterConfig   *AutoJsonConverterConfig   `json:"jsonAuto,omitempty"`
	ExactJsonConverterConfig  *ExactJsonConverterConfig  `json:"jsonExact,omitempty"`
	AutoInfluxConverterConfig *AutoInfluxConverterConfig `json:"influxAuto,omitempty"`
	JsonFrameConverterConfig  *JsonFrameConverterConfig  `json:"jsonFrame,omitempty"`
}

type Data

type Data struct {
	Value1     float64                `json:"value1"`
	Value2     float64                `json:"value2"`
	Value3     *float64               `json:"value3"`
	Value4     float64                `json:"value4"`
	Annotation string                 `json:"annotation"`
	Array      []float64              `json:"array"`
	Map        map[string]interface{} `json:"map"`
	Host       string                 `json:"host"`
	Status     string                 `json:"status"`
}

type DataOutputter

type DataOutputter interface {
	Type() string
	OutputData(ctx context.Context, vars Vars, data []byte) ([]*ChannelData, error)
}

DataOutputter can output incoming data before conversion to frames.

type DataOutputterConfig

type DataOutputterConfig struct {
	Type                     string                    `json:"type" ts_type:"Omit<keyof DataOutputterConfig, 'type'>"`
	RedirectDataOutputConfig *RedirectDataOutputConfig `json:"redirect,omitempty"`
	LokiOutputConfig         *LokiOutputConfig         `json:"loki,omitempty"`
}

type DevRuleBuilder

type DevRuleBuilder struct {
	Node                 *centrifuge.Node
	ManagedStream        *managedstream.Runner
	FrameStorage         *FrameStorage
	ChannelHandlerGetter ChannelHandlerGetter
}

func (*DevRuleBuilder) BuildRules

func (f *DevRuleBuilder) BuildRules(_ context.Context, _ int64) ([]*LiveChannelRule, error)

type DropFieldsFrameProcessor

type DropFieldsFrameProcessor struct {
	// contains filtered or unexported fields
}

DropFieldsFrameProcessor can drop specified fields from a data.Frame.

func (*DropFieldsFrameProcessor) ProcessFrame

func (p *DropFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error)

func (*DropFieldsFrameProcessor) Type

func (p *DropFieldsFrameProcessor) Type() string

type DropFieldsFrameProcessorConfig

type DropFieldsFrameProcessorConfig struct {
	FieldNames []string `json:"fieldNames"`
}

type EntityInfo

type EntityInfo struct {
	Type        string      `json:"type"`
	Description string      `json:"description"`
	Example     interface{} `json:"example,omitempty"`
}

type ExactJsonConverterConfig

type ExactJsonConverterConfig struct {
	Fields []Field `json:"fields"`
}

type Field

type Field struct {
	Name   string            `json:"name"`
	Type   data.FieldType    `json:"type"`
	Value  string            `json:"value"` // Can be JSONPath or Goja script.
	Labels []Label           `json:"labels,omitempty"`
	Config *data.FieldConfig `json:"config,omitempty" ts_type:"FieldConfig"`
}

Field description.

type FileStorage

type FileStorage struct {
	DataPath       string
	SecretsService secrets.Service
}

FileStorage can load channel rules from a file on disk.

func (*FileStorage) CreateChannelRule

func (f *FileStorage) CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)

func (*FileStorage) CreateWriteConfig

func (f *FileStorage) CreateWriteConfig(ctx context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error)

func (*FileStorage) DeleteChannelRule

func (f *FileStorage) DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error

func (*FileStorage) DeleteWriteConfig

func (f *FileStorage) DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error

func (*FileStorage) GetWriteConfig

func (f *FileStorage) GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error)

func (*FileStorage) ListChannelRules

func (f *FileStorage) ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)

func (*FileStorage) ListWriteConfigs

func (f *FileStorage) ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error)

func (*FileStorage) UpdateChannelRule

func (f *FileStorage) UpdateChannelRule(ctx context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)

func (*FileStorage) UpdateWriteConfig

func (f *FileStorage) UpdateWriteConfig(ctx context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error)

type FrameConditionChecker

type FrameConditionChecker interface {
	Type() string
	CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error)
}

FrameConditionChecker checks conditions in context of data.Frame being processed.

type FrameConditionCheckerConfig

type FrameConditionCheckerConfig struct {
	Type                           string                               `json:"type" ts_type:"Omit<keyof FrameConditionCheckerConfig, 'type'>"`
	MultipleConditionCheckerConfig *MultipleFrameConditionCheckerConfig `json:"multiple,omitempty"`
	NumberCompareConditionConfig   *NumberCompareFrameConditionConfig   `json:"numberCompare,omitempty"`
}

type FrameGetSetter

type FrameGetSetter interface {
	Get(orgID int64, channel string) (*data.Frame, bool, error)
	Set(orgID int64, channel string, frame *data.Frame) error
}

type FrameNumberCompareCondition

type FrameNumberCompareCondition struct {
	FieldName string
	Op        NumberCompareOp
	Value     float64
}

FrameNumberCompareCondition can compare numbers.

func NewFrameNumberCompareCondition

func NewFrameNumberCompareCondition(fieldName string, op NumberCompareOp, value float64) *FrameNumberCompareCondition

func (*FrameNumberCompareCondition) CheckFrameCondition

func (c *FrameNumberCompareCondition) CheckFrameCondition(_ context.Context, frame *data.Frame) (bool, error)

func (*FrameNumberCompareCondition) Type

type FrameOutputter

type FrameOutputter interface {
	Type() string
	OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)
}

FrameOutputter outputs data.Frame to a custom destination. Or simply do nothing if some conditions not met.

type FrameOutputterConfig

type FrameOutputterConfig struct {
	Type                    string                     `json:"type" ts_type:"Omit<keyof FrameOutputterConfig, 'type'>"`
	ManagedStreamConfig     *ManagedStreamOutputConfig `json:"managedStream,omitempty"`
	MultipleOutputterConfig *MultipleOutputterConfig   `json:"multiple,omitempty"`
	RedirectOutputConfig    *RedirectOutputConfig      `json:"redirect,omitempty"`
	ConditionalOutputConfig *ConditionalOutputConfig   `json:"conditional,omitempty"`
	ThresholdOutputConfig   *ThresholdOutputConfig     `json:"threshold,omitempty"`
	RemoteWriteOutputConfig *RemoteWriteOutputConfig   `json:"remoteWrite,omitempty"`
	LokiOutputConfig        *LokiOutputConfig          `json:"loki,omitempty"`
	ChangeLogOutputConfig   *ChangeLogOutputConfig     `json:"changeLog,omitempty"`
}

type FrameProcessor

type FrameProcessor interface {
	Type() string
	ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error)
}

FrameProcessor can modify data.Frame in a custom way before it will be outputted.

type FrameProcessorConfig

type FrameProcessorConfig struct {
	Type                      string                          `json:"type" ts_type:"Omit<keyof FrameProcessorConfig, 'type'>"`
	DropFieldsProcessorConfig *DropFieldsFrameProcessorConfig `json:"dropFields,omitempty"`
	KeepFieldsProcessorConfig *KeepFieldsFrameProcessorConfig `json:"keepFields,omitempty"`
	MultipleProcessorConfig   *MultipleFrameProcessorConfig   `json:"multiple,omitempty"`
}

type FrameStorage

type FrameStorage struct {
	// contains filtered or unexported fields
}

FrameStorage keeps last channel frame in memory. Not usable in HA setup.

func NewFrameStorage

func NewFrameStorage() *FrameStorage

func (*FrameStorage) Get

func (s *FrameStorage) Get(orgID int64, channel string) (*data.Frame, bool, error)

func (*FrameStorage) Set

func (s *FrameStorage) Set(orgID int64, channel string, frame *data.Frame) error

type JsonFrameConverter

type JsonFrameConverter struct {
	// contains filtered or unexported fields
}

JsonFrameConverter decodes single data.Frame from JSON.

func (*JsonFrameConverter) Convert

func (c *JsonFrameConverter) Convert(_ context.Context, _ Vars, body []byte) ([]*ChannelFrame, error)

func (*JsonFrameConverter) Type

func (c *JsonFrameConverter) Type() string

type JsonFrameConverterConfig

type JsonFrameConverterConfig struct{}

type KeepFieldsFrameProcessor

type KeepFieldsFrameProcessor struct {
	// contains filtered or unexported fields
}

KeepFieldsFrameProcessor can keep specified fields in a data.Frame dropping all other fields.

func (*KeepFieldsFrameProcessor) ProcessFrame

func (p *KeepFieldsFrameProcessor) ProcessFrame(_ context.Context, _ Vars, frame *data.Frame) (*data.Frame, error)

func (*KeepFieldsFrameProcessor) Type

func (p *KeepFieldsFrameProcessor) Type() string

type KeepFieldsFrameProcessorConfig

type KeepFieldsFrameProcessorConfig struct {
	FieldNames []string `json:"fieldNames"`
}

type Label

type Label struct {
	Name  string `json:"name"`
	Value string `json:"value"` // Can be JSONPath or Goja script.
}

Label ...

type LiveChannelRule

type LiveChannelRule struct {
	// OrgId this rule belongs to.
	OrgId int64
	// Pattern is a pattern for a channel which when matched results in the rule execution
	// during Subscribe or Publish operations. This is very similar to HTTP router functionality but
	// adapted for Grafana Live channels.
	// We use a modified version of github.com/julienschmidt/httprouter for pattern matching logic
	// (see tree package's README for more information).
	Pattern string

	// SubscribeAuth allows providing authorization logic for subscribing to a channel.
	// If SubscribeAuth is not set then all authenticated users can subscribe to a channel.
	SubscribeAuth SubscribeAuthChecker
	// Subscribers allow modifying subscription properties and optionally call additional logic
	// like opening a single stream to a plugin to consume channel events. If not set then
	// subscription will have all options disabled, no initial data.
	Subscribers []Subscriber

	// PublishAuth allows providing authorization logic for publishing into a channel.
	// If PublishAuth is not set then RoleAdmin is required to publish.
	PublishAuth PublishAuthChecker
	// DataOutputters if set allows doing something useful with raw input data. If not set then
	// we step further to the converter. Each DataOutputter can optionally return a slice
	// of ChannelData to pass the control to a rule defined by ChannelData.Channel - i.e.
	// DataOutputters for the returned ChannelData.Channel will be executed. Note that in
	// this case input processing will skip PublishAuth of ChannelData.Channel. I.e. authorization
	// rules defined by the first channel in a pipeline chain.
	DataOutputters []DataOutputter
	// Converter allows transforming raw input data to frames. The Converter can split raw data to
	// slice of ChannelFrame. Each ChannelFrame is then processed according to ChannelFrame.Channel
	// rules - i.e. FrameProcessors for the returned ChannelFrame.Channel will be executed.
	// If ChannelFrame.Channel is empty then we proceed with the current rule towards
	// applying its FrameProcessors.
	Converter Converter
	// FrameProcessors can have logic to modify data.Frame before applying FrameOutputters.
	FrameProcessors []FrameProcessor
	// FrameOutputters if set allow doing something useful with data.Frame. Each FrameOutputter
	// can optionally return a slice of ChannelFrame to pass the control to a rule defined
	// by ChannelFrame.Channel.
	FrameOutputters []FrameOutputter
}

LiveChannelRule is an in-memory representation of each specific rule to be executed by Pipeline.

type LocalSubscribersDataOutput

type LocalSubscribersDataOutput struct {
	// contains filtered or unexported fields
}

func NewLocalSubscribersDataOutput

func NewLocalSubscribersDataOutput(node *centrifuge.Node) *LocalSubscribersDataOutput

func (*LocalSubscribersDataOutput) OutputData

func (out *LocalSubscribersDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error)

func (*LocalSubscribersDataOutput) Type

func (out *LocalSubscribersDataOutput) Type() string

type LocalSubscribersFrameOutput

type LocalSubscribersFrameOutput struct {
	// contains filtered or unexported fields
}

func NewLocalSubscribersFrameOutput

func NewLocalSubscribersFrameOutput(node *centrifuge.Node) *LocalSubscribersFrameOutput

func (*LocalSubscribersFrameOutput) OutputFrame

func (out *LocalSubscribersFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*LocalSubscribersFrameOutput) Type

func (out *LocalSubscribersFrameOutput) Type() string

type LokiDataOutput

type LokiDataOutput struct {
	// contains filtered or unexported fields
}

LokiDataOutput can output raw data to Loki (as string value).

func NewLokiDataOutput

func NewLokiDataOutput(endpoint string, basicAuth *BasicAuth) *LokiDataOutput

func (*LokiDataOutput) OutputData

func (out *LokiDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error)

func (*LokiDataOutput) Type

func (out *LokiDataOutput) Type() string

type LokiFrameOutput

type LokiFrameOutput struct {
	// contains filtered or unexported fields
}

LokiFrameOutput can output frame encoded to JSON to Loki.

func NewLokiFrameOutput

func NewLokiFrameOutput(endpoint string, basicAuth *BasicAuth) *LokiFrameOutput

func (*LokiFrameOutput) OutputFrame

func (out *LokiFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*LokiFrameOutput) Type

func (out *LokiFrameOutput) Type() string

type LokiOutputConfig

type LokiOutputConfig struct {
	UID string `json:"uid"`
}

type LokiStream

type LokiStream struct {
	Stream map[string]string `json:"stream"`
	Values []interface{}     `json:"values"`
}

type LokiStreamsEntry

type LokiStreamsEntry struct {
	Streams []LokiStream `json:"streams"`
}

type ManagedStreamFrameOutput

type ManagedStreamFrameOutput struct {
	// contains filtered or unexported fields
}

func NewManagedStreamFrameOutput

func NewManagedStreamFrameOutput(managedStream *managedstream.Runner) *ManagedStreamFrameOutput

func (*ManagedStreamFrameOutput) OutputFrame

func (out *ManagedStreamFrameOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*ManagedStreamFrameOutput) Type

func (out *ManagedStreamFrameOutput) Type() string

type ManagedStreamOutputConfig

type ManagedStreamOutputConfig struct{}

type ManagedStreamSubscriber

type ManagedStreamSubscriber struct {
	// contains filtered or unexported fields
}

func NewManagedStreamSubscriber

func NewManagedStreamSubscriber(managedStream *managedstream.Runner) *ManagedStreamSubscriber

func (*ManagedStreamSubscriber) Subscribe

func (*ManagedStreamSubscriber) Type

func (s *ManagedStreamSubscriber) Type() string

type MockFrameGetSetter

type MockFrameGetSetter struct {
	// contains filtered or unexported fields
}

MockFrameGetSetter is a mock of FrameGetSetter interface.

func NewMockFrameGetSetter

func NewMockFrameGetSetter(ctrl *gomock.Controller) *MockFrameGetSetter

NewMockFrameGetSetter creates a new mock instance.

func (*MockFrameGetSetter) EXPECT

EXPECT returns an object that allows the caller to indicate expected use.

func (*MockFrameGetSetter) Get

func (m *MockFrameGetSetter) Get(arg0 int64, arg1 string) (*data.Frame, bool, error)

Get mocks base method.

func (*MockFrameGetSetter) Set

func (m *MockFrameGetSetter) Set(arg0 int64, arg1 string, arg2 *data.Frame) error

Set mocks base method.

type MockFrameGetSetterMockRecorder

type MockFrameGetSetterMockRecorder struct {
	// contains filtered or unexported fields
}

MockFrameGetSetterMockRecorder is the mock recorder for MockFrameGetSetter.

func (*MockFrameGetSetterMockRecorder) Get

func (mr *MockFrameGetSetterMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call

Get indicates an expected call of Get.

func (*MockFrameGetSetterMockRecorder) Set

func (mr *MockFrameGetSetterMockRecorder) Set(arg0, arg1, arg2 interface{}) *gomock.Call

Set indicates an expected call of Set.

type MultipleFrameConditionChecker

type MultipleFrameConditionChecker struct {
	ConditionType ConditionType
	Conditions    []FrameConditionChecker
}

MultipleFrameConditionChecker can check multiple conditions according to ConditionType.

func NewMultipleFrameConditionChecker

func NewMultipleFrameConditionChecker(conditionType ConditionType, conditions ...FrameConditionChecker) *MultipleFrameConditionChecker

NewMultipleFrameConditionChecker creates new MultipleFrameConditionChecker.

func (*MultipleFrameConditionChecker) CheckFrameCondition

func (c *MultipleFrameConditionChecker) CheckFrameCondition(ctx context.Context, frame *data.Frame) (bool, error)

func (*MultipleFrameConditionChecker) Type

type MultipleFrameConditionCheckerConfig

type MultipleFrameConditionCheckerConfig struct {
	ConditionType ConditionType                 `json:"conditionType"`
	Conditions    []FrameConditionCheckerConfig `json:"conditions"`
}

type MultipleFrameOutput

type MultipleFrameOutput struct {
	Outputters []FrameOutputter
}

MultipleFrameOutput can combine several FrameOutputter and execute them sequentially.

func NewMultipleFrameOutput

func NewMultipleFrameOutput(outputters ...FrameOutputter) *MultipleFrameOutput

func (MultipleFrameOutput) OutputFrame

func (out MultipleFrameOutput) OutputFrame(ctx context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*MultipleFrameOutput) Type

func (out *MultipleFrameOutput) Type() string

type MultipleFrameProcessor

type MultipleFrameProcessor struct {
	Processors []FrameProcessor
}

MultipleFrameProcessor can combine several FrameProcessor and execute them sequentially.

func NewMultipleFrameProcessor

func NewMultipleFrameProcessor(processors ...FrameProcessor) *MultipleFrameProcessor

func (*MultipleFrameProcessor) ProcessFrame

func (p *MultipleFrameProcessor) ProcessFrame(ctx context.Context, vars Vars, frame *data.Frame) (*data.Frame, error)

func (*MultipleFrameProcessor) Type

func (p *MultipleFrameProcessor) Type() string

type MultipleFrameProcessorConfig

type MultipleFrameProcessorConfig struct {
	Processors []FrameProcessorConfig `json:"processors"`
}

type MultipleOutputterConfig

type MultipleOutputterConfig struct {
	Outputters []FrameOutputterConfig `json:"outputs"`
}

type MultipleSubscriber

type MultipleSubscriber struct {
	Subscribers []Subscriber
}

func NewMultipleSubscriber

func NewMultipleSubscriber(subscribers ...Subscriber) *MultipleSubscriber

func (*MultipleSubscriber) Subscribe

func (*MultipleSubscriber) Type

func (s *MultipleSubscriber) Type() string

type MultipleSubscriberConfig

type MultipleSubscriberConfig struct {
	Subscribers []SubscriberConfig `json:"subscribers"`
}

type NumberCompareFrameConditionConfig

type NumberCompareFrameConditionConfig struct {
	FieldName string          `json:"fieldName"`
	Op        NumberCompareOp `json:"op"`
	Value     float64         `json:"value"`
}

type NumberCompareOp

type NumberCompareOp string

NumberCompareOp is an comparison operator.

const (
	NumberCompareOpLt  NumberCompareOp = "lt"
	NumberCompareOpGt  NumberCompareOp = "gt"
	NumberCompareOpLte NumberCompareOp = "lte"
	NumberCompareOpGte NumberCompareOp = "gte"
	NumberCompareOpEq  NumberCompareOp = "eq"
	NumberCompareOpNe  NumberCompareOp = "ne"
)

Known NumberCompareOp types.

type Pipeline

type Pipeline struct {
	// contains filtered or unexported fields
}

Pipeline allows processing custom input data according to user-defined rules. This includes: * transforming custom input to data.Frame objects * do some processing on these frames * output resulting frames to various destinations.

func New

func New(ruleGetter ChannelRuleGetter) (*Pipeline, error)

New creates new Pipeline.

func (*Pipeline) DataToChannelFrames

func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, error)

func (*Pipeline) Get

func (p *Pipeline) Get(orgID int64, channel string) (*LiveChannelRule, bool, error)

func (*Pipeline) ProcessInput

func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID string, body []byte) (bool, error)

type PublishAuthChecker

type PublishAuthChecker interface {
	CanPublish(ctx context.Context, u *user.SignedInUser) (bool, error)
}

PublishAuthChecker checks whether current user can publish to a channel.

type RedirectDataOutput

type RedirectDataOutput struct {
	// contains filtered or unexported fields
}

RedirectDataOutput passes processing control to the rule defined for a configured channel.

func NewRedirectDataOutput

func NewRedirectDataOutput(config RedirectDataOutputConfig) *RedirectDataOutput

func (*RedirectDataOutput) OutputData

func (out *RedirectDataOutput) OutputData(_ context.Context, vars Vars, data []byte) ([]*ChannelData, error)

func (*RedirectDataOutput) Type

func (out *RedirectDataOutput) Type() string

type RedirectDataOutputConfig

type RedirectDataOutputConfig struct {
	Channel string `json:"channel"`
}

RedirectDataOutputConfig ...

type RedirectFrameOutput

type RedirectFrameOutput struct {
	// contains filtered or unexported fields
}

RedirectFrameOutput passes processing control to the rule defined for a configured channel.

func NewRedirectFrameOutput

func NewRedirectFrameOutput(config RedirectOutputConfig) *RedirectFrameOutput

func (*RedirectFrameOutput) OutputFrame

func (out *RedirectFrameOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*RedirectFrameOutput) Type

func (out *RedirectFrameOutput) Type() string

type RedirectOutputConfig

type RedirectOutputConfig struct {
	Channel string `json:"channel"`
}

RedirectOutputConfig ...

type RemoteWriteFrameOutput

type RemoteWriteFrameOutput struct {

	// Endpoint to send streaming frames to.
	Endpoint string

	// BasicAuth is an optional basic auth params.
	BasicAuth *BasicAuth

	// SampleMilliseconds allow defining an interval to sample points inside a channel
	// when outputting to remote write endpoint (on __name__ label basis). For example
	// when having a 20Hz stream and SampleMilliseconds 1000 then only one point in a
	// second will be sent to remote write endpoint. This reduces data resolution of course.
	// If not set - then no down-sampling will be performed. If SampleMilliseconds is
	// greater than flushInterval then each flush will include a point as we only keeping
	// track of timestamps in terms of each individual flush at the moment.
	SampleMilliseconds int64
	// contains filtered or unexported fields
}

func NewRemoteWriteFrameOutput

func NewRemoteWriteFrameOutput(endpoint string, basicAuth *BasicAuth, sampleMilliseconds int64) *RemoteWriteFrameOutput

func (*RemoteWriteFrameOutput) OutputFrame

func (out *RemoteWriteFrameOutput) OutputFrame(_ context.Context, _ Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*RemoteWriteFrameOutput) Type

func (out *RemoteWriteFrameOutput) Type() string

type RemoteWriteOutputConfig

type RemoteWriteOutputConfig struct {
	UID                string `json:"uid"`
	SampleMilliseconds int64  `json:"sampleMilliseconds"`
}

type RoleCheckAuthorizer

type RoleCheckAuthorizer struct {
	// contains filtered or unexported fields
}

func NewRoleCheckAuthorizer

func NewRoleCheckAuthorizer(role org.RoleType) *RoleCheckAuthorizer

func (*RoleCheckAuthorizer) CanPublish

func (s *RoleCheckAuthorizer) CanPublish(_ context.Context, u *user.SignedInUser) (bool, error)

func (*RoleCheckAuthorizer) CanSubscribe

func (s *RoleCheckAuthorizer) CanSubscribe(_ context.Context, u *user.SignedInUser) (bool, error)

type RuleBuilder

type RuleBuilder interface {
	BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error)
}

RuleBuilder constructs in-memory representation of channel rules.

type Storage

type Storage interface {
	ListWriteConfigs(_ context.Context, orgID int64) ([]WriteConfig, error)
	GetWriteConfig(_ context.Context, orgID int64, cmd WriteConfigGetCmd) (WriteConfig, bool, error)
	CreateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigCreateCmd) (WriteConfig, error)
	UpdateWriteConfig(_ context.Context, orgID int64, cmd WriteConfigUpdateCmd) (WriteConfig, error)
	DeleteWriteConfig(_ context.Context, orgID int64, cmd WriteConfigDeleteCmd) error
	ListChannelRules(_ context.Context, orgID int64) ([]ChannelRule, error)
	CreateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleCreateCmd) (ChannelRule, error)
	UpdateChannelRule(_ context.Context, orgID int64, cmd ChannelRuleUpdateCmd) (ChannelRule, error)
	DeleteChannelRule(_ context.Context, orgID int64, cmd ChannelRuleDeleteCmd) error
}

Storage describes all methods to manage Live pipeline persistent data.

type StorageRuleBuilder

type StorageRuleBuilder struct {
	Node                 *centrifuge.Node
	ManagedStream        *managedstream.Runner
	FrameStorage         *FrameStorage
	Storage              Storage
	ChannelHandlerGetter ChannelHandlerGetter
	SecretsService       secrets.Service
}

func (*StorageRuleBuilder) BuildRules

func (f *StorageRuleBuilder) BuildRules(ctx context.Context, orgID int64) ([]*LiveChannelRule, error)

type SubscribeAuthChecker

type SubscribeAuthChecker interface {
	CanSubscribe(ctx context.Context, u *user.SignedInUser) (bool, error)
}

SubscribeAuthChecker checks whether current user can subscribe to a channel.

type Subscriber

type Subscriber interface {
	Type() string
	Subscribe(ctx context.Context, vars Vars, data []byte) (model.SubscribeReply, backend.SubscribeStreamStatus, error)
}

Subscriber can handle channel subscribe events.

type SubscriberConfig

type SubscriberConfig struct {
	Type                     string                    `json:"type" ts_type:"Omit<keyof SubscriberConfig, 'type'>"`
	MultipleSubscriberConfig *MultipleSubscriberConfig `json:"multiple,omitempty"`
}

type ThresholdOutput

type ThresholdOutput struct {
	// contains filtered or unexported fields
}

ThresholdOutput can monitor threshold transitions of the specified field and output special state frame to the configured channel.

func NewThresholdOutput

func NewThresholdOutput(frameStorage FrameGetSetter, config ThresholdOutputConfig) *ThresholdOutput

func (*ThresholdOutput) OutputFrame

func (out *ThresholdOutput) OutputFrame(_ context.Context, vars Vars, frame *data.Frame) ([]*ChannelFrame, error)

func (*ThresholdOutput) Type

func (out *ThresholdOutput) Type() string

type ThresholdOutputConfig

type ThresholdOutputConfig struct {
	FieldName string `json:"fieldName"`
	Channel   string `json:"channel"`
}

type Vars

type Vars struct {
	OrgID     int64
	Channel   string
	Scope     string
	Namespace string
	Path      string
}

Vars has some helpful things pipeline entities could use.

type WriteConfig

type WriteConfig struct {
	OrgId          int64             `json:"-"`
	UID            string            `json:"uid"`
	Settings       WriteSettings     `json:"settings"`
	SecureSettings map[string][]byte `json:"secureSettings,omitempty"`
}

func (WriteConfig) Valid

func (r WriteConfig) Valid() (bool, string)

type WriteConfigCreateCmd

type WriteConfigCreateCmd struct {
	UID            string            `json:"uid"`
	Settings       WriteSettings     `json:"settings"`
	SecureSettings map[string]string `json:"secureSettings"`
}

type WriteConfigDeleteCmd

type WriteConfigDeleteCmd struct {
	UID string `json:"uid"`
}

type WriteConfigDto

type WriteConfigDto struct {
	UID          string          `json:"uid"`
	Settings     WriteSettings   `json:"settings"`
	SecureFields map[string]bool `json:"secureFields"`
}

func WriteConfigToDto

func WriteConfigToDto(b WriteConfig) WriteConfigDto

type WriteConfigGetCmd

type WriteConfigGetCmd struct {
	UID string `json:"uid"`
}

type WriteConfigUpdateCmd

type WriteConfigUpdateCmd struct {
	UID            string            `json:"uid"`
	Settings       WriteSettings     `json:"settings"`
	SecureSettings map[string]string `json:"secureSettings"`
}

TODO: add version field later.

type WriteConfigs

type WriteConfigs struct {
	Configs []WriteConfig `json:"writeConfigs"`
}

type WriteSettings

type WriteSettings struct {
	// Endpoint to send streaming frames to.
	Endpoint string `json:"endpoint"`
	// BasicAuth is an optional basic auth settings.
	BasicAuth *BasicAuth `json:"basicAuth,omitempty"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL