Documentation ¶
Index ¶
- Constants
- Variables
- func DeriveKey(key, salt []byte, length int) []byte
- func ExportBlueprint(project Project) (map[string]interface{}, error)
- func FixDates(object Model, data map[string]interface{}) error
- func GetBlueprintsPaths(settings Settings) ([]string, error)
- func JSONData(model Model) map[string]interface{}
- func LoadBlueprintConfig(settingsObj Settings, filename, version string) (map[string]interface{}, error)
- func NormalizePath(path string) (string, error)
- func RandomBytes(n int) ([]byte, error)
- func RandomID() []byte
- func Sort[T Model](models []T)
- func StructuredHash(source interface{}) ([]byte, error)
- type Action
- type ActionConfig
- type ActionDefinition
- type ActionDefinitions
- type ActionMaker
- type ActionMakerParams
- type ActionSpecification
- type BaseAction
- func (b *BaseAction) Config() map[string]interface{}
- func (b *BaseAction) ConfigGroup() map[string]interface{}
- func (b *BaseAction) ConfigHash() ([]byte, error)
- func (b *BaseAction) Description() string
- func (b *BaseAction) HasParams() bool
- func (b *BaseAction) ID() []byte
- func (b *BaseAction) MarshalJSON() ([]byte, error)
- func (b *BaseAction) Name() string
- func (b *BaseAction) ParameterGroup(item *Item) (*ParameterGroup, error)
- func (b *BaseAction) Type() string
- type BaseActionConfig
- func (b *BaseActionConfig) Action() (Action, error)
- func (b *BaseActionConfig) Create(values map[string]interface{}) error
- func (b *BaseActionConfig) MarshalJSON() ([]byte, error)
- func (b *BaseActionConfig) Project() Project
- func (b *BaseActionConfig) Type() string
- func (b *BaseActionConfig) Update(values map[string]interface{}) error
- type BaseChannelWriter
- func (b *BaseChannelWriter) Error(item *Item, itemError error) error
- func (b *BaseChannelWriter) Message(item *Item, data map[string]interface{}, mt MessageType) error
- func (b *BaseChannelWriter) Warning(item *Item, itemError error) error
- func (b *BaseChannelWriter) Write(channel string, items []*Item) error
- type BaseConfig
- func (b *BaseConfig) ChannelWriter(internal bool) (ChannelWriter, error)
- func (b *BaseConfig) Create(values map[string]interface{}) error
- func (b *BaseConfig) DeleteRelated() error
- func (b *BaseConfig) MarshalJSON() ([]byte, error)
- func (b *BaseConfig) Processor(internal bool) (*Processor, error)
- func (b *BaseConfig) Stream() Stream
- func (b *BaseConfig) Type() string
- func (b *BaseConfig) Update(values map[string]interface{}) error
- type BaseController
- func (b *BaseController) Definitions() *Definitions
- func (b *BaseController) GetVar(key string) (interface{}, bool)
- func (b *BaseController) InitializePlugin(plugin Plugin) error
- func (b *BaseController) InitializePlugins() error
- func (b *BaseController) ParameterStore() ParameterStore
- func (b *BaseController) RunHooks(name string, data interface{}) (interface{}, error)
- func (b *BaseController) SetVar(key string, value interface{}) error
- func (b *BaseController) Settings() Settings
- type BaseDataset
- type BaseDestination
- func (b *BaseDestination) Create(values map[string]interface{}) error
- func (b *BaseDestination) MarshalJSON() ([]byte, error)
- func (b *BaseDestination) Project() Project
- func (b *BaseDestination) Type() string
- func (b *BaseDestination) Update(values map[string]interface{}) error
- func (b *BaseDestination) Writer() (Writer, error)
- type BaseDestinationMap
- func (b *BaseDestinationMap) Create(values map[string]interface{}) error
- func (b *BaseDestinationMap) InternalWriter() (Writer, error)
- func (b *BaseDestinationMap) MarshalJSON() ([]byte, error)
- func (b *BaseDestinationMap) Type() string
- func (b *BaseDestinationMap) Update(values map[string]interface{}) error
- type BaseProject
- func (b *BaseProject) Controller() Controller
- func (b *BaseProject) Create(values map[string]interface{}) error
- func (b *BaseProject) DeleteRelated() error
- func (b *BaseProject) MarshalJSON() ([]byte, error)
- func (b *BaseProject) Type() string
- func (b *BaseProject) Update(values map[string]interface{}) error
- type BaseSource
- func (b *BaseSource) Create(values map[string]interface{}) error
- func (b *BaseSource) MarshalJSON() ([]byte, error)
- func (b *BaseSource) Project() Project
- func (b *BaseSource) Reader() (Reader, error)
- func (b *BaseSource) SetStat(name string, value float64) error
- func (b *BaseSource) Stat(name string) (float64, error)
- func (b *BaseSource) Stats() (map[string]float64, error)
- func (b *BaseSource) Type() string
- func (b *BaseSource) Update(values map[string]interface{}) error
- type BaseSourceMap
- type BaseStream
- type BasicInternalPayload
- type BasicInternalReader
- type BasicInternalWriter
- type BasicPayload
- type Blueprint
- type ByID
- type ChannelWriter
- type ClosableWriter
- type CommandsDefinition
- type CommandsDefinitions
- type CommandsMaker
- type Config
- type ConfigStatus
- type ConfigurableAction
- type Controller
- type ControllerDefinitions
- type ControllerMaker
- type CustomHashValue
- type Dataset
- type Dates
- type DatesSettable
- type Definitions
- type Destination
- type DestinationMap
- type DestinationStatus
- type DoableAction
- type Error
- type ErrorPolicy
- type Hook
- type HookDefinition
- type HookDefinitions
- type In
- type InMemoryChannelWriter
- func (c *InMemoryChannelWriter) Error(item *Item, err error) error
- func (c *InMemoryChannelWriter) Message(item *Item, data map[string]interface{}, mt MessageType) error
- func (c *InMemoryChannelWriter) Warning(item *Item, warn error) error
- func (c *InMemoryChannelWriter) Write(channel string, items []*Item) error
- type InternalChannel
- type InternalReader
- type InternalWriter
- type IsActionConfig
- type IsActionSpecification
- type IsActionSpecifications
- type IsDataset
- type IsItem
- type IsItems
- type Item
- func (f *Item) All() map[string]interface{}
- func (f *Item) Delete(key string)
- func (f *Item) Get(key string) (interface{}, bool)
- func (f *Item) Keys() []string
- func (f *Item) MarshalJSON() ([]byte, error)
- func (f *Item) Serialize(format string) ([]byte, error)
- func (f *Item) SerializeJSON() ([]byte, error)
- func (f *Item) Set(key string, value interface{})
- func (f *Item) Values() []interface{}
- type ItemStore
- type Level
- type Logger
- func (l *Logger) Debug(args ...interface{})
- func (l *Logger) Debugf(format string, args ...interface{})
- func (l *Logger) Error(args ...interface{})
- func (l *Logger) Errorf(format string, args ...interface{})
- func (l *Logger) Fatal(args ...interface{})
- func (l *Logger) Info(args ...interface{})
- func (l *Logger) Infof(format string, args ...interface{})
- func (l *Logger) SetLevel(level Level)
- func (l *Logger) Warning(args ...interface{})
- func (l *Logger) Warningf(format string, args ...interface{})
- type Message
- type MessageType
- type Meter
- type Metric
- type Model
- type ModelMeter
- type ModelReader
- type ModelWriter
- type ParameterGroup
- type ParameterSet
- func (p *ParameterSet) Actions() []Action
- func (p *ParameterSet) Empty() bool
- func (p *ParameterSet) Hash() []byte
- func (a ParameterSet) MarshalJSON() ([]byte, error)
- func (p *ParameterSet) ParameterStore() ParameterStore
- func (p *ParameterSet) Parameters() []*Parameters
- func (p *ParameterSet) ParametersFor(action Action, parameterGroup *ParameterGroup) (*Parameters, bool, error)
- func (p *ParameterSet) Save() error
- func (p *ParameterSet) SetParameterStore(parameterStore ParameterStore)
- func (p *ParameterSet) UpdateHash() error
- func (p *ParameterSet) UpdateParameters(action Action, params interface{}, parameterGroup *ParameterGroup) error
- type ParameterStore
- type ParameterStoreDefinition
- type ParameterStoreDefinitions
- type ParameterStoreMaker
- type Parameters
- func (p *Parameters) Action() Action
- func (p *Parameters) ID() []byte
- func (a Parameters) MarshalJSON() ([]byte, error)
- func (p *Parameters) ParameterGroup() *ParameterGroup
- func (p *Parameters) ParameterStore() ParameterStore
- func (p *Parameters) Parameters() interface{}
- func (p *Parameters) Save() error
- func (p *Parameters) SetParameterStore(parameterStore ParameterStore)
- func (p *Parameters) Valid(action Action, parameterGroup *ParameterGroup) (bool, error)
- type Payload
- type PeekingReader
- type Plugin
- type PluginDefinition
- type PluginDefinitions
- type PluginMaker
- type PriorityModel
- type Processable
- type ProcessingStats
- type Processor
- func (p *Processor) Advance() ([]*Item, error)
- func (p *Processor) ErrorPolicy() ErrorPolicy
- func (p *Processor) Finalize() ([]*Item, error)
- func (p *Processor) ParameterSet() *ParameterSet
- func (p *Processor) Process(items []*Item, paramsMap map[string]interface{}) ([]*Item, error)
- func (p *Processor) Reset() error
- func (p *Processor) SetErrorPolicy(policy ErrorPolicy)
- func (p *Processor) SetKey(key []byte)
- func (p *Processor) SetSalt(salt []byte)
- func (p *Processor) SetWriter(channelWriter ChannelWriter)
- func (p *Processor) Setup() error
- func (p *Processor) Teardown() error
- func (p *Processor) Undo(items []*Item, paramsMap map[string]interface{}) ([]*Item, error)
- func (p *Processor) Writer() ChannelWriter
- type Project
- type Reader
- type ReaderDefinition
- type ReaderDefinitions
- type ReaderMaker
- type Schedule
- type ScheduledAction
- type Service
- type Settings
- type SetupAction
- type Source
- type SourceMap
- type SourceStatus
- type StatefulAction
- type StatsModel
- type Stream
- type StreamStats
- type StreamStatus
- type Tag
- type TeardownAction
- type TimeWindow
- type TimeWindowFunc
- type UndoableAction
- type Warning
- type Writer
- type WriterDefinition
- type WriterDefinitions
- type WriterMaker
Constants ¶
const ( PanicLogLevel = Level(log.PanicLevel) FatalLogLevel = Level(log.FatalLevel) ErrorLogLevel = Level(log.ErrorLevel) WarnLogLevel = Level(log.WarnLevel) InfoLogLevel = Level(log.InfoLevel) DebugLogLevel = Level(log.DebugLevel) TraceLogLevel = Level(log.TraceLevel) )
const RANDOM_ID_LENGTH = 16
Variables ¶
var ActionConfigForm = forms.Form{ ErrorMsg: "invalid data encountered in the action config", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "type", Validators: []forms.Validator{ forms.IsString{}, }, }, { Name: "config", Validators: []forms.Validator{ forms.IsOptional{Default: map[string]interface{}{}}, forms.IsStringMap{}, }, }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, }, }
var ActionSpecificationForm = forms.Form{ Fields: []forms.Field{ forms.Field{ Name: "name", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, forms.Field{ Name: "description", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, forms.Field{ Name: "id", Validators: []forms.Validator{ forms.IsOptional{ DefaultGenerator: func() interface{} { return RandomID() }, }, forms.IsBytes{ Encoding: "hex", }, }, }, forms.Field{ Name: "type", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsString{}, }, }, forms.Field{ Name: "config", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsStringMap{}, }, }, }, }
var BlueprintConfigForm = forms.Form{ Fields: []forms.Field{ { Name: "id", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsBytes{Encoding: "hex"}, }, }, { Name: "name", Validators: []forms.Validator{ forms.IsString{}, }, }, { Name: "description", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, { Name: "status", Validators: []forms.Validator{ forms.IsOptional{Default: "active"}, forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}}, }, }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, }, }
var BlueprintDateForm = forms.Form{ Fields: []forms.Field{ { Name: "created_at", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsTime{Format: "rfc3339"}, }, }, { Name: "updated_at", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsTime{Format: "rfc3339"}, }, }, { Name: "deleted_at", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsTime{Format: "rfc3339"}, }, }, }, }
var BlueprintProjectForm = forms.Form{ Fields: []forms.Field{ { Name: "id", Validators: []forms.Validator{ forms.IsBytes{Encoding: "hex"}, }, }, { Name: "name", Validators: []forms.Validator{ forms.IsString{}, }, }, { Name: "description", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, }, }
var BlueprintStreamForm = forms.Form{ Fields: []forms.Field{ { Name: "id", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsBytes{Encoding: "hex"}, }, }, { Name: "name", Validators: []forms.Validator{ forms.IsString{}, }, }, { Name: "description", Validators: []forms.Validator{ forms.IsOptional{Default: ""}, forms.IsString{}, }, }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, { Name: "status", Validators: []forms.Validator{ forms.IsOptional{Default: "active"}, forms.IsIn{Choices: []interface{}{"active", "inactive", "testing"}}, }, }, }, }
var ConfigForm = forms.Form{ ErrorMsg: "invalid data encountered in the stream config form", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, { Name: "status", Validators: []forms.Validator{ forms.IsOptional{Default: string(DisabledConfig)}, IsValidConfigStatus, }, }, { Name: "source", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsString{MinLength: 1, MaxLength: 40}, }, }, { Name: "version", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsString{MinLength: 1, MaxLength: 40}, }, }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, }, }
var DatasetForm = forms.Form{ ErrorMsg: "invalid data encountered in the action config", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "items", Validators: []forms.Validator{ forms.IsOptional{Default: []map[string]any{}}, forms.IsList{ Validators: []forms.Validator{ forms.IsStringMap{}, }, }, }, }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, }, }
var DefaultSettings embed.FS
var DescriptionValidators = []forms.Validator{ forms.IsString{MaxLength: 10000}, }
var DestinationForm = forms.Form{ ErrorMsg: "invalid data encountered in the destination form", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, { Name: "config", Validators: []forms.Validator{forms.IsStringMap{}}, }, { Name: "data", Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}}, }, { Name: "type", Validators: []forms.Validator{forms.IsString{}}, }, }, }
var EOS = fmt.Errorf("end of stream")
var IsValidConfigStatus = forms.IsIn{ Choices: []interface{}{ string(ActiveConfig), string(DisabledConfig), string(TestingConfig)}, }
var IsValidStreamStatus = forms.IsIn{ Choices: []interface{}{ string(ActiveStream), string(DisabledStream), string(TestingStream)}, }
var Log = Logger{}
var NameValidators = []forms.Validator{ forms.IsString{MinLength: 2, MaxLength: 100}, }
var NotFound = fmt.Errorf("object not found")
var NullValue = fmt.Errorf("null")
var ParameterForm = forms.Form{ ErrorMsg: "invalid data encountered in the parameter form", Fields: []forms.Field{ { Name: "action", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsStringMap{ Form: &forms.Form{ Fields: []forms.Field{ { Name: "type", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsString{}, }, }, { Name: "id", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsBytes{Encoding: "hex"}, }, }, { Name: "name", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsString{}, }, }, { Name: "description", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsString{}, }, }, { Name: "config", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsStringMap{}, }, }, }, }, }, }, }, { Name: "parameter_group", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsStringMap{ Form: &forms.Form{ Fields: []forms.Field{ { Name: "data", Validators: []forms.Validator{ forms.IsRequired{}, }, }, { Name: "hash", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsBytes{ Encoding: "hex", }, }, }, }, }, }, }, }, { Name: "id", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsBytes{Encoding: "hex"}, }, }, { Name: "parameters", Validators: []forms.Validator{ forms.IsOptional{Default: nil}, }, }, }, }
var ParameterSetForm = forms.Form{ ErrorMsg: "invalid data encountered in the parameter set form", Fields: []forms.Field{ { Name: "parameters", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsList{ Validators: []forms.Validator{ forms.IsBytes{ Encoding: "hex", }, }, }, }, }, { Name: "hash", Validators: []forms.Validator{ forms.IsRequired{}, forms.IsBytes{ Encoding: "hex", }, }, }, }, }
var ProjectForm = forms.Form{ ErrorMsg: "invalid data encountered in the project form", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, { Name: "data", Validators: []forms.Validator{ forms.IsOptional{}, forms.IsStringMap{}, }, }, }, }
var SourceForm = forms.Form{ ErrorMsg: "invalid data encountered in the source form", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, { Name: "config", Validators: []forms.Validator{forms.IsStringMap{}}, }, { Name: "data", Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}}, }, { Name: "type", Validators: []forms.Validator{forms.IsString{}}, }, }, }
var StreamForm = forms.Form{ ErrorMsg: "invalid data encountered in the stream form", Fields: []forms.Field{ { Name: "name", Validators: append([]forms.Validator{ forms.IsRequired{}}, NameValidators...), }, { Name: "status", Validators: []forms.Validator{ forms.IsOptional{Default: string(ActiveStream)}, IsValidStreamStatus, }, }, { Name: "description", Validators: append([]forms.Validator{ forms.IsOptional{Default: ""}}, DescriptionValidators...), }, { Name: "data", Validators: []forms.Validator{forms.IsOptional{}, forms.IsStringMap{}}, }, }, }
var Version = "development"
Functions ¶
func ExportBlueprint ¶ added in v0.0.9
func GetBlueprintsPaths ¶ added in v0.0.4
func LoadBlueprintConfig ¶ added in v0.0.4
func NormalizePath ¶ added in v0.0.4
func RandomBytes ¶
func StructuredHash ¶
Computes a hash of a structured data type that can contain various types like strings or []byte arrays. The hash reflects both the type values and the structure of the source.
Types ¶
type Action ¶
type Action interface { ConfigGroup() map[string]interface{} ConfigHash() ([]byte, error) ParameterGroup(item *Item) (*ParameterGroup, error) Params() interface{} HasParams() bool SetParams(interface{}) error GenerateParams(key, salt []byte) error ID() []byte Name() string Description() string Type() string Config() map[string]interface{} }
func MakeAction ¶
func MakeActions ¶
func MakeActions(specs []ActionSpecification, definitions *Definitions) ([]Action, error)
type ActionConfig ¶
type ActionConfig interface { Model Action() (Action, error) Project() Project SetData(interface{}) error Data() interface{} ConfigData() map[string]interface{} SetConfigData(map[string]interface{}) error Name() string Description() string ActionType() string SetName(string) error SetDescription(string) error SetActionType(string) error }
type ActionDefinition ¶
type ActionDefinitions ¶
type ActionDefinitions map[string]ActionDefinition
type ActionMaker ¶
type ActionMaker func(params ActionSpecification) (Action, error)
type ActionMakerParams ¶
type ActionSpecification ¶
type ActionSpecification struct {
Name, Description, Type string
Definitions *Definitions
ID []byte
Config map[string]interface{}
}
type BaseAction ¶
type BaseAction struct { Spec ActionSpecification Type_ string // contains filtered or unexported fields }
func MakeBaseAction ¶
func MakeBaseAction(spec ActionSpecification, actionType string) BaseAction
func (*BaseAction) Config ¶
func (b *BaseAction) Config() map[string]interface{}
func (*BaseAction) ConfigGroup ¶
func (b *BaseAction) ConfigGroup() map[string]interface{}
By default, the config group contains the full config. This can be overwritten by specific actions to only include the config that is relevant for the functioning of the action.
func (*BaseAction) ConfigHash ¶
func (b *BaseAction) ConfigHash() ([]byte, error)
func (*BaseAction) Description ¶
func (b *BaseAction) Description() string
func (*BaseAction) HasParams ¶
func (b *BaseAction) HasParams() bool
func (*BaseAction) ID ¶
func (b *BaseAction) ID() []byte
func (*BaseAction) MarshalJSON ¶
func (b *BaseAction) MarshalJSON() ([]byte, error)
func (*BaseAction) Name ¶
func (b *BaseAction) Name() string
func (*BaseAction) ParameterGroup ¶
func (b *BaseAction) ParameterGroup(item *Item) (*ParameterGroup, error)
Returns the parameter group for a specific item
func (*BaseAction) Type ¶
func (b *BaseAction) Type() string
type BaseActionConfig ¶
type BaseActionConfig struct { Self ActionConfig Project_ Project }
func (*BaseActionConfig) Action ¶
func (b *BaseActionConfig) Action() (Action, error)
func (*BaseActionConfig) Create ¶
func (b *BaseActionConfig) Create(values map[string]interface{}) error
func (*BaseActionConfig) MarshalJSON ¶
func (b *BaseActionConfig) MarshalJSON() ([]byte, error)
func (*BaseActionConfig) Project ¶
func (b *BaseActionConfig) Project() Project
func (*BaseActionConfig) Type ¶
func (b *BaseActionConfig) Type() string
func (*BaseActionConfig) Update ¶
func (b *BaseActionConfig) Update(values map[string]interface{}) error
type BaseChannelWriter ¶
type BaseChannelWriter struct {
// contains filtered or unexported fields
}
func (*BaseChannelWriter) Error ¶
func (b *BaseChannelWriter) Error(item *Item, itemError error) error
func (*BaseChannelWriter) Message ¶
func (b *BaseChannelWriter) Message( item *Item, data map[string]interface{}, mt MessageType) error
type BaseConfig ¶
func (*BaseConfig) ChannelWriter ¶
func (b *BaseConfig) ChannelWriter(internal bool) (ChannelWriter, error)
func (*BaseConfig) Create ¶
func (b *BaseConfig) Create(values map[string]interface{}) error
func (*BaseConfig) DeleteRelated ¶ added in v0.0.10
func (b *BaseConfig) DeleteRelated() error
func (*BaseConfig) MarshalJSON ¶
func (b *BaseConfig) MarshalJSON() ([]byte, error)
func (*BaseConfig) Stream ¶
func (b *BaseConfig) Stream() Stream
func (*BaseConfig) Type ¶
func (b *BaseConfig) Type() string
func (*BaseConfig) Update ¶
func (b *BaseConfig) Update(values map[string]interface{}) error
type BaseController ¶
type BaseController struct {
// contains filtered or unexported fields
}
func MakeBaseController ¶
func MakeBaseController(settings Settings, definitions *Definitions) (BaseController, error)
func (*BaseController) Definitions ¶
func (b *BaseController) Definitions() *Definitions
func (*BaseController) GetVar ¶
func (b *BaseController) GetVar(key string) (interface{}, bool)
func (*BaseController) InitializePlugin ¶
func (b *BaseController) InitializePlugin(plugin Plugin) error
func (*BaseController) InitializePlugins ¶
func (b *BaseController) InitializePlugins() error
func (*BaseController) ParameterStore ¶
func (b *BaseController) ParameterStore() ParameterStore
func (*BaseController) RunHooks ¶
func (b *BaseController) RunHooks(name string, data interface{}) (interface{}, error)
func (*BaseController) SetVar ¶
func (b *BaseController) SetVar(key string, value interface{}) error
func (*BaseController) Settings ¶
func (b *BaseController) Settings() Settings
type BaseDataset ¶ added in v0.1.0
func (*BaseDataset) Create ¶ added in v0.1.0
func (b *BaseDataset) Create(values map[string]interface{}) error
func (*BaseDataset) MarshalJSON ¶ added in v0.1.0
func (b *BaseDataset) MarshalJSON() ([]byte, error)
func (*BaseDataset) Project ¶ added in v0.1.0
func (b *BaseDataset) Project() Project
func (*BaseDataset) Type ¶ added in v0.1.0
func (b *BaseDataset) Type() string
func (*BaseDataset) Update ¶ added in v0.1.0
func (b *BaseDataset) Update(values map[string]interface{}) error
type BaseDestination ¶
type BaseDestination struct { Self Destination Project_ Project }
func (*BaseDestination) Create ¶
func (b *BaseDestination) Create(values map[string]interface{}) error
func (*BaseDestination) MarshalJSON ¶
func (b *BaseDestination) MarshalJSON() ([]byte, error)
func (*BaseDestination) Project ¶
func (b *BaseDestination) Project() Project
func (*BaseDestination) Type ¶
func (b *BaseDestination) Type() string
func (*BaseDestination) Update ¶
func (b *BaseDestination) Update(values map[string]interface{}) error
func (*BaseDestination) Writer ¶
func (b *BaseDestination) Writer() (Writer, error)
type BaseDestinationMap ¶
type BaseDestinationMap struct {
Self DestinationMap
}
func (*BaseDestinationMap) Create ¶
func (b *BaseDestinationMap) Create(values map[string]interface{}) error
func (*BaseDestinationMap) InternalWriter ¶
func (b *BaseDestinationMap) InternalWriter() (Writer, error)
func (*BaseDestinationMap) MarshalJSON ¶
func (b *BaseDestinationMap) MarshalJSON() ([]byte, error)
func (*BaseDestinationMap) Type ¶
func (b *BaseDestinationMap) Type() string
func (*BaseDestinationMap) Update ¶
func (b *BaseDestinationMap) Update(values map[string]interface{}) error
type BaseProject ¶
type BaseProject struct { Self Project Controller_ Controller }
func (*BaseProject) Controller ¶
func (b *BaseProject) Controller() Controller
func (*BaseProject) Create ¶
func (b *BaseProject) Create(values map[string]interface{}) error
func (*BaseProject) DeleteRelated ¶ added in v0.0.10
func (b *BaseProject) DeleteRelated() error
func (*BaseProject) MarshalJSON ¶
func (b *BaseProject) MarshalJSON() ([]byte, error)
func (*BaseProject) Type ¶
func (b *BaseProject) Type() string
func (*BaseProject) Update ¶
func (b *BaseProject) Update(values map[string]interface{}) error
type BaseSource ¶
func (*BaseSource) Create ¶
func (b *BaseSource) Create(values map[string]interface{}) error
func (*BaseSource) MarshalJSON ¶
func (b *BaseSource) MarshalJSON() ([]byte, error)
func (*BaseSource) Project ¶
func (b *BaseSource) Project() Project
func (*BaseSource) Reader ¶
func (b *BaseSource) Reader() (Reader, error)
func (*BaseSource) Type ¶
func (b *BaseSource) Type() string
func (*BaseSource) Update ¶
func (b *BaseSource) Update(values map[string]interface{}) error
type BaseSourceMap ¶
type BaseSourceMap struct {
Self SourceMap
}
func (*BaseSourceMap) Create ¶
func (b *BaseSourceMap) Create(values map[string]interface{}) error
func (*BaseSourceMap) MarshalJSON ¶
func (b *BaseSourceMap) MarshalJSON() ([]byte, error)
func (*BaseSourceMap) Type ¶
func (b *BaseSourceMap) Type() string
func (*BaseSourceMap) Update ¶
func (b *BaseSourceMap) Update(values map[string]interface{}) error
type BaseStream ¶
BaseStream contains useful common functionality that should be shared by all implementations of the interface, such as validation.
func (*BaseStream) Create ¶
func (b *BaseStream) Create(values map[string]interface{}) error
func (*BaseStream) DeleteRelated ¶ added in v0.0.10
func (b *BaseStream) DeleteRelated() error
func (*BaseStream) MarshalJSON ¶
func (b *BaseStream) MarshalJSON() ([]byte, error)
func (*BaseStream) Project ¶
func (b *BaseStream) Project() Project
func (*BaseStream) Type ¶
func (b *BaseStream) Type() string
func (*BaseStream) Update ¶
func (b *BaseStream) Update(values map[string]interface{}) error
type BasicInternalPayload ¶
type BasicInternalPayload struct {
// contains filtered or unexported fields
}
func (*BasicInternalPayload) Acknowledge ¶
func (p *BasicInternalPayload) Acknowledge() error
func (*BasicInternalPayload) EndOfStream ¶
func (p *BasicInternalPayload) EndOfStream() bool
func (*BasicInternalPayload) Headers ¶
func (p *BasicInternalPayload) Headers() map[string]interface{}
func (*BasicInternalPayload) Items ¶
func (p *BasicInternalPayload) Items() []*Item
func (*BasicInternalPayload) Reject ¶
func (p *BasicInternalPayload) Reject() error
type BasicInternalReader ¶
func (*BasicInternalReader) Purge ¶
func (i *BasicInternalReader) Purge() error
func (*BasicInternalReader) Read ¶
func (i *BasicInternalReader) Read() (Payload, error)
func (*BasicInternalReader) Setup ¶
func (i *BasicInternalReader) Setup(Stream) error
func (*BasicInternalReader) SetupWithModel ¶
func (i *BasicInternalReader) SetupWithModel(model Model) error
func (*BasicInternalReader) Teardown ¶
func (i *BasicInternalReader) Teardown() error
type BasicInternalWriter ¶
func (*BasicInternalWriter) Close ¶
func (i *BasicInternalWriter) Close() error
func (*BasicInternalWriter) Setup ¶
func (i *BasicInternalWriter) Setup(Config) error
func (*BasicInternalWriter) SetupWithModel ¶
func (i *BasicInternalWriter) SetupWithModel(model Model) error
func (*BasicInternalWriter) Teardown ¶
func (i *BasicInternalWriter) Teardown() error
func (*BasicInternalWriter) Write ¶
func (i *BasicInternalWriter) Write(payload Payload) error
type BasicPayload ¶
type BasicPayload struct {
// contains filtered or unexported fields
}
func MakeBasicPayload ¶
func MakeBasicPayload(items []*Item, headers map[string]interface{}, endOfStream bool) *BasicPayload
func (*BasicPayload) Acknowledge ¶
func (b *BasicPayload) Acknowledge() error
func (*BasicPayload) EndOfStream ¶
func (b *BasicPayload) EndOfStream() bool
func (*BasicPayload) Headers ¶
func (b *BasicPayload) Headers() map[string]interface{}
func (*BasicPayload) Items ¶
func (b *BasicPayload) Items() []*Item
func (*BasicPayload) Reject ¶
func (b *BasicPayload) Reject() error
type Blueprint ¶
type Blueprint struct {
// contains filtered or unexported fields
}
func MakeBlueprint ¶
func (*Blueprint) Create ¶
func (b *Blueprint) Create(controller Controller, createProject bool) (Project, error)
func (*Blueprint) CreateWithProject ¶ added in v0.1.0
func (b *Blueprint) CreateWithProject(controller Controller, project Project) error
type ChannelWriter ¶
type ClosableWriter ¶
type ClosableWriter interface {
Close() error
}
type CommandsDefinition ¶
type CommandsDefinition struct { Name string `json:"name"` Description string `json:"description"` Maker CommandsMaker `json:"-"` }
type CommandsDefinitions ¶
type CommandsDefinitions []CommandsDefinition
type CommandsMaker ¶
type CommandsMaker func(controller Controller, definitions interface{}) ([]cli.Command, error)
type Config ¶
type Config interface { Model Status() ConfigStatus Version() string Description() string Source() string Name() string Stream() Stream Data() interface{} ChannelWriter(internal bool) (ChannelWriter, error) SetData(interface{}) error SetStatus(ConfigStatus) error SetVersion(string) error SetDescription(string) error SetSource(string) error SetName(string) error AddDestination(Destination, string, DestinationStatus) error RemoveDestination(Destination) error Destinations() (map[string][]DestinationMap, error) ActionConfigs() ([]ActionConfig, error) AddActionConfig(ActionConfig, int) error RemoveActionConfig(ActionConfig) error Processor(internal bool) (*Processor, error) }
type ConfigStatus ¶
type ConfigStatus string
const ( ActiveConfig ConfigStatus = "active" DisabledConfig ConfigStatus = "disabled" TestingConfig ConfigStatus = "testing" )
type ConfigurableAction ¶
type ConfigurableAction interface {
DoWithConfig(*Item, ChannelWriter, Config) (*Item, error)
}
type Controller ¶
type Controller interface { SetVar(key string, value interface{}) error GetVar(key string) (interface{}, bool) // Clone the controller Clone() (Controller, error) // Transaction Helpers Begin() error Commit() error Rollback() error // Retrieve Settings Settings() Settings // Initialize a plugin InitializePlugin(Plugin) error // Initialize all plugins as defined in the settings InitializePlugins() error // Streams Streams(filters map[string]interface{}) ([]Stream, error) Stream(streamID []byte) (Stream, error) // Sources Sources(filters map[string]interface{}) ([]Source, error) Source(sourceID []byte) (Source, error) // Destinations Destinations(filters map[string]interface{}) ([]Destination, error) Destination(destinationID []byte) (Destination, error) // Configs Config(configID []byte) (Config, error) // Action Configs ActionConfigs(filters map[string]interface{}) ([]ActionConfig, error) ActionConfig(configID []byte) (ActionConfig, error) Definitions() *Definitions // Retrieve a list of streams by urgency StreamsByUrgency(n int) ([]Stream, error) // Retrieve a list of sources by urgency SourcesByUrgency(n int) ([]SourceMap, error) // Retrieve a list of destinations by urgency DestinationsByUrgency(n int) ([]DestinationMap, error) // Acquire a processable entity Acquire(Processable, []byte) (bool, error) // Release a processable entity Release(Processable, []byte) (bool, error) // Send a pingback with stats for a processable entity Ping(Processable, ProcessingStats) error // Datasets Dataset(id []byte) (Dataset, error) Datasets(filter map[string]any) ([]Dataset, error) // Projects MakeProject(id []byte) Project Project(projectID []byte) (Project, error) Projects(filters map[string]interface{}) ([]Project, error) // Resets the database (warning, this is a destructive action...) ResetDB() error // Parameter store ParameterStore() ParameterStore // Run all hooks of the given name RunHooks(name string, data interface{}) (interface{}, error) }
type ControllerDefinitions ¶
type ControllerDefinitions map[string]ControllerMaker
type ControllerMaker ¶
type ControllerMaker func(map[string]interface{}, Settings, *Definitions) (Controller, error)
type CustomHashValue ¶ added in v0.0.4
type CustomHashValue interface {
HashValue() interface{}
}
type DatesSettable ¶ added in v0.1.0
type Definitions ¶
type Definitions struct { CommandsDefinitions ParameterStoreDefinitions PluginDefinitions ActionDefinitions WriterDefinitions ReaderDefinitions ControllerDefinitions HookDefinitions }
func MergeDefinitions ¶
func MergeDefinitions(a, b Definitions) Definitions
func (Definitions) Marshal ¶
func (d Definitions) Marshal() map[string]interface{}
func (Definitions) MarshalJSON ¶
func (d Definitions) MarshalJSON() ([]byte, error)
We perform JSON marshalling manually to gain more flexibility...
type Destination ¶
type Destination interface { Processable // Processable includes Model Writer() (Writer, error) ConfigData() map[string]interface{} SetConfigData(map[string]interface{}) error Name() string DestinationType() string SetDestinationType(string) error SetName(string) error Description() string SetDescription(string) error SetData(interface{}) error Data() interface{} Project() Project }
type DestinationMap ¶
type DestinationMap interface { Processable PriorityModel Destination() Destination Config() Config Name() string SetName(string) error Status() DestinationStatus SetStatus(DestinationStatus) error InternalWriter() (Writer, error) }
type DestinationStatus ¶
type DestinationStatus string
const ( ActiveDestination DestinationStatus = "active" OnDemandDestination DestinationStatus = "on-demand" DisabledDestination DestinationStatus = "disabled" TestingDestination DestinationStatus = "testing" ErrorDestination DestinationStatus = "error" WarningDestination DestinationStatus = "warning" MessageDestination DestinationStatus = "message" )
type DoableAction ¶
type DoableAction interface {
Do(*Item, ChannelWriter) (*Item, error)
}
type ErrorPolicy ¶
type ErrorPolicy string
const ( AbortOnError ErrorPolicy = "abort" ReportErrors ErrorPolicy = "report" IgnoreErrors ErrorPolicy = "ignore" )
type HookDefinition ¶
type HookDefinitions ¶
type HookDefinitions map[string][]HookDefinition
type InMemoryChannelWriter ¶
type InMemoryChannelWriter struct { Items map[string][]*Item Messages []*Message Errors []*Error Warnings []*Warning }
func MakeInMemoryChannelWriter ¶
func MakeInMemoryChannelWriter() *InMemoryChannelWriter
func (*InMemoryChannelWriter) Error ¶
func (c *InMemoryChannelWriter) Error(item *Item, err error) error
func (*InMemoryChannelWriter) Message ¶
func (c *InMemoryChannelWriter) Message(item *Item, data map[string]interface{}, mt MessageType) error
type InternalChannel ¶
func MakeInternalChannel ¶
func MakeInternalChannel() *InternalChannel
func (*InternalChannel) Purge ¶
func (a *InternalChannel) Purge() error
func (*InternalChannel) Read ¶
func (s *InternalChannel) Read() (Payload, error)
func (*InternalChannel) Setup ¶
func (s *InternalChannel) Setup(controller Controller, model Model) error
Sets up the Internal reader, which relies on an internal queue to process items.
func (*InternalChannel) Teardown ¶
func (s *InternalChannel) Teardown() error
func (*InternalChannel) Write ¶
func (s *InternalChannel) Write(payload Payload) error
We write items to the internal Internal writer.
type InternalReader ¶
type InternalReader struct {
*InternalChannel
}
Adaptor for using an internal channel as a reader
func MakeInternalReader ¶
func MakeInternalReader(channel *InternalChannel) *InternalReader
func (*InternalReader) Setup ¶
func (s *InternalReader) Setup(stream Stream) error
type InternalWriter ¶
type InternalWriter struct {
*InternalChannel
}
Adaptor for using an internal channel as a writer
func MakeInternalWriter ¶
func MakeInternalWriter(channel *InternalChannel) *InternalWriter
func (*InternalWriter) Setup ¶
func (s *InternalWriter) Setup(config Config) error
type IsActionConfig ¶
type IsActionConfig struct{}
func (IsActionConfig) Validate ¶
func (i IsActionConfig) Validate(value interface{}, values map[string]interface{}) (interface{}, error)
type IsActionSpecification ¶
type IsActionSpecification struct {
// contains filtered or unexported fields
}
func (IsActionSpecification) Validate ¶
func (i IsActionSpecification) Validate(value interface{}, values map[string]interface{}) (interface{}, error)
func (IsActionSpecification) ValidateWithContext ¶
func (i IsActionSpecification) ValidateWithContext(value interface{}, values map[string]interface{}, context map[string]interface{}) (interface{}, error)
type IsActionSpecifications ¶
type IsActionSpecifications struct{}
func (IsActionSpecifications) Validate ¶
func (f IsActionSpecifications) Validate(value interface{}, values map[string]interface{}) (interface{}, error)
type Item ¶
type Item struct {
// contains filtered or unexported fields
}
func (*Item) MarshalJSON ¶
func (*Item) SerializeJSON ¶
type ItemStore ¶
type ItemStore struct { Items map[string]map[string][]Payload // contains filtered or unexported fields }
func MakeItemStore ¶
func MakeItemStore() *ItemStore
type Message ¶
type Message struct { Type MessageType `json:"type"` Item *Item `json:"item"` Data map[string]interface{} `json:"data"` }
type MessageType ¶
type MessageType string
const ( Info MessageType = "INFO" Debug MessageType = "DEBUG" Quota MessageType = "QUOTA" )
type Meter ¶ added in v0.0.4
type Meter interface { // Add the given value to the metric Add(id string, name string, data map[string]string, tw TimeWindow, value int64) error // Return the metric and its assigned quota Get(id string, name string, data map[string]string, tw TimeWindow) (*Metric, error) // Return metrics for a given ID and time interval Range(id string, from, to int64, name, twType string) ([]*Metric, error) N(id string, to int64, n int64, name, twType string) ([]*Metric, error) }
type Metric ¶ added in v0.0.4
type Metric struct { Name string TimeWindow TimeWindow Value int64 Data map[string]string }
type ModelMeter ¶ added in v0.0.4
type ModelReader ¶
A reader that is able to write objects for a specific model such as a stream.
type ModelWriter ¶
A writer that is able to write objects for a specific model such as a stream or an destination. Used for internal data routing.
type ParameterGroup ¶
type ParameterGroup struct {
// contains filtered or unexported fields
}
func (*ParameterGroup) Data ¶
func (p *ParameterGroup) Data() map[string]interface{}
func (*ParameterGroup) Hash ¶
func (p *ParameterGroup) Hash() []byte
type ParameterSet ¶
type ParameterSet struct {
// contains filtered or unexported fields
}
func MakeParameterSet ¶
func MakeParameterSet(actions []Action, parameterStore ParameterStore) (*ParameterSet, error)
func RestoreParameterSet ¶
func RestoreParameterSet(data map[string]interface{}, parameterStore ParameterStore) (*ParameterSet, error)
func (*ParameterSet) Actions ¶
func (p *ParameterSet) Actions() []Action
func (*ParameterSet) Empty ¶
func (p *ParameterSet) Empty() bool
func (*ParameterSet) Hash ¶
func (p *ParameterSet) Hash() []byte
func (ParameterSet) MarshalJSON ¶
func (a ParameterSet) MarshalJSON() ([]byte, error)
func (*ParameterSet) ParameterStore ¶
func (p *ParameterSet) ParameterStore() ParameterStore
func (*ParameterSet) Parameters ¶
func (p *ParameterSet) Parameters() []*Parameters
Returns the parameters (in order) for the set.
func (*ParameterSet) ParametersFor ¶
func (p *ParameterSet) ParametersFor(action Action, parameterGroup *ParameterGroup) (*Parameters, bool, error)
func (*ParameterSet) SetParameterStore ¶
func (p *ParameterSet) SetParameterStore(parameterStore ParameterStore)
func (*ParameterSet) UpdateHash ¶
func (p *ParameterSet) UpdateHash() error
The hash uniquely identifies a given parameters set based on the IDs of the constitutent paremeters.
func (*ParameterSet) UpdateParameters ¶
func (p *ParameterSet) UpdateParameters(action Action, params interface{}, parameterGroup *ParameterGroup) error
type ParameterStore ¶
type ParameterStore interface { Definitions() *Definitions ParametersById(id []byte) (*Parameters, error) Parameters(action Action, parameterGroup *ParameterGroup) (*Parameters, error) ParameterSet(hash []byte) (*ParameterSet, error) SaveParameterSet(*ParameterSet) (bool, error) SaveParameters(*Parameters) (bool, error) AllParameters() ([]*Parameters, error) AllParameterSets() ([]*ParameterSet, error) }
An interface that manages action parameters
func MakeParameterStore ¶
func MakeParameterStore(settings Settings, definitions *Definitions) (ParameterStore, error)
type ParameterStoreDefinitions ¶
type ParameterStoreDefinitions map[string]ParameterStoreDefinition
type ParameterStoreMaker ¶
type ParameterStoreMaker func(map[string]interface{}, *Definitions) (ParameterStore, error)
type Parameters ¶
type Parameters struct {
// contains filtered or unexported fields
}
Represents parameters for a given ActionConfig
func MakeParameters ¶
func MakeParameters(action Action, parameterStore ParameterStore, parameters interface{}, parameterGroup *ParameterGroup) *Parameters
func RestoreParameters ¶
func RestoreParameters(data map[string]interface{}, parameterStore ParameterStore) (*Parameters, error)
func (*Parameters) Action ¶
func (p *Parameters) Action() Action
Returns the associated action config
func (Parameters) MarshalJSON ¶
func (a Parameters) MarshalJSON() ([]byte, error)
func (*Parameters) ParameterGroup ¶
func (p *Parameters) ParameterGroup() *ParameterGroup
func (*Parameters) ParameterStore ¶
func (p *Parameters) ParameterStore() ParameterStore
func (*Parameters) Parameters ¶
func (p *Parameters) Parameters() interface{}
Returns the parameters
func (*Parameters) SetParameterStore ¶
func (p *Parameters) SetParameterStore(parameterStore ParameterStore)
func (*Parameters) Valid ¶
func (p *Parameters) Valid(action Action, parameterGroup *ParameterGroup) (bool, error)
Returns whether the parameters are valid for a given parameter group
type PeekingReader ¶
type PeekingReader interface { // Read a payload but immediately reject it (if possible) Peek() (Payload, error) Reader }
A peeking reader is able to "peek" into the data stream, i.e. to read a payload but immediately put it back to
type Plugin ¶
type Plugin interface {
Initialize(*Definitions) error
}
type PluginDefinition ¶
type PluginDefinition struct { Name string Description string Maker PluginMaker `json:"-"` Form forms.Form `json:"form"` }
type PluginDefinitions ¶
type PluginDefinitions map[string]PluginDefinition
type PluginMaker ¶
type PriorityModel ¶
type PriorityModel interface { SetPriority(float64) error Priority() float64 PriorityTime() time.Time SetPriorityTime(time.Time) error SetPriorityAndTime(float64, time.Time) error }
A model that has an associated priority
type Processable ¶
type Processable interface { Model }
The Processable interface should be implemented by any object that can be processed (e.g. an source, stream or destination)
type ProcessingStats ¶
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
func MakeProcessor ¶
func MakeProcessor(parameterSet *ParameterSet, channelWriter ChannelWriter, config Config) (*Processor, error)
func (*Processor) ErrorPolicy ¶
func (p *Processor) ErrorPolicy() ErrorPolicy
func (*Processor) ParameterSet ¶
func (p *Processor) ParameterSet() *ParameterSet
func (*Processor) SetErrorPolicy ¶
func (p *Processor) SetErrorPolicy(policy ErrorPolicy)
func (*Processor) SetWriter ¶
func (p *Processor) SetWriter(channelWriter ChannelWriter)
func (*Processor) Writer ¶
func (p *Processor) Writer() ChannelWriter
type Project ¶
type Project interface { Model Name() string SetName(string) error Description() string SetDescription(string) error Data() interface{} SetData(interface{}) error SetCreatedAt(time time.Time) error SetUpdatedAt(time time.Time) error SetDeletedAt(time *time.Time) error MakeActionConfig(id []byte) ActionConfig MakeDestination(id []byte) Destination MakeSource(id []byte) Source MakeStream(id []byte) Stream DeleteRelated() error // datasets (for testing, error logging, ...) MakeDataset(id []byte) Dataset Controller() Controller }
type ReaderDefinition ¶
type ReaderDefinitions ¶
type ReaderDefinitions map[string]ReaderDefinition
type ReaderMaker ¶
type ScheduledAction ¶
type SetupAction ¶
type Source ¶
type Source interface { Processable // Processable includes Model Reader() (Reader, error) ConfigData() map[string]interface{} SetConfigData(map[string]interface{}) error SourceType() string SetSourceType(string) error Name() string SetName(string) error Description() string SetDescription(string) error // Return all streams with a given source status for this source Streams(SourceStatus) ([]Stream, error) SetData(interface{}) error Data() interface{} Project() Project Service() Service SetService(Service) error }
type SourceMap ¶
type SourceMap interface { Processable PriorityModel Source() Source Stream() Stream Status() SourceStatus SetStatus(SourceStatus) error // Return the current session (only applicable for batch sources) Session() interface{} // Update the current session (only applicable for batch sources) SetSession(interface{}) error }
type SourceStatus ¶
type SourceStatus string
const ( // If a source is active, we try to read from it ActiveSource SourceStatus = "active" // If it is disabled, we ignore it DisabledSource SourceStatus = "disabled" )
type StatefulAction ¶
type StatefulAction interface { // Resets the action Reset() error // Finalizes the action Finalize(ChannelWriter) ([]*Item, error) // Advances the action Advance(ChannelWriter) ([]*Item, error) }
type StatsModel ¶
type StatsModel interface { Stats() (map[string]int64, error) Stat(string) (int64, error) // Set a given statistic SetStat(string, int64) error // Add to a given statistic AddToStat(string, int64) error }
A model that allows storing/retrieving statistics
type Stream ¶
type Stream interface { Processable // Processable includes Model PriorityModel Configs() ([]Config, error) Config(id []byte) (Config, error) MakeConfig(id []byte) Config AddSource(Source, SourceStatus) error RemoveSource(Source) error Sources() (map[string]SourceMap, error) Status() StreamStatus SetStatus(StreamStatus) error Name() string SetName(string) error Description() string SetDescription(string) error SetData(interface{}) error Data() interface{} Project() Project }
type StreamStats ¶
type StreamStatus ¶
type StreamStatus string
const ( ActiveStream StreamStatus = "active" DisabledStream StreamStatus = "disabled" TestingStream StreamStatus = "testing" )
type Tag ¶ added in v0.0.4
func ExtractTags ¶ added in v0.0.4
func ExtractTags(field reflect.StructField, tag string) []Tag
type TeardownAction ¶
type TeardownAction interface {
Teardown() error
}
type TimeWindow ¶ added in v0.0.4
func Day ¶ added in v0.0.4
func Day(value int64) TimeWindow
func Hour ¶ added in v0.0.4
func Hour(value int64) TimeWindow
func MakeTimeWindow ¶ added in v0.0.4
func MakeTimeWindow(t int64, twType string) TimeWindow
func Minute ¶ added in v0.0.4
func Minute(value int64) TimeWindow
func Month ¶ added in v0.0.4
func Month(value int64) TimeWindow
func Second ¶ added in v0.0.4
func Second(value int64) TimeWindow
func Week ¶ added in v0.0.4
func Week(value int64) TimeWindow
func (*TimeWindow) Copy ¶ added in v0.0.4
func (t *TimeWindow) Copy() TimeWindow
func (*TimeWindow) IncreaseBy ¶ added in v0.0.4
func (t *TimeWindow) IncreaseBy(n int64)
type TimeWindowFunc ¶ added in v0.0.4
type TimeWindowFunc func(int64) TimeWindow
type UndoableAction ¶
type WriterDefinition ¶
type WriterDefinitions ¶
type WriterDefinitions map[string]WriterDefinition
type WriterMaker ¶
Source Files ¶
- action.go
- action_config.go
- blueprint.go
- channel_writer.go
- command.go
- config.go
- controller.go
- dataset.go
- definitions.go
- destination.go
- destination_map.go
- hash.go
- hook.go
- internal_channel.go
- item.go
- log.go
- meter.go
- model.go
- ops.go
- parameters.go
- payload.go
- plugin.go
- processable.go
- processor.go
- project.go
- random.go
- reader.go
- service.go
- settings.go
- source.go
- source_map.go
- stream.go
- time_window.go
- version.go
- writer.go
Directories ¶
Path | Synopsis |
---|---|
v1/resources/pcap/pcapgo
Package pcapgo provides some native PCAP support, not requiring C libpcap to be installed.
|
Package pcapgo provides some native PCAP support, not requiring C libpcap to be installed. |
writers/example
Module
|
|