Documentation ¶
Index ¶
- Constants
- Variables
- func ClearKVStorage() error
- func CorrectsConfigKeysByJson(configs map[string]interface{}, jsonFilePath string) error
- func DropCfgKeyFromStorage(typ string, plugin string, confKey string) error
- func GetAllConnConfigs() (map[string]map[string]map[string]any, error)
- func GetCfgFromKVStorage(typ string, plugin string, confKey string) (map[string]map[string]interface{}, error)
- func GetConfLoc() (s string, err error)
- func GetDataLoc() (s string, err error)
- func GetEnv() map[string]string
- func GetLoc(subdir string) (string, error)
- func GetLogLoc() (string, error)
- func GetPluginsLoc() (s string, err error)
- func GetYamlConfigAllKeys(typ string) (map[string]struct{}, error)
- func InitConf()
- func LoadCfgKeyKV(key string) (map[string]interface{}, error)
- func LoadConfig(c interface{}) error
- func LoadConfigByName(name string, c interface{}) error
- func LoadConfigFromPath(p string, c interface{}) error
- func NewSqliteKVStore(table string) (*sqlKVStore, error)
- func Printable(m map[string]interface{}) map[string]interface{}
- func ProcessPath(p string) (string, error)
- func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
- func SaveCfgKeyToKV(key string, cfg map[string]interface{}) error
- func SetConsoleAndFileLog(consoleLog, fileLog bool) error
- func SetLogFormat(disableTimestamp bool)
- func SetLogLevel(level string, debug bool)
- func SetupConnectionProps()
- func SetupEnv()
- func ValidateRuleOption(option *def.RuleOption) error
- func WriteCfgIntoKVStorage(typ string, plugin string, confKey string, confData map[string]interface{}) error
- type ConSelector
- type ConfKeysOperator
- type ConfigKeys
- func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) ClearConfKeys()
- func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
- func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
- func (c *ConfigKeys) DeleteConfKey(confKey string)
- func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
- func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
- func (c *ConfigKeys) GetConfKeys() (keys []string)
- func (c *ConfigKeys) GetPluginName() string
- func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
- func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
- func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
- type ConfigOperator
- func NewConfigOperatorForConnection(pluginName string) ConfigOperator
- func NewConfigOperatorForSink(pluginName string) ConfigOperator
- func NewConfigOperatorForSource(pluginName string) ConfigOperator
- func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
- func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
- type ConnectionConfigKeysOps
- type EnvManager
- type JsonPathEval
- type KuiperConf
- type OpenTelemetry
- type PathConfigure
- type SQLConf
- type SinkConf
- type SinkConfigKeysOps
- type SourceConf
- type SourceConfigKeysOps
- type TlsConf
Constants ¶
const ( ConfFileName = "kuiper.yaml" DebugLogLevel = "debug" InfoLogLevel = "info" WarnLogLevel = "warn" ErrorLogLevel = "error" FatalLogLevel = "fatal" PanicLogLevel = "panic" )
const (
KuiperBaseKey = "KuiperBaseKey"
)
const Separator = "__"
Variables ¶
var ( Config *KuiperConf IsTesting bool TestId string )
var CloseLogger = logger.CloseLogger
var FuncMap template.FuncMap
var LoadConfigCache map[string]map[string]interface{}
var (
Log = logger.Log
)
Functions ¶
func DropCfgKeyFromStorage ¶
DropCfgKeyFromStorage ...
func GetAllConnConfigs ¶
GetAllConnConfigs return connections' plugin -> confKey -> props
func GetCfgFromKVStorage ¶
func GetCfgFromKVStorage(typ string, plugin string, confKey string) (map[string]map[string]interface{}, error)
GetCfgFromKVStorage ...
func GetConfLoc ¶
func GetDataLoc ¶
func GetPluginsLoc ¶
func GetYamlConfigAllKeys ¶
GetYamlConfigAllKeys get all plugin keys about sources/sinks/connections
func LoadCfgKeyKV ¶
func LoadConfig ¶
func LoadConfig(c interface{}) error
func LoadConfigByName ¶
func LoadConfigFromPath ¶
func NewSqliteKVStore ¶
func ProcessPath ¶
func RedisStorageConSelectorApply ¶
func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error
func SaveCfgKeyToKV ¶
SaveCfgKeyToKV ...
func SetConsoleAndFileLog ¶
func SetLogFormat ¶
func SetLogFormat(disableTimestamp bool)
func SetLogLevel ¶
func SetupConnectionProps ¶
func SetupConnectionProps()
func ValidateRuleOption ¶
func ValidateRuleOption(option *def.RuleOption) error
Types ¶
type ConSelector ¶
type ConSelector struct { ConnSelectorStr string Type string // mqtt edgex CfgKey string // config key }
func (*ConSelector) Init ¶
func (c *ConSelector) Init() error
func (*ConSelector) ReadCfgFromYaml ¶
func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error)
type ConfKeysOperator ¶
type ConfKeysOperator interface { GetPluginName() string GetConfContentByte() ([]byte, error) // CopyConfContent get the configurations in etc and data folder CopyConfContent() map[string]map[string]interface{} // CopyReadOnlyConfContent get the configurations in etc folder CopyReadOnlyConfContent() map[string]map[string]interface{} // CopyUpdatableConfContent get the configurations in data folder CopyUpdatableConfContent() map[string]map[string]interface{} // CopyUpdatableConfContentFor get the configuration for the specific configKeys CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{} // LoadConfContent load the configurations into data configuration part LoadConfContent(cf map[string]map[string]interface{}) GetConfKeys() (keys []string) GetReadOnlyConfKeys() (keys []string) GetUpdatableConfKeys() (keys []string) DeleteConfKey(confKey string) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error AddConfKey(confKey string, reqField map[string]interface{}) error AddConfKeyField(confKey string, reqField map[string]interface{}) error ClearConfKeys() }
ConfKeysOperator define interface to query/add/update/delete the configs in memory
type ConfigKeys ¶
type ConfigKeys struct {
// contains filtered or unexported fields
}
ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml Hold the connection configs for each connection type in etcCfg field Provide method to query/add/update/delete the configs
func (*ConfigKeys) AddConfKey ¶
func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) AddConfKeyField ¶
func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) ClearConfKeys ¶
func (c *ConfigKeys) ClearConfKeys()
func (*ConfigKeys) CopyConfContent ¶
func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyReadOnlyConfContent ¶
func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContent ¶
func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}
func (*ConfigKeys) CopyUpdatableConfContentFor ¶
func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
func (*ConfigKeys) DeleteConfKey ¶
func (c *ConfigKeys) DeleteConfKey(confKey string)
func (*ConfigKeys) DeleteConfKeyField ¶
func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
func (*ConfigKeys) GetConfContentByte ¶
func (c *ConfigKeys) GetConfContentByte() ([]byte, error)
func (*ConfigKeys) GetConfKeys ¶
func (c *ConfigKeys) GetConfKeys() (keys []string)
func (*ConfigKeys) GetPluginName ¶
func (c *ConfigKeys) GetPluginName() string
func (*ConfigKeys) GetReadOnlyConfKeys ¶
func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)
func (*ConfigKeys) GetUpdatableConfKeys ¶
func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)
func (*ConfigKeys) LoadConfContent ¶
func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})
type ConfigOperator ¶
type ConfigOperator interface { ConfKeysOperator SaveCfgToStorage() error }
ConfigOperator define interface to query/add/update/delete the configs in disk
func NewConfigOperatorForConnection ¶
func NewConfigOperatorForConnection(pluginName string) ConfigOperator
NewConfigOperatorForConnection construct function
func NewConfigOperatorForSink ¶
func NewConfigOperatorForSink(pluginName string) ConfigOperator
NewConfigOperatorForSink construct function
func NewConfigOperatorForSource ¶
func NewConfigOperatorForSource(pluginName string) ConfigOperator
NewConfigOperatorForSource construct function
func NewConfigOperatorFromConnectionStorage ¶
func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromConnectionStorage construct function, Load the configs from et/connections/connection.yaml
func NewConfigOperatorFromSinkStorage ¶
func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSinkStorage construct function, Load the configs from etc/sources/xx.yaml
func NewConfigOperatorFromSourceStorage ¶
func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)
NewConfigOperatorFromSourceStorage construct function, Load the configs from etc/sources/xx.yaml
type ConnectionConfigKeysOps ¶
type ConnectionConfigKeysOps struct {
*ConfigKeys
}
ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml
func (*ConnectionConfigKeysOps) SaveCfgToStorage ¶
func (p *ConnectionConfigKeysOps) SaveCfgToStorage() error
type EnvManager ¶
type EnvManager struct {
// contains filtered or unexported fields
}
func (*EnvManager) GetEnv ¶
func (e *EnvManager) GetEnv() map[string]string
func (*EnvManager) Setup ¶
func (e *EnvManager) Setup()
func (*EnvManager) SetupConnectionProps ¶
func (e *EnvManager) SetupConnectionProps()
type JsonPathEval ¶
type JsonPathEval interface {
Eval(data interface{}) (interface{}, error)
}
func GetJsonPathEval ¶
func GetJsonPathEval(jsonpath string) (JsonPathEval, error)
type KuiperConf ¶
type KuiperConf struct { Basic struct { LogLevel string `yaml:"logLevel"` Debug bool `yaml:"debug"` ConsoleLog bool `yaml:"consoleLog"` FileLog bool `yaml:"fileLog"` LogDisableTimestamp bool `yaml:"logDisableTimestamp"` Syslog *syslogConf `yaml:"syslog"` RotateTime int `yaml:"rotateTime"` MaxAge int `yaml:"maxAge"` RotateSize int64 `yaml:"rotateSize"` RotateCount int `yaml:"rotateCount"` TimeZone string `yaml:"timezone"` Ip string `yaml:"ip"` Port int `yaml:"port"` RestIp string `yaml:"restIp"` RestPort int `yaml:"restPort"` RestTls *TlsConf `yaml:"restTls"` Prometheus bool `yaml:"prometheus"` PrometheusPort int `yaml:"prometheusPort"` PluginHosts string `yaml:"pluginHosts"` Authentication bool `yaml:"authentication"` IgnoreCase bool `yaml:"ignoreCase"` SQLConf *SQLConf `yaml:"sql"` RulePatrolInterval cast.DurationConf `yaml:"rulePatrolInterval"` EnableOpenZiti bool `yaml:"enableOpenZiti"` AesKey string `yaml:"aesKey"` GracefulShutdownTimeout cast.DurationConf `yaml:"gracefulShutdownTimeout"` EnableResourceProfiling bool `yaml:"enableResourceProfiling"` } Rule def.RuleOption Sink *SinkConf Source *SourceConf Store struct { Type string `yaml:"type"` ExtStateType string `yaml:"extStateType"` Redis struct { Host string `yaml:"host"` Port int `yaml:"port"` Password string `yaml:"password"` Timeout cast.DurationConf `yaml:"timeout"` ConnectionSelector string `yaml:"connectionSelector"` } Sqlite struct { Name string `yaml:"name"` } Fdb struct { Path string `yaml:"path"` } } Portable struct { PythonBin string `yaml:"pythonBin"` InitTimeout cast.DurationConf `yaml:"initTimeout"` } Connection struct { BackoffMaxElapsedDuration cast.DurationConf `yaml:"backoffMaxElapsedDuration"` } OpenTelemetry OpenTelemetry `yaml:"openTelemetry"` AesKey []byte }
type OpenTelemetry ¶
type PathConfigure ¶
var ( PathConfig PathConfigure AbsoluteMapping = map[string]string{ // contains filtered or unexported fields } )
type SinkConf ¶
type SinkConf struct { MemoryCacheThreshold int `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"` MaxDiskCache int `json:"maxDiskCache" yaml:"maxDiskCache"` BufferPageSize int `json:"bufferPageSize" yaml:"bufferPageSize"` EnableCache bool `json:"enableCache" yaml:"enableCache"` ResendInterval cast.DurationConf `json:"resendInterval" yaml:"resendInterval"` CleanCacheAtStop bool `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"` ResendAlterQueue bool `json:"resendAlterQueue" yaml:"resendAlterQueue"` ResendPriority int `json:"resendPriority" yaml:"resendPriority"` ResendIndicatorField string `json:"resendIndicatorField" yaml:"resendIndicatorField"` ResendDestination string `json:"resendDestination" yaml:"resendDestination"` }
type SinkConfigKeysOps ¶
type SinkConfigKeysOps struct {
*ConfigKeys
}
SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml
func (*SinkConfigKeysOps) SaveCfgToStorage ¶
func (c *SinkConfigKeysOps) SaveCfgToStorage() error
type SourceConf ¶
type SourceConf struct { HttpServerIp string `json:"httpServerIp" yaml:"httpServerIp"` HttpServerPort int `json:"httpServerPort" yaml:"httpServerPort"` HttpServerTls *TlsConf `json:"httpServerTls" yaml:"httpServerTls"` }
func (*SourceConf) Validate ¶
func (sc *SourceConf) Validate() error
type SourceConfigKeysOps ¶
type SourceConfigKeysOps struct {
*ConfigKeys
}
SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml
func (*SourceConfigKeysOps) SaveCfgToStorage ¶
func (c *SourceConfigKeysOps) SaveCfgToStorage() error