Documentation ¶
Index ¶
- Constants
- func NewTriggerMessageProcessor(bnd trigger.ServiceBinding, metricsManager bootstrapInterfaces.MetricsManager) *triggerMessageProcessor
- func NewTriggerServiceBinding(svc *Service) trigger.ServiceBinding
- type BackgroundMessage
- type ConfigUpdateProcessor
- type Configurable
- func (app *Configurable) AddTags(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) Batch(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) Compress(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) Encrypt(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) FilterByDeviceName(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) FilterByProfileName(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) FilterByResourceName(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) FilterBySourceName(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) HTTPExport(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) JSONLogic(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) SetResponseData(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) ToLineProtocol(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) Transform(parameters map[string]string) interfaces.AppFunction
- func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.AppFunction
- type Service
- func (svc *Service) AddBackgroundPublisher(capacity int) (interfaces.BackgroundPublisher, error)
- func (svc *Service) AddBackgroundPublisherWithTopic(capacity int, topic string) (interfaces.BackgroundPublisher, error)
- func (svc *Service) AddCustomRoute(route string, authentication interfaces.Authentication, ...) error
- func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error
- func (svc *Service) AddRoute(route string, handler func(nethttp.ResponseWriter, *nethttp.Request), ...) error
- func (svc *Service) AppContext() context.Context
- func (svc *Service) ApplicationSettings() map[string]string
- func (svc *Service) BuildContext(correlationId string, contentType string) interfaces.AppFunctionContext
- func (svc *Service) CommandClient() clientInterfaces.CommandClient
- func (svc *Service) DeviceClient() clientInterfaces.DeviceClient
- func (svc *Service) DeviceProfileClient() clientInterfaces.DeviceProfileClient
- func (svc *Service) DeviceServiceClient() clientInterfaces.DeviceServiceClient
- func (svc *Service) EventClient() clientInterfaces.EventClient
- func (svc *Service) GetAppSetting(setting string) (string, error)
- func (svc *Service) GetAppSettingStrings(setting string) ([]string, error)
- func (svc *Service) Initialize() error
- func (svc *Service) ListenForCustomConfigChanges(configToWatch interface{}, sectionName string, ...) error
- func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.FunctionPipeline, error)
- func (svc *Service) LoadCustomConfig(customConfig interfaces.UpdatableConfig, sectionName string) error
- func (svc *Service) LoggingClient() logger.LoggingClient
- func (svc *Service) MetricsManager() bootstrapInterfaces.MetricsManager
- func (svc *Service) NotificationClient() clientInterfaces.NotificationClient
- func (svc *Service) Publish(data any, contentType string) error
- func (svc *Service) PublishWithTopic(topic string, data any, contentType string) error
- func (svc *Service) ReadingClient() clientInterfaces.ReadingClient
- func (svc *Service) RegisterCustomStoreFactory(name string, ...) error
- func (svc *Service) RegisterCustomTriggerFactory(name string, ...) error
- func (svc *Service) RegistryClient() registry.Client
- func (svc *Service) RemoveAllFunctionPipelines()
- func (svc *Service) RequestTimeout() time.Duration
- func (svc *Service) Run() error
- func (svc *Service) SecretProvider() bootstrapInterfaces.SecretProvider
- func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunction) error
- func (svc *Service) Stop()
- func (svc *Service) SubscriptionClient() clientInterfaces.SubscriptionClient
Constants ¶
const ( ProfileNames = "profilenames" DeviceNames = "devicenames" SourceNames = "sourcenames" ResourceNames = "resourcenames" FilterOut = "filterout" EncryptionKey = "key" InitVector = "initvector" Url = "url" ExportMethod = "method" ExportMethodPost = "post" ExportMethodPut = "put" MimeType = "mimetype" PersistOnError = "persistonerror" ContinueOnSendError = "continueonsenderror" ReturnInputData = "returninputdata" SkipVerify = "skipverify" Qos = "qos" Retain = "retain" AutoReconnect = "autoreconnect" ConnectTimeout = "connecttimeout" ProfileName = "profilename" DeviceName = "devicename" ResourceName = "resourcename" ValueType = "valuetype" MediaType = "mediatype" Rule = "rule" BatchThreshold = "batchthreshold" TimeInterval = "timeinterval" HeaderName = "headername" SecretName = "secretname" SecretValueKey = "secretvaluekey" BrokerAddress = "brokeraddress" ClientID = "clientid" KeepAlive = "keepalive" Topic = "topic" TransformType = "type" TransformXml = "xml" TransformJson = "json" AuthMode = "authmode" Tags = "tags" ResponseContentType = "responsecontenttype" Algorithm = "algorithm" CompressGZIP = "gzip" CompressZLIB = "zlib" EncryptAES256 = "aes256" Mode = "mode" BatchByCount = "bycount" BatchByTime = "bytime" BatchByTimeAndCount = "bytimecount" IsEventData = "iseventdata" MergeOnSend = "mergeonsend" HttpRequestHeaders = "httprequestheaders" WillEnabled = "willenabled" WillTopic = "willtopic" WillQos = "willqos" WillPayload = "willpayload" WillRetained = "willretained" MaxReconnectInterval = "maxreconnectinterval" PreConnect = "preconnect" PreConnectRetryCount = "preconnectretrycount" PreConnectRetryInterval = "preconnectretryinterval" PreConnectRetryCountDefault = 6 PreConnectRetryIntervalDefault = time.Second * 10 )
const ( // Valid types of App Service triggers TriggerTypeMessageBus = "EDGEX-MESSAGEBUS" TriggerTypeMQTT = "EXTERNAL-MQTT" TriggerTypeHTTP = "HTTP" )
Variables ¶
This section is empty.
Functions ¶
func NewTriggerMessageProcessor ¶
func NewTriggerMessageProcessor(bnd trigger.ServiceBinding, metricsManager bootstrapInterfaces.MetricsManager) *triggerMessageProcessor
func NewTriggerServiceBinding ¶
func NewTriggerServiceBinding(svc *Service) trigger.ServiceBinding
Types ¶
type BackgroundMessage ¶
type BackgroundMessage struct { PublishTopic string Payload types.MessageEnvelope }
func (BackgroundMessage) Message ¶
func (bg BackgroundMessage) Message() types.MessageEnvelope
func (BackgroundMessage) Topic ¶
func (bg BackgroundMessage) Topic() string
type ConfigUpdateProcessor ¶
type ConfigUpdateProcessor struct {
// contains filtered or unexported fields
}
ConfigUpdateProcessor contains the data need to process configuration updates
func NewConfigUpdateProcessor ¶
func NewConfigUpdateProcessor(svc *Service) *ConfigUpdateProcessor
NewConfigUpdateProcessor creates a new ConfigUpdateProcessor which processes configuration updates triggered from the Configuration Provider
func (*ConfigUpdateProcessor) WaitForConfigUpdates ¶
func (processor *ConfigUpdateProcessor) WaitForConfigUpdates(configUpdated config.UpdatedStream)
WaitForConfigUpdates waits for signal that configuration has been updated (triggered from by Configuration Provider) and then determines what was updated and does any special processing, if needed, for the updates.
type Configurable ¶
type Configurable struct {
// contains filtered or unexported fields
}
Configurable contains the helper functions that return the function pointers for building the configurable function pipeline. They transform the parameters map from the Pipeline configuration in to the actual parameters required by the function.
func NewConfigurable ¶
func NewConfigurable(lc logger.LoggingClient, sp bootstrapInterfaces.SecretProvider) *Configurable
NewConfigurable returns a new instance of Configurable
func (*Configurable) AddTags ¶
func (app *Configurable) AddTags(parameters map[string]string) interfaces.AppFunction
AddTags adds the configured list of tags to Events passed to the transform. This function is a configuration function and returns a function pointer.
func (*Configurable) Batch ¶
func (app *Configurable) Batch(parameters map[string]string) interfaces.AppFunction
Batch sets up Batching of events based on the specified mode parameter (BatchByCount, BatchByTime or BatchByTimeAndCount) and mode specific parameters. This function is a configuration function and returns a function pointer.
func (*Configurable) Compress ¶
func (app *Configurable) Compress(parameters map[string]string) interfaces.AppFunction
Compress compresses data received as either a string,[]byte, or json.Marshaller using the specified algorithm (GZIP or ZLIB) and returns a base64 encoded string as a []byte. This function is a configuration function and returns a function pointer.
func (*Configurable) Encrypt ¶
func (app *Configurable) Encrypt(parameters map[string]string) interfaces.AppFunction
Encrypt encrypts either a string, []byte, or json.Marshaller type using specified encryption algorithm (AES only at this time). It will return a byte[] of the encrypted data. This function is a configuration function and returns a function pointer.
func (*Configurable) FilterByDeviceName ¶
func (app *Configurable) FilterByDeviceName(parameters map[string]string) interfaces.AppFunction
FilterByDeviceName - Specify the device names of interest to filter for data coming from certain sensors. The Filter by Device Name transform looks at the Event in the message and looks at the device names of interest list, provided by this function, and filters out those messages whose Event is for device names not in the device names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.
func (*Configurable) FilterByProfileName ¶
func (app *Configurable) FilterByProfileName(parameters map[string]string) interfaces.AppFunction
FilterByProfileName - Specify the profile names of interest to filter for data coming from certain sensors. The Filter by Profile Name transform looks at the Event in the message and looks at the profile names of interest list, provided by this function, and filters out those messages whose Event is for profile names not in the profile names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.
func (*Configurable) FilterByResourceName ¶
func (app *Configurable) FilterByResourceName(parameters map[string]string) interfaces.AppFunction
FilterByResourceName - Specify the resource name of interest to filter for data from certain types of IoT objects, such as temperatures, motion, and so forth, that may come from an array of sensors or devices. The Filter by resource name assesses the data in each Event and Reading, and removes readings that have a resource name that is not in the list of resource names of interest for the application. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, pressure reading data does not go to functions only interested in motion data. This function is a configuration function and returns a function pointer.
func (*Configurable) FilterBySourceName ¶
func (app *Configurable) FilterBySourceName(parameters map[string]string) interfaces.AppFunction
FilterBySourceName - Specify the source names (resources and/or commands) of interest to filter for data coming from certain sensors. The Filter by Source Name transform looks at the Event in the message and looks at the source names of interest list, provided by this function, and filters out those messages whose Event is for source names not in the source names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.
func (*Configurable) HTTPExport ¶
func (app *Configurable) HTTPExport(parameters map[string]string) interfaces.AppFunction
HTTPExport will send data from the previous function to the specified Endpoint via http POST or PUT. If no previous function exists, then the event that triggered the pipeline will be used. Passing an empty string to the mimetype method will default to application/json. This function is a configuration function and returns a function pointer.
func (*Configurable) JSONLogic ¶
func (app *Configurable) JSONLogic(parameters map[string]string) interfaces.AppFunction
JSONLogic ...
func (*Configurable) MQTTExport ¶
func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.AppFunction
MQTTExport will send data from the previous function to the specified Endpoint via MQTT publish. If no previous function exists, then the event that triggered the pipeline will be used. This function is a configuration function and returns a function pointer.
func (*Configurable) SetResponseData ¶
func (app *Configurable) SetResponseData(parameters map[string]string) interfaces.AppFunction
SetResponseData sets the response data to that passed in from the previous function and the response content type to that set in the ResponseContentType configuration parameter. It will return an error and stop the pipeline if data passed in is not of type []byte, string or json.Marshaller This function is a configuration function and returns a function pointer.
func (*Configurable) ToLineProtocol ¶
func (app *Configurable) ToLineProtocol(parameters map[string]string) interfaces.AppFunction
ToLineProtocol transforms the Metric DTO passed to the transform to a string conforming to Line Protocol syntax. This function is a configuration function and returns a function pointer.
func (*Configurable) Transform ¶
func (app *Configurable) Transform(parameters map[string]string) interfaces.AppFunction
Transform transforms an EdgeX event to XML or JSON based on specified transform type. It will return an error and stop the pipeline if a non-edgex event is received or if no data is received. This function is a configuration function and returns a function pointer.
func (*Configurable) WrapIntoEvent ¶
func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.AppFunction
WrapIntoEvent wraps the provided value as an EdgeX Event using the configured event/reading metadata that have been set. The new Event/Reading is returned to the next pipeline function. This function is a configuration function and returns a function pointer.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service provides the necessary struct and functions to create an instance of the interfaces.ApplicationService interface.
func NewService ¶
func NewService(serviceKey string, targetType interface{}, profileSuffixPlaceholder string) *Service
NewService create, initializes and returns new instance of app.Service which implements the interfaces.ApplicationService interface
func (*Service) AddBackgroundPublisher ¶
func (svc *Service) AddBackgroundPublisher(capacity int) (interfaces.BackgroundPublisher, error)
AddBackgroundPublisher will create a channel of provided capacity to be consumed by the MessageBus output and return a publisher that writes to it
func (*Service) AddBackgroundPublisherWithTopic ¶
func (svc *Service) AddBackgroundPublisherWithTopic(capacity int, topic string) (interfaces.BackgroundPublisher, error)
AddBackgroundPublisherWithTopic will create a channel of provided capacity to be consumed by the MessageBus output and return a publisher that writes to it on a different topic than configured for messagebus output.
func (*Service) AddCustomRoute ¶
func (svc *Service) AddCustomRoute(route string, authentication interfaces.Authentication, handler echo.HandlerFunc, methods ...string) error
AddCustomRoute allows you to leverage the existing webserver to add routes. TODO: Change signature in 4.0 to use "handler echo.HandlerFunc" once addContext is removed
func (*Service) AddFunctionsPipelineForTopics ¶
func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error
AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics
func (*Service) AddRoute ¶
func (svc *Service) AddRoute(route string, handler func(nethttp.ResponseWriter, *nethttp.Request), methods ...string) error
AddRoute allows you to leverage the existing webserver to add routes. DEPRECATED - Use AddCustomRoute TODO: Remove in 4.0
func (*Service) AppContext ¶
AppContext returns the application service context used to detect cancelled context when the service is terminating. Used by custom app service to appropriately exit any long-running functions.
func (*Service) ApplicationSettings ¶
ApplicationSettings returns the values specified in the custom configuration section.
func (*Service) BuildContext ¶
func (svc *Service) BuildContext(correlationId string, contentType string) interfaces.AppFunctionContext
BuildContext allows external callers that may need a context (e.g. background publishers) to easily create one around the service's dic
func (*Service) CommandClient ¶
func (svc *Service) CommandClient() clientInterfaces.CommandClient
CommandClient returns the Command client, which may be nil, from the dependency injection container
func (*Service) DeviceClient ¶
func (svc *Service) DeviceClient() clientInterfaces.DeviceClient
DeviceClient returns the Device client, which may be nil, from the dependency injection container
func (*Service) DeviceProfileClient ¶
func (svc *Service) DeviceProfileClient() clientInterfaces.DeviceProfileClient
DeviceProfileClient returns the DeviceProfile client, which may be nil, from the dependency injection container
func (*Service) DeviceServiceClient ¶
func (svc *Service) DeviceServiceClient() clientInterfaces.DeviceServiceClient
DeviceServiceClient returns the DeviceService client, which may be nil, from the dependency injection container
func (*Service) EventClient ¶
func (svc *Service) EventClient() clientInterfaces.EventClient
EventClient returns the Event client, which may be nil, from the dependency injection container
func (*Service) GetAppSetting ¶
GetAppSetting returns the string for the specified App Setting.
func (*Service) GetAppSettingStrings ¶
GetAppSettingStrings returns the strings slice for the specified App Setting.
func (*Service) Initialize ¶
Initialize bootstraps the service making it ready to accept functions for the pipeline and to run the configured trigger.
func (*Service) ListenForCustomConfigChanges ¶
func (svc *Service) ListenForCustomConfigChanges(configToWatch interface{}, sectionName string, changedCallback func(interface{})) error
ListenForCustomConfigChanges uses the Config Processor from go-mod-bootstrap to attempt to listen for changes to the specified custom configuration section. LoadCustomConfig must be called previously so that the instance of svc.configProcessor has already been set.
func (*Service) LoadConfigurableFunctionPipelines ¶
func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.FunctionPipeline, error)
LoadConfigurableFunctionPipelines return the configured function pipelines (default and per topic) from configuration.
func (*Service) LoadCustomConfig ¶
func (svc *Service) LoadCustomConfig(customConfig interfaces.UpdatableConfig, sectionName string) error
LoadCustomConfig uses the Config Processor from go-mod-bootstrap to attempt to load service's custom configuration. It uses the same command line flags to process the custom config in the same manner as the standard configuration.
func (*Service) LoggingClient ¶
func (svc *Service) LoggingClient() logger.LoggingClient
LoggingClient returns the Logging client from the dependency injection container
func (*Service) MetricsManager ¶
func (svc *Service) MetricsManager() bootstrapInterfaces.MetricsManager
MetricsManager returns the Metrics Manager used to register counter, gauge, gaugeFloat64 or timer metric types from github.com/rcrowley/go-metrics
func (*Service) NotificationClient ¶
func (svc *Service) NotificationClient() clientInterfaces.NotificationClient
NotificationClient returns the Notifications client, which may be nil, from the dependency injection container
func (*Service) Publish ¶
Publish pushes data to the MessageBus using configured topic
func (*Service) PublishWithTopic ¶
PublishWithTopic pushes data to the MessageBus using given topic
func (*Service) ReadingClient ¶
func (svc *Service) ReadingClient() clientInterfaces.ReadingClient
ReadingClient returns the Reading client, which may be nil, from the dependency injection container
func (*Service) RegisterCustomStoreFactory ¶
func (svc *Service) RegisterCustomStoreFactory(name string, factory func(cfg bootstrapConfig.Database, cred bootstrapConfig.Credentials) (interfaces.StoreClient, error)) error
RegisterCustomStoreFactory allows registration of alternative storage implementation to back the Store&Forward loop
func (*Service) RegisterCustomTriggerFactory ¶
func (svc *Service) RegisterCustomTriggerFactory(name string, factory func(interfaces.TriggerConfig) (interfaces.Trigger, error)) error
RegisterCustomTriggerFactory allows users to register builders for custom trigger types
func (*Service) RegistryClient ¶
RegistryClient returns the Registry client, which may be nil, from the dependency injection container
func (*Service) RemoveAllFunctionPipelines ¶
func (svc *Service) RemoveAllFunctionPipelines()
RemoveAllFunctionPipelines removes all existing function pipelines
func (*Service) RequestTimeout ¶
RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration
func (*Service) Run ¶
Run initializes and starts the trigger as specified in the configuration. It will also configure the webserver and start listening on the specified port.
func (*Service) SecretProvider ¶
func (svc *Service) SecretProvider() bootstrapInterfaces.SecretProvider
SecretProvider returns the SecretProvider instance
func (*Service) SetDefaultFunctionsPipeline ¶
func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunction) error
SetDefaultFunctionsPipeline sets the default functions pipeline to the list of specified functions in the order provided.
func (*Service) Stop ¶
func (svc *Service) Stop()
Stop will force the service loop to exit in the same fashion as SIGINT/SIGTERM received from the OS
func (*Service) SubscriptionClient ¶
func (svc *Service) SubscriptionClient() clientInterfaces.SubscriptionClient
SubscriptionClient returns the Subscription client, which may be nil, from the dependency injection container