Documentation ¶
Index ¶
- Constants
- Variables
- func DecryptString(key, text string) (string, error)
- func EncryptString(key, text string) (string, error)
- func GetBinaryDir() string
- func PrepareLinuxServiceEnv() error
- func RemoveLinuxServiceEnv() error
- func UpdateLinuxServiceBinary() error
- type CdfClient
- func (co *CdfClient) BasicUploadFileBody(filePath, fileName, mimeType, uploadUrl string) error
- func (co *CdfClient) Client() *cognite.Client
- func (co *CdfClient) CompareAssets(asset1, asset2 core.Asset) bool
- func (co *CdfClient) UploadFile(filePath, externalId, name, mimeType string, assetId uint64) error
- func (co *CdfClient) UploadInMemoryBody(body []byte, fileName, mimeType, uploadUrl string) error
- func (co *CdfClient) UploadInMemoryFile(body []byte, externalId, name, mimeType string, assetId uint64) error
- func (co *CdfClient) UploadMultipartFileBody(filePath, fileName, mimeType, uploadUrl string) error
- type CdfConfigObserver
- type ConfigAction
- type ConfigActionQueue
- type ProcessorState
- type RemoteConfig
- type SecretManager
- type StateTracker
- func (intgr *StateTracker) GetProcessorState(procId uint64) *ProcessorState
- func (intgr *StateTracker) SetProcessorCurrentState(procId uint64, state string)
- func (intgr *StateTracker) SetProcessorTargetState(procId uint64, state string)
- func (intgr *StateTracker) WaitForProcessorTargetState(procId uint64, timeout time.Duration) bool
- type StaticConfig
Constants ¶
const ( LINUX_USER = "edge-extractor" LINUX_BIN = "/usr/local/bin/edge-extractor" LINUX_CONFIG_FILE = "/etc/edge-extractor/config.json" LINUX_LOG_DIR = "/var/log/edge-extractor" )
const ( ProcessorStateRunning = "RUNNING" ProcessorStateStarting = "STARTING" ProcessorStateShutdown = "SHUTDOWN" ProcessorStateStopped = "STOPPED" ProcessorStateNotFound = "NOT_FOUND" )
const ConfigSourceExtPipelines = "ext_pipeline_config"
const ConfigSourceLocal = "local"
const NewConfigAction = 1
const StartProcessorAction = 1
const RestartProcessorAction = 2
const StartProcessorLoopAction = 4
const StopProcessorAction = 3
Variables ¶
var Key = ""
Functions ¶
func DecryptString ¶
func EncryptString ¶
func GetBinaryDir ¶
func GetBinaryDir() string
func PrepareLinuxServiceEnv ¶
func PrepareLinuxServiceEnv() error
func RemoveLinuxServiceEnv ¶
func RemoveLinuxServiceEnv() error
function removes edge-extractor linux user and group , removes edge-extractor binary from /usr/local/bin, removes config file from /etc/edge-extractor, removes log directory from /var/log/edge-extractor
func UpdateLinuxServiceBinary ¶
func UpdateLinuxServiceBinary() error
Types ¶
type CdfClient ¶
type CdfClient struct {
// contains filtered or unexported fields
}
func NewCdfClient ¶
func (*CdfClient) BasicUploadFileBody ¶
func (*CdfClient) CompareAssets ¶
CompareAssets compares 2 assets and returs true if they are equal
func (*CdfClient) UploadFile ¶
func (*CdfClient) UploadInMemoryBody ¶
func (*CdfClient) UploadInMemoryFile ¶
func (*CdfClient) UploadMultipartFileBody ¶
UploadMultipartFileBody currently not supported by CDF
type CdfConfigObserver ¶
type CdfConfigObserver struct {
// contains filtered or unexported fields
}
func NewCdfConfigObserver ¶
func NewCdfConfigObserver(extractorID string, cogClient *CdfClient, remoteConfigSource string, secretManager *SecretManager) *CdfConfigObserver
func (*CdfConfigObserver) Start ¶
func (intgr *CdfConfigObserver) Start(reloadInterval time.Duration)
Start starts observer process using provided asset filter and reload interval. The operation is non-blocking
func (*CdfConfigObserver) Stop ¶
func (intgr *CdfConfigObserver) Stop()
func (*CdfConfigObserver) SubscribeToConfigUpdates ¶
func (intgr *CdfConfigObserver) SubscribeToConfigUpdates(name string, config interface{}) ConfigActionQueue
SubscribeToConfigUpdates registers Integration in config observer and returns config action queue that Integration can use to receive config updates The queue has capacity of 5 items. If queue is full , the oldest item will be dropped. This is done to avoid blocking of config observer Config events aren't filtered and it's responsibility of Integration to do change detection name - name of Integration config - pointer to Integration config struct
type ConfigAction ¶
type ConfigActionQueue ¶
type ConfigActionQueue chan ConfigAction
type ProcessorState ¶
type RemoteConfig ¶
type RemoteConfig map[string]json.RawMessage
type SecretManager ¶
func NewSecretManager ¶
func NewSecretManager(key string) *SecretManager
func (*SecretManager) GetEncryptedSecrets ¶
func (sm *SecretManager) GetEncryptedSecrets() (map[string]string, error)
func (*SecretManager) GetSecret ¶
func (sm *SecretManager) GetSecret(key string) string
returns secret either from internal secret store or from ENV variable if it is not found in the store. If secret is not found in ENV variable, returns key (plain text)
func (*SecretManager) LoadEncryptedSecrets ¶
func (sm *SecretManager) LoadEncryptedSecrets(secrets map[string]string) error
LoadEncryptedSecrets loads secrets in encrypted form from map[string]string, decrypts them and stores in internal secret store
func (*SecretManager) LoadSecrets ¶
func (sm *SecretManager) LoadSecrets(secrets map[string]string)
LoadSecrets loads secrets in plain text from map[string]string into internal secret store
type StateTracker ¶
type StateTracker struct {
// contains filtered or unexported fields
}
StateTracker keep track of current and target states for all processors
func NewStateTracker ¶
func NewStateTracker() *StateTracker
func (*StateTracker) GetProcessorState ¶
func (intgr *StateTracker) GetProcessorState(procId uint64) *ProcessorState
GetProcessorState public version of getProcessorState
func (*StateTracker) SetProcessorCurrentState ¶
func (intgr *StateTracker) SetProcessorCurrentState(procId uint64, state string)
func (*StateTracker) SetProcessorTargetState ¶
func (intgr *StateTracker) SetProcessorTargetState(procId uint64, state string)
func (*StateTracker) WaitForProcessorTargetState ¶
func (intgr *StateTracker) WaitForProcessorTargetState(procId uint64, timeout time.Duration) bool
WaitForProcessorTargetState blocks execution untill processor reaches target or wait operation times out .
type StaticConfig ¶
type StaticConfig struct { ProjectName string CdfCluster string AdTenantId string AuthTokenUrl string ClientID string Secret string Scopes []string CdfDatasetID int ExtractorID string RemoteConfigSource string // local, ext_pipeline_config ConfigReloadInterval time.Duration EnabledIntegrations []string LogLevel string LogDir string Integrations map[string]json.RawMessage // map of integration configs (key is integration name, value is integration config) IsEncrypted bool Secrets map[string]string // map of encrypted secrets (key is secret name, value is encrypted secret) }
func (*StaticConfig) DecryptSecrets ¶
func (config *StaticConfig) DecryptSecrets(key string) error
func (*StaticConfig) EncryptSecrets ¶
func (config *StaticConfig) EncryptSecrets(key string) error