Documentation
¶
Index ¶
- Constants
- Variables
- func BuildTopicName(namespace, recordName string) string
- func ParseLocation(s string) (*protos.LocationValue, error)
- type BinarySerializer
- func (bs *BinarySerializer) Ack(record *Record) []byte
- func (bs *BinarySerializer) Deserialize(msg []byte, socketID string) (record *Record, err error)
- func (bs *BinarySerializer) Dispatch(record *Record)
- func (bs *BinarySerializer) Error(err error, record *Record) []byte
- func (bs *BinarySerializer) Logger() *logrus.Logger
- func (bs *BinarySerializer) ReliableAck() bool
- type Dispatcher
- type NonAnonymizedError
- type Producer
- type Record
- func (record *Record) Ack() []byte
- func (record *Record) Dispatch()
- func (record *Record) Encode() ([]byte, error)
- func (record *Record) Error(err error) []byte
- func (record *Record) Length() int
- func (record *Record) Metadata() map[string]string
- func (record *Record) Payload() []byte
- func (record *Record) Raw() []byte
- type RequestIdentity
- type UnauthorizedSenderIDError
- type UnknownMessageType
Constants ¶
const ( // SizeLimit maximum incoming payload size from the vehicle SizeLimit = 1000000 // 1mb )
Variables ¶
var ErrMessageTooBig = fmt.Errorf("can't process message, size above 1mb")
ErrMessageTooBig handles error when incoming payload is too large
Functions ¶
func BuildTopicName ¶ added in v0.0.5
BuildTopicName creates a topic from a namespace and a recordName
func ParseLocation ¶ added in v0.1.1
func ParseLocation(s string) (*protos.LocationValue, error)
ParseLocation parses a location string (such as "(37.412374 N, 122.145867 W)") into a *proto.Location type.
Types ¶
type BinarySerializer ¶
type BinarySerializer struct { DispatchRules map[string][]Producer RequestIdentity *RequestIdentity // contains filtered or unexported fields }
BinarySerializer serializes records
func NewBinarySerializer ¶
func NewBinarySerializer(requestIdentity *RequestIdentity, dispatchRules map[string][]Producer, reliableAck bool, logger *logrus.Logger) *BinarySerializer
NewBinarySerializer returns a dedicated serializer for a current socket connection
func (*BinarySerializer) Ack ¶
func (bs *BinarySerializer) Ack(record *Record) []byte
Ack returns an ack response
func (*BinarySerializer) Deserialize ¶
func (bs *BinarySerializer) Deserialize(msg []byte, socketID string) (record *Record, err error)
Deserialize transforms a csv byte array into a Record
func (*BinarySerializer) Dispatch ¶
func (bs *BinarySerializer) Dispatch(record *Record)
Dispatch pushes the record to kafka for every rule associated to it
func (*BinarySerializer) Error ¶
func (bs *BinarySerializer) Error(err error, record *Record) []byte
Error returns an error response
func (*BinarySerializer) Logger ¶
func (bs *BinarySerializer) Logger() *logrus.Logger
Logger returns logger for the serializer
func (*BinarySerializer) ReliableAck ¶
func (bs *BinarySerializer) ReliableAck() bool
ReliableAck returns true if serializer supports reliable acks (only ack to car once datastore acked the data)
type Dispatcher ¶
type Dispatcher string
Dispatcher type of telemetry record dispatcher
const ( // Pubsub registers a Google pubsub dispatcher Pubsub Dispatcher = "pubsub" // Kafka registers a kafka dispatcher Kafka Dispatcher = "kafka" // Kinesis registers a kinesis publisher Kinesis Dispatcher = "kinesis" // Logger registers a simple logger Logger Dispatcher = "logger" )
type NonAnonymizedError ¶
type NonAnonymizedError struct { }
NonAnonymizedError is an error struct representing mismatch ID
func (*NonAnonymizedError) Error ¶
func (e *NonAnonymizedError) Error() string
Error returns an error string implementing the error interface
type Producer ¶
type Producer interface {
Produce(entry *Record)
}
Producer handles dispatching data received from the vehicle
type Record ¶
type Record struct { ProduceTime time.Time ReceivedTimestamp int64 Serializer *BinarySerializer SocketID string Timestamp int64 Txid string TxType string TripID string Version int Vin string PayloadBytes []byte RawBytes []byte }
Record is a structs that represents the telemetry records vehicles send to the backend vin is used as kafka produce partitioning key by default, can be configured to random
func NewRecord ¶
func NewRecord(ts *BinarySerializer, msg []byte, socketID string) (*Record, error)
NewRecord Sanitizes and instantiates a Record from a message !! caller expect *Record to not be nil !!
func (*Record) Dispatch ¶
func (record *Record) Dispatch()
Dispatch uses the configuration to send records to the list of backends/data stores they belong
type RequestIdentity ¶
RequestIdentity stores identifiers for the socket connection
type UnauthorizedSenderIDError ¶
type UnauthorizedSenderIDError struct {}
UnauthorizedSenderIDError is an error struct representing mismatch ID
func (*UnauthorizedSenderIDError) Error ¶
func (e *UnauthorizedSenderIDError) Error() string
Error returns an error string implementing the error interface
type UnknownMessageType ¶
UnknownMessageType is an error struct representing a message that cannot be parsed
func (*UnknownMessageType) Error ¶
func (e *UnknownMessageType) Error() string
Error returns an error string implementing the error interface