streaming

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 29, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const ReadWriteExitDeadline = 50 * time.Millisecond

ReadWriteExitDeadline is the deadline to set when existing the reader/writer

View Source
const SocketContext contextKeyType = iota + 1

SocketContext is the name of variable holding socket data in the context

View Source
const WriteLoopDeadline = 10 * time.Second

WriteLoopDeadline is the read/write deadline in the main loop

Variables

This section is empty.

Functions

This section is empty.

Types

type Metrics added in v0.0.6

type Metrics struct {
	// contains filtered or unexported fields
}

Metrics stores metrics reported from this package

type Server

type Server struct {
	// DispatchRules is a mapping of topics (records type) to their dispatching methods (loaded from Records json)
	DispatchRules map[string][]telemetry.Producer
	// contains filtered or unexported fields
}

Server stores server resources

func InitServer

func InitServer(c *config.Config, producerRules map[string][]telemetry.Producer, logger *logrus.Logger, registry *SocketRegistry) (*http.Server, *Server, error)

InitServer initializes the main server

func (*Server) ServeBinaryWs

func (s *Server) ServeBinaryWs(config *config.Config, registry *SocketRegistry) func(w http.ResponseWriter, r *http.Request)

ServeBinaryWs serves a http query and upgrades it to a websocket -- only serves binary data coming from the ws

func (*Server) Status

func (s *Server) Status() func(w http.ResponseWriter, r *http.Request)

Status API shows server with mtls config is up

type SocketManager

type SocketManager struct {
	Ws           *websocket.Conn
	MsgType      int
	RecordsStats map[string]int
	StartTime    time.Time
	UUID         string
	// contains filtered or unexported fields
}

SocketManager is a struct responsible for managing the socket connection with the clients

func NewSocketManager

func NewSocketManager(ctx context.Context, requestIdentity *telemetry.RequestIdentity, ws *websocket.Conn, config *config.Config, logger *logrus.Logger) *SocketManager

NewSocketManager instantiates a SocketManager

func (*SocketManager) Close

func (sm *SocketManager) Close()

Close shuts down a socket connection for a single client and log metrics

func (SocketManager) DatastoreAckProcessor

func (sm SocketManager) DatastoreAckProcessor(ackChan chan (*telemetry.Record))

DatastoreAckProcessor records metrics after acking records

func (*SocketManager) ListenToWriteChannel

func (sm *SocketManager) ListenToWriteChannel() SocketMessage

ListenToWriteChannel to the write channel

func (*SocketManager) ParseAndProcessRecord

func (sm *SocketManager) ParseAndProcessRecord(serializer *telemetry.BinarySerializer, message []byte)

ParseAndProcessRecord reads incoming client message and dispatches to relevant producer

func (*SocketManager) ProcessTelemetry

func (sm *SocketManager) ProcessTelemetry(serializer *telemetry.BinarySerializer)

ProcessTelemetry uses the serializer to dispatch telemetry records

func (*SocketManager) RecordsStatsToLogInfo

func (sm *SocketManager) RecordsStatsToLogInfo() map[string]interface{}

RecordsStatsToLogInfo formats the stats map into a string

func (SocketManager) ReportMetricBytesPerRecords added in v0.0.6

func (sm SocketManager) ReportMetricBytesPerRecords(recordType string, byteSize int)

ReportMetricBytesPerRecords records metrics for metric size

type SocketMessage

type SocketMessage struct {
	MsgType int
	Msg     []byte
}

SocketMessage represents incoming socket connection

type SocketRegistry

type SocketRegistry struct {
	// contains filtered or unexported fields
}

SocketRegistry is a library to handle keeping track of connected sockets

func NewSocketRegistry

func NewSocketRegistry() *SocketRegistry

NewSocketRegistry returns an empty socket registry

func (*SocketRegistry) DeregisterSocket

func (s *SocketRegistry) DeregisterSocket(socket *SocketManager)

DeregisterSocket removes a disconnecting socket

func (*SocketRegistry) GetSocket

func (s *SocketRegistry) GetSocket(uuid string) *SocketManager

GetSocket returns a socket if connected

func (*SocketRegistry) NumConnectedSockets

func (s *SocketRegistry) NumConnectedSockets() int

NumConnectedSockets returns the number of connected sockets

func (*SocketRegistry) RegisterSocket

func (s *SocketRegistry) RegisterSocket(socket *SocketManager)

RegisterSocket registers a new socket

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL