event

package
v0.10.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2021 License: GPL-3.0 Imports: 7 Imported by: 1

Documentation

Overview

Package event implements an event bus. for a great introduction to the event bus pattern in go, see: https://levelup.gitconnected.com/lets-write-a-simple-event-bus-in-go-79b9480d8997

Example
ctx, done := context.WithCancel(context.Background())
defer done()

bus := NewBus(ctx)

makeDoneHandler := func(label string) Handler {
	return func(ctx context.Context, e Event) error {
		fmt.Printf("%s handler called\n", label)
		return nil
	}
}

bus.SubscribeTypes(makeDoneHandler("first"), ETMainSaidHello, ETMainOpSucceeded)
bus.SubscribeTypes(makeDoneHandler("second"), ETMainSaidHello)
bus.SubscribeTypes(makeDoneHandler("third"), ETMainSaidHello)

bus.Publish(ctx, ETMainSaidHello, "hello")
bus.Publish(ctx, ETMainOpSucceeded, "operation worked!")
Output:

first handler called
second handler called
third handler called
first handler called

Index

Examples

Constants

View Source
const (
	// ETDatasetNameInit is when a dataset is initialized
	// payload is a DsChange
	ETDatasetNameInit = Type("dataset:Init")
	// ETDatasetCommitChange is when a dataset changes its newest commit
	// payload is a DsChange
	ETDatasetCommitChange = Type("dataset:CommitChange")
	// ETDatasetDeleteAll is when a dataset is entirely deleted
	// payload is a DsChange
	ETDatasetDeleteAll = Type("dataset:DeleteAll")
	// ETDatasetRename is when a dataset is renamed
	// payload is a DsChange
	ETDatasetRename = Type("dataset:Rename")
	// ETDatasetCreateLink is when a dataset is linked to a working directory
	// payload is a DsChange
	ETDatasetCreateLink = Type("dataset:CreateLink")

	// ETDatasetSaveStarted fires when saving a dataset starts
	// subscriptions do not block the publisher
	// payload will be a DsSaveEvent
	ETDatasetSaveStarted = Type("dataset:SaveStarted")
	// ETDatasetSaveProgress indicates a change in progress of dataset version
	// creation.
	// subscriptions do not block the publisher
	// payload will be a DsSaveEvent
	ETDatasetSaveProgress = Type("dataset:SaveProgress")
	// ETDatasetSaveCompleted indicates creating a dataset version finished
	// payload will be a DsSaveEvent
	ETDatasetSaveCompleted = Type("dataset:SaveCompleted")
)
View Source
const (
	// ETCreatedNewFile is the event for creating a new file
	ETCreatedNewFile = Type("watchfs:CreatedNewFile")
	// ETModifiedFile is the event for modifying a file
	ETModifiedFile = Type("watchfs:ModifiedFile")
	// ETDeletedFile is the event for deleting a file
	ETDeletedFile = Type("watchfs:DeletedFile")
	// ETRenamedFolder is the event for renaming a folder
	ETRenamedFolder = Type("watchfs:RenamedFolder")
	// ETRemovedFolder is the event for removing a folder
	ETRemovedFolder = Type("watchfs:RemovedFolder")
)
View Source
const (
	// ETRemoteClientPushVersionProgress indicates a change in progress of a
	// dataset version push. Progress can fire as much as once-per-block.
	// subscriptions do not block the publisher
	// payload will be a RemoteEvent
	ETRemoteClientPushVersionProgress = Type("remoteClient:PushVersionProgress")
	// ETRemoteClientPushVersionCompleted indicates a version successfully pushed
	// to a remote.
	// payload will be a RemoteEvent
	ETRemoteClientPushVersionCompleted = Type("remoteClient:PushVersionCompleted")
	// ETRemoteClientPushDatasetCompleted indicates pushing a dataset
	// (logbook + versions) completed
	// payload will be a RemoteEvent
	ETRemoteClientPushDatasetCompleted = Type("remoteClient:PushDatasetCompleted")
	// ETRemoteClientPullVersionProgress indicates a change in progress of a
	// dataset version pull. Progress can fire as much as once-per-block.
	// subscriptions do not block the publisher
	// payload will be a RemoteEvent
	ETRemoteClientPullVersionProgress = Type("remoteClient:PullVersionProgress")
	// ETRemoteClientPullVersionCompleted indicates a version successfully pulled
	// from a remote.
	// payload will be a RemoteEvent
	ETRemoteClientPullVersionCompleted = Type("remoteClient:PullVersionCompleted")
	// ETRemoteClientPullDatasetCompleted indicates pulling a dataset
	// (logbook + versions) completed
	// payload will be a RemoteEvent
	ETRemoteClientPullDatasetCompleted = Type("remoteClient:PullDatasetCompleted")
	// ETRemoteClientRemoveDatasetCompleted indicates removing a dataset
	// (logbook + versions) remove completed
	// payload will be a RemoteEvent
	ETRemoteClientRemoveDatasetCompleted = Type("remoteClient:RemoveDatasetCompleted")
)
View Source
const (
	// ETTransformStart signals the start a transform execution
	// Payload will be a TransformLifecycle
	ETTransformStart = Type("tf:Start")
	// ETTransformStop signals the completion of a transform execution
	// Payload will be a TransformLifecycle
	ETTransformStop = Type("tf:Stop")

	// ETTransformStepStart signals a step is starting.
	// Payload will be a TransformStepLifecycle
	ETTransformStepStart = Type("tf:StepStart")
	// ETTransformStepStop signals a step has stopped.
	// Payload will be a TransformStepLifecycle
	ETTransformStepStop = Type("tf:StepStop")
	// ETTransformStepSkip signals a step was skipped.
	// Payload will be a TransformStepLifecycle
	ETTransformStepSkip = Type("tf:StepSkip")

	// ETTransformPrint is sent by print commands.
	// Payload will be a Message
	ETTransformPrint = Type("tf:Print")
	// ETTransformError is for when a tranform program execution error occurs.
	// Payload will be a Message
	ETTransformError = Type("tf:Error")
	// ETTransformDatasetPreview is an abbreviated dataset document in a transform
	// Payload will be a *dataset.Dataset Preview
	ETTransformDatasetPreview = Type("tf:DatasetPreview")
)
View Source
const (
	// TransformMsgLvlNone defines an unknown logging level
	TransformMsgLvlNone = TransformMsgLvl("")
	// TransformMsgLvlDebug defines logging level debug
	TransformMsgLvlDebug = TransformMsgLvl("debug")
	// TransformMsgLvlInfo defines logging level info
	TransformMsgLvlInfo = TransformMsgLvl("info")
	// TransformMsgLvlWarn defines logging level warn
	TransformMsgLvlWarn = TransformMsgLvl("warn")
	// TransformMsgLvlError defines logging level error
	TransformMsgLvlError = TransformMsgLvl("error")
)
View Source
const (
	// ETFSICreateLinkEvent type for when FSI creates a link between a dataset
	// and working directory
	ETFSICreateLinkEvent = Type("fsi:CreateLinkEvent")
)

Variables

View Source
var (

	// ErrBusClosed indicates the event bus is no longer coordinating events
	// because it's parent context has closed
	ErrBusClosed = fmt.Errorf("event bus is closed")
	// NowFunc is the function that generates timestamps (tests may override)
	NowFunc = time.Now
)
View Source
var (
	// ETP2PGoneOnline occurs when a p2p node opens up for peer-2-peer connections
	// payload will be []multiaddr.Addr, the listening addresses of this peer
	ETP2PGoneOnline = Type("p2p:GoneOnline")
	// ETP2PGoneOffline occurs when a p2p node has finished disconnecting from
	// a peer-2-peer network
	// payload will be nil
	ETP2PGoneOffline = Type("p2p:GoneOffline")
	// ETP2PQriPeerConnected fires whenever a peer-2-peer connection that
	// supports the qri protocol is established
	// payload is a *profile.Profile
	// subscribers cannot block the publisher
	ETP2PQriPeerConnected = Type("p2p:QriPeerConnected")
	// ETP2PQriPeerDisconnected fires whenever a qri peer-2-peer connection
	// is closed
	// payload is a *profile.Profile
	// a nil payload means we never successfully obtained the peer's profile
	// information
	// subscribers cannot block the publisher
	ETP2PQriPeerDisconnected = Type("p2p:QriPeerDisconnected")
	// ETP2PPeerConnected occurs after any peer has connected to this node
	// payload will be a libp2p.peerInfo
	ETP2PPeerConnected = Type("p2p:PeerConnected")
	// ETP2PPeerDisconnected occurs after any peer has connected to this node
	// payload will be a libp2p.peerInfo
	ETP2PPeerDisconnected = Type("p2p:PeerDisconnected")
	// ETP2PMessageReceived fires whenever the p2p protocol receives a message
	// from a Qri peer
	// payload will be a p2p.Message
	ETP2PMessageReceived = Type("p2p:MessageReceived")
)
View Source
var ETInstanceConstructed = Type("lib:InstanceConstructed")

ETInstanceConstructed is fired once a node is created payload is nil

View Source
var NilBus = nilBus{}

NilBus replaces a nil value. it implements the bus interface, but does nothing

Functions

This section is empty.

Types

type Bus

type Bus interface {
	// Publish an event to the bus
	Publish(ctx context.Context, typ Type, data interface{}) error
	// PublishID publishes an event with an arbitrary session id
	PublishID(ctx context.Context, typ Type, sessionID string, data interface{}) error
	// Subscribe to one or more eventTypes with a handler function that will be called
	// whenever the event type is published
	SubscribeTypes(handler Handler, eventTypes ...Type)
	// SubscribeID subscribes to only events that have a matching session id
	SubscribeID(handler Handler, sessionID string)
	// SubscribeAll subscribes to all events
	SubscribeAll(handler Handler)
	// NumSubscriptions returns the number of subscribers to the bus's events
	NumSubscribers() int
}

Bus is a central coordination point for event publication and subscription. Zero or more subscribers register to be notified of events, optionally by type or id, then a publisher writes an event to the bus, which broadcasts to all matching subscribers

func NewBus

func NewBus(ctx context.Context) Bus

NewBus creates a new event bus. Event busses should be instantiated as a singleton. If the passed in context is cancelled, the bus will stop emitting events and close all subscribed channels

TODO (b5) - finish context-closing cleanup

type DsChange added in v0.9.10

type DsChange struct {
	InitID     string             `json:"initID"`
	TopIndex   int                `json:"topIndex"`
	ProfileID  string             `json:"profileID"`
	Username   string             `json:"username"`
	PrettyName string             `json:"prettyName"`
	HeadRef    string             `json:"headRef"`
	Info       *dsref.VersionInfo `json:"info"`
	Dir        string             `json:"dir"`
}

DsChange represents the result of a change to a dataset

type DsSaveEvent added in v0.10.0

type DsSaveEvent struct {
	Username string `json:"username"`
	Name     string `json:"name"`
	// either message or error will be populated. message should be human-centric
	// description of progress
	Message string `json:"message"`
	// saving error. only populated on failed ETSaveDatasetCompleted event
	Error error `json:"error,omitempty"`
	// completion pct from 0-1
	Completion float64 `json:"complete"`
	// only populated on successful ETDatasetSaveCompleted
	Path string `json:"path,omitempty"`
}

DsSaveEvent represents a change in version creation progress

type Event

type Event struct {
	Type      Type
	Timestamp int64
	SessionID string
	Payload   interface{}
}

Event represents an event that subscribers will receive from the bus

type FSICreateLinkEvent

type FSICreateLinkEvent struct {
	FSIPath  string `json:"fsiPath"`
	Username string `json:"username"`
	Dsname   string `json:"dsName"`
}

FSICreateLinkEvent describes an FSI created link

type Handler added in v0.9.10

type Handler func(ctx context.Context, e Event) error

Handler is a function that will be called by the event bus whenever a matching event is published. Handler calls are blocking, called in order of subscription. Any error returned by a handler is passed back to the event publisher. The handler context originates from the publisher, and in practice will often be scoped to a "request context" like an HTTP request or CLI command invocation. Generally, even handlers should aim to return quickly, and only delegate to goroutines when the publishing event is firing on a long-running process

type Publisher added in v0.9.6

type Publisher interface {
	Publish(ctx context.Context, typ Type, payload interface{}) error
	PublishID(ctx context.Context, typ Type, sessionID string, payload interface{}) error
}

Publisher is an interface that can only publish an event

type RemoteEvent added in v0.9.10

type RemoteEvent struct {
	Ref        dsref.Ref      `json:"ref"`
	RemoteAddr string         `json:"remoteAddr"`
	Progress   dag.Completion `json:"progress"`
	Error      error          `json:"error,omitempty"`
}

RemoteEvent encapsulates the push / pull progress of a dataset version

type TransformLifecycle added in v0.10.0

type TransformLifecycle struct {
	StepCount int    `json:"stepCount"`
	Status    string `json:"status,omitempty"`
}

TransformLifecycle captures state about the execution of an entire transform script it's the payload of ETTransformStart/Stop

type TransformMessage added in v0.10.0

type TransformMessage struct {
	Lvl TransformMsgLvl `json:"lvl"`
	Msg string          `json:"msg"`
}

TransformMessage is the payload for print and error events

type TransformMsgLvl added in v0.10.0

type TransformMsgLvl string

TransformMsgLvl is an enumeration of all possible degrees of message logging in an implicit hierarchy (levels)

type TransformStepLifecycle added in v0.10.0

type TransformStepLifecycle struct {
	Name     string `json:"name"`
	Category string `json:"category"`
	Status   string `json:"status,omitempty"`
}

TransformStepLifecycle describes the state of transform step execution at a moment in time payload for ETTransformStepStart/Stop

type Type added in v0.9.10

type Type string

Type is the set of all kinds of events emitted by the bus. Use "Type" to distinguish between different events. Event emitters should declare Types as constants and document the expected payload type. This term, although similar to the keyword in go, is used to match what react/redux use in their event system.

type WatchfsChange added in v0.9.10

type WatchfsChange struct {
	Username    string    `json:"username"`
	Dsname      string    `json:"dsName"`
	Source      string    `json:"source"`
	Destination string    `json:"destination"`
	Time        time.Time `json:"time"`
}

WatchfsChange represents events for filesystem changes

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL