Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BatchQueries ¶
func BatchQueries(values <-chan *model.EventRequest, maxItems int, maxTimeout time.Duration) chan []*model.EventRequest
BatchQueries Reads from a channel of UpdateRequests and batches them up, waiting for either maxItems to be present in the batch or maxTimeout to have elapsed since the batch was started (whichever occurs first). Once the condition has been met, the batch is released and the next batch is started.
Types ¶
type DefaultSequenceManager ¶
type DefaultSequenceManager struct {
// contains filtered or unexported fields
}
func NewStaticSequenceManager ¶
func NewStaticSequenceManager(initialSequences map[int64]int64) *DefaultSequenceManager
NewStaticSequenceManager returns a SequenceManager that only updates if `Update` is called. This is mainly usefuly for test purposes
func NewUpdatingSequenceManager ¶
func NewUpdatingSequenceManager(ctx context.Context, eventDb *eventdb.EventDb, pulsarClient pulsar.Client, updateTopic string) (*DefaultSequenceManager, error)
NewUpdatingSequenceManager returns a SequenceManager that is initialised from the eventDb and then receives updates from pulsar
func (*DefaultSequenceManager) Get ¶
func (sm *DefaultSequenceManager) Get(jobsetId int64) (int64, bool)
Get Retrieves the latets sequence for the given jobset. The boolean returned will be true if an offset exists and false otherwise
func (*DefaultSequenceManager) Update ¶
func (sm *DefaultSequenceManager) Update(newSequences map[int64]int64)
Update updates the sequences for the supplied jobsets. Any sequences in the update which are lower than the sequences we already store will be ignored.
type DefaultSubscriptionManager ¶
type DefaultSubscriptionManager struct {
// contains filtered or unexported fields
}
func NewSubscriptionManager ¶
func NewSubscriptionManager(sequenceManager SequenceManager, db eventDbRO, maxBatchSize int, maxTimeout time.Duration, pollPeriod time.Duration, queryConcurrency int, maxFetchSize int, clock clock.Clock) *DefaultSubscriptionManager
NewSubscriptionManager returns a DefaultSubscriptionManager that can fetch events from postgres and manage subscription requests for new data
func (*DefaultSubscriptionManager) Subscribe ¶
func (sm *DefaultSubscriptionManager) Subscribe(jobset int64, fromOffset int64) *model.EventSubscription
Subscribe returns an EventSubscription which consists of a stream of events along with a subscription id Callers should pass back the subscriptionId when they want to Unsubscribe.
func (*DefaultSubscriptionManager) Unsubscribe ¶
func (sm *DefaultSubscriptionManager) Unsubscribe(subscriptionId int64)
Unsubscribe frees up resources associated with the stream
type EventApi ¶
type EventApi struct {
// contains filtered or unexported fields
}
EventApi is responsible for serveing User requests for event messages
func NewEventApi ¶
func NewEventApi(jobsetMapper eventapi.JobsetMapper, subscriptionManager SubscriptionManager, sequenceManager SequenceManager) *EventApi
func (*EventApi) GetJobSetEvents ¶
func (r *EventApi) GetJobSetEvents(request *api.JobSetRequest, stream api.Event_GetJobSetEventsServer) error
GetJobSetEvents Returns a stream of events from the events Db If request.Watch is set then the stream will only end when the user requests it, otherwise it will return all events present in the database when the request was made.
type SequenceManager ¶
type SequenceManager interface { Get(jobsetId int64) (int64, bool) Update(newOffsets map[int64]int64) }
SequenceManager is responsible for storing the latest available Sequence number for each jobset
type SubscriptionManager ¶
type SubscriptionManager interface { Subscribe(jobset int64, fromOffset int64) *model.EventSubscription Unsubscribe(subscriptionId int64) }
SubscriptionManager lets callers subscribe to channels of events in an efficient manner