device

package
v2.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2023 License: Apache-2.0 Imports: 30 Imported by: 4

Documentation

Index

Constants

View Source
const (
	DefaultMessageTimeout time.Duration = 2 * time.Minute
	DefaultListRefresh    time.Duration = 10 * time.Second
)
View Source
const (
	// Connect indicates a successful device connection.  After receipt of this event, the given
	// Device is able to receive requests.
	Connect EventType = iota

	// Disconnect indicates a device disconnection.  After receipt of this event, the given
	// Device can no longer receive requests.
	Disconnect

	// MessageSent indicates that a message was successfully dispatched to a device.
	MessageSent

	// MessageReceived indicates that a message has been successfully received and
	// dispatched to any goroutine waiting on it, as would be the case for a response.
	MessageReceived

	// MessageFailed indicates that a message could not be sent to a device, either because
	// of a communications error or due to the device disconnecting.  For each enqueued message
	// at the time of a device's disconnection, there will be (1) MessageFailed event.
	MessageFailed

	// TransactionComplete indicates that a response to a transaction has been received, and the
	// transaction completed successfully (at least as far as the routing infrastructure can tell).
	TransactionComplete

	// TransactionBroken indicates receipt of a message that had a transaction key for which there
	// was no waiting transaction
	TransactionBroken

	InvalidEventString string = "!!INVALID DEVICE EVENT TYPE!!"
)
View Source
const (
	JWTClaimsKey = "jwt-claims"
	SessionIDKey = "session-id"
)

Reserved metadata keys

View Source
const (
	PartnerIDClaimKey = "partner-id"
	TrustClaimKey     = "trust"
)

Top level JWTClaim keys

View Source
const (
	DeviceCounter             = "device_count"
	DuplicatesCounter         = "duplicate_count"
	RequestResponseCounter    = "request_response_count"
	PingCounter               = "ping_count"
	PongCounter               = "pong_count"
	ConnectCounter            = "connect_count"
	DisconnectCounter         = "disconnect_count"
	DeviceLimitReachedCounter = "device_limit_reached_count"
	ModelGauge                = "hardware_model"
	WRPSourceCheck            = "wrp_source_check"
)
View Source
const (
	// DeviceNameHeader is the name of the HTTP header which contains the device service name.
	// This header is primarily required at connect time to identify the device.
	DeviceNameHeader = "X-Webpa-Device-Name"

	// ConveyHeader is the name of the optional HTTP header which contains the encoded convey JSON.
	ConveyHeader = "X-Webpa-Convey"

	DefaultIdlePeriod     time.Duration = 135 * time.Second
	DefaultRequestTimeout time.Duration = 30 * time.Second
	DefaultWriteTimeout   time.Duration = 60 * time.Second
	DefaultPingPeriod     time.Duration = 45 * time.Second

	DefaultReadBufferSize         = 0
	DefaultWriteBufferSize        = 0
	DefaultDeviceMessageQueueSize = 100
)
View Source
const DefaultWRPContentType = "application/octet-stream"

DefaultWRPContentType is the content type used on inbound WRP messages which don't provide one.

View Source
const (
	// DeviceManagerKey is the Viper subkey under which device.Options are typically stored
	// In a JSON configuration file, this will be expressed as:
	//
	//   {
	//     /* other stuff can be here */
	//
	//     "device": {
	//       "manager": {
	//       }
	//     }
	//   }
	DeviceManagerKey = "device.manager"
)
View Source
const MaxDevicesHeader = "X-Xmidt-Max-Devices"
View Source
const (
	UnknownPartner = "unknown"
)

Default values

Variables

View Source
var (
	ErrorMissingDeviceNameContext     = errors.New("Missing device ID in request context")
	ErrorMissingSecureContext         = errors.New("Missing security information in request context")
	ErrorMissingDeviceNameHeader      = errors.New("Missing device name header")
	ErrorMissingDeviceNameVar         = errors.New("Missing device name path variable")
	ErrorMissingPathVars              = errors.New("Missing URI path variables")
	ErrorInvalidDeviceName            = errors.New("Invalid device name")
	ErrorDeviceNotFound               = errors.New("The device does not exist")
	ErrorNonUniqueID                  = errors.New("More than once device with that identifier is connected")
	ErrorDuplicateKey                 = errors.New("That key is a duplicate")
	ErrorDuplicateDevice              = errors.New("That device is already in this registry")
	ErrorInvalidTransactionKey        = errors.New("Transaction keys must be non-empty strings")
	ErrorNoSuchTransactionKey         = errors.New("That transaction key is not registered")
	ErrorTransactionAlreadyRegistered = errors.New("That transaction is already registered")
	ErrorTransactionCanceled          = errors.New("The transaction has been canceled")
	ErrorResponseNoContents           = errors.New("The response has no contents")
	ErrorDeviceBusy                   = errors.New("That device is busy")
	ErrorDeviceClosed                 = errors.New("That device has been closed")
	ErrorTransactionsClosed           = errors.New("Transactions are closed for that device")
	ErrorTransactionsAlreadyClosed    = errors.New("That Transactions is already closed")
	ErrorDeviceFilteredOut            = errors.New("Device blocked from connecting due to filters")
)
View Source
var UseID = struct {
	// F is a configurable constructor that allows an arbitrary IDFromRequest strategy
	F func(IDFromRequest) func(http.Handler) http.Handler

	// FromHeader uses the device name header to extract the device identifier.
	// This constructor isn't configurable, and is used as-is: device.UseID.FromHeader.
	FromHeader func(http.Handler) http.Handler

	// FromPath is a configurable constructor that extracts the device identifier
	// from the URI path using the supplied variable name.  This constructor is
	// configurable: device.UseID.FromPath("deviceId").
	FromPath func(string) func(http.Handler) http.Handler
}{
	F: useID,

	FromHeader: useID(
		func(request *http.Request) (ID, error) {
			deviceName := request.Header.Get(DeviceNameHeader)
			if len(deviceName) == 0 {
				return invalidID, ErrorMissingDeviceNameHeader
			}

			return ParseID(deviceName)
		},
	),

	FromPath: func(variableName string) func(http.Handler) http.Handler {
		return useID(
			func(request *http.Request) (ID, error) {
				vars := mux.Vars(request)
				if vars == nil {
					return invalidID, ErrorMissingPathVars
				}

				deviceName := vars[variableName]
				if len(deviceName) == 0 {
					return invalidID, ErrorMissingDeviceNameVar
				}

				return ParseID(deviceName)
			},
		)
	},
}

UseID is a collection of Alice-style constructors that all insert the device ID into the delegate's request Context using various strategies.

Functions

func EncodeResponse

func EncodeResponse(output http.ResponseWriter, response *Response, format wrp.Format) (err error)

EncodeResponse writes out a device transaction Response to an http Response.

If response.Error is set, a JSON-formatted error with status http.StatusInternalServerError is written to the HTTP response.

If the encoder pool is nil, or if the pool is supplied but it's format is the same as the response, this function assumes that the format of the HTTP response is the same as response.Contents. It is an error if response.Contents is empty in this case. The response.Format field dictates the Content-Type of the HTTP response.

If none of the above applies, the encoder pool is used to encode response.Routing to the HTTP response. The content type is set to pool.Format(). nolint: typecheck

func IDHashParser

func IDHashParser(request *http.Request) ([]byte, error)

IDHashParser is a parsing function that examines an HTTP request to produce a []byte key for consistent hashing. The returned function examines the given request header and invokes ParseID on the value.

If deviceNameHeader is the empty string, DefaultDeviceNameHeader is used.

func Metrics

func Metrics() []xmetrics.Metric

Metrics is the device module function that adds default device metrics

func MustDialDevice

func MustDialDevice(d Dialer, deviceName, url string, extra http.Header) (*websocket.Conn, *http.Response)

MustDialDevice panics if the dial operation fails. Mostly useful for test code.

func NewDeadline

func NewDeadline(timeout time.Duration, now func() time.Time) func() time.Time

NewDeadline creates a deadline closure given a timeout and a now function. If timeout is nonpositive, the return closure always returns zero time. If now is nil (and timeout is positive), then time.Now is used.

func NewPinger

func NewPinger(w Writer, pings xmetrics.Incrementer, data []byte, deadline func() time.Time) (func() error, error)

NewPinger creates a ping closure for the given connection. Internally, a prepared message is created using the supplied data, and the given counter is incremented for each successful update of the write deadline.

func SetPongHandler

func SetPongHandler(r Reader, pongs xmetrics.Incrementer, deadline func() time.Time)

SetPongHandler establishes an instrumented pong handler for the given connection that enforces the given read timeout.

func WithDeviceMetadata

func WithDeviceMetadata(parent context.Context, metadata *Metadata) context.Context

WithDeviceMetadata returns a new context with the given metadata as a value.

func WithID

func WithID(parent context.Context, id ID) context.Context

WithID returns a new context with the given device ID as a value.

func WithIDRequest

func WithIDRequest(id ID, original *http.Request) *http.Request

WithIDRequest returns a new HTTP request with the given device ID in the associated Context.

Types

type CloseReason

type CloseReason struct {
	// Err is the optional field that specifies the underlying error that occurred, such as
	// an I/O error.  If nil, the close reason is assumed to be due to application logic, e.g. a rehash
	Err error

	// Text is the required field indicating a JSON-friendly value describing the reason for closure.
	Text string
}

CloseReason exposes metadata around why a particular device was closed

func (CloseReason) String

func (c CloseReason) String() string

type ConnectHandler

type ConnectHandler struct {
	Logger         *zap.Logger
	Connector      Connector
	ResponseHeader http.Header
}

ConnectHandler is used to initiate a concurrent connection between a Talaria and a device by upgrading a http connection to a websocket

func (*ConnectHandler) ServeHTTP

func (ch *ConnectHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)

type Connection

type Connection interface {
	io.Closer
	Reader
	Writer
}

Connection describes the set of behaviors for device connections used by this package.

type Connector

type Connector interface {
	// Connect upgrade an HTTP connection to a websocket and begins concurrent
	// management of the device.
	Connect(http.ResponseWriter, *http.Request, http.Header) (Interface, error)

	// Disconnect disconnects the device associated with the given id.
	// If the id was found, this method returns true.
	Disconnect(ID, CloseReason) bool

	// DisconnectIf iterates over all devices known to this manager, applying the
	// given predicate.  For any devices that result in true, this method disconnects them.
	// Note that this method may pause connections and disconnections while it is executing.
	// This method returns the number of devices that were disconnected.
	//
	// Only disconnection by ID is supported, which means that any identifier matching
	// the predicate will result in *all* duplicate devices under that ID being removed.
	//
	// No methods on this Manager should be called from within the predicate function, or
	// a deadlock will likely occur.
	DisconnectIf(func(ID) (CloseReason, bool)) int

	// DisconnectAll disconnects all devices from this instance, and returns the count of
	// devices disconnected.
	DisconnectAll(CloseReason) int

	// GetFilter returns the Filter interface used for filtering connection requests
	GetFilter() Filter
}

Connector is a strategy interface for managing device connections to a server. Implementations are responsible for upgrading websocket connections and providing for explicit disconnection.

type Dialer

type Dialer interface {
	// DialDevice attempts to connect to the given device.  If supplied, the extra headers are passed
	// along in the Dial call.  However, the extra http.Header object is not modified by this method.
	DialDevice(deviceName, url string, extra http.Header) (*websocket.Conn, *http.Response, error)
}

Dialer is a device-specific dialer for device websocket connections. This interface has a similar signature and usage pattern as gorilla's websocket.Dialer.

func DefaultDialer

func DefaultDialer() Dialer

DefaultDialer returns a useful default device Dialer

func NewDialer

func NewDialer(o DialerOptions) Dialer

NewDialer produces a device dialer using the supplied set of options

type DialerOptions

type DialerOptions struct {
	// DeviceHeader is the HTTP header which carries the device name.
	// If unset, DeviceNameHeader is used
	DeviceHeader string

	// WSDialer is the low-level websocket dialer to use.  If unset, an internal default gorilla dialer is used.
	WSDialer websocketDialer
}

DialerOptions represents the set of options available when creating a custom device Dialer

type Event

type Event struct {
	// Type describes the kind of this event.  This field is always set.
	Type EventType

	// Device refers to the device, possibly disconnected, for which this event is being set.
	// This field is always set.
	Device Interface

	// Message is the WRP message relevant to this event.
	//
	// Never assume that it is safe to use this Message outside the listener invocation.  Make
	// a copy if this Message is needed by other goroutines or if it needs to be part of a long-lived
	// data structure.
	// nolint: typecheck
	Message wrp.Typed

	// Format is the encoding format of the Contents field
	// nolint: typecheck
	Format wrp.Format

	// Contents is the encoded representation of the Message field.  It is always set if and only if
	// the Message field is set.
	//
	// Never assume that it is safe to use this byte slice outside the listener invocation.  Make
	// a copy if this byte slice is needed by other goroutines or if it needs to be part of a long-lived
	// data structure.
	Contents []byte

	// Error is the error which occurred during an attempt to send a message.  This field is only populated
	// for MessageFailed events when there was an actual error.  For MessageFailed events that indicate a
	// device was disconnected with enqueued messages, this field will be nil.
	Error error
}

Event represents a single occurrence of interest for device-related applications. Instances of Event should be considered immutable by application code. Also, Event instances should not be stored across calls to a listener, as the infrastructure is free to reuse Event instances.

type EventType

type EventType uint8

EventType is the type of device-related event

func (EventType) String

func (et EventType) String() string

type Filter

type Filter interface {
	AllowConnection(d Interface) (bool, MatchResult)
}

type FilterFunc

type FilterFunc func(d Interface) (bool, MatchResult)

func (FilterFunc) AllowConnection

func (filter FilterFunc) AllowConnection(d Interface) (bool, MatchResult)

type ID

type ID string

ID represents a normalized identifier for a device.

func GetID

func GetID(ctx context.Context) (id ID, ok bool)

GetID returns the device ID from the context if any.

func IntToMAC

func IntToMAC(value uint64) ID

IntToMAC accepts a 64-bit integer and formats that as a device MAC address identifier The returned ID will be of the form mac:XXXXXXXXXXXX, where X is a hexadecimal digit using lowercased letters.

func ParseID

func ParseID(deviceName string) (ID, error)

ParseID parses a raw device name into a canonicalized identifier.

func (ID) Bytes

func (id ID) Bytes() []byte

Bytes is a convenience function to obtain the []byte representation of an ID.

type IDFromRequest

type IDFromRequest func(*http.Request) (ID, error)

IDFromRequest is a strategy type for extracting the device identifier from an HTTP request

type Interface

type Interface interface {
	fmt.Stringer
	json.Marshaler

	// ID returns the canonicalized identifer for this device.  Note that
	// this is NOT globally unique.  It is possible for multiple devices
	// with the same ID to be connected.  This typically occurs due to fraud,
	// but we don't want to turn away duped devices.
	ID() ID

	// Pending returns the count of pending messages for this device
	Pending() int

	// Closed tests if this device is closed.  When this method returns true,
	// any attempt to send messages to this device will result in an error.
	//
	// Once closed, a device cannot be reopened.
	Closed() bool

	// Send dispatches a message to this device.  This method is useful outside
	// a Manager if multiple messages should be sent to the device.  The Request.Message field
	// is not required if Request.Contents and Request.Format are set appropriately.  However,
	// a Request.Message is the only way to start a transaction.
	//
	// This method is synchronous.  If the request is of a type that should expect a response,
	// that response is returned.  An error is returned if this device has been closed or
	// if there were any I/O issues sending the request.
	//
	// Internally, the requests passed to this method are serviced by the write pump in
	// the enclosing Manager instance.  The read pump will handle sending the response.
	Send(*Request) (*Response, error)

	// Statistics returns the current, tracked Statistics instance for this device
	Statistics() Statistics

	// Convey returns a read-only view of the device convey information
	Convey() convey.Interface

	// ConveyCompliance returns the result of attempting to parse the convey information
	// sent during device connection
	ConveyCompliance() convey.Compliance

	// Metadata returns a key value store object for information that's useful to guide interactions
	// with a device such as security credentials.
	Metadata() *Metadata

	// CloseReason returns the metadata explaining why a device was closed.  If this device
	// is not closed, this method's return is undefined.
	CloseReason() CloseReason
}

Interface is the core type for this package. It provides access to public device metadata and the ability to send messages directly the a device.

Instances are mostly immutable, and have a strict lifecycle. Devices are initially open, and when closed cannot be reused or reopened. A new device instance is required if further communication is desired after the original device instance is closed.

The only piece of metadata that is mutable is the Key. A device Manager allows clients to change the routing Key of a device. All other public metadata is immutable.

Each device will have a pair of goroutines within the enclosing manager: a read and write, referred to as pumps. The write pump services the queue of messages used by Send, while the read pump rarely needs to interact with devices directly.

The String() method will always return a valid JSON object representation of this device.

type ListHandler

type ListHandler struct {
	Logger   *zap.Logger
	Registry Registry
	Refresh  time.Duration
	// contains filtered or unexported fields
}

ListHandler is an HTTP handler which can take updated JSON device lists.

func (*ListHandler) ServeHTTP

func (lh *ListHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)

type Listener

type Listener func(*Event)

Listener is an event sink. Listeners should never modify events and should never store events for later use. If data from an event is needed for another goroutine or for long-term storage, a copy should be made.

type Manager

type Manager interface {
	Connector
	Router
	Registry
}

Manager supplies a hub for connecting and disconnecting devices as well as an access point for obtaining device metadata.

func NewManager

func NewManager(o *Options) Manager

NewManager constructs a Manager from a set of options. A ConnectionFactory will be created from the options if one is not supplied.

type ManagerOption

type ManagerOption func(*manager)

ManagerOption is a configuration option for a manager

type MatchResult

type MatchResult struct {
	Location string
	Key      string
}

type Measures

type Measures struct {
	Device          xmetrics.Setter
	LimitReached    xmetrics.Incrementer
	Duplicates      xmetrics.Incrementer
	RequestResponse metrics.Counter
	Ping            xmetrics.Incrementer
	Pong            xmetrics.Incrementer
	Connect         xmetrics.Incrementer
	Disconnect      xmetrics.Adder
	Models          metrics.Gauge
	WRPSourceCheck  metrics.Counter
}

Measures is a convenient struct that holds all the device-related metric objects for runtime consumption.

func NewMeasures

func NewMeasures(p provider.Provider) Measures

NewMeasures constructs a Measures given a go-kit metrics Provider

type MessageHandler

type MessageHandler struct {
	// Logger is the sink for logging output.  If not set, logging will be sent to a NOP logger
	Logger *zap.Logger

	// Router is the device message Router to use.  This field is required.
	Router Router
}

MessageHandler is a configurable http.Handler which handles inbound WRP traffic to be sent to devices.

func (*MessageHandler) ServeHTTP

func (mh *MessageHandler) ServeHTTP(httpResponse http.ResponseWriter, httpRequest *http.Request)

type Metadata

type Metadata struct {
	// contains filtered or unexported fields
}

Metadata contains information such as security credentials related to a device. Read operations are optimized with a copy-on-write strategy. Client code must further synchronize concurrent writers to avoid stale data. Metadata uses an atomic.Value internally and thus it should not be copied after creation.

func GetDeviceMetadata

func GetDeviceMetadata(ctx context.Context) (metadata *Metadata, ok bool)

GetDeviceMetadata returns the device metadata from the context if any.

func (*Metadata) Claims

func (m *Metadata) Claims() (claims map[string]interface{})

Claims returns the claims attached to a device. The returned map should not be modified to avoid any race conditions. To update the claims, take a look at the ClaimsCopy() function

func (*Metadata) ClaimsCopy

func (m *Metadata) ClaimsCopy() map[string]interface{}

ClaimsCopy returns a deep copy of the claims. Use this, along with the SetClaims() method to update the claims.

func (*Metadata) Load

func (m *Metadata) Load(key string) interface{}

Load returns the value associated with the given key in the metadata map. It is not recommended modifying values returned by reference.

func (*Metadata) PartnerIDClaim

func (m *Metadata) PartnerIDClaim() string

PartnerIDClaim returns the partner ID claim. If no claim is found, the value defaults to "unknown"

func (*Metadata) SessionID

func (m *Metadata) SessionID() (sessionID string)

SessionID returns the UUID associated with a device's current connection to the cluster if one has been set. The zero value is returned as default.

func (*Metadata) SetClaims

func (m *Metadata) SetClaims(claims map[string]interface{})

SetClaims updates the claims associated with the device that's owner of the metadata. To avoid updating the claims with stale data, client code will need to synchronize the entire transaction of reading, copying, modifying and writing back the value.

func (*Metadata) SetSessionID

func (m *Metadata) SetSessionID(sessionID string)

SetSessionID sets the UUID associated the device's current connection to the cluster. It uses sync.Once to ensure the sessionID is unchanged through the metadata's lifecycle.

func (*Metadata) Store

func (m *Metadata) Store(key string, value interface{}) bool

Store updates the key value mapping in the device metadata map. A boolean result is given indicating whether the operation was successful. Operations will fail for reserved keys. To avoid updating keys with stale data/value, client code will need to synchronize the entire transaction of reading, copying, modifying and writing back the value.

func (*Metadata) TrustClaim

func (m *Metadata) TrustClaim() int

TrustClaim returns the device's trust level claim. By Default, a device is untrusted (trust = 0).

type MockConnector

type MockConnector struct {
	mock.Mock
}

func (*MockConnector) Connect

func (m *MockConnector) Connect(response http.ResponseWriter, request *http.Request, header http.Header) (Interface, error)

func (*MockConnector) Disconnect

func (m *MockConnector) Disconnect(id ID, reason CloseReason) bool

func (*MockConnector) DisconnectAll

func (m *MockConnector) DisconnectAll(reason CloseReason) int

func (*MockConnector) DisconnectIf

func (m *MockConnector) DisconnectIf(predicate func(ID) (CloseReason, bool)) int

func (*MockConnector) GetFilter

func (m *MockConnector) GetFilter() Filter

type MockDevice

type MockDevice struct {
	mock.Mock
}

func (*MockDevice) Close

func (m *MockDevice) Close() error

func (*MockDevice) CloseReason

func (m *MockDevice) CloseReason() CloseReason

func (*MockDevice) Closed

func (m *MockDevice) Closed() bool

func (*MockDevice) Convey

func (m *MockDevice) Convey() convey.Interface

func (*MockDevice) ConveyCompliance

func (m *MockDevice) ConveyCompliance() convey.Compliance

func (*MockDevice) ID

func (m *MockDevice) ID() ID

func (*MockDevice) MarshalJSON

func (m *MockDevice) MarshalJSON() ([]byte, error)

func (*MockDevice) Metadata

func (m *MockDevice) Metadata() *Metadata

func (*MockDevice) Pending

func (m *MockDevice) Pending() int

func (*MockDevice) Send

func (m *MockDevice) Send(request *Request) (*Response, error)

func (*MockDevice) Statistics

func (m *MockDevice) Statistics() Statistics

func (*MockDevice) String

func (m *MockDevice) String() string

type MockRegistry

type MockRegistry struct {
	mock.Mock
}

func (*MockRegistry) Get

func (m *MockRegistry) Get(id ID) (Interface, bool)

func (*MockRegistry) Len

func (m *MockRegistry) Len() int

func (*MockRegistry) VisitAll

func (m *MockRegistry) VisitAll(f func(Interface) bool) int

type Options

type Options struct {
	// Upgrader is the gorilla websocket.Upgrader injected into these options.
	Upgrader websocket.Upgrader

	// MaxDevices is the maximum number of devices allowed to connect to any one Manager.
	// If unset (i.e. zero), math.MaxUint32 is used as the maximum.
	MaxDevices int

	// DeviceMessageQueueSize is the capacity of the channel which stores messages waiting
	// to be transmitted to a device.  If not supplied, DefaultDeviceMessageQueueSize is used.
	DeviceMessageQueueSize int

	// PingPeriod is the time between pings sent to each device
	PingPeriod time.Duration

	// IdlePeriod is the length of time a device connection is allowed to be idle,
	// with no traffic coming from the device.  If not supplied, DefaultIdlePeriod is used.
	IdlePeriod time.Duration

	// WriteTimeout is the write timeout for each device's websocket.  If not supplied,
	// DefaultWriteTimeout is used.
	WriteTimeout time.Duration

	// Listeners contains the event sinks for managers created using these options
	Listeners []Listener

	// Logger is the output sink for log messages.  If not supplied, log output
	// is sent to a NOP logger.
	Logger *zap.Logger

	// MetricsProvider is the go-kit factory for metrics
	MetricsProvider provider.Provider

	// Now is the closure used to determine the current time.  If not set, time.Now is used.
	Now func() time.Time

	// WRPSourceCheck defines behavior around checking the Source field in WRP messages originating
	// from devices. All the following are cases of an invalid WRP wrt the source:
	// 1) Source is empty.
	// 2) Canonical ID can't be parsed from Source.
	// 3) Canonical ID doesn't match that of the established websocket connection.
	// Note: when the check type is "monitor", no messages are dropped but they are logged as an error and update the "wrp_source_check"
	// counter.
	WRPSourceCheck wrpSourceCheckConfig

	// Filter determines whether or not a device should be able to connect to talaria based on the filters in place
	Filter Filter
}

