Documentation ¶
Index ¶
- func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateRequestEventRecord(objectID, referenceID, message string, resource *resources.Resource, ...) *si.EventRecord
- func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func Init()
- type EventPublisher
- type EventStore
- type EventStream
- type EventStreamData
- type EventStreaming
- func (e *EventStreaming) Close()
- func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream
- func (e *EventStreaming) GetEventStreams() []EventStreamData
- func (e *EventStreaming) PublishEvent(event *si.EventRecord)
- func (e *EventStreaming) RemoveEventStream(consumer *EventStream)
- type EventSystem
- type EventSystemImpl
- func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)
- func (ec *EventSystemImpl) CloseAllStreams()
- func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream
- func (ec *EventSystemImpl) GetEventStreams() []EventStreamData
- func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)
- func (ec *EventSystemImpl) GetRequestCapacity() uint64
- func (ec *EventSystemImpl) GetRingBufferCapacity() uint64
- func (ec *EventSystemImpl) IsEventTrackingEnabled() bool
- func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)
- func (ec *EventSystemImpl) Restart()
- func (ec *EventSystemImpl) StartService()
- func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)
- func (ec *EventSystemImpl) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateAppEventRecord ¶
func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord
func CreateNodeEventRecord ¶
func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord
func CreateQueueEventRecord ¶
func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource, state string) *si.EventRecord
func CreateUserGroupEventRecord ¶
func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord
Types ¶
type EventPublisher ¶
type EventPublisher struct {
// contains filtered or unexported fields
}
func CreateShimPublisher ¶
func CreateShimPublisher(store *EventStore) *EventPublisher
func (*EventPublisher) StartService ¶
func (sp *EventPublisher) StartService()
func (*EventPublisher) Stop ¶
func (sp *EventPublisher) Stop()
type EventStore ¶
The EventStore operates under the following assumptions:
- there is a cap for the number of events stored
- the CollectEvents() function clears the currently stored events in the EventStore
Assuming the rate of events generated by the scheduler component in a given time period is high, calling CollectEvents() periodically should be fine.
func (*EventStore) CollectEvents ¶
func (es *EventStore) CollectEvents() []*si.EventRecord
func (*EventStore) CountStoredEvents ¶
func (es *EventStore) CountStoredEvents() uint64
func (*EventStore) SetStoreSize ¶
func (es *EventStore) SetStoreSize(size uint64)
func (*EventStore) Store ¶
func (es *EventStore) Store(event *si.EventRecord)
type EventStream ¶
type EventStream struct {
Events <-chan *si.EventRecord
}
EventStream handle type returned to the client that wants to capture the stream of events.
type EventStreamData ¶
EventStreamData contains data about an event stream.
type EventStreaming ¶
EventStreaming implements the event streaming logic. New events are immediately forwarded to all active consumers.
func NewEventStreaming ¶
func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming
NewEventStreaming creates a new event streaming infrastructure.
func (*EventStreaming) Close ¶
func (e *EventStreaming) Close()
Close stops event streaming completely.
func (*EventStreaming) CreateEventStream ¶
func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream
CreateEventStream sets up event streaming for a consumer. The returned EventStream object contains a channel that can be used for reading.
When a consumer is finished, it must call RemoveEventStream to free up resources.
Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
func (*EventStreaming) GetEventStreams ¶
func (e *EventStreaming) GetEventStreams() []EventStreamData
GetEventStreams returns the current active event streams.
func (*EventStreaming) PublishEvent ¶
func (e *EventStreaming) PublishEvent(event *si.EventRecord)
PublishEvent publishes an event to all event stream consumers.
The streaming logic uses bridging to ensure proper ordering of existing and new events. Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
If "local" is full, it means that the consumer side has not processed the events at an appropriate pace. Such a consumer is removed and the related channels are closed.
func (*EventStreaming) RemoveEventStream ¶
func (e *EventStreaming) RemoveEventStream(consumer *EventStream)
RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.
type EventSystem ¶
type EventSystem interface { // AddEvent adds an event record to the event system for processing: // 1. It is added to a slice from where it is periodically read by the shim publisher. // 2. It is added to an internal ring buffer so that clients can retrieve the event history. // 3. Streaming clients are updated. AddEvent(event *si.EventRecord) // StartService starts the event system. // This method does not block. Events are processed on a separate goroutine. StartService() // Stop stops the event system. Stop() // IsEventTrackingEnabled whether history tracking is currently enabled or not. IsEventTrackingEnabled() bool // GetEventsFromID retrieves "count" number of elements from the history buffer from "id". Every // event has a unique ID inside the ring buffer. // If "id" is not in the buffer, then no record is returned, but the currently available range // [low..high] is set. GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64) // CreateEventStream creates an event stream (channel) for a consumer. // The "name" argument is an arbitrary string for a consumer, which is used for logging. It does not need to be unique. // The "count" argument defines how many historical elements should be returned on the stream. Zero is a valid value for "count". // The returned type contains a read-only channel which is updated as soon as there is a new event record. // It is also used as a handle to stop the streaming. // Consumers must read the channel and process the event objects as soon as they can to avoid // events piling up inside the channel buffers. CreateEventStream(name string, count uint64) *EventStream // RemoveStream stops streaming for a given consumer. // Consumers that no longer wish to be updated (e.g., a remote client // disconnected) *must* call this method to gracefully stop the streaming. RemoveStream(*EventStream) // GetEventStreams returns the current active event streams. GetEventStreams() []EventStreamData }
func GetEventSystem ¶
func GetEventSystem() EventSystem
GetEventSystem returns the event system instance. Initialization happens during the first call.
type EventSystemImpl ¶
type EventSystemImpl struct { Store *EventStore // storing eventChannel locking.RWMutex // contains filtered or unexported fields }
EventSystemImpl main implementation of the event system which is used for history tracking.
func (*EventSystemImpl) AddEvent ¶
func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)
AddEvent adds an event record to the event system. See the interface for details.
func (*EventSystemImpl) CloseAllStreams ¶
func (ec *EventSystemImpl) CloseAllStreams()
VisibleForTesting
func (*EventSystemImpl) CreateEventStream ¶
func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream
CreateEventStream creates an event stream. See the interface for details.
func (*EventSystemImpl) GetEventStreams ¶
func (ec *EventSystemImpl) GetEventStreams() []EventStreamData
GetEventStreams returns the current active event streams.
func (*EventSystemImpl) GetEventsFromID ¶
func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)
GetEventsFromID retrieves historical elements. See the interface for details.
func (*EventSystemImpl) GetRequestCapacity ¶
func (ec *EventSystemImpl) GetRequestCapacity() uint64
GetRequestCapacity returns the capacity of an intermediate storage which is used by the shim publisher.
func (*EventSystemImpl) GetRingBufferCapacity ¶
func (ec *EventSystemImpl) GetRingBufferCapacity() uint64
GetRingBufferCapacity returns the capacity of the buffer which stores historical elements.
func (*EventSystemImpl) IsEventTrackingEnabled ¶
func (ec *EventSystemImpl) IsEventTrackingEnabled() bool
IsEventTrackingEnabled whether history tracking is currently enabled or not.
func (*EventSystemImpl) RemoveStream ¶
func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)
RemoveStream graceful termination of an event streaming for a consumer. See the interface for details.
func (*EventSystemImpl) Restart ¶
func (ec *EventSystemImpl) Restart()
Restart restarts the event system, used during config update.
func (*EventSystemImpl) StartService ¶
func (ec *EventSystemImpl) StartService()
StartService starts the event processing in the background. See the interface for details.
func (*EventSystemImpl) StartServiceWithPublisher ¶
func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)
StartServiceWithPublisher starts the event processing background routines. Only exported for testing.