Documentation
¶
Index ¶
- Constants
- Variables
- func FromControlChannel(in <-chan ControlSnip) <-chan interface{}
- func FromSnipChannel(in <-chan QuerySnip) <-chan interface{}
- func NewControlRunner(run func(c <-chan ControlSnip)) func(c <-chan interface{})
- func NewMqttOptions(broker string, user string, password string, clientID string) *MQTT.ClientOptions
- func NewSnipRunner(run func(c <-chan QuerySnip)) func(c <-chan interface{})
- func ServeWebsocket(hub *SocketHub, w http.ResponseWriter, r *http.Request)
- func ToControlChannel(in <-chan interface{}) <-chan ControlSnip
- type Broadcaster
- type Cache
- type ControlSnip
- type DeviceInfo
- type DeviceStatus
- type Handler
- type HomieRunner
- type Httpd
- type Influx
- type MemoryStatus
- type MeterReadings
- type ModbusStatus
- type MqttClient
- type MqttRunner
- type QueryEngine
- type QuerySnip
- type Readings
- type RuntimeInfo
- type SocketClient
- type SocketHub
- type Status
Constants ¶
const AssetsDir = "assets"
AssetsDir is the assets directory relative to the module root
Variables ¶
var ( // Version of executable Version = "unknown version" // Commit of executable Commit = "unknown commit" )
var Assets fs.FS
Assets is the embedded assets file system
Functions ¶
func FromControlChannel ¶
func FromControlChannel(in <-chan ControlSnip) <-chan interface{}
FromControlChannel adapts a chan ControlSnip to chan interface
func FromSnipChannel ¶
func FromSnipChannel(in <-chan QuerySnip) <-chan interface{}
FromSnipChannel adapts a chan QuerySnip to chan interface
func NewControlRunner ¶
func NewControlRunner(run func(c <-chan ControlSnip)) func(c <-chan interface{})
NewControlRunner adapts a chan ControlSnip to chan interface
func NewMqttOptions ¶
func NewMqttOptions( broker string, user string, password string, clientID string, ) *MQTT.ClientOptions
NewMqttOptions creates MQTT client options
func NewSnipRunner ¶
func NewSnipRunner(run func(c <-chan QuerySnip)) func(c <-chan interface{})
NewSnipRunner adapts a chan QuerySnip to chan interface
func ServeWebsocket ¶
func ServeWebsocket(hub *SocketHub, w http.ResponseWriter, r *http.Request)
ServeWebsocket handles websocket requests from the peer.
func ToControlChannel ¶
func ToControlChannel(in <-chan interface{}) <-chan ControlSnip
ToControlChannel adapts a chan interface to chan ControlSnip
Types ¶
type Broadcaster ¶
type Broadcaster struct { sync.Mutex // guard recipients // contains filtered or unexported fields }
Broadcaster acts as hub for broadcating snips to multiple recipients
func NewBroadcaster ¶
func NewBroadcaster(in <-chan interface{}) *Broadcaster
NewBroadcaster creates a Broadcaster that implements a hub and spoke message replication pattern
func (*Broadcaster) Attach ¶
func (b *Broadcaster) Attach() <-chan interface{}
Attach creates and attaches a channel to the broadcaster
func (*Broadcaster) AttachRunner ¶
func (b *Broadcaster) AttachRunner(runner func(<-chan interface{}))
AttachRunner attaches a Run method as broadcast receiver and adds it to the waitgroup
func (*Broadcaster) Done ¶
func (b *Broadcaster) Done() <-chan struct{}
Done returns a channel signalling when broadcasting has stopped
type Cache ¶
Cache caches and aggregates meter reasings
type ControlSnip ¶
type ControlSnip struct { Device string Status RuntimeInfo }
ControlSnip wraps device status information
type DeviceInfo ¶
type DeviceInfo interface {
DeviceDescriptorByID(id string) meters.DeviceDescriptor
}
DeviceInfo returns device descriptor by device id
type DeviceStatus ¶
type DeviceStatus struct { Device string Type string Online bool ModbusStatus }
DeviceStatus represents a devices runtime status
type Handler ¶
Handler is responsible for querying a single connection
func NewHandler ¶
NewHandler creates a connection handler. The handler is responsible for querying all devices attached to the connection.
type HomieRunner ¶
type HomieRunner struct {
// contains filtered or unexported fields
}
HomieRunner publishes query results as homie mqtt topics
func NewHomieRunner ¶
func NewHomieRunner(qe DeviceInfo, cc <-chan ControlSnip, options *MQTT.ClientOptions, qos byte, rootTopic string, verbose bool) *HomieRunner
NewHomieRunner create new runner for homie IoT spec
type Httpd ¶
type Httpd struct {
// contains filtered or unexported fields
}
Httpd is an http server
type Influx ¶
type Influx struct {
// contains filtered or unexported fields
}
Influx is an InfluxDB v2 publisher
type MemoryStatus ¶
MemoryStatus represents daemon memory allocation
type MeterReadings ¶
MeterReadings holds entire sets of current and recent meter readings for a single device
func NewMeterReadings ¶
func NewMeterReadings(maxAge time.Duration) *MeterReadings
NewMeterReadings container for current and recent meter readings
func (*MeterReadings) Add ¶
func (mr *MeterReadings) Add(snip QuerySnip)
Add adds a meter reading for specified device
func (*MeterReadings) Average ¶
func (mr *MeterReadings) Average(timestamp time.Time) *Readings
Average averages historic readings after given timestamp
func (*MeterReadings) TrimBefore ¶
func (mr *MeterReadings) TrimBefore(timestamp time.Time)
TrimBefore removes historic readings older than timestamp
type ModbusStatus ¶
type ModbusStatus struct { Requests uint64 RequestsPerMinute float64 Errors uint64 ErrorsPerMinute float64 }
ModbusStatus represents device request and error status
type MqttClient ¶
MqttClient is a MQTT publisher
func NewMqttClient ¶
func NewMqttClient( options *MQTT.ClientOptions, qos byte, verbose bool, ) *MqttClient
NewMqttClient creates new publisher for MQTT
func (*MqttClient) Publish ¶
func (m *MqttClient) Publish(topic string, retained bool, message interface{})
Publish MQTT message with error handling
func (*MqttClient) WaitForToken ¶
func (m *MqttClient) WaitForToken(token MQTT.Token)
WaitForToken synchronously waits until token operation completed
type MqttRunner ¶
type MqttRunner struct { *MqttClient // contains filtered or unexported fields }
MqttRunner allows to attach an MqttClient as broadcast receiver
func NewMqttRunner ¶
func NewMqttRunner(options *MQTT.ClientOptions, qos byte, topic string, verbose bool) *MqttRunner
NewMqttRunner create a new runer for plain MQTT
type QueryEngine ¶
type QueryEngine struct {
// contains filtered or unexported fields
}
QueryEngine executes queries on connections and attached devices
func NewQueryEngine ¶
func NewQueryEngine(managers map[string]*meters.Manager) *QueryEngine
NewQueryEngine creates new query engine
func (*QueryEngine) DeviceDescriptorByID ¶
func (q *QueryEngine) DeviceDescriptorByID(id string) (res meters.DeviceDescriptor)
DeviceDescriptorByID implements DeviceInfo interface
func (*QueryEngine) Run ¶
func (q *QueryEngine) Run( ctx context.Context, rate time.Duration, control chan<- ControlSnip, results chan<- QuerySnip, )
Run executes the query engine to produce measurement results
type QuerySnip ¶
type QuerySnip struct { Device string meters.MeasurementResult }
QuerySnip wraps query results
func (*QuerySnip) MarshalJSON ¶
MarshalJSON converts QuerySnip to json, replacing Timestamp with unix time representation
type Readings ¶
Readings combines readings of all measurements into one data structure
func (*Readings) Add ¶
Add adds the values represented by the QuerySnip to the Readings and updates the current time stamp
type RuntimeInfo ¶
type RuntimeInfo struct { Online bool Requests uint64 Errors uint64 // contains filtered or unexported fields }
RuntimeInfo represents a single modbus device status
func (*RuntimeInfo) Available ¶
func (r *RuntimeInfo) Available(online bool)
Available sets the device online status
func (*RuntimeInfo) IsQueryable ¶
func (r *RuntimeInfo) IsQueryable() (queryable bool, elapsed bool)
IsQueryable determines if a device can be queries. This is the case if either the device is online or the device is offline and the retryTimeout has elapsed. Returns queryable status and if the offline timeout has elapsed.
type SocketClient ¶
type SocketClient struct {
// contains filtered or unexported fields
}
SocketClient is a middleman between the websocket connection and the hub.
type SocketHub ¶
type SocketHub struct {
// contains filtered or unexported fields
}
SocketHub maintains the set of active clients and broadcasts messages to the clients.
func NewSocketHub ¶
NewSocketHub creates a web socket hub that distributes meter status and query results for the ui or other clients
type Status ¶
type Status struct { sync.Mutex StartTime time.Time UpTime float64 Goroutines int Memory MemoryStatus Meters []DeviceStatus // contains filtered or unexported fields }
Status represents the daemon and device status. It is updated when marshaled to JSON
func NewStatus ¶
func NewStatus(qe DeviceInfo, control <-chan ControlSnip) *Status
NewStatus creates status cache that collects device status from control channel. It needs to be Update()d in order to refresh its data for consumption
func (*Status) MarshalJSON ¶
MarshalJSON will syncronize access to the status object see http://choly.ca/post/go-json-marshalling/ for avoiding infinite loop