Options represent the available configuration options for components within this package

func NewOptions

func NewOptions(logger *zap.Logger, v *viper.Viper) (o *Options, err error)

NewOptions unmarshals a device.Options from a Viper environment. Listeners must be configured separately.

type ReadCloser

type ReadCloser interface {
	io.Closer
	Reader
}

ReadCloser adds io.Closer behavior to Reader

func InstrumentReader

func InstrumentReader(r ReadCloser, s Statistics) ReadCloser

type Reader

type Reader interface {
	ReadMessage() (int, []byte, error)
	SetReadDeadline(time.Time) error
	SetPongHandler(func(string) error)
}

Reader represents the read behavior of a device connection

type Registry

type Registry interface {
	// Len returns the count of devices currently in this registry
	Len() int

	// Get returns the device associated with the given ID, if any
	Get(ID) (Interface, bool)

	// VisitAll applies the given visitor function to each device known to this manager.
	//
	// No methods on this Manager should be called from within the visitor function, or
	// a deadlock will likely occur.
	VisitAll(func(Interface) bool) int
}

Registry is the strategy interface for querying the set of connected devices. Methods in this interface follow the Visitor pattern and are typically executed under a read lock.

type Request

type Request struct {
	// Message is the original, decoded WRP message containing the routing information.  When sending a request
	// through Manager.Route, this field is required and must also implement wrp.Routable.
	// nolint: typecheck
	Message wrp.Typed

	// Format is the WRP format of the Contents member.  If Format is not JSON, then Routing
	// will be encoded prior to sending to devices.
	// nolint: typecheck
	Format wrp.Format

	// Contents is the encoded form of Routing in Format format.  If this member is of 0 length,
	// then Routing will be encoded prior to sending to devices.
	Contents []byte
	// contains filtered or unexported fields
}

Request represents a single device Request, carrying routing information and message contents.

func DecodeRequest

func DecodeRequest(source io.Reader, format wrp.Format) (*Request, error)

DecodeRequest decodes a WRP source into a device Request. Typically, this is used to produce a device Request from an http.Request.

The returned request will not be associated with any context. nolint: typecheck

func (*Request) Context

func (r *Request) Context() context.Context

Context returns the context.Context object associated with this Request. This method never returns nil. If no context is associated with this Request, this method returns context.Background().

func (*Request) ID

func (r *Request) ID() (i ID, err error)

ID returns the device id for this request. If Message is nil or does not implement wrp.Routable, this method returns an empty identifier.

func (*Request) Transactional

func (r *Request) Transactional() (string, bool)

Transactional tests if Message is Routable and, if so, returns the transactional information from the request. This method returns a tuple containing the transaction key (if any) combined with wheither this request represents part of a transaction.

func (*Request) WithContext

func (r *Request) WithContext(ctx context.Context) *Request

WithContext is similar to net/http.Request.WithContext. This method does not, however, create a copy of the original device Request. Rather, it returns the request modified with the next context.

type Response

type Response struct {
	// Device is the sink to which the corresponding Request was sent
	Device Interface

	// Message is the decoded WRP message received from the device
	// nolint: typecheck
	Message *wrp.Message

	// Format is the encoding Format of the Contents field.  Almost always, this will be Msgpack.
	// nolint: typecheck
	Format wrp.Format

	// Contents is the encoded form of Message, formatted in Format
	Contents []byte
}

Response represents the response to a device request. Some requests have no response, in which case a Response without a Routing or Contents will be returned.

type Router

type Router interface {
	// Route dispatches a WRP request to exactly one device, identified by the ID
	// field of the request.  Route is synchronous, and honors the cancellation semantics
	// of the Request's context.
	Route(*Request) (*Response, error)
}

Router handles dispatching messages to devices.

type StatHandler

type StatHandler struct {
	Logger   *zap.Logger
	Registry Registry
	Variable string
}

StatHandler is an http.Handler that returns device statistics. The device name is specified as a gorilla path variable.

func (*StatHandler) ServeHTTP

func (sh *StatHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)

type Statistics

type Statistics interface {
	fmt.Stringer
	json.Marshaler

	// BytesReceived returns the total bytes received since this instance was created
	BytesReceived() int

	// AddBytesReceived increments the BytesReceived count
	AddBytesReceived(int)

	// MessagesReceived returns the total messages received since this instance was created
	MessagesReceived() int

	// AddMessagesReceived increments the MessagesReceived count
	AddMessagesReceived(int)

	// BytesSent returns the total bytes sent since this instance was created
	BytesSent() int

	// AddBytesSent increments the BytesSent count
	AddBytesSent(int)

	// MessagesSent returns the total messages sent since this instance was created
	MessagesSent() int

	// AddMessagesSent increments the MessagesSent count
	AddMessagesSent(int)

	// Duplications returns the number of times this device has had a duplicate connected, i.e.
	// a device with the same device ID.
	Duplications() int

	// AddDuplications increments the count of duplications
	AddDuplications(int)

	// ConnectedAt returns the connection time at which this statistics began tracking
	ConnectedAt() time.Time

	// UpTime computes the duration for which the device has been connected
	UpTime() time.Duration
}

Statistics represents a set of device statistics.

func NewStatistics

func NewStatistics(now func() time.Time, connectedAt time.Time) Statistics

NewStatistics creates a Statistics instance with the given connection time If now is nil, this method uses time.Now.

type Transactions

type Transactions struct {
	// contains filtered or unexported fields
}

Transactions represents a set of pending transactions. Instances are safe for concurrent access.

func NewTransactions

func NewTransactions() *Transactions

func (*Transactions) Cancel

func (t *Transactions) Cancel(transactionKey string)

Cancel simply cancels a transaction. The transaction key is removed from the pending set. If that transaction key is not registered, this method does nothing. The channel returned from Register is closed, which will cause any code waiting for a response to get a nil Response.

This method is normally called by the same goroutine that calls Register to ensure that transactions are cleaned up.

func (*Transactions) Close

func (t *Transactions) Close() error

Close cancels all pending transactions and marks this Transactions so that no future Register calls will succeed. Typically useful during a device disconnection to cleanup waiting goroutines.

func (*Transactions) Complete

func (t *Transactions) Complete(transactionKey string, response *Response) error

Complete dispatches the given response to the appropriate channel returned from Register and removes the transaction from the internal pending set. This method is intended for goroutines that are servicing queues of messages, e.g. the read pump of a Manager. Such goroutines use this method to indicate that a transaction is complete.

If this method is passed a nil response, it panics.

func (*Transactions) Keys

func (t *Transactions) Keys() []string

Keys returns a slice containing the transaction keys that are pending

func (*Transactions) Len

func (t *Transactions) Len() int

Len returns the count of pending transactions

func (*Transactions) Register

func (t *Transactions) Register(transactionKey string) (<-chan *Response, error)

Register inserts a transaction key into the pending set and returns a channel that a Response will be repoted on. This method is intended to be called by goroutines which want to wait for a transaction to complete.

This method returns an error if either transactionKey is the empty string or if a transaction with this key has already been registered. The latter is a more serious problem, since it indicates that higher-level code has generated duplicate transaction identifiers. For safety, a Transactions instance expressly does not allow that case.

The returned channel will either receive a non-nil response from some code calling Complete, or will see a channel closure (nil Response) from some code calling Cancel.

type WRPSourceCheckType

type WRPSourceCheckType string

WRPSourceCheckType is used to define the different modes in which the source check can run.

const (
	CheckTypeMonitor WRPSourceCheckType = "monitor"
	CheckTypeEnforce WRPSourceCheckType = "enforce"
)

Check types for the WRP Source check

type WriteCloser

type WriteCloser interface {
	io.Closer
	Writer
}

WriteCloser adds io.Closer behavior to Writer

func InstrumentWriter

func InstrumentWriter(w WriteCloser, s Statistics) WriteCloser

type Writer

type Writer interface {
	WriteMessage(int, []byte) error
	WritePreparedMessage(*websocket.PreparedMessage) error
	SetWriteDeadline(time.Time) error
}

Writer represents the write behavior of a device connection

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL