events

package
v0.0.0-...-3d39b65 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 20, 2024 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateAppEventRecord

func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord

func CreateNodeEventRecord

func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord

func CreateQueueEventRecord

func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord

func CreateRequestEventRecord

func CreateRequestEventRecord(objectID, referenceID, message string, resource *resources.Resource, state string) *si.EventRecord

func CreateUserGroupEventRecord

func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord

func Init

func Init()

Init Initializes the event system. Only exported for testing.

Types

type EventPublisher

type EventPublisher struct {
	// contains filtered or unexported fields
}

func CreateShimPublisher

func CreateShimPublisher(store *EventStore) *EventPublisher

func (*EventPublisher) StartService

func (sp *EventPublisher) StartService()

func (*EventPublisher) Stop

func (sp *EventPublisher) Stop()

type EventStore

type EventStore struct {
	locking.RWMutex
	// contains filtered or unexported fields
}

The EventStore operates under the following assumptions:

  • there is a cap for the number of events stored
  • the CollectEvents() function clears the currently stored events in the EventStore

Assuming the rate of events generated by the scheduler component in a given time period is high, calling CollectEvents() periodically should be fine.

func (*EventStore) CollectEvents

func (es *EventStore) CollectEvents() []*si.EventRecord

func (*EventStore) CountStoredEvents

func (es *EventStore) CountStoredEvents() uint64

func (*EventStore) SetStoreSize

func (es *EventStore) SetStoreSize(size uint64)

func (*EventStore) Store

func (es *EventStore) Store(event *si.EventRecord)

type EventStream

type EventStream struct {
	Events <-chan *si.EventRecord
}

EventStream handle type returned to the client that wants to capture the stream of events.

type EventStreamData

type EventStreamData struct {
	Name      string
	CreatedAt time.Time
}

EventStreamData contains data about an event stream.

type EventStreaming

type EventStreaming struct {
	locking.RWMutex
	// contains filtered or unexported fields
}

EventStreaming implements the event streaming logic. New events are immediately forwarded to all active consumers.

func NewEventStreaming

func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming

NewEventStreaming creates a new event streaming infrastructure.

func (*EventStreaming) Close

func (e *EventStreaming) Close()

Close stops event streaming completely.

func (*EventStreaming) CreateEventStream

func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream

CreateEventStream sets up event streaming for a consumer. The returned EventStream object contains a channel that can be used for reading.

When a consumer is finished, it must call RemoveEventStream to free up resources.

Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number of maximum historical events from the ring buffer. "0" is a valid value and means no past events.

func (*EventStreaming) GetEventStreams

func (e *EventStreaming) GetEventStreams() []EventStreamData

GetEventStreams returns the current active event streams.

func (*EventStreaming) PublishEvent

func (e *EventStreaming) PublishEvent(event *si.EventRecord)

PublishEvent publishes an event to all event stream consumers.

The streaming logic uses bridging to ensure proper ordering of existing and new events. Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.

If "local" is full, it means that the consumer side has not processed the events at an appropriate pace. Such a consumer is removed and the related channels are closed.

func (*EventStreaming) RemoveEventStream

func (e *EventStreaming) RemoveEventStream(consumer *EventStream)

RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.

type EventSystem

type EventSystem interface {
	// AddEvent adds an event record to the event system for processing:
	// 1. It is added to a slice from where it is periodically read by the shim publisher.
	// 2. It is added to an internal ring buffer so that clients can retrieve the event history.
	// 3. Streaming clients are updated.
	AddEvent(event *si.EventRecord)

	// StartService starts the event system.
	// This method does not block. Events are processed on a separate goroutine.
	StartService()

	// Stop stops the event system.
	Stop()

	// IsEventTrackingEnabled whether history tracking is currently enabled or not.
	IsEventTrackingEnabled() bool

	// GetEventsFromID retrieves "count" number of elements from the history buffer from "id". Every
	// event has a unique ID inside the ring buffer.
	// If "id" is not in the buffer, then no record is returned, but the currently available range
	// [low..high] is set.
	GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)

	// CreateEventStream creates an event stream (channel) for a consumer.
	// The "name" argument is an arbitrary string for a consumer, which is used for logging. It does not need to be unique.
	// The "count" argument defines how many historical elements should be returned on the stream. Zero is a valid value for "count".
	// The returned type contains a read-only channel which is updated as soon as there is a new event record.
	// It is also used as a handle to stop the streaming.
	// Consumers must read the channel and process the event objects as soon as they can to avoid
	// events piling up inside the channel buffers.
	CreateEventStream(name string, count uint64) *EventStream

	// RemoveStream stops streaming for a given consumer.
	// Consumers that no longer wish to be updated (e.g., a remote client
	// disconnected) *must* call this method to gracefully stop the streaming.
	RemoveStream(*EventStream)

	// GetEventStreams returns the current active event streams.
	GetEventStreams() []EventStreamData
}

func GetEventSystem

func GetEventSystem() EventSystem

GetEventSystem returns the event system instance. Initialization happens during the first call.

type EventSystemImpl

type EventSystemImpl struct {
	Store *EventStore // storing eventChannel

	locking.RWMutex
	// contains filtered or unexported fields
}

EventSystemImpl main implementation of the event system which is used for history tracking.

func (*EventSystemImpl) AddEvent

func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)

AddEvent adds an event record to the event system. See the interface for details.

func (*EventSystemImpl) CloseAllStreams

func (ec *EventSystemImpl) CloseAllStreams()

VisibleForTesting

func (*EventSystemImpl) CreateEventStream

func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream

CreateEventStream creates an event stream. See the interface for details.

func (*EventSystemImpl) GetEventStreams

func (ec *EventSystemImpl) GetEventStreams() []EventStreamData

GetEventStreams returns the current active event streams.

func (*EventSystemImpl) GetEventsFromID

func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)

GetEventsFromID retrieves historical elements. See the interface for details.

func (*EventSystemImpl) GetRequestCapacity

func (ec *EventSystemImpl) GetRequestCapacity() uint64

GetRequestCapacity returns the capacity of an intermediate storage which is used by the shim publisher.

func (*EventSystemImpl) GetRingBufferCapacity

func (ec *EventSystemImpl) GetRingBufferCapacity() uint64

GetRingBufferCapacity returns the capacity of the buffer which stores historical elements.

func (*EventSystemImpl) IsEventTrackingEnabled

func (ec *EventSystemImpl) IsEventTrackingEnabled() bool

IsEventTrackingEnabled whether history tracking is currently enabled or not.

func (*EventSystemImpl) RemoveStream

func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)

RemoveStream graceful termination of an event streaming for a consumer. See the interface for details.

func (*EventSystemImpl) Restart

func (ec *EventSystemImpl) Restart()

Restart restarts the event system, used during config update.

func (*EventSystemImpl) StartService

func (ec *EventSystemImpl) StartService()

StartService starts the event processing in the background. See the interface for details.

func (*EventSystemImpl) StartServiceWithPublisher

func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)

StartServiceWithPublisher starts the event processing background routines. Only exported for testing.

func (*EventSystemImpl) Stop

func (ec *EventSystemImpl) Stop()

Stop stops the event system.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL