Documentation ¶
Index ¶
- Variables
- type AddressEventHandler
- type ChanneledSubscriber
- type ChanneledSubscriberEvent
- type EventChanneledSubscriber
- type EventController
- type EventHandler
- type EventIDStore
- type EventPollWaiter
- type EventSource
- type EventSubscriber
- type InMemoryEventIDStore
- type LabelEventHandler
- type MessageEventHandler
- type NoOpSubscribable
- type NullEventSource
- type RefreshEventHandler
- type Service
- func (s *Service) Close()
- func (s *Service) IsPaused() bool
- func (s *Service) Pause()
- func (s *Service) PauseWithWaiter() *EventPollWaiter
- func (s *Service) Resume()
- func (s *Service) RewindEventID(ctx context.Context, id string) error
- func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) (string, error)
- func (s *Service) Subscribe(subscription EventSubscriber)
- func (s *Service) Unsubscribe(subscription EventSubscriber)
- type Subscribable
- type Subscription
- type UserEventHandler
- type UserSettingsHandler
- type UserUsedSpaceEventHandler
- type VaultEventIDStore
Constants ¶
This section is empty.
Variables ¶
var ErrPublishTimeoutExceeded = errors.New("event publish timed out")
Functions ¶
This section is empty.
Types ¶
type AddressEventHandler ¶
type ChanneledSubscriber ¶
type ChanneledSubscriber[T any] struct { // contains filtered or unexported fields }
func (*ChanneledSubscriber[T]) OnEventCh ¶
func (c *ChanneledSubscriber[T]) OnEventCh() <-chan *ChanneledSubscriberEvent[T]
type ChanneledSubscriberEvent ¶
type ChanneledSubscriberEvent[T any] struct { // contains filtered or unexported fields }
func (ChanneledSubscriberEvent[T]) Consume ¶
func (c ChanneledSubscriberEvent[T]) Consume(f func(T) error)
type EventChanneledSubscriber ¶
type EventChanneledSubscriber = ChanneledSubscriber[proton.Event]
func NewEventSubscriber ¶
func NewEventSubscriber(name string) *EventChanneledSubscriber
type EventController ¶
type EventController interface { Pause() Resume() }
type EventHandler ¶
type EventHandler struct { RefreshHandler RefreshEventHandler AddressHandler AddressEventHandler UserHandler UserEventHandler LabelHandler LabelEventHandler MessageHandler MessageEventHandler UsedSpaceHandler UserUsedSpaceEventHandler UserSettingsHandler UserSettingsHandler }
type EventIDStore ¶
type EventIDStore interface { // Load the last stored event, return "" for empty. Load(ctx context.Context) (string, error) // Store the new id. Store(ctx context.Context, id string) error }
EventIDStore exposes behavior expected of a type which allows us to store and retrieve event Ids. Note: this may be accessed from multiple go-routines.
type EventPollWaiter ¶
type EventPollWaiter struct {
// contains filtered or unexported fields
}
EventPollWaiter is meant to be used to wait for the event loop to finish processing the current events after being paused.
func (*EventPollWaiter) WaitPollFinished ¶
func (e *EventPollWaiter) WaitPollFinished(ctx context.Context) error
func (*EventPollWaiter) WaitPollFinishedWithDeadline ¶
type EventSource ¶
type EventSource interface { GetLatestEventID(ctx context.Context) (string, error) GetEvent(ctx context.Context, id string) ([]proton.Event, bool, error) }
EventSource represents a type which produces proton events.
type EventSubscriber ¶
type EventSubscriber = subscriber[proton.Event]
type InMemoryEventIDStore ¶
type InMemoryEventIDStore struct {
// contains filtered or unexported fields
}
func NewInMemoryEventIDStore ¶
func NewInMemoryEventIDStore() *InMemoryEventIDStore
type LabelEventHandler ¶
type MessageEventHandler ¶
type NoOpSubscribable ¶
type NoOpSubscribable struct{}
func (NoOpSubscribable) Subscribe ¶
func (n NoOpSubscribable) Subscribe(_ EventSubscriber)
func (NoOpSubscribable) Unsubscribe ¶
func (n NoOpSubscribable) Unsubscribe(_ EventSubscriber)
type NullEventSource ¶
type NullEventSource struct{}
func (NullEventSource) GetLatestEventID ¶
func (n NullEventSource) GetLatestEventID(_ context.Context) (string, error)
type RefreshEventHandler ¶
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service polls from the given event source and ensures that all the respective subscribers get notified before proceeding to the next event. The events are published in the following order: * Refresh * User * Address * Label * Message * UserUsedSpace By default this service starts paused, you need to call `Service.Resume` at least one time to begin event polling.
func NewService ¶
func NewService( userID string, eventSource EventSource, store EventIDStore, eventPublisher events.EventPublisher, pollPeriod time.Duration, jitter time.Duration, eventTimeout time.Duration, panicHandler async.PanicHandler, ) *Service
func (*Service) Close ¶
func (s *Service) Close()
Close should be called after the service has been cancelled to clean up any remaining pending operations.
func (*Service) PauseWithWaiter ¶
func (s *Service) PauseWithWaiter() *EventPollWaiter
PauseWithWaiter pauses the event polling and returns a waiter to notify when the last event has been published after the pause request.
func (*Service) RewindEventID ¶
RewindEventID sets the event id as the next event to be polled.
func (*Service) Start ¶
func (s *Service) Start(ctx context.Context, group *orderedtasks.OrderedCancelGroup) (string, error)
Start the event service and return the last EventID that was processed.
func (*Service) Subscribe ¶
func (s *Service) Subscribe(subscription EventSubscriber)
Subscribe adds new subscribers to the service. This method can safely be called during event handling.
func (*Service) Unsubscribe ¶
func (s *Service) Unsubscribe(subscription EventSubscriber)
Unsubscribe removes subscribers from the service. This method can safely be called during event handling.
type Subscribable ¶
type Subscribable interface { Subscribe(subscription EventSubscriber) Unsubscribe(subscription EventSubscriber) }
Subscribable represents a type that allows the registration of event subscribers.
type Subscription ¶
type Subscription = EventSubscriber
type UserEventHandler ¶
type UserSettingsHandler ¶
type VaultEventIDStore ¶
type VaultEventIDStore struct {
// contains filtered or unexported fields
}
func NewVaultEventIDStore ¶
func NewVaultEventIDStore(vault *vault.User) *VaultEventIDStore