Documentation ¶
Index ¶
- Constants
- Variables
- func EncodeResponse(output http.ResponseWriter, response *Response, format wrp.Format) (err error)
- func IDHashParser(request *http.Request) ([]byte, error)
- func Metrics() []xmetrics.Metric
- func MustDialDevice(d Dialer, deviceName, url string, extra http.Header) (*websocket.Conn, *http.Response)
- func NewDeadline(timeout time.Duration, now func() time.Time) func() time.Time
- func NewPinger(w Writer, pings xmetrics.Incrementer, data []byte, deadline func() time.Time) (func() error, error)
- func SetPongHandler(r Reader, pongs xmetrics.Incrementer, deadline func() time.Time)
- func Timeout(o *Options) func(http.Handler) http.Handler
- func WithID(id ID, parent context.Context) context.Context
- func WithIDRequest(id ID, original *http.Request) *http.Request
- type CloseReason
- type ConnectHandler
- type Connection
- type Connector
- type Dialer
- type DialerOptions
- type Event
- type EventType
- type ID
- type IDFromRequest
- type Interface
- type ListHandler
- type Listener
- type Manager
- type Measures
- type MessageHandler
- type MockConnector
- func (m *MockConnector) Connect(response http.ResponseWriter, request *http.Request, header http.Header) (Interface, error)
- func (m *MockConnector) Disconnect(id ID, reason CloseReason) bool
- func (m *MockConnector) DisconnectAll(reason CloseReason) int
- func (m *MockConnector) DisconnectIf(predicate func(ID) (CloseReason, bool)) int
- type MockDevice
- func (m *MockDevice) Close() error
- func (m *MockDevice) CloseReason() CloseReason
- func (m *MockDevice) Closed() bool
- func (m *MockDevice) Convey() convey.Interface
- func (m *MockDevice) ConveyCompliance() convey.Compliance
- func (m *MockDevice) ID() ID
- func (m *MockDevice) MarshalJSON() ([]byte, error)
- func (m *MockDevice) PartnerIDs() []string
- func (m *MockDevice) Pending() int
- func (m *MockDevice) SatClientID() string
- func (m *MockDevice) Send(request *Request) (*Response, error)
- func (m *MockDevice) Statistics() Statistics
- func (m *MockDevice) String() string
- func (m *MockDevice) Trust() string
- type MockRegistry
- type Options
- type ReadCloser
- type Reader
- type Registry
- type Request
- type Response
- type Router
- type StatHandler
- type Statistics
- type Transactions
- func (t *Transactions) Cancel(transactionKey string)
- func (t *Transactions) Close() error
- func (t *Transactions) Complete(transactionKey string, response *Response) error
- func (t *Transactions) Keys() []string
- func (t *Transactions) Len() int
- func (t *Transactions) Register(transactionKey string) (<-chan *Response, error)
- type WriteCloser
- type Writer
Constants ¶
const ( DefaultMessageTimeout time.Duration = 2 * time.Minute DefaultListRefresh time.Duration = 10 * time.Second )
const ( // Connect indicates a successful device connection. After receipt of this event, the given // Device is able to receive requests. Connect EventType = iota // Disconnect indicates a device disconnection. After receipt of this event, the given // Device can no longer receive requests. Disconnect // MessageSent indicates that a message was successfully dispatched to a device. MessageSent // MessageReceived indicates that a message has been successfully received and // dispatched to any goroutine waiting on it, as would be the case for a response. MessageReceived // MessageFailed indicates that a message could not be sent to a device, either because // of a communications error or due to the device disconnecting. For each enqueued message // at the time of a device's disconnection, there will be (1) MessageFailed event. MessageFailed // TransactionComplete indicates that a response to a transaction has been received, and the // transaction completed successfully (at least as far as the routing infrastructure can tell). TransactionComplete // TransactionBroken indicates receipt of a message that had a transaction key for which there // was no waiting transaction TransactionBroken InvalidEventString string = "!!INVALID DEVICE EVENT TYPE!!" )
const ( DeviceCounter = "device_count" DuplicatesCounter = "duplicate_count" RequestResponseCounter = "request_response_count" PingCounter = "ping_count" PongCounter = "pong_count" ConnectCounter = "connect_count" DisconnectCounter = "disconnect_count" DeviceLimitReachedCounter = "device_limit_reached_count" ModelGauge = "hardware_model" )
const ( // DeviceNameHeader is the name of the HTTP header which contains the device service name. // This header is primarily required at connect time to identify the device. DeviceNameHeader = "X-Webpa-Device-Name" // ConveyHeader is the name of the optional HTTP header which contains the encoded convey JSON. ConveyHeader = "X-Webpa-Convey" DefaultIdlePeriod time.Duration = 135 * time.Second DefaultRequestTimeout time.Duration = 30 * time.Second DefaultWriteTimeout time.Duration = 60 * time.Second DefaultPingPeriod time.Duration = 45 * time.Second DefaultReadBufferSize = 0 DefaultWriteBufferSize = 0 DefaultDeviceMessageQueueSize = 100 )
const ( // DeviceManagerKey is the Viper subkey under which device.Options are typically stored // In a JSON configuration file, this will be expressed as: // // { // /* other stuff can be here */ // // "device": { // "manager": { // } // } // } DeviceManagerKey = "device.manager" )
const MaxDevicesHeader = "X-Xmidt-Max-Devices"
Variables ¶
var ( ErrorMissingDeviceNameContext = errors.New("Missing device ID in request context") ErrorMissingSecureContext = errors.New("Missing security information in request context") ErrorMissingDeviceNameHeader = errors.New("Missing device name header") ErrorMissingDeviceNameVar = errors.New("Missing device name path variable") ErrorMissingPathVars = errors.New("Missing URI path variables") ErrorInvalidDeviceName = errors.New("Invalid device name") ErrorDeviceNotFound = errors.New("The device does not exist") ErrorNonUniqueID = errors.New("More than once device with that identifier is connected") ErrorDuplicateKey = errors.New("That key is a duplicate") ErrorDuplicateDevice = errors.New("That device is already in this registry") ErrorInvalidTransactionKey = errors.New("Transaction keys must be non-empty strings") ErrorNoSuchTransactionKey = errors.New("That transaction key is not registered") ErrorTransactionAlreadyRegistered = errors.New("That transaction is already registered") ErrorTransactionCancelled = errors.New("The transaction has been cancelled") ErrorResponseNoContents = errors.New("The response has no contents") ErrorDeviceBusy = errors.New("That device is busy") ErrorDeviceClosed = errors.New("That device has been closed") ErrorTransactionsClosed = errors.New("Transactions are closed for that device") ErrorTransactionsAlreadyClosed = errors.New("That Transactions is already closed") )
var UseID = struct { // F is a configurable constructor that allows an arbitrary IDFromRequest strategy F func(IDFromRequest) func(http.Handler) http.Handler // FromHeader uses the device name header to extract the device identifier. // This constructor isn't configurable, and is used as-is: device.UseID.FromHeader. FromHeader func(http.Handler) http.Handler // FromPath is a configurable constructor that extracts the device identifier // from the URI path using the supplied variable name. This constructor is // configurable: device.UseID.FromPath("deviceId"). FromPath func(string) func(http.Handler) http.Handler }{ F: useID, FromHeader: useID( func(request *http.Request) (ID, error) { deviceName := request.Header.Get(DeviceNameHeader) if len(deviceName) == 0 { return invalidID, ErrorMissingDeviceNameHeader } return ParseID(deviceName) }, ), FromPath: func(variableName string) func(http.Handler) http.Handler { return useID( func(request *http.Request) (ID, error) { vars := mux.Vars(request) if vars == nil { return invalidID, ErrorMissingPathVars } deviceName := vars[variableName] if len(deviceName) == 0 { return invalidID, ErrorMissingDeviceNameVar } return ParseID(deviceName) }, ) }, }
UseID is a collection of Alice-style constructors that all insert the device ID into the delegate's request Context using various strategies.
Functions ¶
func EncodeResponse ¶
EncodeResponse writes out a device transaction Response to an http Response.
If response.Error is set, a JSON-formatted error with status http.StatusInternalServerError is written to the HTTP response.
If the encoder pool is nil, or if the pool is supplied but it's format is the same as the response, this function assumes that the format of the HTTP response is the same as response.Contents. It is an error if response.Contents is empty in this case. The response.Format field dictates the Content-Type of the HTTP response.
If none of the above applies, the encoder pool is used to encode response.Routing to the HTTP response. The content type is set to pool.Format().
func IDHashParser ¶
IDHashParser is a parsing function that examines an HTTP request to produce a []byte key for consistent hashing. The returned function examines the given request header and invokes ParseID on the value.
If deviceNameHeader is the empty string, DefaultDeviceNameHeader is used.
func MustDialDevice ¶
func MustDialDevice(d Dialer, deviceName, url string, extra http.Header) (*websocket.Conn, *http.Response)
MustDialDevice panics if the dial operation fails. Mostly useful for test code.
func NewDeadline ¶
NewDeadline creates a deadline closure given a timeout and a now function. If timeout is nonpositive, the return closure always returns zero time. If now is nil (and timeout is positive), then time.Now is used.
func NewPinger ¶
func NewPinger(w Writer, pings xmetrics.Incrementer, data []byte, deadline func() time.Time) (func() error, error)
NewPinger creates a ping closure for the given connection. Internally, a prepared message is created using the supplied data, and the given counter is incremented for each successful update of the write deadline.
func SetPongHandler ¶
func SetPongHandler(r Reader, pongs xmetrics.Incrementer, deadline func() time.Time)
SetPongHandler establishes an instrumented pong handler for the given connection that enforces the given read timeout.
func Timeout ¶
Timeout returns an Alice-style constructor which enforces a timeout for all device request contexts.
Types ¶
type CloseReason ¶
type CloseReason struct { // Err is the optional field that specifies the underlying error that occurred, such as // an I/O error. If nil, the close reason is assumed to be due to application logic, e.g. a rehash Err error // Text is the required field indicating a JSON-friendly value describing the reason for closure. Text string }
CloseReason exposes metadata around why a particular device was closed
func (CloseReason) String ¶
func (c CloseReason) String() string
type ConnectHandler ¶
func (*ConnectHandler) ServeHTTP ¶
func (ch *ConnectHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)
type Connection ¶
Connection describes the set of behaviors for device connections used by this package.
type Connector ¶
type Connector interface { // Connect upgrade an HTTP connection to a websocket and begins concurrent // management of the device. Connect(http.ResponseWriter, *http.Request, http.Header) (Interface, error) // Disconnect disconnects the device associated with the given id. // If the id was found, this method returns true. Disconnect(ID, CloseReason) bool // DisconnectIf iterates over all devices known to this manager, applying the // given predicate. For any devices that result in true, this method disconnects them. // Note that this method may pause connections and disconnections while it is executing. // This method returns the number of devices that were disconnected. // // Only disconnection by ID is supported, which means that any identifier matching // the predicate will result in *all* duplicate devices under that ID being removed. // // No methods on this Manager should be called from within the predicate function, or // a deadlock will likely occur. DisconnectIf(func(ID) (CloseReason, bool)) int // DisconnectAll disconnects all devices from this instance, and returns the count of // devices disconnected. DisconnectAll(CloseReason) int }
Connector is a strategy interface for managing device connections to a server. Implementations are responsible for upgrading websocket connections and providing for explicit disconnection.
type Dialer ¶
type Dialer interface { // DialDevice attempts to connect to the given device. If supplied, the extra headers are passed // along in the Dial call. However, the extra http.Header object is not modified by this method. DialDevice(deviceName, url string, extra http.Header) (*websocket.Conn, *http.Response, error) }
Dialer is a device-specific dialer for device websocket connections. This interface has a similar signature and usage pattern as gorilla's websocket.Dialer.
func DefaultDialer ¶
func DefaultDialer() Dialer
DefaultDialer returns a useful default device Dialer
func NewDialer ¶
func NewDialer(o DialerOptions) Dialer
NewDialer produces a device dialer using the supplied set of options
type DialerOptions ¶
type DialerOptions struct { // DeviceHeader is the HTTP header which carries the device name. // If unset, DeviceNameHeader is used DeviceHeader string // WSDialer is the low-level websocket dialer to use. If unset, an internal default gorilla dialer is used. WSDialer websocketDialer }
DialerOptions represents the set of options available when creating a custom device Dialer
type Event ¶
type Event struct { // Type describes the kind of this event. This field is always set. Type EventType // Device refers to the device, possibly disconnected, for which this event is being set. // This field is always set. Device Interface // Message is the WRP message relevant to this event. // // Never assume that it is safe to use this Message outside the listener invocation. Make // a copy if this Message is needed by other goroutines or if it needs to be part of a long-lived // data structure. Message wrp.Typed // Format is the encoding format of the Contents field Format wrp.Format // Contents is the encoded representation of the Message field. It is always set if and only if // the Message field is set. // // Never assume that it is safe to use this byte slice outside the listener invocation. Make // a copy if this byte slice is needed by other goroutines or if it needs to be part of a long-lived // data structure. Contents []byte // Error is the error which occurred during an attempt to send a message. This field is only populated // for MessageFailed events when there was an actual error. For MessageFailed events that indicate a // device was disconnected with enqueued messages, this field will be nil. Error error }
Event represents a single occurrence of interest for device-related applications. Instances of Event should be considered immutable by application code. Also, Event instances should not be stored across calls to a listener, as the infrastructure is free to reuse Event instances.
type ID ¶
type ID string
ID represents a normalized identifer for a device.
func GetID ¶
GetID returns the device ID from a Context. If no device ID is present, this function returns false for the second parameter.
func IntToMAC ¶
IntToMAC accepts a 64-bit integer and formats that as a device MAC address identifier The returned ID will be of the form mac:XXXXXXXXXXXX, where X is a hexadecimal digit using lowercased letters.
type IDFromRequest ¶
IDFromRequest is a strategy type for extracting the device identifier from an HTTP request
type Interface ¶
type Interface interface { fmt.Stringer json.Marshaler // ID returns the canonicalized identifer for this device. Note that // this is NOT globally unique. It is possible for multiple devices // with the same ID to be connected. This typically occurs due to fraud, // but we don't want to turn away duped devices. ID() ID // Pending returns the count of pending messages for this device Pending() int // Closed tests if this device is closed. When this method returns true, // any attempt to send messages to this device will result in an error. // // Once closed, a device cannot be reopened. Closed() bool // Send dispatches a message to this device. This method is useful outside // a Manager if multiple messages should be sent to the device. The Request.Message field // is not required if Request.Contents and Request.Format are set appropriately. However, // a Request.Message is the only way to start a transaction. // // This method is synchronous. If the request is of a type that should expect a response, // that response is returned. An error is returned if this device has been closed or // if there were any I/O issues sending the request. // // Internally, the requests passed to this method are serviced by the write pump in // the enclosing Manager instance. The read pump will handle sending the response. Send(*Request) (*Response, error) // Statistics returns the current, tracked Statistics instance for this device Statistics() Statistics // Convey returns a read-only view of the device convey information Convey() convey.Interface // ConveyCompliance returns the result of attempting to parse the convey information // sent during device connection ConveyCompliance() convey.Compliance // PartnerIDs returns the array of partner ids established when the device connected PartnerIDs() []string // SatClientID returns the SAT JWT token passed when the device connected SatClientID() string // Trust returns the trust level of this device Trust() string // CloseReason returns the metadata explaining why a device was closed. If this device // is not closed, this method's return is undefined. CloseReason() CloseReason }
Interface is the core type for this package. It provides access to public device metadata and the ability to send messages directly the a device.
Instances are mostly immutable, and have a strict lifecycle. Devices are initially open, and when closed cannot be reused or reopened. A new device instance is required if further communication is desired after the original device instance is closed.
The only piece of metadata that is mutable is the Key. A device Manager allows clients to change the routing Key of a device. All other public metadata is immutable.
Each device will have a pair of goroutines within the enclosing manager: a read and write, referred to as pumps. The write pump services the queue of messages used by Send, while the read pump rarely needs to interact with devices directly.
The String() method will always return a valid JSON object representation of this device.
type ListHandler ¶
type ListHandler struct { Logger log.Logger Registry Registry Refresh time.Duration // contains filtered or unexported fields }
ListHandler is an HTTP handler which can take updated JSON device lists.
func (*ListHandler) ServeHTTP ¶
func (lh *ListHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)
type Listener ¶
type Listener func(*Event)
Listener is an event sink. Listeners should never modify events and should never store events for later use. If data from an event is needed for another goroutine or for long-term storage, a copy should be made.
type Manager ¶
Manager supplies a hub for connecting and disconnecting devices as well as an access point for obtaining device metadata.
func NewManager ¶
NewManager constructs a Manager from a set of options. A ConnectionFactory will be created from the options if one is not supplied.
type Measures ¶
type Measures struct { Device xmetrics.Setter LimitReached xmetrics.Incrementer Duplicates xmetrics.Incrementer RequestResponse metrics.Counter Ping xmetrics.Incrementer Pong xmetrics.Incrementer Connect xmetrics.Incrementer Disconnect xmetrics.Adder Models metrics.Gauge }
Measures is a convenient struct that holds all the device-related metric objects for runtime consumption.
func NewMeasures ¶
NewMeasures constructs a Measures given a go-kit metrics Provider
type MessageHandler ¶
type MessageHandler struct { // Logger is the sink for logging output. If not set, logging will be sent to a NOP logger Logger log.Logger // Router is the device message Router to use. This field is required. Router Router }
MessageHandler is a configurable http.Handler which handles inbound WRP traffic to be sent to devices.
func (*MessageHandler) ServeHTTP ¶
func (mh *MessageHandler) ServeHTTP(httpResponse http.ResponseWriter, httpRequest *http.Request)
type MockConnector ¶
func (*MockConnector) Connect ¶
func (m *MockConnector) Connect(response http.ResponseWriter, request *http.Request, header http.Header) (Interface, error)
func (*MockConnector) Disconnect ¶
func (m *MockConnector) Disconnect(id ID, reason CloseReason) bool
func (*MockConnector) DisconnectAll ¶
func (m *MockConnector) DisconnectAll(reason CloseReason) int
func (*MockConnector) DisconnectIf ¶
func (m *MockConnector) DisconnectIf(predicate func(ID) (CloseReason, bool)) int
type MockDevice ¶
func (*MockDevice) Close ¶
func (m *MockDevice) Close() error
func (*MockDevice) CloseReason ¶
func (m *MockDevice) CloseReason() CloseReason
func (*MockDevice) Closed ¶
func (m *MockDevice) Closed() bool
func (*MockDevice) Convey ¶
func (m *MockDevice) Convey() convey.Interface
func (*MockDevice) ConveyCompliance ¶
func (m *MockDevice) ConveyCompliance() convey.Compliance
func (*MockDevice) ID ¶
func (m *MockDevice) ID() ID
func (*MockDevice) MarshalJSON ¶
func (m *MockDevice) MarshalJSON() ([]byte, error)
func (*MockDevice) PartnerIDs ¶
func (m *MockDevice) PartnerIDs() []string
func (*MockDevice) Pending ¶
func (m *MockDevice) Pending() int
func (*MockDevice) SatClientID ¶
func (m *MockDevice) SatClientID() string
func (*MockDevice) Statistics ¶
func (m *MockDevice) Statistics() Statistics
func (*MockDevice) String ¶
func (m *MockDevice) String() string
func (*MockDevice) Trust ¶
func (m *MockDevice) Trust() string
type MockRegistry ¶
func (*MockRegistry) Len ¶
func (m *MockRegistry) Len() int
type Options ¶
type Options struct { // Upgrader is the gorilla websocket.Upgrader injected into these options. Upgrader websocket.Upgrader // MaxDevices is the maximum number of devices allowed to connect to any one Manager. // If unset (i.e. zero), math.MaxUint32 is used as the maximum. MaxDevices int // DeviceMessageQueueSize is the capacity of the channel which stores messages waiting // to be transmitted to a device. If not supplied, DefaultDeviceMessageQueueSize is used. DeviceMessageQueueSize int // PingPeriod is the time between pings sent to each device PingPeriod time.Duration // IdlePeriod is the length of time a device connection is allowed to be idle, // with no traffic coming from the device. If not supplied, DefaultIdlePeriod is used. IdlePeriod time.Duration // RequestTimeout is the timeout for all inbound HTTP requests RequestTimeout time.Duration // WriteTimeout is the write timeout for each device's websocket. If not supplied, // DefaultWriteTimeout is used. WriteTimeout time.Duration // Listeners contains the event sinks for managers created using these options Listeners []Listener // Logger is the output sink for log messages. If not supplied, log output // is sent to a NOP logger. Logger log.Logger // MetricsProvider is the go-kit factory for metrics MetricsProvider provider.Provider // Now is the closure used to determine the current time. If not set, time.Now is used. Now func() time.Time }
Options represent the available configuration options for components within this package
type ReadCloser ¶
ReadCloser adds io.Closer behavior to Reader
func InstrumentReader ¶
func InstrumentReader(r ReadCloser, s Statistics) ReadCloser
type Reader ¶
type Reader interface { ReadMessage() (int, []byte, error) SetReadDeadline(time.Time) error SetPongHandler(func(string) error) }
Reader represents the read behavior of a device connection
type Registry ¶
type Registry interface { // Len returns the count of devices currently in this registry Len() int // Get returns the device associated with the given ID, if any Get(ID) (Interface, bool) // VisitAll applies the given visitor function to each device known to this manager. // // No methods on this Manager should be called from within the visitor function, or // a deadlock will likely occur. VisitAll(func(Interface) bool) int }
Registry is the strategy interface for querying the set of connected devices. Methods in this interface follow the Visitor pattern and are typically executed under a read lock.
type Request ¶
type Request struct { // Message is the original, decoded WRP message containing the routing information. When sending a request // through Manager.Route, this field is required and must also implement wrp.Routable. Message wrp.Typed // Format is the WRP format of the Contents member. If Format is not JSON, then Routing // will be encoded prior to sending to devices. Format wrp.Format // Contents is the encoded form of Routing in Format format. If this member is of 0 length, // then Routing will be encoded prior to sending to devices. Contents []byte // contains filtered or unexported fields }
Request represents a single device Request, carrying routing information and message contents.
func DecodeRequest ¶
DecodeRequest decodes a WRP source into a device Request. Typically, this is used to produce a device Request from an http.Request.
The returned request will not be associated with any context.
func (*Request) Context ¶
Context returns the context.Context object associated with this Request. This method never returns nil. If no context is associated with this Request, this method returns context.Background().
func (*Request) ID ¶
ID returns the device id for this request. If Message is nil or does not implement wrp.Routable, this method returns an empty identifier.
func (*Request) Transactional ¶
Transactional tests if Message is Routable and, if so, returns the transactional information from the request. This method returns a tuple containing the transaction key (if any) combined with wheither this request represents part of a transaction.
type Response ¶
type Response struct { // Device is the sink to which the corresponding Request was sent Device Interface // Message is the decoded WRP message received from the device Message *wrp.Message // Format is the encoding Format of the Contents field. Almost always, this will be Msgpack. Format wrp.Format // Contents is the encoded form of Message, formatted in Format Contents []byte }
Response represents the response to a device request. Some requests have no response, in which case a Response without a Routing or Contents will be returned.
type Router ¶
type Router interface { // Route dispatches a WRP request to exactly one device, identified by the ID // field of the request. Route is synchronous, and honors the cancellation semantics // of the Request's context. Route(*Request) (*Response, error) }
Router handles dispatching messages to devices.
type StatHandler ¶
StatHandler is an http.Handler that returns device statistics. The device name is specified as a gorilla path variable.
func (*StatHandler) ServeHTTP ¶
func (sh *StatHandler) ServeHTTP(response http.ResponseWriter, request *http.Request)
type Statistics ¶
type Statistics interface { fmt.Stringer json.Marshaler // BytesReceived returns the total bytes received since this instance was created BytesReceived() int // AddBytesReceived increments the BytesReceived count AddBytesReceived(int) // MessagesReceived returns the total messages received since this instance was created MessagesReceived() int // AddMessagesReceived increments the MessagesReceived count AddMessagesReceived(int) // BytesSent returns the total bytes sent since this instance was created BytesSent() int // AddBytesSent increments the BytesSent count AddBytesSent(int) // MessagesSent returns the total messages sent since this instance was created MessagesSent() int // AddMessagesSent increments the MessagesSent count AddMessagesSent(int) // Duplications returns the number of times this device has had a duplicate connected, i.e. // a device with the same device ID. Duplications() int // AddDuplications increments the count of duplications AddDuplications(int) // ConnectedAt returns the connection time at which this statistics began tracking ConnectedAt() time.Time // UpTime computes the duration for which the device has been connected UpTime() time.Duration }
Statistics represents a set of device statistics.
func NewStatistics ¶
func NewStatistics(now func() time.Time, connectedAt time.Time) Statistics
NewStatistics creates a Statistics instance with the given connection time If now is nil, this method uses time.Now.
type Transactions ¶
type Transactions struct {
// contains filtered or unexported fields
}
Transactions represents a set of pending transactions. Instances are safe for concurrent access.
func NewTransactions ¶
func NewTransactions() *Transactions
func (*Transactions) Cancel ¶
func (t *Transactions) Cancel(transactionKey string)
Cancel simply cancels a transaction. The transaction key is removed from the pending set. If that transaction key is not registered, this method does nothing. The channel returned from Register is closed, which will cause any code waiting for a response to get a nil Response.
This method is normally called by the same goroutine that calls Register to ensure that transactions are cleaned up.
func (*Transactions) Close ¶
func (t *Transactions) Close() error
Close cancels all pending transactions and marks this Transactions so that no future Register calls will succeed. Typically useful during a device disconnection to cleanup waiting goroutines.
func (*Transactions) Complete ¶
func (t *Transactions) Complete(transactionKey string, response *Response) error
Complete dispatches the given response to the appropriate channel returned from Register and removes the transaction from the internal pending set. This method is intended for goroutines that are servicing queues of messages, e.g. the read pump of a Manager. Such goroutines use this method to indicate that a transaction is complete.
If this method is passed a nil response, it panics.
func (*Transactions) Keys ¶
func (t *Transactions) Keys() []string
Keys returns a slice containing the transaction keys that are pending
func (*Transactions) Len ¶
func (t *Transactions) Len() int
Len returns the count of pending transactions
func (*Transactions) Register ¶
func (t *Transactions) Register(transactionKey string) (<-chan *Response, error)
Register inserts a transaction key into the pending set and returns a channel that a Response will be repoted on. This method is intended to be called by goroutines which want to wait for a transaction to complete.
This method returns an error if either transactionKey is the empty string or if a transaction with this key has already been registered. The latter is a more serious problem, since it indicates that higher-level code has generated duplicate transaction identifiers. For safety, a Transactions instance expressly does not allow that case.
The returned channel will either receive a non-nil response from some code calling Complete, or will see a channel closure (nil Response) from some code calling Cancel.
type WriteCloser ¶
WriteCloser adds io.Closer behavior to Writer
func InstrumentWriter ¶
func InstrumentWriter(w WriteCloser, s Statistics) WriteCloser