Documentation ¶
Overview ¶
Contains an implementation of OCPP message dispatcher via JSON over WebSocket.
Index ¶
- Constants
- Variables
- func FormatErrorType(d dialector) ocpp.ErrorCode
- func IsErrorCodeValid(fl validator.FieldLevel) bool
- func ParseJsonMessage(dataJson string) ([]interface{}, error)
- func ParseRawJsonMessage(dataJson []byte) ([]interface{}, error)
- func SetHTMLEscape(flag bool)
- func SetLogger(logger logging.Logger)
- func SetMessageIdGenerator(generator func() string)
- func SetMessageValidation(enabled bool)
- type Call
- type CallError
- type CallResult
- type CanceledRequestHandler
- type Client
- func (c *Client) HandleFailedResponseError(requestID string, err error, featureName string)
- func (c *Client) IsConnected() bool
- func (c *Client) SendError(requestId string, errorCode ocpp.ErrorCode, description string, ...) error
- func (c *Client) SendRequest(request ocpp.Request) error
- func (c *Client) SendResponse(requestId string, response ocpp.Response) error
- func (c *Client) SetErrorHandler(handler func(err *ocpp.Error, details interface{}))
- func (c *Client) SetInvalidMessageHook(...)
- func (c *Client) SetOnDisconnectedHandler(handler func(err error))
- func (c *Client) SetOnReconnectedHandler(handler func())
- func (c *Client) SetOnRequestCanceled(handler func(requestId string, request ocpp.Request, err *ocpp.Error))
- func (c *Client) SetRequestHandler(handler func(request ocpp.Request, requestId string, action string))
- func (c *Client) SetResponseHandler(handler func(response ocpp.Response, requestId string))
- func (c *Client) Start(serverURL string) error
- func (c *Client) StartWithRetries(serverURL string)
- func (c *Client) Stop()
- type ClientDispatcher
- type ClientHandler
- type ClientState
- type DefaultClientDispatcher
- func (d *DefaultClientDispatcher) CompleteRequest(requestId string)
- func (d *DefaultClientDispatcher) IsPaused() bool
- func (d *DefaultClientDispatcher) IsRunning() bool
- func (d *DefaultClientDispatcher) Pause()
- func (d *DefaultClientDispatcher) Resume()
- func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error
- func (d *DefaultClientDispatcher) SetNetworkClient(client ws.WsClient)
- func (d *DefaultClientDispatcher) SetOnRequestCanceled(cb func(requestID string, request ocpp.Request, err *ocpp.Error))
- func (d *DefaultClientDispatcher) SetPendingRequestState(state ClientState)
- func (d *DefaultClientDispatcher) SetTimeout(timeout time.Duration)
- func (d *DefaultClientDispatcher) Start()
- func (d *DefaultClientDispatcher) Stop()
- type DefaultServerDispatcher
- func (d *DefaultServerDispatcher) CompleteRequest(clientID string, requestID string)
- func (d *DefaultServerDispatcher) CreateClient(clientID string)
- func (d *DefaultServerDispatcher) DeleteClient(clientID string)
- func (d *DefaultServerDispatcher) IsRunning() bool
- func (d *DefaultServerDispatcher) SendRequest(clientID string, req RequestBundle) error
- func (d *DefaultServerDispatcher) SetNetworkServer(server ws.WsServer)
- func (d *DefaultServerDispatcher) SetOnRequestCanceled(cb CanceledRequestHandler)
- func (d *DefaultServerDispatcher) SetPendingRequestState(state ServerState)
- func (d *DefaultServerDispatcher) SetTimeout(timeout time.Duration)
- func (d *DefaultServerDispatcher) Start()
- func (d *DefaultServerDispatcher) Stop()
- type Endpoint
- func (endpoint *Endpoint) AddProfile(profile *ocpp.Profile)
- func (endpoint *Endpoint) CreateCall(request ocpp.Request) (*Call, error)
- func (endpoint *Endpoint) CreateCallError(uniqueId string, code ocpp.ErrorCode, description string, details interface{}) (*CallError, error)
- func (endpoint *Endpoint) CreateCallResult(confirmation ocpp.Response, uniqueId string) (*CallResult, error)
- func (endpoint *Endpoint) Dialect() ocpp.Dialect
- func (endpoint *Endpoint) GetProfile(name string) (*ocpp.Profile, bool)
- func (endpoint *Endpoint) GetProfileForFeature(featureName string) (*ocpp.Profile, bool)
- func (endpoint *Endpoint) ParseMessage(arr []interface{}, pendingRequestState ClientState) (Message, error)
- func (endpoint *Endpoint) SetDialect(d ocpp.Dialect)
- type ErrorHandler
- type FIFOClientQueue
- type FIFOQueueMap
- type InvalidMessageHook
- type Message
- type MessageType
- type RequestBundle
- type RequestHandler
- type RequestQueue
- type ResponseHandler
- type Server
- func (s *Server) HandleFailedResponseError(clientID string, requestID string, err error, featureName string)
- func (s *Server) SendError(clientID string, requestId string, errorCode ocpp.ErrorCode, ...) error
- func (s *Server) SendRequest(clientID string, request ocpp.Request) error
- func (s *Server) SendResponse(clientID string, requestId string, response ocpp.Response) error
- func (s *Server) SetCanceledRequestHandler(handler CanceledRequestHandler)
- func (s *Server) SetDisconnectedClientHandler(handler ClientHandler)
- func (s *Server) SetErrorHandler(handler ErrorHandler)
- func (s *Server) SetInvalidMessageHook(hook InvalidMessageHook)
- func (s *Server) SetNewClientHandler(handler ClientHandler)
- func (s *Server) SetNewClientValidationHandler(handler ws.CheckClientHandler)
- func (s *Server) SetRequestHandler(handler RequestHandler)
- func (s *Server) SetResponseHandler(handler ResponseHandler)
- func (s *Server) Start(listenPort int, listenPath string)
- func (s *Server) Stop()
- type ServerDispatcher
- type ServerQueueMap
- type ServerState
Constants ¶
const ( NotImplemented ocpp.ErrorCode = "NotImplemented" // Requested Action is not known by receiver. NotSupported ocpp.ErrorCode = "NotSupported" // Requested Action is recognized but not supported by the receiver. InternalError ocpp.ErrorCode = "InternalError" // An internal error occurred and the receiver was not able to process the requested Action successfully. MessageTypeNotSupported ocpp.ErrorCode = "MessageTypeNotSupported" // A message with an Message Type Number received that is not supported by this implementation. ProtocolError ocpp.ErrorCode = "ProtocolError" // Payload for Action is incomplete. SecurityError ocpp.ErrorCode = "SecurityError" // During the processing of Action a security issue occurred preventing receiver from completing the Action successfully. PropertyConstraintViolation ocpp.ErrorCode = "PropertyConstraintViolation" // Payload is syntactically correct but at least one field contains an invalid value. OccurrenceConstraintViolation ocpp.ErrorCode = "OccurrenceConstraintViolation" // Payload for Action is syntactically correct but at least one of the fields violates occurrence constraints. TypeConstraintViolation ocpp.ErrorCode = "TypeConstraintViolation" // Payload for Action is syntactically correct but at least one of the fields violates data type constraints (e.g. “somestring”: 12). GenericError ocpp.ErrorCode = "GenericError" // Any other error not covered by the previous ones. FormatViolationV2 ocpp.ErrorCode = "FormatViolation" // Payload for Action is syntactically incorrect. This is only valid for OCPP 2.0.1 FormatViolationV16 ocpp.ErrorCode = "FormationViolation" // Payload for Action is syntactically incorrect or not conform the PDU structure for Action. This is only valid for OCPP 1.6 )
Variables ¶
var EscapeHTML = true
var Validate = validator.New()
The validator, used for validating incoming/outgoing OCPP messages.
Functions ¶
func FormatErrorType ¶
func IsErrorCodeValid ¶
func IsErrorCodeValid(fl validator.FieldLevel) bool
func ParseJsonMessage ¶
Unmarshals an OCPP-J json object from a JSON string. Returns the array of elements contained in the message.
func ParseRawJsonMessage ¶
Unmarshals an OCPP-J json object from a byte array. Returns the array of elements contained in the message.
func SetHTMLEscape ¶
func SetHTMLEscape(flag bool)
Allows an instance of ocppj to configure if the message is Marshaled by escaping special caracters like "<", ">", "&" etc For more info https://pkg.go.dev/encoding/json#HTMLEscape
func SetLogger ¶
Sets a custom Logger implementation, allowing the ocpp-j package to log events. By default, a VoidLogger is used, so no logs will be sent to any output.
The function panics, if a nil logger is passed.
func SetMessageIdGenerator ¶
func SetMessageIdGenerator(generator func() string)
SetMessageIdGenerator sets a lambda function for generating unique IDs for new messages. The function is invoked automatically when creating a new Call.
Settings this overrides the default behavior, which is:
fmt.Sprintf("%v", rand.Uint32())
func SetMessageValidation ¶
func SetMessageValidation(enabled bool)
Allows to enable/disable automatic validation for OCPP messages (this includes the field constraints defined for every request/response). The feature may be useful when working with OCPP implementations that don't fully comply to the specs.
Validation is enabled by default.
⚠️ Use at your own risk! When disabled, outgoing and incoming OCPP messages will not be validated anymore, potentially leading to errors.
Types ¶
type Call ¶
type Call struct { Message `validate:"-"` MessageTypeId MessageType `json:"messageTypeId" validate:"required,eq=2"` UniqueId string `json:"uniqueId" validate:"required,max=36"` Action string `json:"action" validate:"required,max=36"` Payload ocpp.Request `json:"payload" validate:"required"` }
An OCPP-J Call message, containing an OCPP Request.
func (*Call) GetMessageTypeId ¶
func (call *Call) GetMessageTypeId() MessageType
func (*Call) GetUniqueId ¶
func (*Call) MarshalJSON ¶
type CallError ¶
type CallError struct { Message MessageTypeId MessageType `json:"messageTypeId" validate:"required,eq=4"` UniqueId string `json:"uniqueId" validate:"required,max=36"` ErrorCode ocpp.ErrorCode `json:"errorCode" validate:"errorCode"` ErrorDescription string `json:"errorDescription" validate:"omitempty"` ErrorDetails interface{} `json:"errorDetails" validate:"omitempty"` }
An OCPP-J CallError message, containing an OCPP Error.
func (*CallError) GetMessageTypeId ¶
func (callError *CallError) GetMessageTypeId() MessageType
func (*CallError) GetUniqueId ¶
func (*CallError) MarshalJSON ¶
type CallResult ¶
type CallResult struct { Message MessageTypeId MessageType `json:"messageTypeId" validate:"required,eq=3"` UniqueId string `json:"uniqueId" validate:"required,max=36"` Payload ocpp.Response `json:"payload" validate:"required"` }
An OCPP-J CallResult message, containing an OCPP Response.
func (*CallResult) GetMessageTypeId ¶
func (callResult *CallResult) GetMessageTypeId() MessageType
func (*CallResult) GetUniqueId ¶
func (callResult *CallResult) GetUniqueId() string
func (*CallResult) MarshalJSON ¶
func (callResult *CallResult) MarshalJSON() ([]byte, error)
type CanceledRequestHandler ¶
type CanceledRequestHandler func(clientID string, requestID string, request ocpp.Request, err *ocpp.Error)
Handler function to be invoked when a request gets canceled (either due to timeout or to other external factors).
type Client ¶
type Client struct { Endpoint Id string RequestState ClientState // contains filtered or unexported fields }
The endpoint initiating the connection to an OCPP server, in an OCPP-J topology. During message exchange, the two roles may be reversed (depending on the message direction), but a client struct remains associated to a charge point/charging station.
func NewClient ¶
func NewClient(id string, wsClient ws.WsClient, dispatcher ClientDispatcher, stateHandler ClientState, profiles ...*ocpp.Profile) *Client
Creates a new Client endpoint. Requires a unique client ID, a websocket client, a struct for queueing/dispatching requests, a state handler and a list of supported profiles (optional).
You may create a simple new server by using these default values:
s := ocppj.NewClient(ws.NewClient(), nil, nil)
The wsClient parameter cannot be nil. Refer to the ws package for information on how to create and customize a websocket client.
func (*Client) HandleFailedResponseError ¶
HandleFailedResponseError allows to handle failures while sending responses (either CALL_RESULT or CALL_ERROR). It internally analyzes and creates an ocpp.Error based on the given error. It will the attempt to send it to the server.
The function helps to prevent starvation on the other endpoint, which is caused by a response never reaching it. The method will, however, only attempt to send a default error once. If this operation fails, the other endpoint may still starve.
func (*Client) IsConnected ¶
func (*Client) SendError ¶
func (c *Client) SendError(requestId string, errorCode ocpp.ErrorCode, description string, details interface{}) error
Sends an OCPP Error to the server. The requestID parameter is required and identifies the previously received request.
Returns an error in the following cases:
- message validation fails (error is malformed)
- a network error occurred
func (*Client) SendRequest ¶
Sends an OCPP Request to the server. The protocol is based on request-response and cannot send multiple messages concurrently. To guarantee this, outgoing messages are added to a queue and processed sequentially.
Returns an error in the following cases:
- the client wasn't started
- message validation fails (request is malformed)
- the endpoint doesn't support the feature
- the output queue is full
func (*Client) SendResponse ¶
Sends an OCPP Response to the server. The requestID parameter is required and identifies the previously received request.
Returns an error in the following cases:
- message validation fails (response is malformed)
- the endpoint doesn't support the feature
- a network error occurred
func (*Client) SetErrorHandler ¶
Registers a handler for incoming error messages.
func (*Client) SetInvalidMessageHook ¶
func (c *Client) SetInvalidMessageHook(hook func(err *ocpp.Error, rawMessage string, parsedFields []interface{}) *ocpp.Error)
SetInvalidMessageHook registers an optional hook for incoming messages that couldn't be parsed. This hook is called when a message is received but cannot be parsed to the target OCPP message struct.
The application is notified synchronously of the error. The callback provides the raw JSON string, along with the parsed fields. The application MUST return as soon as possible, since the hook is called synchronously and awaits a return value.
While the hook does not allow responding to the message directly, the return value will be used to send an OCPP error to the other endpoint.
If no handler is registered (or no error is returned by the hook), the internal error message is sent to the client without further processing.
Note: Failing to return from the hook will cause the client to block indefinitely.
func (*Client) SetOnDisconnectedHandler ¶
func (*Client) SetOnReconnectedHandler ¶
func (c *Client) SetOnReconnectedHandler(handler func())
func (*Client) SetOnRequestCanceled ¶
func (c *Client) SetOnRequestCanceled(handler func(requestId string, request ocpp.Request, err *ocpp.Error))
Registers the handler to be called on timeout.
func (*Client) SetRequestHandler ¶
func (c *Client) SetRequestHandler(handler func(request ocpp.Request, requestId string, action string))
Registers a handler for incoming requests.
func (*Client) SetResponseHandler ¶
Registers a handler for incoming responses.
func (*Client) Start ¶
Connects to the given serverURL and starts running the I/O loop for the underlying connection.
If the connection is established successfully, the function returns control to the caller immediately. The read/write routines are run on dedicated goroutines, so the main thread can perform other operations.
In case of disconnection, the client handles re-connection automatically. The client will attempt to re-connect to the server forever, until it is stopped by invoking the Stop method.
An error may be returned, if establishing the connection failed.
func (*Client) StartWithRetries ¶ added in v0.18.2
type ClientDispatcher ¶
type ClientDispatcher interface { // Starts the dispatcher. Depending on the implementation, this may // start a dedicated goroutine or simply allocate the necessary state. Start() // Sets the maximum timeout to be considered after sending a request. // If a response to the request is not received within the specified period, the request // is discarded and an error is returned to the caller. // // The timeout is reset upon a disconnection/reconnection. // // This function must be called before starting the dispatcher, otherwise it may lead to unexpected behavior. SetTimeout(timeout time.Duration) // Returns true, if the dispatcher is currently running, false otherwise. // If the dispatcher is paused, the function still returns true. IsRunning() bool // Returns true, if the dispatcher is currently paused, false otherwise. // If the dispatcher is not running at all, the function will still return false. IsPaused() bool // Dispatches a request. Depending on the implementation, this may first queue a request // and process it later, asynchronously, or write it directly to the networking layer. // // If no network client was set, or the request couldn't be processed, an error is returned. SendRequest(req RequestBundle) error // Notifies the dispatcher that a request has been completed (i.e. a response was received). // The dispatcher takes care of removing the request marked by the requestID from // the pending requests. It will then attempt to process the next queued request. CompleteRequest(requestID string) // Sets a callback to be invoked when a request gets canceled, due to network timeouts or internal errors. // The callback passes the original message ID and request struct of the failed request, along with an error. // // Calling Stop on the dispatcher will not trigger this callback. // // If no callback is set, a request will still be removed from the dispatcher when a timeout occurs. SetOnRequestCanceled(cb func(requestID string, request ocpp.Request, err *ocpp.Error)) // Sets the network client, so the dispatcher may send requests using the networking layer directly. // // This needs to be set before calling the Start method. If not, sending requests will fail. SetNetworkClient(client ws.WsClient) // Sets the state manager for pending requests in the dispatcher. // // The state should only be accessed by the dispatcher while running. SetPendingRequestState(stateHandler ClientState) // Stops a running dispatcher. This will clear all state and empty the internal queues. // // If an onRequestCanceled callback is set, it won't be triggered by stopping the dispatcher. Stop() // Notifies that an external event (typically network-related) should pause // the dispatcher. Internal timers will be stopped an no further requests // will be set to pending. You may keep enqueuing requests. // Use the Resume method for re-starting the dispatcher. Pause() // Undoes a previous pause operation, restarting internal timers and the // regular request flow. // // If there was a pending request before pausing the dispatcher, a response/timeout // for this request shall be awaited anew. Resume() }
ClientDispatcher contains the state and logic for handling outgoing messages on a client endpoint. This allows the ocpp-j layer to delegate queueing and processing logic to an external entity.
The dispatcher writes outgoing messages directly to the networking layer, using a previously set websocket client.
A ClientState needs to be passed to the dispatcher, before starting it. The dispatcher is in charge of managing pending requests while handling the request flow.
type ClientHandler ¶
type ClientState ¶
type ClientState interface { // Sets a Request as pending on the endpoint. Requests are considered pending until a response was received. // The function expects a unique message ID and the Request. // If an element with the same requestID exists, the new one will be ignored. AddPendingRequest(requestID string, req ocpp.Request) // Retrieves a pending Request, using the message ID. // If no request for the passed message ID is found, a false flag is returned. GetPendingRequest(requestID string) (ocpp.Request, bool) // Deletes a pending Request from the endpoint, using the message ID. // If no such message is currently stored as pending, the call has no effect. DeletePendingRequest(requestID string) // Clears all currently pending requests. Any confirmation/error, // received as a response to a cleared request, will be ignored. ClearPendingRequests() // Returns true if there currently is at least one pending request, false otherwise. HasPendingRequest() bool }
Contains the pending request state for messages, associated to a single client-server channel. It is used to separate endpoint logic from state management.
func NewClientState ¶
func NewClientState() ClientState
Creates a simple struct implementing ClientState, to be used by client/server dispatchers.
type DefaultClientDispatcher ¶
type DefaultClientDispatcher struct {
// contains filtered or unexported fields
}
DefaultClientDispatcher is a default implementation of the ClientDispatcher interface.
The dispatcher implements the ClientState as well for simplicity. Access to pending requests is thread-safe.
func NewDefaultClientDispatcher ¶
func NewDefaultClientDispatcher(queue RequestQueue) *DefaultClientDispatcher
NewDefaultClientDispatcher creates a new DefaultClientDispatcher struct.
func (*DefaultClientDispatcher) CompleteRequest ¶
func (d *DefaultClientDispatcher) CompleteRequest(requestId string)
func (*DefaultClientDispatcher) IsPaused ¶
func (d *DefaultClientDispatcher) IsPaused() bool
func (*DefaultClientDispatcher) IsRunning ¶
func (d *DefaultClientDispatcher) IsRunning() bool
func (*DefaultClientDispatcher) Pause ¶
func (d *DefaultClientDispatcher) Pause()
func (*DefaultClientDispatcher) Resume ¶
func (d *DefaultClientDispatcher) Resume()
func (*DefaultClientDispatcher) SendRequest ¶
func (d *DefaultClientDispatcher) SendRequest(req RequestBundle) error
func (*DefaultClientDispatcher) SetNetworkClient ¶
func (d *DefaultClientDispatcher) SetNetworkClient(client ws.WsClient)
func (*DefaultClientDispatcher) SetOnRequestCanceled ¶
func (*DefaultClientDispatcher) SetPendingRequestState ¶
func (d *DefaultClientDispatcher) SetPendingRequestState(state ClientState)
func (*DefaultClientDispatcher) SetTimeout ¶
func (d *DefaultClientDispatcher) SetTimeout(timeout time.Duration)
func (*DefaultClientDispatcher) Start ¶
func (d *DefaultClientDispatcher) Start()
func (*DefaultClientDispatcher) Stop ¶
func (d *DefaultClientDispatcher) Stop()
type DefaultServerDispatcher ¶
type DefaultServerDispatcher struct {
// contains filtered or unexported fields
}
DefaultServerDispatcher is a default implementation of the ServerDispatcher interface.
The dispatcher implements the ClientState as well for simplicity. Access to pending requests is thread-safe.
func NewDefaultServerDispatcher ¶
func NewDefaultServerDispatcher(queueMap ServerQueueMap) *DefaultServerDispatcher
NewDefaultServerDispatcher creates a new DefaultServerDispatcher struct.
func (*DefaultServerDispatcher) CompleteRequest ¶
func (d *DefaultServerDispatcher) CompleteRequest(clientID string, requestID string)
func (*DefaultServerDispatcher) CreateClient ¶
func (d *DefaultServerDispatcher) CreateClient(clientID string)
func (*DefaultServerDispatcher) DeleteClient ¶
func (d *DefaultServerDispatcher) DeleteClient(clientID string)
func (*DefaultServerDispatcher) IsRunning ¶
func (d *DefaultServerDispatcher) IsRunning() bool
func (*DefaultServerDispatcher) SendRequest ¶
func (d *DefaultServerDispatcher) SendRequest(clientID string, req RequestBundle) error
func (*DefaultServerDispatcher) SetNetworkServer ¶
func (d *DefaultServerDispatcher) SetNetworkServer(server ws.WsServer)
func (*DefaultServerDispatcher) SetOnRequestCanceled ¶
func (d *DefaultServerDispatcher) SetOnRequestCanceled(cb CanceledRequestHandler)
func (*DefaultServerDispatcher) SetPendingRequestState ¶
func (d *DefaultServerDispatcher) SetPendingRequestState(state ServerState)
func (*DefaultServerDispatcher) SetTimeout ¶
func (d *DefaultServerDispatcher) SetTimeout(timeout time.Duration)
func (*DefaultServerDispatcher) Start ¶
func (d *DefaultServerDispatcher) Start()
func (*DefaultServerDispatcher) Stop ¶
func (d *DefaultServerDispatcher) Stop()
type Endpoint ¶
An OCPP-J endpoint is one of the two entities taking part in the communication. The endpoint keeps state for supported OCPP profiles and current pending requests.
func (*Endpoint) AddProfile ¶
Adds support for a new profile on the endpoint.
func (*Endpoint) CreateCall ¶
Creates a Call message, given an OCPP request. A unique ID for the message is automatically generated. Returns an error in case the request's feature is not supported on this endpoint.
The created call is not automatically scheduled for transmission and is not added to the list of pending requests.
func (*Endpoint) CreateCallError ¶
func (endpoint *Endpoint) CreateCallError(uniqueId string, code ocpp.ErrorCode, description string, details interface{}) (*CallError, error)
Creates a CallError message, given the message's unique ID and the error.
func (*Endpoint) CreateCallResult ¶
func (endpoint *Endpoint) CreateCallResult(confirmation ocpp.Response, uniqueId string) (*CallResult, error)
Creates a CallResult message, given an OCPP response and the message's unique ID.
Returns an error in case the response's feature is not supported on this endpoint.
func (*Endpoint) GetProfile ¶
Retrieves a profile for the given profile name. Returns a false flag in case no profile matching the specified name was found.
func (*Endpoint) GetProfileForFeature ¶
Retrieves a profile for a given feature. Returns a false flag in case no profile supporting the specified feature was found.
func (*Endpoint) ParseMessage ¶
func (endpoint *Endpoint) ParseMessage(arr []interface{}, pendingRequestState ClientState) (Message, error)
Parses an OCPP-J message. The function expects an array of elements, as contained in the JSON message.
Pending requests are automatically cleared, in case the received message is a CallResponse or CallError.
func (*Endpoint) SetDialect ¶
Sets endpoint dialect.
type FIFOClientQueue ¶
type FIFOClientQueue struct {
// contains filtered or unexported fields
}
FIFOClientQueue is a default queue implementation. The queue is thread-safe.
func NewFIFOClientQueue ¶
func NewFIFOClientQueue(capacity int) *FIFOClientQueue
NewFIFOClientQueue creates a new FIFOClientQueue with the given capacity.
A FIFOQueue is backed by a slice, and the capacity represents the maximum capacity of the queue. Passing capacity = 0 will create a queue without a maximum capacity. The capacity cannot change after creation.
func (*FIFOClientQueue) Init ¶
func (q *FIFOClientQueue) Init()
func (*FIFOClientQueue) IsEmpty ¶
func (q *FIFOClientQueue) IsEmpty() bool
func (*FIFOClientQueue) IsFull ¶
func (q *FIFOClientQueue) IsFull() bool
func (*FIFOClientQueue) Peek ¶
func (q *FIFOClientQueue) Peek() interface{}
func (*FIFOClientQueue) Pop ¶
func (q *FIFOClientQueue) Pop() interface{}
func (*FIFOClientQueue) Push ¶
func (q *FIFOClientQueue) Push(element interface{}) error
func (*FIFOClientQueue) Size ¶
func (q *FIFOClientQueue) Size() int
type FIFOQueueMap ¶
type FIFOQueueMap struct {
// contains filtered or unexported fields
}
FIFOQueueMap is a default implementation of ServerQueueMap. A FIFOQueueMap is backed by a map[string]RequestQueue. The data structure is thread-safe.
When calling the GetOrCreate function, if no entry for a key was found in the map, a new RequestQueue with the given capacity will be created.
func NewFIFOQueueMap ¶
func NewFIFOQueueMap(clientQueueCapacity int) *FIFOQueueMap
NewFIFOQueueMap creates a new FIFOQueueMap, which will automatically create queues with the specified capacity.
Passing capacity = 0 will generate queues without a maximum capacity. The capacity cannot change after creation.
func (*FIFOQueueMap) Add ¶
func (f *FIFOQueueMap) Add(clientID string, queue RequestQueue)
func (*FIFOQueueMap) Get ¶
func (f *FIFOQueueMap) Get(clientID string) (RequestQueue, bool)
func (*FIFOQueueMap) GetOrCreate ¶
func (f *FIFOQueueMap) GetOrCreate(clientID string) RequestQueue
func (*FIFOQueueMap) Init ¶
func (f *FIFOQueueMap) Init()
func (*FIFOQueueMap) Remove ¶
func (f *FIFOQueueMap) Remove(clientID string)
type InvalidMessageHook ¶
type Message ¶
type Message interface { // Returns the message type identifier of the message. GetMessageTypeId() MessageType // Returns the unique identifier of the message. GetUniqueId() string json.Marshaler }
An OCPP-J message.
type MessageType ¶
type MessageType int
MessageType identifies the type of message exchanged between two OCPP endpoints.
const ( CALL MessageType = 2 CALL_RESULT MessageType = 3 CALL_ERROR MessageType = 4 )
type RequestBundle ¶
RequestBundle is a convenience struct for passing a call object struct and the raw byte data into the queue containing outgoing requests.
type RequestHandler ¶
type RequestQueue ¶
type RequestQueue interface { // Init puts the queue in its initial state. May be used for initial setup or clearing. Init() // Push appends the given element at the end of the queue. // Returns an error if the operation failed (e.g. the queue is full). Push(element interface{}) error // Peek returns the first element of the queue, without removing it from the data structure. Peek() interface{} // Pop returns the first element of the queue, removing it from the queue. Pop() interface{} // Size returns the current size of the queue. Size() int // IsFull returns true if the queue is currently full, false otherwise. IsFull() bool // IsEmpty returns true if the queue is currently empty, false otherwise. IsEmpty() bool }
RequestQueue can be arbitrarily implemented, as long as it conforms to the Queue interface.
A RequestQueue is used by ocppj client and server to manage outgoing requests. The underlying data structure must be thread-safe, since different goroutines may access it at the same time.
type ResponseHandler ¶
type Server ¶
type Server struct { Endpoint RequestState ServerState // contains filtered or unexported fields }
The endpoint waiting for incoming connections from OCPP clients, in an OCPP-J topology. During message exchange, the two roles may be reversed (depending on the message direction), but a server struct remains associated to a central system.
func NewServer ¶
func NewServer(wsServer ws.WsServer, dispatcher ServerDispatcher, stateHandler ServerState, profiles ...*ocpp.Profile) *Server
Creates a new Server endpoint. Requires a a websocket server. Optionally a structure for queueing/dispatching requests, a custom state handler and a list of profiles may be passed.
You may create a simple new server by using these default values:
s := ocppj.NewServer(ws.NewServer(), nil, nil)
The dispatcher's associated ClientState will be set during initialization.
func (*Server) HandleFailedResponseError ¶
func (s *Server) HandleFailedResponseError(clientID string, requestID string, err error, featureName string)
HandleFailedResponseError allows to handle failures while sending responses (either CALL_RESULT or CALL_ERROR). It internally analyzes and creates an ocpp.Error based on the given error. It will the attempt to send it to the client.
The function helps to prevent starvation on the other endpoint, which is caused by a response never reaching it. The method will, however, only attempt to send a default error once. If this operation fails, the other endpoint may still starve.
func (*Server) SendError ¶
func (s *Server) SendError(clientID string, requestId string, errorCode ocpp.ErrorCode, description string, details interface{}) error
Sends an OCPP Error to a client, identified by the clientID parameter. The requestID parameter is required and identifies the previously received request.
Returns an error in the following cases:
- message validation fails (error is malformed)
- a network error occurred
func (*Server) SendRequest ¶
Sends an OCPP Request to a client, identified by the clientID parameter.
Returns an error in the following cases:
- the server wasn't started
- message validation fails (request is malformed)
- the endpoint doesn't support the feature
- the output queue is full
func (*Server) SendResponse ¶
Sends an OCPP Response to a client, identified by the clientID parameter. The requestID parameter is required and identifies the previously received request.
Returns an error in the following cases:
- message validation fails (response is malformed)
- the endpoint doesn't support the feature
- a network error occurred
func (*Server) SetCanceledRequestHandler ¶
func (s *Server) SetCanceledRequestHandler(handler CanceledRequestHandler)
Registers a handler for canceled request messages.
func (*Server) SetDisconnectedClientHandler ¶
func (s *Server) SetDisconnectedClientHandler(handler ClientHandler)
Registers a handler for client disconnections.
func (*Server) SetErrorHandler ¶
func (s *Server) SetErrorHandler(handler ErrorHandler)
Registers a handler for incoming error messages.
func (*Server) SetInvalidMessageHook ¶
func (s *Server) SetInvalidMessageHook(hook InvalidMessageHook)
SetInvalidMessageHook registers an optional hook for incoming messages that couldn't be parsed. This hook is called when a message is received but cannot be parsed to the target OCPP message struct.
The application is notified synchronously of the error. The callback provides the raw JSON string, along with the parsed fields. The application MUST return as soon as possible, since the hook is called synchronously and awaits a return value.
The hook does not allow responding to the message directly, but the return value will be used to send an OCPP error to the other endpoint.
If no handler is registered (or no error is returned by the hook), the internal error message is sent to the client without further processing.
Note: Failing to return from the hook will cause the handler for this client to block indefinitely.
func (*Server) SetNewClientHandler ¶
func (s *Server) SetNewClientHandler(handler ClientHandler)
Registers a handler for incoming client connections.
func (*Server) SetNewClientValidationHandler ¶
func (s *Server) SetNewClientValidationHandler(handler ws.CheckClientHandler)
Registers a handler for validate incoming client connections.
func (*Server) SetRequestHandler ¶
func (s *Server) SetRequestHandler(handler RequestHandler)
Registers a handler for incoming requests.
func (*Server) SetResponseHandler ¶
func (s *Server) SetResponseHandler(handler ResponseHandler)
Registers a handler for incoming responses.
func (*Server) Start ¶
Starts the underlying Websocket server on a specified listenPort and listenPath.
The function runs indefinitely, until the server is stopped. Invoke this function in a separate goroutine, to perform other operations on the main thread.
An error may be returned, if the websocket server couldn't be started.
type ServerDispatcher ¶
type ServerDispatcher interface { // Starts the dispatcher. Depending on the implementation, this may // start a dedicated goroutine or simply allocate the necessary state. Start() // Returns true, if the dispatcher is currently running, false otherwise. // If the dispatcher is paused, the function still returns true. IsRunning() bool // Sets the maximum timeout to be considered after sending a request. // If a response to the request is not received within the specified period, the request // is discarded and an error is returned to the caller. // // One timeout per client runs in the background. // The timeout is reset whenever a response comes in, the connection is closed, or the server is stopped. // // This function must be called before starting the dispatcher, otherwise it may lead to unexpected behavior. SetTimeout(timeout time.Duration) // Dispatches a request for a specific client. Depending on the implementation, this may first queue // a request and process it later (asynchronously), or write it directly to the networking layer. // // If no network server was set, or the request couldn't be processed, an error is returned. SendRequest(clientID string, req RequestBundle) error // Notifies the dispatcher that a request has been completed (i.e. a response was received), // for a specific client. // The dispatcher takes care of removing the request marked by the requestID from // that client's pending requests. It will then attempt to process the next queued request. CompleteRequest(clientID string, requestID string) // Sets a callback to be invoked when a request gets canceled, due to network timeouts. // The callback passes the original client ID, message ID, and request struct of the failed request, // along with an error. // // Calling Stop on the dispatcher will not trigger this callback. // // If no callback is set, a request will still be removed from the dispatcher when a timeout occurs. SetOnRequestCanceled(cb CanceledRequestHandler) // Sets the network server, so the dispatcher may send requests using the networking layer directly. // // This needs to be set before calling the Start method. If not, sending requests will fail. SetNetworkServer(server ws.WsServer) // Sets the state manager for pending requests in the dispatcher. // // The state should only be accessed by the dispatcher while running. SetPendingRequestState(stateHandler ServerState) // Stops a running dispatcher. This will clear all state and empty the internal queues. // // If an onRequestCanceled callback is set, it won't be triggered by stopping the dispatcher. Stop() // Notifies that it is now possible to dispatch requests for a new client. // // Internal queues are created and requests for the client are now accepted. CreateClient(clientID string) // Notifies that a client was invalidated (typically caused by a network event). // // The dispatcher will stop dispatching requests for that specific client. // Internal queues for that client are cleared and no further requests will be accepted. // Undelivered pending requests are also cleared. // The OnRequestCanceled callback will be invoked for each discarded request. DeleteClient(clientID string) }
ServerDispatcher contains the state and logic for handling outgoing messages on a server endpoint. This allows the ocpp-j layer to delegate queueing and processing logic to an external entity.
The dispatcher writes outgoing messages directly to the networking layer, using a previously set websocket server.
A ClientState needs to be passed to the dispatcher, before starting it. The dispatcher is in charge of managing all pending requests to clients, while handling the request flow.
type ServerQueueMap ¶
type ServerQueueMap interface { // Init puts the queue map in its initial state. May be used for initial setup or clearing. Init() // Get retrieves the queue associated to a specific clientID. // If no such element exists, the returned flag will be false. Get(clientID string) (RequestQueue, bool) // GetOrCreate retrieves the queue associated to a specific clientID. // If no such element exists, it is created, added to the map and returned. GetOrCreate(clientID string) RequestQueue // Remove deletes the queue associated to a specific clientID. // If no such element exists, nothing happens. Remove(clientID string) // Add inserts a new RequestQueue into the map structure. // If such element already exists, it will be replaced with the new queue. Add(clientID string, queue RequestQueue) }
ServerQueueMap defines the interface for managing client request queues.
An OCPP-J server may serve multiple clients at the same time, so it will need to provide a queue for each client.
type ServerState ¶
type ServerState interface { // Sets a Request as pending on the endpoint, for a specific client. // Requests are considered pending until a response was received. // The function expects a client ID, a unique message ID and the Request itself. // If an element with the same clientID/requestID exists, the new one will be ignored. AddPendingRequest(clientID string, requestID string, req ocpp.Request) // Deletes a pending Request from the endpoint, for a specific client, using the message ID. // If no such message is currently stored as pending, the call has no effect. DeletePendingRequest(clientID string, requestID string) // Retrieves a ClientState object, associated to a specific client. // If no such state exists, an empty state is returned. GetClientState(clientID string) ClientState // Returns true if there currently are pending requests for a client, false otherwise. HasPendingRequest(clientID string) bool // Returns true if there currently is at least one pending request, false otherwise. HasPendingRequests() bool // Clears currently pending requests for a client. Any confirmation/error, // received as a response to a cleared request, will be ignored. ClearClientPendingRequest(clientID string) // Clears all currently pending requests inside the map. Any confirmation/error, // received as a response to a previously sent request, will be ignored. // // Does not perform a deep deletion; is references to client state objects // are stored elsewhere, those will remain unaffected and become invalid. ClearAllPendingRequests() }
Contains the pending request state for messages associated to all client-server channels. It is used to separate endpoint logic from state management.
func NewServerState ¶
func NewServerState(m *sync.RWMutex) ServerState
Creates a simple struct implementing ServerState, to be used by server dispatchers.
If no mutex is passed, then atomic access to the data struct is not guaranteed, and race conditions may arise.