Documentation ¶
Index ¶
- Constants
- Variables
- func CorrectsConfigKeysByJson(configs map[string]interface{}, jsonFilePath string) error
- func GetConfLoc() (string, error)
- func GetDataLoc() (string, error)
- func GetLoc(subdir string) (string, error)
- func GetLocalZone() int
- func GetLogLoc() (string, error)
- func GetNow() time.Time
- func GetNowInMilli() int64
- func GetPluginsLoc() (string, error)
- func GetTicker(duration int64) *clock.Ticker
- func GetTimer(duration int64) *clock.Timer
- func GetTimerByTime(t time.Time) *clock.Timer
- func InitClock()
- func InitConf()
- func LoadConfig(c interface{}) error
- func LoadConfigByName(name string, c interface{}) error
- func LoadConfigFromPath(p string, c interface{}) error
- func NewSqliteKVStore(table string) (*sqlKVStore, error)
- func Printable(m map[string]interface{}) map[string]interface{}
- func ProcessPath(p string) (string, error)
- func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
- func SetConsoleAndFileLog(consoleLog, fileLog bool) error
- func SetLogFormat(disableTimestamp bool)
- func SetLogLevel(level string, debug bool)
- func ValidateRuleOption(option *api.RuleOption) error
- type ConSelector
- type ConfKeysOperator
- type ConfigKeys
- func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) ClearConfKeys()
- func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
- func (c *ConfigKeys) DeleteConfKey(confKey string)
- func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
- func (c *ConfigKeys) GetConfKeys() (keys []string)
- func (c *ConfigKeys) GetPluginName() string
- func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
- func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
- func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
- type ConfigOperator
- func NewConfigOperatorForConnection(pluginName string) ConfigOperator
- func NewConfigOperatorForSink(pluginName string) ConfigOperator
- func NewConfigOperatorForSource(pluginName string) ConfigOperator
- func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
- type ConnectionConfigKeysOps
- type JsonPathEval
- type KuiperConf
- type PathConfigure
- type SQLConf
- type SinkConf
- type SinkConfigKeysOps
- type SourceConf
- type SourceConfigKeysOps
Constants ¶
const ( ConfFileName = "kuiper.yaml" DebugLogLevel = "debug" InfoLogLevel = "info" WarnLogLevel = "warn" ErrorLogLevel = "error" FatalLogLevel = "fatal" PanicLogLevel = "panic" )
const (
KuiperBaseKey = "KuiperBaseKey"
)
const Separator = "__"
Variables ¶
var ( Config *KuiperConf IsTesting bool TestId string )
var Clock clock.Clock
var CloseLogger = logger.CloseLogger
var FuncMap template.FuncMap
var (
Log = logger.Log
)
Functions ¶
func GetConfLoc ¶
func GetDataLoc ¶
func GetLocalZone ¶
func GetLocalZone() int
func GetNowInMilli ¶
func GetNowInMilli() int64
func GetPluginsLoc ¶
func LoadConfig ¶
func LoadConfig(c interface{}) error
func LoadConfigByName ¶
func LoadConfigFromPath ¶
func NewSqliteKVStore ¶
func ProcessPath ¶
func RedisStorageConSelectorApply ¶
func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
func SetConsoleAndFileLog ¶
func SetLogFormat ¶
func SetLogFormat(disableTimestamp bool)
func SetLogLevel ¶
func ValidateRuleOption ¶
func ValidateRuleOption(option *api.RuleOption) error
Types ¶
type ConSelector ¶
type ConSelector struct { ConnSelectorStr string Type string // mqtt edgex CfgKey string // config key }
func (*ConSelector) Init ¶
func (c *ConSelector) Init() error
func (*ConSelector) ReadCfgFromYaml ¶
func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error)
type ConfKeysOperator ¶
type ConfKeysOperator interface { GetPluginName() string GetConfContentByte() ([]byte, error) // CopyConfContent get the configurations in etc and data folder CopyConfContent() map[string]map[string]interface{} // CopyReadOnlyConfContent get the configurations in etc folder CopyReadOnlyConfContent() map[string]map[string]interface{} // CopyUpdatableConfContent get the configurations in data folder CopyUpdatableConfContent() map[string]map[string]interface{} // CopyUpdatableConfContentFor get the configuration for the specific configKeys CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{} // LoadConfContent load the configurations into data configuration part LoadConfContent(cf map[string]map[string]interface{}) GetConfKeys() (keys []string) GetReadOnlyConfKeys() (keys []string) GetUpdatableConfKeys() (keys []string) DeleteConfKey(confKey string) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error AddConfKey(confKey string, reqField map[string]interface{}) error AddConfKeyField(confKey string, reqField map[string]interface{}) error ClearConfKeys() }
ConfKeysOperator define interface to query/add/update/delete the configs in memory
type ConfigKeys ¶
type ConfigKeys struct {
// contains filtered or unexported fields
}
ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml Hold the connection configs for each connection type in etcCfg field Provide method to query/add/update/delete the configs
func (*ConfigKeys) AddConfKey ¶
func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) AddConfKeyField ¶
func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) ClearConfKeys ¶
func (c *ConfigKeys) ClearConfKeys()
func (*ConfigKeys) CopyConfContent ¶
func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyReadOnlyConfContent ¶
func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContent ¶
func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContentFor ¶
func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
func (*ConfigKeys) DeleteConfKey ¶
func (c *ConfigKeys) DeleteConfKey(confKey string)
func (*ConfigKeys) DeleteConfKeyField ¶
func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) GetConfContentByte ¶
func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
func (*ConfigKeys) GetConfKeys ¶
func (c *ConfigKeys) GetConfKeys() (keys []string)
func (*ConfigKeys) GetPluginName ¶
func (c *ConfigKeys) GetPluginName() string
func (*ConfigKeys) GetReadOnlyConfKeys ¶
func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
func (*ConfigKeys) GetUpdatableConfKeys ¶
func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
func (*ConfigKeys) LoadConfContent ¶
func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
type ConfigOperator ¶
type ConfigOperator interface { ConfKeysOperator SaveCfgToStorage() error }
ConfigOperator define interface to query/add/update/delete the configs in disk
func NewConfigOperatorForConnection ¶
func NewConfigOperatorForConnection(pluginName string) ConfigOperator
NewConfigOperatorForConnection construct function
func NewConfigOperatorForSink ¶
func NewConfigOperatorForSink(pluginName string) ConfigOperator
NewConfigOperatorForSink construct function
func NewConfigOperatorForSource ¶
func NewConfigOperatorForSource(pluginName string) ConfigOperator
NewConfigOperatorForSource construct function
func NewConfigOperatorFromConnectionStorage ¶
func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromConnectionStorage construct function, Load the configs from et/connections/connection.yaml
func NewConfigOperatorFromSinkStorage ¶
func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSinkStorage construct function, Load the configs from etc/sources/xx.yaml
func NewConfigOperatorFromSourceStorage ¶
func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSourceStorage construct function, Load the configs from etc/sources/xx.yaml
type ConnectionConfigKeysOps ¶
type ConnectionConfigKeysOps struct {
*ConfigKeys
}
ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
func (*ConnectionConfigKeysOps) SaveCfgToStorage ¶
func (p *ConnectionConfigKeysOps) SaveCfgToStorage() error
type JsonPathEval ¶
type JsonPathEval interface {
Eval(data interface{}) (interface{}, error)
}
func GetJsonPathEval ¶
func GetJsonPathEval(jsonpath string) (JsonPathEval, error)
type KuiperConf ¶
type KuiperConf struct { Basic struct { LogLevel string `yaml:"logLevel"` Debug bool `yaml:"debug"` ConsoleLog bool `yaml:"consoleLog"` FileLog bool `yaml:"fileLog"` LogDisableTimestamp bool `yaml:"logDisableTimestamp"` Syslog *syslogConf `yaml:"syslog"` RotateTime int `yaml:"rotateTime"` MaxAge int `yaml:"maxAge"` RotateSize int64 `yaml:"rotateSize"` RotateCount int `yaml:"rotateCount"` TimeZone string `yaml:"timezone"` Ip string `yaml:"ip"` Port int `yaml:"port"` RestIp string `yaml:"restIp"` RestPort int `yaml:"restPort"` RestTls *tlsConf `yaml:"restTls"` Prometheus bool `yaml:"prometheus"` PrometheusPort int `yaml:"prometheusPort"` PluginHosts string `yaml:"pluginHosts"` Authentication bool `yaml:"authentication"` IgnoreCase bool `yaml:"ignoreCase"` SQLConf *SQLConf `yaml:"sql"` RulePatrolInterval string `yaml:"rulePatrolInterval"` CfgStorageType string `yaml:"cfgStorageType"` EnableOpenZiti bool `yaml:"enableOpenZiti"` } Rule api.RuleOption Sink *SinkConf Source *SourceConf Store struct { Type string `yaml:"type"` ExtStateType string `yaml:"extStateType"` Redis struct { Host string `yaml:"host"` Port int `yaml:"port"` Password string `yaml:"password"` Timeout int `yaml:"timeout"` ConnectionSelector string `yaml:"connectionSelector"` } Sqlite struct { Name string `yaml:"name"` } Fdb struct { Path string `yaml:"path"` } } Portable struct { PythonBin string `yaml:"pythonBin"` InitTimeout int `yaml:"initTimeout"` } }
type PathConfigure ¶
var ( PathConfig PathConfigure AbsoluteMapping = map[string]string{ // contains filtered or unexported fields } )
type SinkConf ¶
type SinkConf struct { MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"` MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"` BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"` EnableCache bool `json:"enableCache" yaml:"enableCache"` ResendInterval int `json:"resendInterval" yaml:"resendInterval"` CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"` ResendAlterQueue bool `json:"resendAlterQueue" yaml:"resendAlterQueue"` ResendPriority int `json:"resendPriority" yaml:"resendPriority"` ResendIndicatorField string `json:"resendIndicatorField" yaml:"resendIndicatorField"` }
type SinkConfigKeysOps ¶
type SinkConfigKeysOps struct {
*ConfigKeys
}
SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
func (*SinkConfigKeysOps) SaveCfgToStorage ¶
func (c *SinkConfigKeysOps) SaveCfgToStorage() error
type SourceConf ¶
type SourceConf struct { HttpServerIp string `json:"httpServerIp" yaml:"httpServerIp"` HttpServerPort int `json:"httpServerPort" yaml:"httpServerPort"` HttpServerTls *tlsConf `json:"httpServerTls" yaml:"httpServerTls"` }
func (*SourceConf) Validate ¶
func (sc *SourceConf) Validate() error
type SourceConfigKeysOps ¶
type SourceConfigKeysOps struct {
*ConfigKeys
}
SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
func (*SourceConfigKeysOps) SaveCfgToStorage ¶
func (c *SourceConfigKeysOps) SaveCfgToStorage() error