Documentation ¶
Overview ¶
Package event implements an event bus. for a great introduction to the event bus pattern in go, see: https://levelup.gitconnected.com/lets-write-a-simple-event-bus-in-go-79b9480d8997
Example ¶
ctx, done := context.WithCancel(context.Background()) defer done() bus := NewBus(ctx) makeDoneHandler := func(label string) Handler { return func(ctx context.Context, e Event) error { fmt.Printf("%s handler called\n", label) return nil } } bus.SubscribeTypes(makeDoneHandler("first"), ETMainSaidHello, ETMainOpSucceeded) bus.SubscribeTypes(makeDoneHandler("second"), ETMainSaidHello) bus.SubscribeTypes(makeDoneHandler("third"), ETMainSaidHello) bus.Publish(ctx, ETMainSaidHello, "hello") bus.Publish(ctx, ETMainOpSucceeded, "operation worked!")
Output: first handler called second handler called third handler called first handler called
Index ¶
Examples ¶
Constants ¶
const ( // ETDatasetNameInit is when a dataset is initialized // payload is a DsChange ETDatasetNameInit = Type("dataset:Init") // ETDatasetCommitChange is when a dataset changes its newest commit // payload is a DsChange ETDatasetCommitChange = Type("dataset:CommitChange") // ETDatasetDeleteAll is when a dataset is entirely deleted // payload is a DsChange ETDatasetDeleteAll = Type("dataset:DeleteAll") // ETDatasetRename is when a dataset is renamed // payload is a DsChange ETDatasetRename = Type("dataset:Rename") // ETDatasetCreateLink is when a dataset is linked to a working directory // payload is a DsChange ETDatasetCreateLink = Type("dataset:CreateLink") // ETDatasetSaveStarted fires when saving a dataset starts // subscriptions do not block the publisher // payload will be a DsSaveEvent ETDatasetSaveStarted = Type("dataset:SaveStarted") // ETDatasetSaveProgress indicates a change in progress of dataset version // creation. // subscriptions do not block the publisher // payload will be a DsSaveEvent ETDatasetSaveProgress = Type("dataset:SaveProgress") // ETDatasetSaveCompleted indicates creating a dataset version finished // payload will be a DsSaveEvent ETDatasetSaveCompleted = Type("dataset:SaveCompleted") )
const ( // ETCreatedNewFile is the event for creating a new file ETCreatedNewFile = Type("watchfs:CreatedNewFile") // ETModifiedFile is the event for modifying a file ETModifiedFile = Type("watchfs:ModifiedFile") // ETDeletedFile is the event for deleting a file ETDeletedFile = Type("watchfs:DeletedFile") // ETRenamedFolder is the event for renaming a folder ETRenamedFolder = Type("watchfs:RenamedFolder") // ETRemovedFolder is the event for removing a folder ETRemovedFolder = Type("watchfs:RemovedFolder") )
const ( // ETRemoteClientPushVersionProgress indicates a change in progress of a // dataset version push. Progress can fire as much as once-per-block. // subscriptions do not block the publisher // payload will be a RemoteEvent ETRemoteClientPushVersionProgress = Type("remoteClient:PushVersionProgress") // ETRemoteClientPushVersionCompleted indicates a version successfully pushed // to a remote. // payload will be a RemoteEvent ETRemoteClientPushVersionCompleted = Type("remoteClient:PushVersionCompleted") // ETRemoteClientPushDatasetCompleted indicates pushing a dataset // (logbook + versions) completed // payload will be a RemoteEvent ETRemoteClientPushDatasetCompleted = Type("remoteClient:PushDatasetCompleted") // ETRemoteClientPullVersionProgress indicates a change in progress of a // dataset version pull. Progress can fire as much as once-per-block. // subscriptions do not block the publisher // payload will be a RemoteEvent ETRemoteClientPullVersionProgress = Type("remoteClient:PullVersionProgress") // ETRemoteClientPullVersionCompleted indicates a version successfully pulled // from a remote. // payload will be a RemoteEvent ETRemoteClientPullVersionCompleted = Type("remoteClient:PullVersionCompleted") // ETRemoteClientPullDatasetCompleted indicates pulling a dataset // (logbook + versions) completed // payload will be a RemoteEvent ETRemoteClientPullDatasetCompleted = Type("remoteClient:PullDatasetCompleted") // ETRemoteClientRemoveDatasetCompleted indicates removing a dataset // (logbook + versions) remove completed // payload will be a RemoteEvent ETRemoteClientRemoveDatasetCompleted = Type("remoteClient:RemoveDatasetCompleted") )
const ( // ETTransformStart signals the start a transform execution // Payload will be a TransformLifecycle ETTransformStart = Type("tf:Start") // ETTransformStop signals the completion of a transform execution // Payload will be a TransformLifecycle ETTransformStop = Type("tf:Stop") // ETTransformStepStart signals a step is starting. // Payload will be a TransformStepLifecycle ETTransformStepStart = Type("tf:StepStart") // ETTransformStepStop signals a step has stopped. // Payload will be a TransformStepLifecycle ETTransformStepStop = Type("tf:StepStop") // ETTransformStepSkip signals a step was skipped. // Payload will be a TransformStepLifecycle ETTransformStepSkip = Type("tf:StepSkip") // ETTransformPrint is sent by print commands. // Payload will be a Message ETTransformPrint = Type("tf:Print") // ETTransformError is for when a tranform program execution error occurs. // Payload will be a Message ETTransformError = Type("tf:Error") // ETTransformDatasetPreview is an abbreviated dataset document in a transform // Payload will be a *dataset.Dataset Preview ETTransformDatasetPreview = Type("tf:DatasetPreview") )
const ( // TransformMsgLvlNone defines an unknown logging level TransformMsgLvlNone = TransformMsgLvl("") // TransformMsgLvlDebug defines logging level debug TransformMsgLvlDebug = TransformMsgLvl("debug") // TransformMsgLvlInfo defines logging level info TransformMsgLvlInfo = TransformMsgLvl("info") // TransformMsgLvlWarn defines logging level warn TransformMsgLvlWarn = TransformMsgLvl("warn") // TransformMsgLvlError defines logging level error TransformMsgLvlError = TransformMsgLvl("error") )
const ( // ETFSICreateLinkEvent type for when FSI creates a link between a dataset // and working directory ETFSICreateLinkEvent = Type("fsi:CreateLinkEvent") )
Variables ¶
var ( // ErrBusClosed indicates the event bus is no longer coordinating events // because it's parent context has closed ErrBusClosed = fmt.Errorf("event bus is closed") // NowFunc is the function that generates timestamps (tests may override) NowFunc = time.Now )
var ( // ETP2PGoneOnline occurs when a p2p node opens up for peer-2-peer connections // payload will be []multiaddr.Addr, the listening addresses of this peer ETP2PGoneOnline = Type("p2p:GoneOnline") // ETP2PGoneOffline occurs when a p2p node has finished disconnecting from // a peer-2-peer network // payload will be nil ETP2PGoneOffline = Type("p2p:GoneOffline") // ETP2PQriPeerConnected fires whenever a peer-2-peer connection that // supports the qri protocol is established // payload is a *profile.Profile // subscribers cannot block the publisher ETP2PQriPeerConnected = Type("p2p:QriPeerConnected") // ETP2PQriPeerDisconnected fires whenever a qri peer-2-peer connection // is closed // payload is a *profile.Profile // a nil payload means we never successfully obtained the peer's profile // information // subscribers cannot block the publisher ETP2PQriPeerDisconnected = Type("p2p:QriPeerDisconnected") // ETP2PPeerConnected occurs after any peer has connected to this node // payload will be a libp2p.peerInfo ETP2PPeerConnected = Type("p2p:PeerConnected") // ETP2PPeerDisconnected occurs after any peer has connected to this node // payload will be a libp2p.peerInfo ETP2PPeerDisconnected = Type("p2p:PeerDisconnected") // ETP2PMessageReceived fires whenever the p2p protocol receives a message // from a Qri peer // payload will be a p2p.Message ETP2PMessageReceived = Type("p2p:MessageReceived") )
var ETInstanceConstructed = Type("lib:InstanceConstructed")
ETInstanceConstructed is fired once a node is created payload is nil
var NilBus = nilBus{}
NilBus replaces a nil value. it implements the bus interface, but does nothing
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus interface { // Publish an event to the bus Publish(ctx context.Context, typ Type, data interface{}) error // PublishID publishes an event with an arbitrary session id PublishID(ctx context.Context, typ Type, sessionID string, data interface{}) error // Subscribe to one or more eventTypes with a handler function that will be called // whenever the event type is published SubscribeTypes(handler Handler, eventTypes ...Type) // SubscribeID subscribes to only events that have a matching session id SubscribeID(handler Handler, sessionID string) // SubscribeAll subscribes to all events SubscribeAll(handler Handler) // NumSubscriptions returns the number of subscribers to the bus's events NumSubscribers() int }
Bus is a central coordination point for event publication and subscription. Zero or more subscribers register to be notified of events, optionally by type or id, then a publisher writes an event to the bus, which broadcasts to all matching subscribers
type DsChange ¶ added in v0.9.10
type DsChange struct { InitID string `json:"initID"` TopIndex int `json:"topIndex"` ProfileID string `json:"profileID"` Username string `json:"username"` PrettyName string `json:"prettyName"` HeadRef string `json:"headRef"` Info *dsref.VersionInfo `json:"info"` Dir string `json:"dir"` }
DsChange represents the result of a change to a dataset
type DsSaveEvent ¶ added in v0.10.0
type DsSaveEvent struct { Username string `json:"username"` Name string `json:"name"` // either message or error will be populated. message should be human-centric // description of progress Message string `json:"message"` // saving error. only populated on failed ETSaveDatasetCompleted event Error error `json:"error,omitempty"` // completion pct from 0-1 Completion float64 `json:"complete"` // only populated on successful ETDatasetSaveCompleted Path string `json:"path,omitempty"` }
DsSaveEvent represents a change in version creation progress
type FSICreateLinkEvent ¶
type FSICreateLinkEvent struct { FSIPath string `json:"fsiPath"` Username string `json:"username"` Dsname string `json:"dsName"` }
FSICreateLinkEvent describes an FSI created link
type Handler ¶ added in v0.9.10
Handler is a function that will be called by the event bus whenever a matching event is published. Handler calls are blocking, called in order of subscription. Any error returned by a handler is passed back to the event publisher. The handler context originates from the publisher, and in practice will often be scoped to a "request context" like an HTTP request or CLI command invocation. Generally, even handlers should aim to return quickly, and only delegate to goroutines when the publishing event is firing on a long-running process
type Publisher ¶ added in v0.9.6
type Publisher interface { Publish(ctx context.Context, typ Type, payload interface{}) error PublishID(ctx context.Context, typ Type, sessionID string, payload interface{}) error }
Publisher is an interface that can only publish an event
type RemoteEvent ¶ added in v0.9.10
type RemoteEvent struct { Ref dsref.Ref `json:"ref"` RemoteAddr string `json:"remoteAddr"` Progress dag.Completion `json:"progress"` Error error `json:"error,omitempty"` }
RemoteEvent encapsulates the push / pull progress of a dataset version
type TransformLifecycle ¶ added in v0.10.0
type TransformLifecycle struct { StepCount int `json:"stepCount"` Status string `json:"status,omitempty"` }
TransformLifecycle captures state about the execution of an entire transform script it's the payload of ETTransformStart/Stop
type TransformMessage ¶ added in v0.10.0
type TransformMessage struct { Lvl TransformMsgLvl `json:"lvl"` Msg string `json:"msg"` }
TransformMessage is the payload for print and error events
type TransformMsgLvl ¶ added in v0.10.0
type TransformMsgLvl string
TransformMsgLvl is an enumeration of all possible degrees of message logging in an implicit hierarchy (levels)
type TransformStepLifecycle ¶ added in v0.10.0
type TransformStepLifecycle struct { Name string `json:"name"` Category string `json:"category"` Status string `json:"status,omitempty"` }
TransformStepLifecycle describes the state of transform step execution at a moment in time payload for ETTransformStepStart/Stop
type Type ¶ added in v0.9.10
type Type string
Type is the set of all kinds of events emitted by the bus. Use "Type" to distinguish between different events. Event emitters should declare Types as constants and document the expected payload type. This term, although similar to the keyword in go, is used to match what react/redux use in their event system.