app

package
v3.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 3, 2024 License: Apache-2.0 Imports: 54 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ProfileNames            = "profilenames"
	DeviceNames             = "devicenames"
	SourceNames             = "sourcenames"
	ResourceNames           = "resourcenames"
	FilterOut               = "filterout"
	EncryptionKey           = "key"
	InitVector              = "initvector"
	Url                     = "url"
	ExportMethod            = "method"
	ExportMethodPost        = "post"
	ExportMethodPut         = "put"
	MimeType                = "mimetype"
	PersistOnError          = "persistonerror"
	ContinueOnSendError     = "continueonsenderror"
	ReturnInputData         = "returninputdata"
	SkipVerify              = "skipverify"
	Qos                     = "qos"
	Retain                  = "retain"
	AutoReconnect           = "autoreconnect"
	ConnectTimeout          = "connecttimeout"
	ProfileName             = "profilename"
	DeviceName              = "devicename"
	ResourceName            = "resourcename"
	ValueType               = "valuetype"
	MediaType               = "mediatype"
	Rule                    = "rule"
	BatchThreshold          = "batchthreshold"
	TimeInterval            = "timeinterval"
	HeaderName              = "headername"
	SecretName              = "secretname"
	SecretValueKey          = "secretvaluekey"
	BrokerAddress           = "brokeraddress"
	ClientID                = "clientid"
	KeepAlive               = "keepalive"
	Topic                   = "topic"
	TransformType           = "type"
	TransformXml            = "xml"
	TransformJson           = "json"
	AuthMode                = "authmode"
	Tags                    = "tags"
	ResponseContentType     = "responsecontenttype"
	Algorithm               = "algorithm"
	CompressGZIP            = "gzip"
	CompressZLIB            = "zlib"
	EncryptAES256           = "aes256"
	Mode                    = "mode"
	BatchByCount            = "bycount"
	BatchByTime             = "bytime"
	BatchByTimeAndCount     = "bytimecount"
	IsEventData             = "iseventdata"
	MergeOnSend             = "mergeonsend"
	HttpRequestHeaders      = "httprequestheaders"
	WillEnabled             = "willenabled"
	WillTopic               = "willtopic"
	WillQos                 = "willqos"
	WillPayload             = "willpayload"
	WillRetained            = "willretained"
	MaxReconnectInterval    = "maxreconnectinterval"
	PreConnect              = "preconnect"
	PreConnectRetryCount    = "preconnectretrycount"
	PreConnectRetryInterval = "preconnectretryinterval"

	PreConnectRetryCountDefault    = 6
	PreConnectRetryIntervalDefault = time.Second * 10
)
View Source
const (
	// Valid types of App Service triggers
	TriggerTypeMessageBus = "EDGEX-MESSAGEBUS"
	TriggerTypeMQTT       = "EXTERNAL-MQTT"
	TriggerTypeHTTP       = "HTTP"
)

Variables

This section is empty.

Functions

func NewTriggerMessageProcessor

func NewTriggerMessageProcessor(bnd trigger.ServiceBinding, metricsManager bootstrapInterfaces.MetricsManager) *triggerMessageProcessor

func NewTriggerServiceBinding

func NewTriggerServiceBinding(svc *Service) trigger.ServiceBinding

Types

type BackgroundMessage

type BackgroundMessage struct {
	PublishTopic string
	Payload      types.MessageEnvelope
}

func (BackgroundMessage) Message

func (BackgroundMessage) Topic

func (bg BackgroundMessage) Topic() string

type ConfigUpdateProcessor

type ConfigUpdateProcessor struct {
	// contains filtered or unexported fields
}

ConfigUpdateProcessor contains the data need to process configuration updates

func NewConfigUpdateProcessor

func NewConfigUpdateProcessor(svc *Service) *ConfigUpdateProcessor

NewConfigUpdateProcessor creates a new ConfigUpdateProcessor which processes configuration updates triggered from the Configuration Provider

func (*ConfigUpdateProcessor) WaitForConfigUpdates

func (processor *ConfigUpdateProcessor) WaitForConfigUpdates(configUpdated config.UpdatedStream)

WaitForConfigUpdates waits for signal that configuration has been updated (triggered from by Configuration Provider) and then determines what was updated and does any special processing, if needed, for the updates.

type Configurable

type Configurable struct {
	// contains filtered or unexported fields
}

Configurable contains the helper functions that return the function pointers for building the configurable function pipeline. They transform the parameters map from the Pipeline configuration in to the actual parameters required by the function.

func NewConfigurable

NewConfigurable returns a new instance of Configurable

func (*Configurable) AddTags

func (app *Configurable) AddTags(parameters map[string]string) interfaces.AppFunction

AddTags adds the configured list of tags to Events passed to the transform. This function is a configuration function and returns a function pointer.

func (*Configurable) Batch

func (app *Configurable) Batch(parameters map[string]string) interfaces.AppFunction

Batch sets up Batching of events based on the specified mode parameter (BatchByCount, BatchByTime or BatchByTimeAndCount) and mode specific parameters. This function is a configuration function and returns a function pointer.

func (*Configurable) Compress

func (app *Configurable) Compress(parameters map[string]string) interfaces.AppFunction

Compress compresses data received as either a string,[]byte, or json.Marshaller using the specified algorithm (GZIP or ZLIB) and returns a base64 encoded string as a []byte. This function is a configuration function and returns a function pointer.

func (*Configurable) Encrypt

func (app *Configurable) Encrypt(parameters map[string]string) interfaces.AppFunction

Encrypt encrypts either a string, []byte, or json.Marshaller type using specified encryption algorithm (AES only at this time). It will return a byte[] of the encrypted data. This function is a configuration function and returns a function pointer.

func (*Configurable) FilterByDeviceName

func (app *Configurable) FilterByDeviceName(parameters map[string]string) interfaces.AppFunction

FilterByDeviceName - Specify the device names of interest to filter for data coming from certain sensors. The Filter by Device Name transform looks at the Event in the message and looks at the device names of interest list, provided by this function, and filters out those messages whose Event is for device names not in the device names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.

func (*Configurable) FilterByProfileName

func (app *Configurable) FilterByProfileName(parameters map[string]string) interfaces.AppFunction

FilterByProfileName - Specify the profile names of interest to filter for data coming from certain sensors. The Filter by Profile Name transform looks at the Event in the message and looks at the profile names of interest list, provided by this function, and filters out those messages whose Event is for profile names not in the profile names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.

func (*Configurable) FilterByResourceName

func (app *Configurable) FilterByResourceName(parameters map[string]string) interfaces.AppFunction

FilterByResourceName - Specify the resource name of interest to filter for data from certain types of IoT objects, such as temperatures, motion, and so forth, that may come from an array of sensors or devices. The Filter by resource name assesses the data in each Event and Reading, and removes readings that have a resource name that is not in the list of resource names of interest for the application. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, pressure reading data does not go to functions only interested in motion data. This function is a configuration function and returns a function pointer.

func (*Configurable) FilterBySourceName

func (app *Configurable) FilterBySourceName(parameters map[string]string) interfaces.AppFunction

FilterBySourceName - Specify the source names (resources and/or commands) of interest to filter for data coming from certain sensors. The Filter by Source Name transform looks at the Event in the message and looks at the source names of interest list, provided by this function, and filters out those messages whose Event is for source names not in the source names of interest. This function will return an error and stop the pipeline if a non-edgex event is received or if no data is received. For example, data generated by a motor does not get passed to functions only interested in data from a thermostat. This function is a configuration function and returns a function pointer.

func (*Configurable) HTTPExport

func (app *Configurable) HTTPExport(parameters map[string]string) interfaces.AppFunction

HTTPExport will send data from the previous function to the specified Endpoint via http POST or PUT. If no previous function exists, then the event that triggered the pipeline will be used. Passing an empty string to the mimetype method will default to application/json. This function is a configuration function and returns a function pointer.

func (*Configurable) JSONLogic

func (app *Configurable) JSONLogic(parameters map[string]string) interfaces.AppFunction

JSONLogic ...

func (*Configurable) MQTTExport

func (app *Configurable) MQTTExport(parameters map[string]string) interfaces.AppFunction

MQTTExport will send data from the previous function to the specified Endpoint via MQTT publish. If no previous function exists, then the event that triggered the pipeline will be used. This function is a configuration function and returns a function pointer.

func (*Configurable) SetResponseData

func (app *Configurable) SetResponseData(parameters map[string]string) interfaces.AppFunction

SetResponseData sets the response data to that passed in from the previous function and the response content type to that set in the ResponseContentType configuration parameter. It will return an error and stop the pipeline if data passed in is not of type []byte, string or json.Marshaller This function is a configuration function and returns a function pointer.

func (*Configurable) ToLineProtocol

func (app *Configurable) ToLineProtocol(parameters map[string]string) interfaces.AppFunction

ToLineProtocol transforms the Metric DTO passed to the transform to a string conforming to Line Protocol syntax. This function is a configuration function and returns a function pointer.

func (*Configurable) Transform

func (app *Configurable) Transform(parameters map[string]string) interfaces.AppFunction

Transform transforms an EdgeX event to XML or JSON based on specified transform type. It will return an error and stop the pipeline if a non-edgex event is received or if no data is received. This function is a configuration function and returns a function pointer.

func (*Configurable) WrapIntoEvent

func (app *Configurable) WrapIntoEvent(parameters map[string]string) interfaces.AppFunction

WrapIntoEvent wraps the provided value as an EdgeX Event using the configured event/reading metadata that have been set. The new Event/Reading is returned to the next pipeline function. This function is a configuration function and returns a function pointer.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service provides the necessary struct and functions to create an instance of the interfaces.ApplicationService interface.

func NewService

func NewService(serviceKey string, targetType interface{}, profileSuffixPlaceholder string) *Service

NewService create, initializes and returns new instance of app.Service which implements the interfaces.ApplicationService interface

func (*Service) AddBackgroundPublisher

func (svc *Service) AddBackgroundPublisher(capacity int) (interfaces.BackgroundPublisher, error)

AddBackgroundPublisher will create a channel of provided capacity to be consumed by the MessageBus output and return a publisher that writes to it

func (*Service) AddBackgroundPublisherWithTopic

func (svc *Service) AddBackgroundPublisherWithTopic(capacity int, topic string) (interfaces.BackgroundPublisher, error)

AddBackgroundPublisherWithTopic will create a channel of provided capacity to be consumed by the MessageBus output and return a publisher that writes to it on a different topic than configured for messagebus output.

func (*Service) AddCustomRoute

func (svc *Service) AddCustomRoute(route string, authentication interfaces.Authentication, handler echo.HandlerFunc, methods ...string) error

AddCustomRoute allows you to leverage the existing webserver to add routes. TODO: Change signature in 4.0 to use "handler echo.HandlerFunc" once addContext is removed

func (*Service) AddFunctionsPipelineForTopics

func (svc *Service) AddFunctionsPipelineForTopics(id string, topics []string, transforms ...interfaces.AppFunction) error

AddFunctionsPipelineForTopics adds a functions pipeline for the specified for the specified id and topics

func (*Service) AddRoute

func (svc *Service) AddRoute(route string, handler func(nethttp.ResponseWriter, *nethttp.Request), methods ...string) error

AddRoute allows you to leverage the existing webserver to add routes. DEPRECATED - Use AddCustomRoute TODO: Remove in 4.0

func (*Service) AppContext

func (svc *Service) AppContext() context.Context

AppContext returns the application service context used to detect cancelled context when the service is terminating. Used by custom app service to appropriately exit any long-running functions.

func (*Service) ApplicationSettings

func (svc *Service) ApplicationSettings() map[string]string

ApplicationSettings returns the values specified in the custom configuration section.

func (*Service) BuildContext

func (svc *Service) BuildContext(correlationId string, contentType string) interfaces.AppFunctionContext

BuildContext allows external callers that may need a context (e.g. background publishers) to easily create one around the service's dic

func (*Service) CommandClient

func (svc *Service) CommandClient() clientInterfaces.CommandClient

CommandClient returns the Command client, which may be nil, from the dependency injection container

func (*Service) DeviceClient

func (svc *Service) DeviceClient() clientInterfaces.DeviceClient

DeviceClient returns the Device client, which may be nil, from the dependency injection container

func (*Service) DeviceProfileClient

func (svc *Service) DeviceProfileClient() clientInterfaces.DeviceProfileClient

DeviceProfileClient returns the DeviceProfile client, which may be nil, from the dependency injection container

func (*Service) DeviceServiceClient

func (svc *Service) DeviceServiceClient() clientInterfaces.DeviceServiceClient

DeviceServiceClient returns the DeviceService client, which may be nil, from the dependency injection container

func (*Service) EventClient

func (svc *Service) EventClient() clientInterfaces.EventClient

EventClient returns the Event client, which may be nil, from the dependency injection container

func (*Service) GetAppSetting

func (svc *Service) GetAppSetting(setting string) (string, error)

GetAppSetting returns the string for the specified App Setting.

func (*Service) GetAppSettingStrings

func (svc *Service) GetAppSettingStrings(setting string) ([]string, error)

GetAppSettingStrings returns the strings slice for the specified App Setting.

func (*Service) Initialize

func (svc *Service) Initialize() error

Initialize bootstraps the service making it ready to accept functions for the pipeline and to run the configured trigger.

func (*Service) ListenForCustomConfigChanges

func (svc *Service) ListenForCustomConfigChanges(configToWatch interface{}, sectionName string, changedCallback func(interface{})) error

ListenForCustomConfigChanges uses the Config Processor from go-mod-bootstrap to attempt to listen for changes to the specified custom configuration section. LoadCustomConfig must be called previously so that the instance of svc.configProcessor has already been set.

func (*Service) LoadConfigurableFunctionPipelines

func (svc *Service) LoadConfigurableFunctionPipelines() (map[string]interfaces.FunctionPipeline, error)

LoadConfigurableFunctionPipelines return the configured function pipelines (default and per topic) from configuration.

func (*Service) LoadCustomConfig

func (svc *Service) LoadCustomConfig(customConfig interfaces.UpdatableConfig, sectionName string) error

LoadCustomConfig uses the Config Processor from go-mod-bootstrap to attempt to load service's custom configuration. It uses the same command line flags to process the custom config in the same manner as the standard configuration.

func (*Service) LoggingClient

func (svc *Service) LoggingClient() logger.LoggingClient

LoggingClient returns the Logging client from the dependency injection container

func (*Service) MetricsManager

func (svc *Service) MetricsManager() bootstrapInterfaces.MetricsManager

MetricsManager returns the Metrics Manager used to register counter, gauge, gaugeFloat64 or timer metric types from github.com/rcrowley/go-metrics

func (*Service) NotificationClient

func (svc *Service) NotificationClient() clientInterfaces.NotificationClient

NotificationClient returns the Notifications client, which may be nil, from the dependency injection container

func (*Service) Publish

func (svc *Service) Publish(data any, contentType string) error

Publish pushes data to the MessageBus using configured topic

func (*Service) PublishWithTopic

func (svc *Service) PublishWithTopic(topic string, data any, contentType string) error

PublishWithTopic pushes data to the MessageBus using given topic

func (*Service) ReadingClient

func (svc *Service) ReadingClient() clientInterfaces.ReadingClient

ReadingClient returns the Reading client, which may be nil, from the dependency injection container

func (*Service) RegisterCustomStoreFactory

func (svc *Service) RegisterCustomStoreFactory(name string, factory func(cfg bootstrapConfig.Database, cred bootstrapConfig.Credentials) (interfaces.StoreClient, error)) error

RegisterCustomStoreFactory allows registration of alternative storage implementation to back the Store&Forward loop

func (*Service) RegisterCustomTriggerFactory

func (svc *Service) RegisterCustomTriggerFactory(name string,
	factory func(interfaces.TriggerConfig) (interfaces.Trigger, error)) error

RegisterCustomTriggerFactory allows users to register builders for custom trigger types

func (*Service) RegistryClient

func (svc *Service) RegistryClient() registry.Client

RegistryClient returns the Registry client, which may be nil, from the dependency injection container

func (*Service) RemoveAllFunctionPipelines

func (svc *Service) RemoveAllFunctionPipelines()

RemoveAllFunctionPipelines removes all existing function pipelines

func (*Service) RequestTimeout

func (svc *Service) RequestTimeout() time.Duration

RequestTimeout returns the Request Timeout duration that was parsed from the Service.RequestTimeout configuration

func (*Service) Run

func (svc *Service) Run() error

Run initializes and starts the trigger as specified in the configuration. It will also configure the webserver and start listening on the specified port.

func (*Service) SecretProvider

func (svc *Service) SecretProvider() bootstrapInterfaces.SecretProvider

SecretProvider returns the SecretProvider instance

func (*Service) SetDefaultFunctionsPipeline

func (svc *Service) SetDefaultFunctionsPipeline(transforms ...interfaces.AppFunction) error

SetDefaultFunctionsPipeline sets the default functions pipeline to the list of specified functions in the order provided.

func (*Service) Stop

func (svc *Service) Stop()

Stop will force the service loop to exit in the same fashion as SIGINT/SIGTERM received from the OS

func (*Service) SubscriptionClient

func (svc *Service) SubscriptionClient() clientInterfaces.SubscriptionClient

SubscriptionClient returns the Subscription client, which may be nil, from the dependency injection container

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL