conf

package
v2.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 9, 2024 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ConfFileName  = "kuiper.yaml"
	DebugLogLevel = "debug"
	InfoLogLevel  = "info"
	WarnLogLevel  = "warn"
	ErrorLogLevel = "error"
	FatalLogLevel = "fatal"
	PanicLogLevel = "panic"
)
View Source
const (
	KuiperBaseKey = "KuiperBaseKey"
)
View Source
const Separator = "__"

Variables

View Source
var (
	Config    *KuiperConf
	IsTesting bool
	TestId    string
)
View Source
var CloseLogger = logger.CloseLogger
View Source
var LoadConfigCache map[string]map[string]interface{}
View Source
var (
	Log = logger.Log
)

Functions

func ClearKVStorage

func ClearKVStorage() error

ClearKVStorage only used in unit test

func CorrectsConfigKeysByJson

func CorrectsConfigKeysByJson(configs map[string]interface{}, jsonFilePath string) error

func DropCfgKeyFromStorage

func DropCfgKeyFromStorage(typ string, plugin string, confKey string) error

DropCfgKeyFromStorage ...

func GetAllConnConfigs

func GetAllConnConfigs() (map[string]map[string]map[string]any, error)

GetAllConnConfigs return connections' plugin -> confKey -> props

func GetCfgFromKVStorage

func GetCfgFromKVStorage(typ string, plugin string, confKey string) (map[string]map[string]interface{}, error)

GetCfgFromKVStorage ...

func GetConfLoc

func GetConfLoc() (s string, err error)

func GetDataLoc

func GetDataLoc() (s string, err error)

func GetEnv

func GetEnv() map[string]string

func GetLoc

func GetLoc(subdir string) (string, error)

GetLoc subdir must be a relative path

func GetLogLoc

func GetLogLoc() (string, error)

func GetPluginsLoc

func GetPluginsLoc() (s string, err error)

func GetYamlConfigAllKeys

func GetYamlConfigAllKeys(typ string) (map[string]struct{}, error)

GetYamlConfigAllKeys get all plugin keys about sources/sinks/connections

func InitConf

func InitConf()

func LoadCfgKeyKV

func LoadCfgKeyKV(key string) (map[string]interface{}, error)

func LoadConfig

func LoadConfig(c interface{}) error

func LoadConfigByName

func LoadConfigByName(name string, c interface{}) error

func LoadConfigFromPath

func LoadConfigFromPath(p string, c interface{}) error

func NewSqliteKVStore

func NewSqliteKVStore(table string) (*sqlKVStore, error)

func Printable

func Printable(m map[string]interface{}) map[string]interface{}

func ProcessPath

func ProcessPath(p string) (string, error)

func RedisStorageConSelectorApply

func RedisStorageConSelectorApply(connectionSelector string, conf *KuiperConf) error

func SaveCfgKeyToKV

func SaveCfgKeyToKV(key string, cfg map[string]interface{}) error

SaveCfgKeyToKV ...

func SetConsoleAndFileLog

func SetConsoleAndFileLog(consoleLog, fileLog bool) error

func SetLogFormat

func SetLogFormat(disableTimestamp bool)

func SetLogLevel

func SetLogLevel(level string, debug bool)

func SetupConnectionProps

func SetupConnectionProps()

func SetupEnv

func SetupEnv()

func ValidateRuleOption

func ValidateRuleOption(option *def.RuleOption) error

func WriteCfgIntoKVStorage

func WriteCfgIntoKVStorage(typ string, plugin string, confKey string, confData map[string]interface{}) error

WriteCfgIntoKVStorage ...

Types

type ConSelector

type ConSelector struct {
	ConnSelectorStr string
	Type            string // mqtt edgex
	CfgKey          string // config key
}

func (*ConSelector) Init

func (c *ConSelector) Init() error

func (*ConSelector) ReadCfgFromYaml

func (c *ConSelector) ReadCfgFromYaml() (props map[string]interface{}, err error)

