Documentation
¶
Index ¶
- Constants
- Variables
- func BuildTopicName(namespace, recordName string) string
- func ParseLocation(s string) (*protos.LocationValue, error)
- type BinarySerializer
- func (bs *BinarySerializer) Ack(record *Record) []byte
- func (bs *BinarySerializer) Deserialize(msg []byte, socketID string) (record *Record, err error)
- func (bs *BinarySerializer) Dispatch(record *Record)
- func (bs *BinarySerializer) Error(err error, record *Record) []byte
- func (bs *BinarySerializer) Logger() *logrus.Logger
- type Dispatcher
- type NonAnonymizedError
- type Producer
- type Record
- func (record *Record) Ack() []byte
- func (record *Record) Dispatch()
- func (record *Record) Encode() ([]byte, error)
- func (record *Record) Error(err error) []byte
- func (record *Record) GetJSONPayload() ([]byte, error)
- func (record *Record) GetProtoMessage() proto.Message
- func (record *Record) Length() int
- func (record *Record) LengthRawBytes() int
- func (record *Record) Metadata() map[string]string
- func (record *Record) Payload() []byte
- func (record *Record) Raw() []byte
- type RequestIdentity
- type UnauthorizedSenderIDError
- type UnknownMessageType
Constants ¶
const ( // SizeLimit maximum incoming payload size from the vehicle SizeLimit = 1000000 // 1mb )
Variables ¶
var ErrMessageTooBig = fmt.Errorf("can't process message, size above 1mb")
ErrMessageTooBig handles error when incoming payload is too large
Functions ¶
func BuildTopicName ¶ added in v0.0.5
BuildTopicName creates a topic from a namespace and a recordName
func ParseLocation ¶ added in v0.1.1
func ParseLocation(s string) (*protos.LocationValue, error)
ParseLocation parses a location string (such as "(37.412374 N, 122.145867 W)") into a *proto.Location type.
Types ¶
type BinarySerializer ¶
type BinarySerializer struct { DispatchRules map[string][]Producer RequestIdentity *RequestIdentity // contains filtered or unexported fields }
BinarySerializer serializes records
func NewBinarySerializer ¶
func NewBinarySerializer(requestIdentity *RequestIdentity, dispatchRules map[string][]Producer, logger *logrus.Logger) *BinarySerializer
NewBinarySerializer returns a dedicated serializer for a current socket connection
func (*BinarySerializer) Ack ¶
func (bs *BinarySerializer) Ack(record *Record) []byte
Ack returns an ack response
func (*BinarySerializer) Deserialize ¶
func (bs *BinarySerializer) Deserialize(msg []byte, socketID string) (record *Record, err error)
Deserialize transforms a csv byte array into a Record
func (*BinarySerializer) Dispatch ¶
func (bs *BinarySerializer) Dispatch(record *Record)
Dispatch pushes the record to kafka for every rule associated to it
func (*BinarySerializer) Error ¶
func (bs *BinarySerializer) Error(err error, record *Record) []byte
Error returns an error response
func (*BinarySerializer) Logger ¶
func (bs *BinarySerializer) Logger() *logrus.Logger
Logger returns logger for the serializer
type Dispatcher ¶
type Dispatcher string
Dispatcher type of telemetry record dispatcher
const ( // Pubsub registers a Google pubsub dispatcher Pubsub Dispatcher = "pubsub" // Kafka registers a kafka dispatcher Kafka Dispatcher = "kafka" // Kinesis registers a kinesis publisher Kinesis Dispatcher = "kinesis" // Logger registers a simple logger Logger Dispatcher = "logger" // ZMQ registers a zmq logger ZMQ Dispatcher = "zmq" // MQTT registers an MQTT dispatcher MQTT Dispatcher = "mqtt" )
type NonAnonymizedError ¶
type NonAnonymizedError struct { }
NonAnonymizedError is an error struct representing mismatch ID
func (*NonAnonymizedError) Error ¶
func (e *NonAnonymizedError) Error() string
Error returns an error string implementing the error interface
type Producer ¶
type Producer interface { Close() error Produce(entry *Record) ProcessReliableAck(entry *Record) ReportError(message string, err error, logInfo logrus.LogInfo) }
Producer handles dispatching data received from the vehicle
type Record ¶
type Record struct { ProduceTime time.Time ReceivedTimestamp int64 Serializer *BinarySerializer SocketID string Timestamp int64 Txid string TxType string TripID string Version int Vin string PayloadBytes []byte RawBytes []byte // contains filtered or unexported fields }
Record is a structs that represents the telemetry records vehicles send to the backend vin is used as kafka produce partitioning key by default, can be configured to random
func NewRecord ¶
func NewRecord(ts *BinarySerializer, msg []byte, socketID string, transmitDecodedRecords bool) (*Record, error)
NewRecord Sanitizes and instantiates a Record from a message !! caller expect *Record to not be nil !!
func (*Record) Dispatch ¶
func (record *Record) Dispatch()
Dispatch uses the configuration to send records to the list of backends/data stores they belong
func (*Record) GetJSONPayload ¶ added in v0.1.9
GetJSONPayload marshals to JSON if requested
func (*Record) GetProtoMessage ¶ added in v0.1.9
GetProtoMessage gets extracted protobuf message
func (*Record) LengthRawBytes ¶ added in v0.2.1
LengthRawBytes gets the record's flatbuffer payload byte size
type RequestIdentity ¶
RequestIdentity stores identifiers for the socket connection
type UnauthorizedSenderIDError ¶
type UnauthorizedSenderIDError struct {}
UnauthorizedSenderIDError is an error struct representing mismatch ID
func (*UnauthorizedSenderIDError) Error ¶
func (e *UnauthorizedSenderIDError) Error() string
Error returns an error string implementing the error interface
type UnknownMessageType ¶
UnknownMessageType is an error struct representing a message that cannot be parsed
func (*UnknownMessageType) Error ¶
func (e *UnknownMessageType) Error() string
Error returns an error string implementing the error interface