Documentation ¶
Overview ¶
Package gated implements a Filter that provides the ability to buffer events based on their IDs until an event is flushed. When an individual gated event is flushed, the filter will build and emit a composite event for the flushed event using it's ID to identify all the related gated events up until that point in time.
Index ¶
- Constants
- type EventPayload
- type EventPayloadDetails
- type Filter
- func (w *Filter) Close(ctx context.Context) error
- func (w *Filter) FlushAll(ctx context.Context) error
- func (w *Filter) Now() time.Time
- func (w *Filter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error)
- func (w *Filter) Reopen() error
- func (w *Filter) Type() eventlogger.NodeType
- type Gateable
- type Payload
- type Sender
Examples ¶
Constants ¶
const DefaultEventTimeout = time.Second * 10
DefaultEventTimeout defines a default expiry for events processed by a gated.Filter
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type EventPayload ¶
type EventPayload struct { ID string `json:"id"` Header map[string]interface{} `json:"header,omitempty"` Details []EventPayloadDetails `json:"details,omitempty"` }
EventPayload defines the resulting Event.Payload from gated Payload.ComposeFrom
type EventPayloadDetails ¶
type EventPayloadDetails struct { Type string `json:"type"` CreatedAt string `json:"created_at"` Payload map[string]interface{} `json:"payload,omitempty"` }
EventPayloadDetails defines the struct used in the gated EventPayload.Details slice.
type Filter ¶
type Filter struct { // Broker used to send along expired gated events Broker Sender // Expiration for gated events. It's important because without an // expiration gated events that aren't flushed/processed could consume all // available memory. Expired events will be sent along if there's a Broker // or deleted if there's no Broker. If no expiration is set the // DefaultGatedEventTimeout will be used. Expiration time.Duration // NowFunc is a func that returns the current time and the Filter and // if unset, it will default to time.Now() NowFunc func() time.Time // contains filtered or unexported fields }
Filter provides the ability to buffer events identified by a Gateable.GetID() until an event is processed that returns true for Gateable.FlushEvent().
When a Gateable Event returns true for FlushEvent(), the filter will call Gateable.ComposedOf(...) with the list of gated events with the coresponding Gateable.GetID() up to that point in time and return the resulting composed event. There is no dependency on Filter.Broker to handle an event that returns true for FlushEvent() since the Filter simply needs to return the flushed event from Filter.Process(...)
Filter.Broker is only used when handling expired events or when handling calls to Filter.FlushAll(). If Filter.Broker is nil, expired gated events will simply be deleted. If the Broker is NOT nil, then the expiring gated events will be flushed using Gateable.ComposedOf(...) and the resulting composed event is sent using the Broker. If the Broker is nil when Filter.FlushAll() is called then the gated events will just be deleted. If the Broker is not nil when Filter.FlushAll() is called, then all the gated events will be sent using the Broker.
Example ¶
package main import ( "context" "fmt" "os" "time" "github.com/hashicorp/eventlogger" "github.com/hashicorp/eventlogger/filters/gated" "github.com/hashicorp/eventlogger/sinks/writer" ) func main() { then := time.Date( 2009, 11, 17, 20, 34, 58, 651387237, time.UTC) // Create a broker b, _ := eventlogger.NewBroker() b.StopTimeAt(then) // setting this so the output timestamps are predictable for testing. // A gated.Filter for events gf := &gated.Filter{ Broker: b, NowFunc: func() time.Time { return then }, // setting this so the output timestamps are predictable for testing. } // Marshal to JSON jsonFmt := &eventlogger.JSONFormatter{} // Send the output to stdout stdoutSink := &writer.Sink{ Writer: os.Stdout, } // Register the nodes with the broker nodes := []eventlogger.Node{gf, jsonFmt, stdoutSink} nodeIDs := make([]eventlogger.NodeID, len(nodes)) for i, node := range nodes { id := eventlogger.NodeID(fmt.Sprintf("node-%d", i)) err := b.RegisterNode(id, node) if err != nil { // handle error } nodeIDs[i] = id } et := eventlogger.EventType("test-event") // Register a pipeline for our event type err := b.RegisterPipeline(eventlogger.Pipeline{ EventType: et, PipelineID: "gated-filter-pipeline", NodeIDs: nodeIDs, }) if err != nil { // handle error } // define a common event ID for a set of events we want gated together. eventID := "event-1" payloads := []*gated.Payload{ { // our first event ID: eventID, Header: map[string]interface{}{ "tmz": "EST", "user": "alice", }, Detail: map[string]interface{}{ "file_name": "file1.txt", "total_bytes": 1024, }, }, { // our 2nd event ID: eventID, Header: map[string]interface{}{ "roles": []string{"admin", "individual-contributor"}, }, }, // the last event { ID: eventID, Flush: true, Detail: map[string]interface{}{ "file_name": "file2.txt", "total_bytes": 512, }, }, } ctx := context.Background() for _, p := range payloads { // Send our gated event payloads if status, err := b.Send(ctx, et, p); err != nil { // handle err and status.Warnings fmt.Println("err: ", err) fmt.Println("warnings: ", status.Warnings) } } }
Output: {"created_at":"2009-11-17T20:34:58.651387237Z","event_type":"test-event","payload":{"id":"event-1","header":{"roles":["admin","individual-contributor"],"tmz":"EST","user":"alice"},"details":[{"type":"test-event","created_at":"2009-11-17 20:34:58.651387237 +0000 UTC","payload":{"file_name":"file1.txt","total_bytes":1024}},{"type":"test-event","created_at":"2009-11-17 20:34:58.651387237 +0000 UTC","payload":{"file_name":"file2.txt","total_bytes":512}}]}}
func (*Filter) Close ¶ added in v0.2.0
Close implements eventlogger.Closer interface so the gated.Filter will call FlushAll() when asked to close.
func (*Filter) FlushAll ¶
FlushAll will flush all events that have been gated and is useful for circumstances where the system is shuting down and you need to flush everything that's been gated.
If the Broker is nil when Filter.FlushAll() is called then the gated events will just be deleted. If the Broker is not nil when Filter.FlushAll() is called, then all the gated events will be sent using the Broker.
func (*Filter) Now ¶
Now returns the current time. If Filter.NowFunc is unset, then time.Now() is used as a default.
func (*Filter) Process ¶
func (w *Filter) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error)
Process will determine if an Event is Gateable. Events that are not not Gateable are immediately returned. If the Event is Gateable, it's added to a list of Events using it's Gateable.ID() as an index, until an event with a matching Gateable.ID() is processed where Gateable.Flush() returns true. If Gateable.Flush(), then Gateable.ComposedFrom([]*Event) is called with all the gated events for the ID.
func (*Filter) Type ¶
func (w *Filter) Type() eventlogger.NodeType
Type describes the type of the node as a Filter.
type Gateable ¶
type Gateable interface { // GetID returns an ID which allows the gated.Filter to determine that the // payload is part of a group of Gateable payloads. GetID() string // FlushEvent returns true when the Gateable event payload includes a Flush // indicator. FlushEvent() bool // ComposeFrom creates one payload which is a composition of a list events. // When ComposeFrom(...) is called by a gated.Filter the receiver will // always be nil. The payload returned must not have a Gateable payload. ComposeFrom(events []*eventlogger.Event) (t eventlogger.EventType, payload interface{}, err error) }
Gateable defines an interface for Event payloads which are "gateable" by the gated.Filter
type Payload ¶
type Payload struct { // ID must be a unique ID ID string `json:"id"` // Flush value is returned from FlushEvent() Flush bool `json:"-"` // Header is top level header info Header map[string]interface{} `json:"header,omitempty"` // Detail is detail info Detail map[string]interface{} `json:"detail,omitempty"` }
Payload implements the Gateable interface for an Event payload and can be used when sending events with a Broker.
func (*Payload) ComposeFrom ¶
func (s *Payload) ComposeFrom(events []*eventlogger.Event) (eventlogger.EventType, interface{}, error)
ComposedFrom will build a single event payload which will be Flushed/Processed from a collection of gated events. The payload returned is not a Gateable payload intentionally. Note: the Payload receiver is always nil when this function is called.
func (*Payload) FlushEvent ¶
FlushEvent tells the Filter to flush/process the events associated with the Gateable ID
type Sender ¶
type Sender interface {
Send(ctx context.Context, t eventlogger.EventType, payload interface{}) (eventlogger.Status, error)
}
Sender defines an interface for sending events via broker.