type ConfKeysOperator

type ConfKeysOperator interface {
	GetPluginName() string
	GetConfContentByte() ([]byte, error)
	// CopyConfContent get the configurations in etc and data folder
	CopyConfContent() map[string]map[string]interface{}
	// CopyReadOnlyConfContent get the configurations in etc folder
	CopyReadOnlyConfContent() map[string]map[string]interface{}
	// CopyUpdatableConfContent get the configurations in data folder
	CopyUpdatableConfContent() map[string]map[string]interface{}
	// CopyUpdatableConfContentFor get the configuration for the specific configKeys
	CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}
	// LoadConfContent load the configurations into data configuration part
	LoadConfContent(cf map[string]map[string]interface{})
	GetConfKeys() (keys []string)
	GetReadOnlyConfKeys() (keys []string)
	GetUpdatableConfKeys() (keys []string)
	DeleteConfKey(confKey string)
	DeleteConfKeyField(confKey string, reqField map[string]interface{}) error
	AddConfKey(confKey string, reqField map[string]interface{}) error
	AddConfKeyField(confKey string, reqField map[string]interface{}) error
	ClearConfKeys()
}

ConfKeysOperator define interface to query/add/update/delete the configs in memory

type ConfigKeys

type ConfigKeys struct {
	// contains filtered or unexported fields
}

ConfigKeys implement ConfKeysOperator interface, load the configs from etc/sources/xx.yaml and et/connections/connection.yaml Hold the connection configs for each connection type in etcCfg field Provide method to query/add/update/delete the configs

func (*ConfigKeys) AddConfKey

func (c *ConfigKeys) AddConfKey(confKey string, reqField map[string]interface{}) error

func (*ConfigKeys) AddConfKeyField

func (c *ConfigKeys) AddConfKeyField(confKey string, reqField map[string]interface{}) error

func (*ConfigKeys) ClearConfKeys

func (c *ConfigKeys) ClearConfKeys()

func (*ConfigKeys) CopyConfContent

func (c *ConfigKeys) CopyConfContent() map[string]map[string]interface{}

func (*ConfigKeys) CopyReadOnlyConfContent

func (c *ConfigKeys) CopyReadOnlyConfContent() map[string]map[string]interface{}

func (*ConfigKeys) CopyUpdatableConfContent

func (c *ConfigKeys) CopyUpdatableConfContent() map[string]map[string]interface{}

func (*ConfigKeys) CopyUpdatableConfContentFor

func (c *ConfigKeys) CopyUpdatableConfContentFor(configKeys []string) map[string]map[string]interface{}

func (*ConfigKeys) DeleteConfKey

func (c *ConfigKeys) DeleteConfKey(confKey string)

func (*ConfigKeys) DeleteConfKeyField

func (c *ConfigKeys) DeleteConfKeyField(confKey string, reqField map[string]interface{}) error

func (*ConfigKeys) GetConfContentByte

func (c *ConfigKeys) GetConfContentByte() ([]byte, error)

func (*ConfigKeys) GetConfKeys

func (c *ConfigKeys) GetConfKeys() (keys []string)

func (*ConfigKeys) GetPluginName

func (c *ConfigKeys) GetPluginName() string

func (*ConfigKeys) GetReadOnlyConfKeys

func (c *ConfigKeys) GetReadOnlyConfKeys() (keys []string)

func (*ConfigKeys) GetUpdatableConfKeys

func (c *ConfigKeys) GetUpdatableConfKeys() (keys []string)

func (*ConfigKeys) LoadConfContent

func (c *ConfigKeys) LoadConfContent(cf map[string]map[string]interface{})

type ConfigOperator

type ConfigOperator interface {
	ConfKeysOperator
	SaveCfgToStorage() error
}

ConfigOperator define interface to query/add/update/delete the configs in disk

func NewConfigOperatorForConnection

func NewConfigOperatorForConnection(pluginName string) ConfigOperator

NewConfigOperatorForConnection construct function

func NewConfigOperatorForSink

func NewConfigOperatorForSink(pluginName string) ConfigOperator

NewConfigOperatorForSink construct function

func NewConfigOperatorForSource

func NewConfigOperatorForSource(pluginName string) ConfigOperator

NewConfigOperatorForSource construct function

func NewConfigOperatorFromConnectionStorage

func NewConfigOperatorFromConnectionStorage(pluginName string) (ConfigOperator, error)

NewConfigOperatorFromConnectionStorage construct function, Load the configs from et/connections/connection.yaml

func NewConfigOperatorFromSinkStorage

func NewConfigOperatorFromSinkStorage(pluginName string) (ConfigOperator, error)

NewConfigOperatorFromSinkStorage construct function, Load the configs from etc/sources/xx.yaml

func NewConfigOperatorFromSourceStorage

func NewConfigOperatorFromSourceStorage(pluginName string) (ConfigOperator, error)

NewConfigOperatorFromSourceStorage construct function, Load the configs from etc/sources/xx.yaml

type ConnectionConfigKeysOps

type ConnectionConfigKeysOps struct {
	*ConfigKeys
}

ConnectionConfigKeysOps implement ConfOperator interface, load the configs from et/connections/connection.yaml

func (*ConnectionConfigKeysOps) SaveCfgToStorage

func (p *ConnectionConfigKeysOps) SaveCfgToStorage() error

type EnvManager

type EnvManager struct {
	// contains filtered or unexported fields
}

func (*EnvManager) GetEnv

func (e *EnvManager) GetEnv() map[string]string

func (*EnvManager) Setup

func (e *EnvManager) Setup()

func (*EnvManager) SetupConnectionProps

func (e *EnvManager) SetupConnectionProps()

type JsonPathEval

type JsonPathEval interface {
	Eval(data interface{}) (interface{}, error)
}

func GetJsonPathEval

func GetJsonPathEval(jsonpath string) (JsonPathEval, error)

type KuiperConf

type KuiperConf struct {
	Basic struct {
		LogLevel                string            `yaml:"logLevel"`
		Debug                   bool              `yaml:"debug"`
		ConsoleLog              bool              `yaml:"consoleLog"`
		FileLog                 bool              `yaml:"fileLog"`
		LogDisableTimestamp     bool              `yaml:"logDisableTimestamp"`
		Syslog                  *syslogConf       `yaml:"syslog"`
		RotateTime              int               `yaml:"rotateTime"`
		MaxAge                  int               `yaml:"maxAge"`
		RotateSize              int64             `yaml:"rotateSize"`
		RotateCount             int               `yaml:"rotateCount"`
		TimeZone                string            `yaml:"timezone"`
		Ip                      string            `yaml:"ip"`
		Port                    int               `yaml:"port"`
		RestIp                  string            `yaml:"restIp"`
		RestPort                int               `yaml:"restPort"`
		RestTls                 *TlsConf          `yaml:"restTls"`
		Prometheus              bool              `yaml:"prometheus"`
		PrometheusPort          int               `yaml:"prometheusPort"`
		PluginHosts             string            `yaml:"pluginHosts"`
		Authentication          bool              `yaml:"authentication"`
		IgnoreCase              bool              `yaml:"ignoreCase"`
		SQLConf                 *SQLConf          `yaml:"sql"`
		RulePatrolInterval      cast.DurationConf `yaml:"rulePatrolInterval"`
		EnableOpenZiti          bool              `yaml:"enableOpenZiti"`
		AesKey                  string            `yaml:"aesKey"`
		GracefulShutdownTimeout cast.DurationConf `yaml:"gracefulShutdownTimeout"`
		EnableResourceProfiling bool              `yaml:"enableResourceProfiling"`
	}
	Rule   def.RuleOption
	Sink   *SinkConf
	Source *SourceConf
	Store  struct {
		Type         string `yaml:"type"`
		ExtStateType string `yaml:"extStateType"`
		Redis        struct {
			Host               string            `yaml:"host"`
			Port               int               `yaml:"port"`
			Password           string            `yaml:"password"`
			Timeout            cast.DurationConf `yaml:"timeout"`
			ConnectionSelector string            `yaml:"connectionSelector"`
		}
		Sqlite struct {
			Name string `yaml:"name"`
		}
		Fdb struct {
			Path string `yaml:"path"`
		}
	}
	Portable struct {
		PythonBin   string            `yaml:"pythonBin"`
		InitTimeout cast.DurationConf `yaml:"initTimeout"`
	}
	Connection struct {
		BackoffMaxElapsedDuration cast.DurationConf `yaml:"backoffMaxElapsedDuration"`
	}
	OpenTelemetry OpenTelemetry `yaml:"openTelemetry"`

	AesKey []byte
}

type OpenTelemetry

type OpenTelemetry struct {
	ServiceName           string `yaml:"serviceName"`
	EnableRemoteCollector bool   `yaml:"enableRemoteCollector"`
	RemoteEndpoint        string `yaml:"remoteEndpoint"`
	LocalTraceCapacity    int    `yaml:"localTraceCapacity"`
	EnableLocalStorage    bool   `yaml:"enableLocalStorage"`
}

type PathConfigure

type PathConfigure struct {
	LoadFileType string
	Dirs         map[string]string
}
var (
	PathConfig      PathConfigure
	AbsoluteMapping = map[string]string{
					// contains filtered or unexported fields
	}
)

type SQLConf

type SQLConf struct {
	MaxConnections int `yaml:"maxConnections"`
}

type SinkConf

type SinkConf struct {
	MemoryCacheThreshold int               `json:"memoryCacheThreshold" yaml:"memoryCacheThreshold"`
	MaxDiskCache         int               `json:"maxDiskCache" yaml:"maxDiskCache"`
	BufferPageSize       int               `json:"bufferPageSize" yaml:"bufferPageSize"`
	EnableCache          bool              `json:"enableCache" yaml:"enableCache"`
	ResendInterval       cast.DurationConf `json:"resendInterval" yaml:"resendInterval"`
	CleanCacheAtStop     bool              `json:"cleanCacheAtStop" yaml:"cleanCacheAtStop"`
	ResendAlterQueue     bool              `json:"resendAlterQueue" yaml:"resendAlterQueue"`
	ResendPriority       int               `json:"resendPriority" yaml:"resendPriority"`
	ResendIndicatorField string            `json:"resendIndicatorField" yaml:"resendIndicatorField"`
	ResendDestination    string            `json:"resendDestination" yaml:"resendDestination"`
}

func (*SinkConf) Validate

func (sc *SinkConf) Validate() error

Validate the configuration and reset to the default value for invalid values.

type SinkConfigKeysOps

type SinkConfigKeysOps struct {
	*ConfigKeys
}

SinkConfigKeysOps implement ConfOperator interface, load the configs from data/sinks/xx.yaml

func (*SinkConfigKeysOps) SaveCfgToStorage

func (c *SinkConfigKeysOps) SaveCfgToStorage() error

type SourceConf

type SourceConf struct {
	HttpServerIp   string   `json:"httpServerIp" yaml:"httpServerIp"`
	HttpServerPort int      `json:"httpServerPort" yaml:"httpServerPort"`
	HttpServerTls  *TlsConf `json:"httpServerTls" yaml:"httpServerTls"`
}

func (*SourceConf) Validate

func (sc *SourceConf) Validate() error

type SourceConfigKeysOps

type SourceConfigKeysOps struct {
	*ConfigKeys
}

SourceConfigKeysOps implement ConfOperator interface, load the configs from etc/sources/xx.yaml

func (*SourceConfigKeysOps) SaveCfgToStorage

func (c *SourceConfigKeysOps) SaveCfgToStorage() error

type TlsConf

type TlsConf struct {
	Certfile string `yaml:"certfile"`
	Keyfile  string `yaml:"keyfile"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL