Documentation ¶
Index ¶
- func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func CreateRequestEventRecord(objectID, referenceID, message string, resource *resources.Resource) *si.EventRecord
- func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, ...) *si.EventRecord
- func Init()
- type EventPublisher
- type EventStore
- type EventStream
- type EventStreamData
- type EventStreaming
- func (e *EventStreaming) Close()
- func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream
- func (e *EventStreaming) GetEventStreams() []EventStreamData
- func (e *EventStreaming) PublishEvent(event *si.EventRecord)
- func (e *EventStreaming) RemoveEventStream(consumer *EventStream)
- type EventSystem
- type EventSystemImpl
- func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)
- func (ec *EventSystemImpl) CloseAllStreams()
- func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream
- func (ec *EventSystemImpl) GetEventStreams() []EventStreamData
- func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)
- func (ec *EventSystemImpl) GetRequestCapacity() int
- func (ec *EventSystemImpl) GetRingBufferCapacity() uint64
- func (ec *EventSystemImpl) IsEventTrackingEnabled() bool
- func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)
- func (ec *EventSystemImpl) Restart()
- func (ec *EventSystemImpl) StartService()
- func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)
- func (ec *EventSystemImpl) Stop()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateAppEventRecord ¶
func CreateAppEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord
func CreateNodeEventRecord ¶
func CreateNodeEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord
func CreateQueueEventRecord ¶
func CreateQueueEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord
func CreateRequestEventRecord ¶
func CreateRequestEventRecord(objectID, referenceID, message string, resource *resources.Resource) *si.EventRecord
func CreateUserGroupEventRecord ¶ added in v1.5.0
func CreateUserGroupEventRecord(objectID, message, referenceID string, changeType si.EventRecord_ChangeType, changeDetail si.EventRecord_ChangeDetail, resource *resources.Resource) *si.EventRecord
Types ¶
type EventPublisher ¶
type EventPublisher struct {
// contains filtered or unexported fields
}
func CreateShimPublisher ¶
func CreateShimPublisher(store *EventStore) *EventPublisher
func (*EventPublisher) StartService ¶
func (sp *EventPublisher) StartService()
func (*EventPublisher) Stop ¶
func (sp *EventPublisher) Stop()
type EventStore ¶
The EventStore operates under the following assumptions:
- there is a cap for the number of events stored
- the CollectEvents() function clears the currently stored events in the EventStore
Assuming the rate of events generated by the scheduler component in a given time period is high, calling CollectEvents() periodically should be fine.
func (*EventStore) CollectEvents ¶
func (es *EventStore) CollectEvents() []*si.EventRecord
func (*EventStore) CountStoredEvents ¶
func (es *EventStore) CountStoredEvents() int
func (*EventStore) Store ¶
func (es *EventStore) Store(event *si.EventRecord)
type EventStream ¶ added in v1.5.0
type EventStream struct {
Events <-chan *si.EventRecord
}
EventStream handle type returned to the client that wants to capture the stream of events.
type EventStreamData ¶ added in v1.5.0
EventStreamData contains data about an event stream.
type EventStreaming ¶ added in v1.5.0
EventStreaming implements the event streaming logic. New events are immediately forwarded to all active consumers.
func NewEventStreaming ¶ added in v1.5.0
func NewEventStreaming(eventBuffer *eventRingBuffer) *EventStreaming
NewEventStreaming creates a new event streaming infrastructure.
func (*EventStreaming) Close ¶ added in v1.5.0
func (e *EventStreaming) Close()
Close stops event streaming completely.
func (*EventStreaming) CreateEventStream ¶ added in v1.5.0
func (e *EventStreaming) CreateEventStream(name string, count uint64) *EventStream
CreateEventStream sets up event streaming for a consumer. The returned EventStream object contains a channel that can be used for reading.
When a consumer is finished, it must call RemoveEventStream to free up resources.
Consumers have an arbitrary name for logging purposes. The "count" parameter defines the number of maximum historical events from the ring buffer. "0" is a valid value and means no past events.
func (*EventStreaming) GetEventStreams ¶ added in v1.5.0
func (e *EventStreaming) GetEventStreams() []EventStreamData
GetEventStreams returns the current active event streams.
func (*EventStreaming) PublishEvent ¶ added in v1.5.0
func (e *EventStreaming) PublishEvent(event *si.EventRecord)
PublishEvent publishes an event to all event stream consumers.
The streaming logic uses bridging to ensure proper ordering of existing and new events. Events are sent to the "local" channel from where it is forwarded to the "consumer" channel.
If "local" is full, it means that the consumer side has not processed the events at an appropriate pace. Such a consumer is removed and the related channels are closed.
func (*EventStreaming) RemoveEventStream ¶ added in v1.5.0
func (e *EventStreaming) RemoveEventStream(consumer *EventStream)
RemoveEventStream stops the streaming for a given consumer. Must be called to avoid resource leaks.
type EventSystem ¶ added in v1.4.0
type EventSystem interface { // AddEvent adds an event record to the event system for processing: // 1. It is added to a slice from where it is periodically read by the shim publisher. // 2. It is added to an internal ring buffer so that clients can retrieve the event history. // 3. Streaming clients are updated. AddEvent(event *si.EventRecord) // StartService starts the event system. // This method does not block. Events are processed on a separate goroutine. StartService() // Stop stops the event system. Stop() // IsEventTrackingEnabled whether history tracking is currently enabled or not. IsEventTrackingEnabled() bool // GetEventsFromID retrieves "count" number of elements from the history buffer from "id". Every // event has a unique ID inside the ring buffer. // If "id" is not in the buffer, then no record is returned, but the currently available range // [low..high] is set. GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64) // CreateEventStream creates an event stream (channel) for a consumer. // The "name" argument is an arbitrary string for a consumer, which is used for logging. It does not need to be unique. // The "count" argument defines how many historical elements should be returned on the stream. Zero is a valid value for "count". // The returned type contains a read-only channel which is updated as soon as there is a new event record. // It is also used as a handle to stop the streaming. // Consumers must read the channel and process the event objects as soon as they can to avoid // events piling up inside the channel buffers. CreateEventStream(name string, count uint64) *EventStream // RemoveStream stops streaming for a given consumer. // Consumers that no longer wish to be updated (e.g., a remote client // disconnected) *must* call this method to gracefully stop the streaming. RemoveStream(*EventStream) // GetEventStreams returns the current active event streams. GetEventStreams() []EventStreamData }
func GetEventSystem ¶ added in v1.4.0
func GetEventSystem() EventSystem
GetEventSystem returns the event system instance. Initialization happens during the first call.
type EventSystemImpl ¶ added in v1.4.0
type EventSystemImpl struct { Store *EventStore // storing eventChannel locking.RWMutex // contains filtered or unexported fields }
EventSystemImpl main implementation of the event system which is used for history tracking.
func (*EventSystemImpl) AddEvent ¶ added in v1.4.0
func (ec *EventSystemImpl) AddEvent(event *si.EventRecord)
AddEvent adds an event record to the event system. See the interface for details.
func (*EventSystemImpl) CloseAllStreams ¶ added in v1.5.0
func (ec *EventSystemImpl) CloseAllStreams()
VisibleForTesting
func (*EventSystemImpl) CreateEventStream ¶ added in v1.5.0
func (ec *EventSystemImpl) CreateEventStream(name string, count uint64) *EventStream
CreateEventStream creates an event stream. See the interface for details.
func (*EventSystemImpl) GetEventStreams ¶ added in v1.5.0
func (ec *EventSystemImpl) GetEventStreams() []EventStreamData
GetEventStreams returns the current active event streams.
func (*EventSystemImpl) GetEventsFromID ¶ added in v1.4.0
func (ec *EventSystemImpl) GetEventsFromID(id, count uint64) ([]*si.EventRecord, uint64, uint64)
GetEventsFromID retrieves historical elements. See the interface for details.
func (*EventSystemImpl) GetRequestCapacity ¶ added in v1.4.0
func (ec *EventSystemImpl) GetRequestCapacity() int
GetRequestCapacity returns the capacity of an intermediate storage which is used by the shim publisher.
func (*EventSystemImpl) GetRingBufferCapacity ¶ added in v1.4.0
func (ec *EventSystemImpl) GetRingBufferCapacity() uint64
GetRingBufferCapacity returns the capacity of the buffer which stores historical elements.
func (*EventSystemImpl) IsEventTrackingEnabled ¶ added in v1.4.0
func (ec *EventSystemImpl) IsEventTrackingEnabled() bool
IsEventTrackingEnabled whether history tracking is currently enabled or not.
func (*EventSystemImpl) RemoveStream ¶ added in v1.5.0
func (ec *EventSystemImpl) RemoveStream(consumer *EventStream)
RemoveStream graceful termination of an event streaming for a consumer. See the interface for details.
func (*EventSystemImpl) Restart ¶ added in v1.4.0
func (ec *EventSystemImpl) Restart()
Restart restarts the event system, used during config update.
func (*EventSystemImpl) StartService ¶ added in v1.4.0
func (ec *EventSystemImpl) StartService()
StartService starts the event processing in the background. See the interface for details.
func (*EventSystemImpl) StartServiceWithPublisher ¶ added in v1.4.0
func (ec *EventSystemImpl) StartServiceWithPublisher(withPublisher bool)
StartServiceWithPublisher starts the event processing background routines. Only exported for testing.
func (*EventSystemImpl) Stop ¶ added in v1.4.0
func (ec *EventSystemImpl) Stop()
Stop stops the event system.