Documentation ¶
Index ¶
- Constants
- Variables
- type DataUtils
- type DataUtilsImpl
- type FeedbackLoop
- type Manager
- type ManagerImpl
- type NotificationParser
- type NotificationParserImpl
- type Processor
- type ProcessorImpl
- func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *dtos.SegmentChangeUpdate) error
- func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *dtos.SplitChangeUpdate) error
- func (p *ProcessorImpl) ProcessSplitKillUpdate(update *dtos.SplitKillUpdate) error
- func (p *ProcessorImpl) StartWorkers()
- func (p *ProcessorImpl) StopWorkers()
- type SegmentUpdateWorker
- type SplitUpdateWorker
- type StatusTracker
- type StatusTrackerImpl
- func (p *StatusTrackerImpl) HandleAblyError(errorEvent *dtos.AblyError) (newStatus *int64)
- func (p *StatusTrackerImpl) HandleControl(controlUpdate *dtos.ControlUpdate) *int64
- func (p *StatusTrackerImpl) HandleDisconnection() *int64
- func (p *StatusTrackerImpl) HandleOccupancy(message *dtos.OccupancyMessage) (newStatus *int64)
- func (p *StatusTrackerImpl) NotifySSEShutdownExpected()
- func (p *StatusTrackerImpl) Reset()
Constants ¶
const ( StatusUp = iota StatusDown StatusRetryableError StatusNonRetryableError )
Status update contants that will be propagated to the push manager's user
Variables ¶
var ErrAlreadyRunning = errors.New("push manager already running")
ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance
var ErrEmptyEvent = errors.New("empty incoming event")
ErrEmptyEvent indicates an event without message and event fields
var ErrNotRunning = errors.New("push manager not running")
ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance
Functions ¶
This section is empty.
Types ¶
type DataUtils ¶
type DataUtils interface { Decode(data string) ([]byte, error) Decompress(data []byte, compressType int) ([]byte, error) }
func NewDataUtilsImpl ¶
func NewDataUtilsImpl() DataUtils
type DataUtilsImpl ¶
type DataUtilsImpl struct { }
func (*DataUtilsImpl) Decompress ¶
func (d *DataUtilsImpl) Decompress(data []byte, compressType int) ([]byte, error)
type FeedbackLoop ¶
type FeedbackLoop = chan<- int64
FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated
type Manager ¶
type Manager interface { Start() error Stop() error StopWorkers() StartWorkers() NextRefresh() time.Time }
Manager interface contains public methods for push manager
type ManagerImpl ¶
type ManagerImpl struct {
// contains filtered or unexported fields
}
ManagerImpl implements the manager interface
func NewManager ¶
func NewManager( logger logging.LoggerInterface, synchronizer synchronizerInterface, cfg *conf.AdvancedConfig, feedbackLoop chan<- int64, authAPI service.AuthClient, runtimeTelemetry storage.TelemetryRuntimeProducer, metadata dtos.Metadata, clientKey *string, ) (*ManagerImpl, error)
NewManager constructs a new push manager
func (*ManagerImpl) NextRefresh ¶
func (m *ManagerImpl) NextRefresh() time.Time
NextRefresh returns the time when the next token refresh will happen
func (*ManagerImpl) Start ¶
func (m *ManagerImpl) Start() error
Start initiates the authentication flow and if successful initiates a connection
func (*ManagerImpl) StartWorkers ¶
func (m *ManagerImpl) StartWorkers()
StartWorkers start the splits & segments workers
func (*ManagerImpl) Stop ¶
func (m *ManagerImpl) Stop() error
Stop method stops the sse client and it's status monitoring goroutine
func (*ManagerImpl) StopWorkers ¶
func (m *ManagerImpl) StopWorkers()
StopWorkers stops the splits & segments workers
type NotificationParser ¶
type NotificationParser interface {
ParseAndForward(sse.IncomingMessage) (*int64, error)
}
NotificationParser interface
type NotificationParserImpl ¶
type NotificationParserImpl struct {
// contains filtered or unexported fields
}
NotificationParserImpl implementas the NotificationParser interface
func NewNotificationParserImpl ¶
func NewNotificationParserImpl( loggerInterface logging.LoggerInterface, onSplitUpdate func(update *dtos.SplitChangeUpdate) error, onSplitKill func(*dtos.SplitKillUpdate) error, onSegmentUpdate func(*dtos.SegmentChangeUpdate) error, onControlUpdate func(*dtos.ControlUpdate) *int64, onOccupancyMessage func(*dtos.OccupancyMessage) *int64, onAblyError func(*dtos.AblyError) *int64) *NotificationParserImpl
func (*NotificationParserImpl) ParseAndForward ¶
func (p *NotificationParserImpl) ParseAndForward(raw sse.IncomingMessage) (*int64, error)
ParseAndForward accepts an incoming RAW event and returns a properly parsed & typed event
type Processor ¶
type Processor interface { ProcessSplitChangeUpdate(update *dtos.SplitChangeUpdate) error ProcessSplitKillUpdate(update *dtos.SplitKillUpdate) error ProcessSegmentChangeUpdate(update *dtos.SegmentChangeUpdate) error StartWorkers() StopWorkers() }
Processor provides the interface for an update-message processor
type ProcessorImpl ¶
type ProcessorImpl struct {
// contains filtered or unexported fields
}
ProcessorImpl struct for notification processor
func NewProcessor ¶
func NewProcessor( splitQueueSize int64, segmentQueueSize int64, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*ProcessorImpl, error)
NewProcessor creates new processor
func (*ProcessorImpl) ProcessSegmentChangeUpdate ¶
func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *dtos.SegmentChangeUpdate) error
ProcessSegmentChangeUpdate accepts a segment change notification and schedules a fetch
func (*ProcessorImpl) ProcessSplitChangeUpdate ¶
func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *dtos.SplitChangeUpdate) error
ProcessSplitChangeUpdate accepts a split change notifications and schedules a fetch
func (*ProcessorImpl) ProcessSplitKillUpdate ¶
func (p *ProcessorImpl) ProcessSplitKillUpdate(update *dtos.SplitKillUpdate) error
ProcessSplitKillUpdate accepts a split kill notification, issues a local kill and schedules a fetch
func (*ProcessorImpl) StartWorkers ¶
func (p *ProcessorImpl) StartWorkers()
StartWorkers enables split & segments workers
func (*ProcessorImpl) StopWorkers ¶
func (p *ProcessorImpl) StopWorkers()
StopWorkers pauses split & segments workers
type SegmentUpdateWorker ¶
type SegmentUpdateWorker struct {
// contains filtered or unexported fields
}
SegmentUpdateWorker struct
func NewSegmentUpdateWorker ¶
func NewSegmentUpdateWorker( segmentQueue chan dtos.SegmentChangeUpdate, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*SegmentUpdateWorker, error)
NewSegmentUpdateWorker creates SegmentUpdateWorker
func (*SegmentUpdateWorker) IsRunning ¶
func (s *SegmentUpdateWorker) IsRunning() bool
IsRunning indicates if worker is running or not
type SplitUpdateWorker ¶
type SplitUpdateWorker struct {
// contains filtered or unexported fields
}
SplitUpdateWorker struct
func NewSplitUpdateWorker ¶
func NewSplitUpdateWorker( splitQueue chan dtos.SplitChangeUpdate, synchronizer synchronizerInterface, logger logging.LoggerInterface, ) (*SplitUpdateWorker, error)
NewSplitUpdateWorker creates SplitUpdateWorker
func (*SplitUpdateWorker) IsRunning ¶
func (s *SplitUpdateWorker) IsRunning() bool
IsRunning indicates if worker is running or not
type StatusTracker ¶
type StatusTracker interface { HandleOccupancy(*dtos.OccupancyMessage) *int64 HandleControl(*dtos.ControlUpdate) *int64 HandleAblyError(*dtos.AblyError) *int64 HandleDisconnection() *int64 NotifySSEShutdownExpected() Reset() }
StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.
type StatusTrackerImpl ¶
type StatusTrackerImpl struct {
// contains filtered or unexported fields
}
StatusTrackerImpl is a concrete implementation of the StatusTracker interface
func NewStatusTracker ¶
func NewStatusTracker(logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *StatusTrackerImpl
NewStatusTracker returns a new StatusTracker
func (*StatusTrackerImpl) HandleAblyError ¶
func (p *StatusTrackerImpl) HandleAblyError(errorEvent *dtos.AblyError) (newStatus *int64)
HandleAblyError should be called whenever an ably error is received
func (*StatusTrackerImpl) HandleControl ¶
func (p *StatusTrackerImpl) HandleControl(controlUpdate *dtos.ControlUpdate) *int64
HandleControl should be called whenever a control notification is received
func (*StatusTrackerImpl) HandleDisconnection ¶
func (p *StatusTrackerImpl) HandleDisconnection() *int64
HandleDisconnection should be called whenver the SSE client gets disconnected
func (*StatusTrackerImpl) HandleOccupancy ¶
func (p *StatusTrackerImpl) HandleOccupancy(message *dtos.OccupancyMessage) (newStatus *int64)
HandleOccupancy should be called for every occupancy notification received
func (*StatusTrackerImpl) NotifySSEShutdownExpected ¶
func (p *StatusTrackerImpl) NotifySSEShutdownExpected()
NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client
func (*StatusTrackerImpl) Reset ¶
func (p *StatusTrackerImpl) Reset()
Reset should be called on initialization and when the a new connection is being established (to start from scratch)