publisher

package
v0.0.0-...-476b4f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package publisher with publication of updates of registered entities

Package publisher ... - Publishes updates to node, inputs and outputs when they are (re)discovered - configuration of nodes - control of inputs - update of security keys and identity signature Thread-safe. All public functions can be invoked from multiple goroutines

Package publisher with facade functions for nodes, inputs and outputs that work using nodeIDs instead of use of full addresses on the internal Nodes, Inputs and Outputs collections. Mostly intended to reduce boilerplate code in managing nodes, inputs and outputs

Index

Constants

View Source
const (
	// DefaultPollInterval in which the registered nodes, inputs and outputs are queried for
	// polling based sources
	DefaultPollInterval = 600

	// RegisteredNodesFileSuffix to append to name of the file containing registered nodes
	RegisteredNodesFileSuffix = "-nodes.json"
	// RegisteredIdentityFileSuffix to append to the name of the file containing publisher saved identity
	RegisteredIdentityFileSuffix = "-identity.json"
	// DomainPublishersFileSuffix to append to the name of the file containing domain publisher identities
	DomainPublishersFileSuffix = "-domainpublishers.json"
)

Variables

This section is empty.

Functions

func PublishOutputEvent

func PublishOutputEvent(
	node *types.NodeDiscoveryMessage,
	registeredOutputs *outputs.RegisteredOutputs,
	outputValues *outputs.RegisteredOutputValues,
	messageSigner *messaging.MessageSigner,
) error

PublishOutputEvent publishes all node output values in the $event command zone/publisher/nodealias/$event TODO: decide when to invoke this

func SetLogging

func SetLogging(levelName string, filename string) error

SetLogging sets the logging level and output file for this publisher Intended for setting logging from configuration

levelName is the requested logging level: error, warning, info, debug
filename is the output log file full name including path, use "" for stderr

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher carries the operating state of 'this' publisher

func NewAppPublisher

func NewAppPublisher(appID string, configFolder string, appConfig interface{},
	cacheFolder string, cacheDiscovery bool) (*Publisher, error)

NewAppPublisher function for all the boilerplate. This:

  1. Loads messenger config and create messenger instance

  2. Load PublisherConfig from <appID>.yaml

  3. Load appconfig from <appID>.yaml (yes same file)

  4. Create a publisher using the domain from messenger config and publisherID from <appID>.yaml

  5. Set to persist nodes and load previously saved nodes

    - appID is the application ID, used as publisher ID unless overridden in <appID>.yaml. - configFolder contains the identity, messenger and application configuration Use "" for default location (~/.config/iotdomain). - cacheFolder contains the saved discovered nodes and publishers files. Use "" for default location (~/.cache/iotdomain). - appConfig optional application object to load <appID>.yaml configuration into - cacheDiscovery loads and saves discovered publisher identities and nodes from cache

This returns publisher instance or error if messenger fails to load

func NewPublisher

func NewPublisher(config *PublisherConfig, messenger messaging.IMessenger,
) *Publisher

NewPublisher creates a new publisher instance. This is used for all publications.

The configFolder contains the publisher saved identity and node configuration <publisherID>-nodes.json. which is loaded during Start(). Use "" for default config folder. When autosave is set then the configuration files are written when identity or registered nodes update.

domain and publisherID identify this publisher. If the identity file does not match these, it

is discarded and a new identity is created. If the publisher has joined the domain and the DSS has issued the identity then changing domain or publisherID invalidates the publisher and it has to rejoin the domain. If no domain is provided, the default 'local' is used.

signingMethod indicates if and how publications must be signed. The default is jws. For testing 'none' can be used.

messenger for publishing onto the message bus is required

func (*Publisher) Address

func (pub *Publisher) Address() string

Address returns the publisher's identity address

func (*Publisher) CreateInput

func (pub *Publisher) CreateInput(nodeHWID string, inputType types.InputType, instance string,
	setCommandHandler func(input *types.InputDiscoveryMessage, sender string, value string)) *types.InputDiscoveryMessage

CreateInput creates a new node input that handle set commands and add it to the registered inputs

If an input of the given nodeHWID, type and instance already exist it will be replaced. This returns the new input

func (*Publisher) CreateInputFromFile

func (pub *Publisher) CreateInputFromFile(
	nodeHWID string, inputType types.InputType, instance string, path string,
	handler func(input *types.InputDiscoveryMessage, sender string, value string)) *types.InputDiscoveryMessage

CreateInputFromFile sends a file or folder to an input when it is modified - TODO The input handler is triggered with a message containing the path as value

func (*Publisher) CreateInputFromHTTP

func (pub *Publisher) CreateInputFromHTTP(
	nodeHWID string, inputType types.InputType, instance string, url string, login string, password string, intervalSec int,
	handler func(input *types.InputDiscoveryMessage, sender string, value string))

CreateInputFromHTTP periodically polls an http address and sends the response to an input when it is modified

func (*Publisher) CreateInputFromOutput

func (pub *Publisher) CreateInputFromOutput(
	nodeHWID string, inputType types.InputType, instance string, outputAddress string,
	handler func(input *types.InputDiscoveryMessage, sender string, value string))

CreateInputFromOutput subscribes to an output and triggers the input when a new value is received

func (*Publisher) CreateNode

func (pub *Publisher) CreateNode(nodeHWID string, nodeType types.NodeType) *types.NodeDiscoveryMessage

CreateNode creates a new node and add it to this publisher's registered nodes returns the new node instance

func (*Publisher) CreateOutput

func (pub *Publisher) CreateOutput(nodeHWID string, outputType types.OutputType,
	instance string) *types.OutputDiscoveryMessage

CreateOutput creates a new node output adds it to this publisher outputs list returns the output object to allow for easy updates

func (*Publisher) DeleteNode

func (pub *Publisher) DeleteNode(hwAddress string)

DeleteNode deletes a node from the collection of registered nodes

func (*Publisher) Domain

func (pub *Publisher) Domain() string

Domain returns the publication domain

func (*Publisher) GetDomainInput

func (pub *Publisher) GetDomainInput(address string) *types.InputDiscoveryMessage

GetDomainInput returns a discovered domain input

func (*Publisher) GetDomainInputs

func (pub *Publisher) GetDomainInputs() []*types.InputDiscoveryMessage

GetDomainInputs returns all discovered domain inputs

func (*Publisher) GetDomainNode

func (pub *Publisher) GetDomainNode(address string) *types.NodeDiscoveryMessage

GetDomainNode returns a discovered domain node by its address

func (*Publisher) GetDomainNodes

func (pub *Publisher) GetDomainNodes() []*types.NodeDiscoveryMessage

GetDomainNodes returns all discovered domain nodes

func (*Publisher) GetDomainOutput

func (pub *Publisher) GetDomainOutput(address string) *types.OutputDiscoveryMessage

GetDomainOutput returns a discovered domain output by its address

func (*Publisher) GetDomainOutputs

func (pub *Publisher) GetDomainOutputs() []*types.OutputDiscoveryMessage

GetDomainOutputs returns all discovered domain outputs

func (*Publisher) GetDomainPublishers

func (pub *Publisher) GetDomainPublishers() []*types.PublisherIdentityMessage

GetDomainPublishers returns all discovered domain publishers

func (*Publisher) GetIdentity

func (pub *Publisher) GetIdentity() *types.PublisherIdentityMessage

GetIdentity returns the publisher public identity including public signing key

func (*Publisher) GetIdentityKeys

func (pub *Publisher) GetIdentityKeys() *ecdsa.PrivateKey

GetIdentityKeys returns the private/public key pair of this publisher

func (*Publisher) GetInputByAddress

func (pub *Publisher) GetInputByAddress(address string) *types.InputDiscoveryMessage

GetInputByAddress returns a registered input by its full address

func (*Publisher) GetInputByID

func (pub *Publisher) GetInputByID(inputID string) *types.InputDiscoveryMessage

GetInputByID returns a registered input by its inputID

func (*Publisher) GetInputByNodeHWID

func (pub *Publisher) GetInputByNodeHWID(nodeHWID string, inputType types.InputType, instance string) *types.InputDiscoveryMessage

GetInputByNodeHWID Get a registered input by its node HWID

func (*Publisher) GetInputs

func (pub *Publisher) GetInputs() []*types.InputDiscoveryMessage

GetInputs returns a list of all registered inputs

func (*Publisher) GetNodeAttr

func (pub *Publisher) GetNodeAttr(nodeHWID string, attrName types.NodeAttr) string

GetNodeAttr returns a node attribute value

func (*Publisher) GetNodeByAddress

func (pub *Publisher) GetNodeByAddress(address string) *types.NodeDiscoveryMessage

GetNodeByAddress returns a registered node by its full address using its nodeID

func (*Publisher) GetNodeByHWID

func (pub *Publisher) GetNodeByHWID(nodeHWID string) (node *types.NodeDiscoveryMessage)

GetNodeByHWID returns a node from this publisher or nil if the nodeHWID isn't found

func (*Publisher) GetNodeByNodeID

func (pub *Publisher) GetNodeByNodeID(nodeID string) (node *types.NodeDiscoveryMessage)

GetNodeByNodeID returns a node from this publisher or nil if the node id isn't found

func (*Publisher) GetNodeConfigBool

func (pub *Publisher) GetNodeConfigBool(
	nodeHWID string, attrName types.NodeAttr, defaultValue bool) (value bool, err error)

GetNodeConfigBool returns a node configuration value as a boolean This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigFloat

func (pub *Publisher) GetNodeConfigFloat(
	nodeHWID string, attrName types.NodeAttr, defaultValue float32) (value float32, err error)

GetNodeConfigFloat returns a node configuration value as a float number. This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigInt

func (pub *Publisher) GetNodeConfigInt(
	nodeHWID string, attrName types.NodeAttr, defaultValue int) (value int, err error)

GetNodeConfigInt returns a node configuration value as an integer This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeConfigString

func (pub *Publisher) GetNodeConfigString(
	nodeHWID string, attrName types.NodeAttr, defaultValue string) (value string, err error)

GetNodeConfigString returns a node configuration value as a string This retuns the given default if no configuration value exists and no configuration default is set

func (*Publisher) GetNodeStatus

func (pub *Publisher) GetNodeStatus(nodeHWID string, attrName types.NodeStatus) (value string, exists bool)

GetNodeStatus returns a status attribute of a registered node

func (*Publisher) GetNodes

func (pub *Publisher) GetNodes() []*types.NodeDiscoveryMessage

GetNodes returns a list of all registered nodes

func (*Publisher) GetOutputByID

func (pub *Publisher) GetOutputByID(outputID string) *types.OutputDiscoveryMessage

GetOutputByID gets a registered output by its outputID

func (*Publisher) GetOutputByNodeHWID

func (pub *Publisher) GetOutputByNodeHWID(nodeHWID string, outputType types.OutputType, instance string) *types.OutputDiscoveryMessage

GetOutputByNodeHWID get a registered output by node HWID

func (*Publisher) GetOutputValueByID

func (pub *Publisher) GetOutputValueByID(outputID string) *types.OutputValue

GetOutputValueByID returns the registered output's value object including timestamp

func (*Publisher) GetOutputValueByNodeHWID

func (pub *Publisher) GetOutputValueByNodeHWID(nodeHWID string, outputType types.OutputType, instance string) *types.OutputValue

GetOutputValueByNodeHWID returns the registered output's value object including timestamp

func (*Publisher) GetOutputs

func (pub *Publisher) GetOutputs() []*types.OutputDiscoveryMessage

GetOutputs returns a list of all registered outputs

func (*Publisher) GetPublisherKey

func (pub *Publisher) GetPublisherKey(address string) *ecdsa.PublicKey

GetPublisherKey returns the public key of the publisher contained in the given address The address must at least contain a domain and publisherId

func (*Publisher) HandleSetNodeIDCommand

func (pub *Publisher) HandleSetNodeIDCommand(address string, message *types.SetNodeIDMessage)

HandleSetNodeIDCommand handles the command to change the ID of a node. This updates the address of a node, its inputs and its outputs.

func (*Publisher) LoadDomainPublishers

func (pub *Publisher) LoadDomainPublishers() error

LoadDomainPublishers loads discovered publisher identities from the cache folder. Intended to cache the public signing keys to verify messages from these publishers

func (*Publisher) LoadRegisteredNodes

func (pub *Publisher) LoadRegisteredNodes() error

LoadRegisteredNodes loads saved registered nodes from the config folder. Intended to restore node configuration.

func (*Publisher) MakeNodeDiscoveryAddress

func (pub *Publisher) MakeNodeDiscoveryAddress(nodeID string) string

MakeNodeDiscoveryAddress makes the node discovery address using the publisher domain and publisherID

func (*Publisher) PublishNodeConfigure

func (pub *Publisher) PublishNodeConfigure(domainNodeAddr string, attr types.NodeAttrMap) bool

PublishNodeConfigure publishes a $configure command to a domain node Returns true if successful, false if the domain node publisher cannot be found or has no public key and the message is not sent.

func (*Publisher) PublishOutputEvent

func (pub *Publisher) PublishOutputEvent(node *types.NodeDiscoveryMessage) error

PublishOutputEvent publishes all outputs of the node in a single event

func (*Publisher) PublishRaw

func (pub *Publisher) PublishRaw(output *types.OutputDiscoveryMessage, sign bool, value string)

PublishRaw immediately publishes the given value of a node, output type and instance on the $raw output address. The content can be signed but is not encrypted. This is intended for publishing large values that should not be stored, for example images

func (*Publisher) PublishSetInput

func (pub *Publisher) PublishSetInput(inputAddr string, value string) error

PublishSetInput publishes a $setInput input command to the given input address

This requires that the publisher identity of the receiving input is known so the

command can be encrypted. Returns error if the destination publisher is unknown and the message cannot be sent.

func (*Publisher) PublishSetNodeID

func (pub *Publisher) PublishSetNodeID(nodeAddr string, newNodeID string) error

PublishSetNodeID publishes a set node ID command to the given node address

This requires that the publisher identity of the receiving input is known so the

command can be encrypted. Returns error if the destination publisher is unknown and the message cannot be sent.

func (*Publisher) PublishUpdatedOutputValues

func (publisher *Publisher) PublishUpdatedOutputValues(
	updatedOutputIDs []string,
	messageSigner *messaging.MessageSigner)

PublishUpdatedOutputValues publishes updated outputs discovery and values of registered outputs This uses the node config to determine which output publications to use: eg raw, latest, history

func (*Publisher) PublishUpdates

func (publisher *Publisher) PublishUpdates()

PublishUpdates publishes changes to registered nodes, inputs, outputs, values and this publisher identity

func (*Publisher) PublisherID

func (pub *Publisher) PublisherID() string

PublisherID returns the publisher's ID

func (*Publisher) SaveDomainPublishers

func (pub *Publisher) SaveDomainPublishers() error

SaveDomainPublishers saves discovered domain publisher identities

func (*Publisher) SaveRegisteredNodes

func (pub *Publisher) SaveRegisteredNodes() error

SaveRegisteredNodes saves current registered nodes to the config folder

func (*Publisher) SetNodeConfigHandler

func (pub *Publisher) SetNodeConfigHandler(
	handler func(nodeHWID string, config types.NodeAttrMap))

SetNodeConfigHandler set the handler for updating node configuration. The handler is invoked if a configuration update for a node is received and the node exists.

func (*Publisher) SetPollInterval

func (pub *Publisher) SetPollInterval(seconds int, handler func(pub *Publisher))

SetPollInterval is a convenience function for periodic polling of updates to registered nodes, inputs, outputs and output values. seconds interval to perform another poll. Default (0) is DefaultPollInterval intended for publishers that need to poll for values

func (*Publisher) SetPublisherStatus

func (pub *Publisher) SetPublisherStatus(status types.PublisherRunState)

SetPublisherStatus sets the publisher runtime status and publishes the message

func (*Publisher) SetSigningOnOff

func (pub *Publisher) SetSigningOnOff(onOff bool)

SetSigningOnOff turns signing of publications on or off.

The default is on (true)

func (*Publisher) Start

func (pub *Publisher) Start()

Start starts publishing registered nodes, inputs and outputs, and listens for command messages. Start will fail if no messenger has been provided.

func (*Publisher) Stop

func (pub *Publisher) Stop()

Stop publishing, and set the run status to disconnected and disconnect from the message bus. Wait until the heartbeat loop has finished processing messages

func (*Publisher) Subscribe

func (pub *Publisher) Subscribe(domain string, publisherID string)

Subscribe to receive nodes, inputs and outputs from the selected domain and/or publisher To subscribe to all domains or all publishers use "" as the domain or publisherID

func (*Publisher) Unsubscribe

func (pub *Publisher) Unsubscribe(domain string, publisherID string)

Unsubscribe from receiving nodes, inputs and outputs from the selected domain and/or publisher Use the same domain and publisherID as used in Subscribe

func (*Publisher) UpdateNodeAttr

func (pub *Publisher) UpdateNodeAttr(nodeHWID string, attrParams types.NodeAttrMap) (changed bool)

UpdateNodeAttr updates one or more attributes of a registered node This only updates the node if the status or lastError message changes

func (*Publisher) UpdateNodeConfig

func (pub *Publisher) UpdateNodeConfig(nodeHWID string, attrName types.NodeAttr, configAttr *types.ConfigAttr)

UpdateNodeConfig updates a registered node's configuration and publishes the updated node.

If a config already exists then its value is retained but its configuration parameters are replaced.
Nodes are immutable. A new node is created and published and the old node instance is discarded.

func (*Publisher) UpdateNodeConfigValues

func (pub *Publisher) UpdateNodeConfigValues(nodeHWID string, params types.NodeAttrMap) (changed bool)

UpdateNodeConfigValues updates the configuration values for the given registered node. This takes a map of key-value pairs with the configuration attribute name and new value. Intended for updating the node configuration based on what the registered node reports.

func (*Publisher) UpdateNodeErrorStatus

func (pub *Publisher) UpdateNodeErrorStatus(nodeHWID string, status string, lastError string)

UpdateNodeErrorStatus sets a registered node RunState to the given status with a lasterror message Use NodeRunStateError for errors and NodeRunStateReady to clear error This only updates the node if the status or lastError message changes

func (*Publisher) UpdateNodeStatus

func (pub *Publisher) UpdateNodeStatus(nodeHWID string, status map[types.NodeStatus]string) (changed bool)

UpdateNodeStatus updates one or more status attributes of a registered node This only updates the node if the status changes

func (*Publisher) UpdateOutput

func (pub *Publisher) UpdateOutput(output *types.OutputDiscoveryMessage)

UpdateOutput replaces a registered output with a new instance. Intended to update an output attribute. If the output does not exist, this is ignored.

func (*Publisher) UpdateOutputForecast

func (pub *Publisher) UpdateOutputForecast(outputID string, forecast outputs.OutputForecast)

UpdateOutputForecast replaces a forecast

func (*Publisher) UpdateOutputValue

func (pub *Publisher) UpdateOutputValue(nodeHWID string, outputType types.OutputType, instance string, newValue string) bool

UpdateOutputValue adds the registered node's output value to the front of the value history

func (*Publisher) WaitForSignal

func (pub *Publisher) WaitForSignal()

WaitForSignal waits until a TERM or INT signal is received

type PublisherConfig

type PublisherConfig struct {
	SaveDiscoveredPublishers bool   `yaml:"cachePublishers"`   // load/save discovered publisher identities to cache
	SaveDiscoveredNodes      bool   `yaml:"cacheNodes"`        // load/save discovered nodes to cache
	CacheFolder              string `yaml:"cacheFolder"`       // location of discovered domain nodes and publishers
	ConfigFolder             string `yaml:"configFolder"`      // location of yaml configuration files and registered nodes and identity
	Domain                   string `yaml:"domain"`            // optional override per publisher. Default is local
	PublisherID              string `yaml:"publisherId"`       // this publisher's ID
	Loglevel                 string `yaml:"loglevel"`          // error, warning, info, debug
	Logfile                  string `yaml:"logfile"`           //
	DisableConfig            bool   `yaml:"disableConfig"`     // disable configuration over the bus, default is enabled
	DisableInput             bool   `yaml:"disableInput"`      // disable inputs over the bus, default is enabled
	DisablePublishers        bool   `yaml:"disablePublishers"` // disable listening for available publishers (enable for signature verification)
	SecuredDomain            bool   `yaml:"securedDomain"`     // require secured domain and signed messages
}

PublisherConfig defined configuration fields read from the application configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL