push

package
v5.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2024 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StatusUp = iota
	StatusDown
	StatusRetryableError
	StatusNonRetryableError
)

Status update contants that will be propagated to the push manager's user

Variables

View Source
var ErrAlreadyRunning = errors.New("push manager already running")

ErrAlreadyRunning is the error to be returned when .Start() is called on an already running instance

View Source
var ErrEmptyEvent = errors.New("empty incoming event")

ErrEmptyEvent indicates an event without message and event fields

View Source
var ErrNotRunning = errors.New("push manager not running")

ErrNotRunning is the error to be returned when .Stop() is called on a non-running instance

Functions

This section is empty.

Types

type DataUtils

type DataUtils interface {
	Decode(data string) ([]byte, error)
	Decompress(data []byte, compressType int) ([]byte, error)
}

func NewDataUtilsImpl

func NewDataUtilsImpl() DataUtils

type DataUtilsImpl

type DataUtilsImpl struct {
}

func (*DataUtilsImpl) Decode

func (d *DataUtilsImpl) Decode(data string) ([]byte, error)

func (*DataUtilsImpl) Decompress

func (d *DataUtilsImpl) Decompress(data []byte, compressType int) ([]byte, error)

type FeedbackLoop

type FeedbackLoop = chan<- int64

FeedbackLoop is a type alias for the type of chan that must be supplied for push status tobe propagated

type Manager

type Manager interface {
	Start() error
	Stop() error
	StopWorkers()
	StartWorkers()
	NextRefresh() time.Time
}

Manager interface contains public methods for push manager

type ManagerImpl

type ManagerImpl struct {
	// contains filtered or unexported fields
}

ManagerImpl implements the manager interface

func NewManager

func NewManager(
	logger logging.LoggerInterface,
	synchronizer synchronizerInterface,
	cfg *conf.AdvancedConfig,
	feedbackLoop chan<- int64,
	authAPI service.AuthClient,
	runtimeTelemetry storage.TelemetryRuntimeProducer,
	metadata dtos.Metadata,
	clientKey *string,
) (*ManagerImpl, error)

NewManager constructs a new push manager

func (*ManagerImpl) NextRefresh

func (m *ManagerImpl) NextRefresh() time.Time

NextRefresh returns the time when the next token refresh will happen

func (*ManagerImpl) Start

func (m *ManagerImpl) Start() error

Start initiates the authentication flow and if successful initiates a connection

func (*ManagerImpl) StartWorkers

func (m *ManagerImpl) StartWorkers()

StartWorkers start the splits & segments workers

func (*ManagerImpl) Stop

func (m *ManagerImpl) Stop() error

Stop method stops the sse client and it's status monitoring goroutine

func (*ManagerImpl) StopWorkers

func (m *ManagerImpl) StopWorkers()

StopWorkers stops the splits & segments workers

type NotificationParser

type NotificationParser interface {
	ParseAndForward(sse.IncomingMessage) (*int64, error)
}

NotificationParser interface

type NotificationParserImpl

type NotificationParserImpl struct {
	// contains filtered or unexported fields
}

NotificationParserImpl implementas the NotificationParser interface

func NewNotificationParserImpl

func NewNotificationParserImpl(
	loggerInterface logging.LoggerInterface,
	onSplitUpdate func(update *dtos.SplitChangeUpdate) error,
	onSplitKill func(*dtos.SplitKillUpdate) error,
	onSegmentUpdate func(*dtos.SegmentChangeUpdate) error,
	onControlUpdate func(*dtos.ControlUpdate) *int64,
	onOccupancyMessage func(*dtos.OccupancyMessage) *int64,
	onAblyError func(*dtos.AblyError) *int64) *NotificationParserImpl

func (*NotificationParserImpl) ParseAndForward

func (p *NotificationParserImpl) ParseAndForward(raw sse.IncomingMessage) (*int64, error)

ParseAndForward accepts an incoming RAW event and returns a properly parsed & typed event

type Processor

type Processor interface {
	ProcessSplitChangeUpdate(update *dtos.SplitChangeUpdate) error
	ProcessSplitKillUpdate(update *dtos.SplitKillUpdate) error
	ProcessSegmentChangeUpdate(update *dtos.SegmentChangeUpdate) error
	StartWorkers()
	StopWorkers()
}

Processor provides the interface for an update-message processor

type ProcessorImpl

type ProcessorImpl struct {
	// contains filtered or unexported fields
}

ProcessorImpl struct for notification processor

func NewProcessor

func NewProcessor(
	splitQueueSize int64,
	segmentQueueSize int64,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*ProcessorImpl, error)

NewProcessor creates new processor

func (*ProcessorImpl) ProcessSegmentChangeUpdate

func (p *ProcessorImpl) ProcessSegmentChangeUpdate(update *dtos.SegmentChangeUpdate) error

ProcessSegmentChangeUpdate accepts a segment change notification and schedules a fetch

func (*ProcessorImpl) ProcessSplitChangeUpdate

func (p *ProcessorImpl) ProcessSplitChangeUpdate(update *dtos.SplitChangeUpdate) error

ProcessSplitChangeUpdate accepts a split change notifications and schedules a fetch

func (*ProcessorImpl) ProcessSplitKillUpdate

func (p *ProcessorImpl) ProcessSplitKillUpdate(update *dtos.SplitKillUpdate) error

ProcessSplitKillUpdate accepts a split kill notification, issues a local kill and schedules a fetch

func (*ProcessorImpl) StartWorkers

func (p *ProcessorImpl) StartWorkers()

StartWorkers enables split & segments workers

func (*ProcessorImpl) StopWorkers

func (p *ProcessorImpl) StopWorkers()

StopWorkers pauses split & segments workers

type SegmentUpdateWorker

type SegmentUpdateWorker struct {
	// contains filtered or unexported fields
}

SegmentUpdateWorker struct

func NewSegmentUpdateWorker

func NewSegmentUpdateWorker(
	segmentQueue chan dtos.SegmentChangeUpdate,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*SegmentUpdateWorker, error)

NewSegmentUpdateWorker creates SegmentUpdateWorker

func (*SegmentUpdateWorker) IsRunning

func (s *SegmentUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SegmentUpdateWorker) Start

func (s *SegmentUpdateWorker) Start()

Start starts worker

func (*SegmentUpdateWorker) Stop

func (s *SegmentUpdateWorker) Stop()

Stop stops worker

type SplitUpdateWorker

type SplitUpdateWorker struct {
	// contains filtered or unexported fields
}

SplitUpdateWorker struct

func NewSplitUpdateWorker

func NewSplitUpdateWorker(
	splitQueue chan dtos.SplitChangeUpdate,
	synchronizer synchronizerInterface,
	logger logging.LoggerInterface,
) (*SplitUpdateWorker, error)

NewSplitUpdateWorker creates SplitUpdateWorker

func (*SplitUpdateWorker) IsRunning

func (s *SplitUpdateWorker) IsRunning() bool

IsRunning indicates if worker is running or not

func (*SplitUpdateWorker) Start

func (s *SplitUpdateWorker) Start()

Start starts worker

func (*SplitUpdateWorker) Stop

func (s *SplitUpdateWorker) Stop()

Stop stops worker

type StatusTracker

type StatusTracker interface {
	HandleOccupancy(*dtos.OccupancyMessage) *int64
	HandleControl(*dtos.ControlUpdate) *int64
	HandleAblyError(*dtos.AblyError) *int64
	HandleDisconnection() *int64
	NotifySSEShutdownExpected()
	Reset()
}

StatusTracker keeps track of the status of the push subsystem and generates appropriate status change notifications.

type StatusTrackerImpl

type StatusTrackerImpl struct {
	// contains filtered or unexported fields
}

StatusTrackerImpl is a concrete implementation of the StatusTracker interface

func NewStatusTracker

func NewStatusTracker(logger logging.LoggerInterface, runtimeTelemetry storage.TelemetryRuntimeProducer) *StatusTrackerImpl

NewStatusTracker returns a new StatusTracker

func (*StatusTrackerImpl) HandleAblyError

func (p *StatusTrackerImpl) HandleAblyError(errorEvent *dtos.AblyError) (newStatus *int64)

HandleAblyError should be called whenever an ably error is received

func (*StatusTrackerImpl) HandleControl

func (p *StatusTrackerImpl) HandleControl(controlUpdate *dtos.ControlUpdate) *int64

HandleControl should be called whenever a control notification is received

func (*StatusTrackerImpl) HandleDisconnection

func (p *StatusTrackerImpl) HandleDisconnection() *int64

HandleDisconnection should be called whenver the SSE client gets disconnected

func (*StatusTrackerImpl) HandleOccupancy

func (p *StatusTrackerImpl) HandleOccupancy(message *dtos.OccupancyMessage) (newStatus *int64)

HandleOccupancy should be called for every occupancy notification received

func (*StatusTrackerImpl) NotifySSEShutdownExpected

func (p *StatusTrackerImpl) NotifySSEShutdownExpected()

NotifySSEShutdownExpected should be called when we are forcefully closing the SSE client

func (*StatusTrackerImpl) Reset

func (p *StatusTrackerImpl) Reset()

Reset should be called on initialization and when the a new connection is being established (to start from scratch)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL