Documentation
¶
Overview ¶
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
Index ¶
Constants ¶
const ( // @todo: make this dynamically generated DEFAULT_CHUNK_WINDOW = 3600000 // milliseconds )
const ( // Interval at which feed components emit log lines. DEFAULT_LOG_INTERVAL = 30 // seconds )
Variables ¶
var ( ErrWebsocketConnect = errors.New("cannot connect to websocket") ErrFeedAlreadyRunning = errors.New("feed already running") ErrChannelClosed = errors.New("feed channel is closed") ErrStreamerStopped = errors.New("feed event streamer was stopped") )
var ( ErrPollerNotRunning = errors.New("poller not running") ErrPollerAlreadyRunning = errors.New("poller already running") )
var CurrentTimestamp func() int64 = func() int64 { return time.Now().UnixNano() / int64(time.Millisecond) }
This allows tests to set the time.
var (
ErrStreamerLag = errors.New("streamer fell too far behind the CDC event poller")
)
Functions ¶
This section is empty.
Types ¶
type Delayer ¶
type Delayer interface { // Returns the maximum upper-bound timetstamp that is safe to use when // querying for CDC events. MaxTimestamp() (int64, error) // BeginChange marks an entity change as having started. The only // argument it takes is a string (id) that must uniquely identify // the change. BeginChange(changeId string) error // EndChange takes the id of an entity change and marks is as having // ended. EndChange(changeId string) error }
A Delayer keeps track of the maximum upper-bound timestamp that is safe to use when querying for CDC events. As long as the maximum upper-bound timestamp used, queries are guaranteed to return consistent data. This guarantee doesn't apply if the Delayer timestamp is not used.
The Delyer is a singleton. Its need arises from the fact that there is no guarantee that CDC events are written in the order that they happen in (entities and CDC events are not written in a single transaction). This can occur if event1 happens before event2, but event2 is written to the datastore before event1. Until event1 is written, the Delayer returns a timestamp that is LESS than the starting time for both event1 and event2. When both events have been written, the Delayer returns a timestamp GREATER than or equal to the completion timestamp of both events. This ensures that any queries to the Store that use this timestamp as the maximum upper-bound timestamp never get event2 without getting event1 first.
func NewDynamicDelayer ¶
NewDynamicDelayer returns a Delayer that dynamically updates the maximum upper-bound timestamp. It keeps track of all active entity changes (inserts, updates, deletes) on this Etre instance and continually writes the start time of the oldest active change to a persistent data store. Each Etre instance maintains its own "oldest active change" record in the data store. The max upper-bound timestamp that is safe to use is calculated by querying the smallest "oldest active change" value from the data store.
func NewStaticDelayer ¶
NewStaticDelayer returns a Delayer that uses a static value for determining determining the maximum upper-bound timestamp. It always returns time.Now() - delay as the maximum upper-bound timestamp.
type ErrWriteEvent ¶
type ErrWriteEvent struct {
// contains filtered or unexported fields
}
ErrWriteEvent represents an error in writing a CDC event to a persistent data store. It satisfies the Error interface.
func (ErrWriteEvent) Error ¶
func (e ErrWriteEvent) Error() string
type FeedFactory ¶
type FeedFactory interface { // MakeWebsocket makes a websocket feed. MakeWebsocket(*websocket.Conn) *WebsocketFeed // MakeInternal makes an internal feed. MakeInternal(bufferSize int) *InternalFeed }
A FeedFactory creates feeds. A feed produces a change feed of Change Data Capture (CDC) events for all entity types. Feeds are used by clients to stream events as they happen within Etre. Each client has its own feed.
func NewFeedFactory ¶
func NewFeedFactory(poller Poller, cdcs Store) FeedFactory
type Filter ¶
type Filter struct { SinceTs int64 // Only read events that have a timestamp greater than or equal to this value. UntilTs int64 // Only read events that have a timestamp less than this value. Limit int }
Filter contains fields that are used to filter events that the CDC reads. Unset fields are ignored.
var NoFilter Filter
NoFilter is a convenience var for calls like Read(cdc.NoFilter). Other packages must not modify this var.
type InternalFeed ¶
type InternalFeed struct { *sync.Mutex // guard function calls // contains filtered or unexported fields }
InternalFeed represents a feed that works on a channel. It is used by internal components in Etre that need access to a feed of CDC events (currently it is only used in tests). Call Start on a feed to start it and to get access to the channel that the feed sends events on.
func NewInternalFeed ¶
func NewInternalFeed(bufferSize int, poller Poller, cdcs Store) *InternalFeed
NewInternalFeed creates a feed that works on a channel.
bufferSize causes Start to create and return a buffered feed channel. A value of 10 is reasonable. If the channel blocks, it is closed and Error returns ErrCallerBlocked.
func (*InternalFeed) Error ¶
func (f *InternalFeed) Error() error
Error returns the error that caused the feed channel to be closed.
func (*InternalFeed) Start ¶
func (f *InternalFeed) Start(startTs int64) <-chan etre.CDCEvent
Start starts a feed from a given timestamp. It returns a feed channel on which the caller can receive CDC events for as long as the feed is running. Calling Start again returns the same feed channel if already started. On error or when Stop is called, the feed channel is closed and Error returns the error. A feed cannot be restarted once it has stopped.
func (*InternalFeed) Stop ¶
func (f *InternalFeed) Stop()
Stop stops the feed and closes the feed channel returned by Start. It is safe to call multiple times.
type Poller ¶
type Poller interface { // Run runs the Poller. It only stops running if it encounters an error // (which can be inspected with Error), else it runs forever. Clients // can register with the Poller once it is running. Run() error // Register registers a client with the Poller. It takes a brief lock // on the Poller to ensure that we get a consistent snapshot of when // the client is registered. It returns a buffered channel that the // client can select on to get all events that the Poller polls after // the client is registered. It also returns a timestamp which // represents the last upper-bound timestamp that it polled before the // client was registered. The client can assume that all events after // this timestamp will come through the aforementioned channel. If the // buffer on the channel fills up (if the client isn't consuming from // it fast enough), the Poller automatically deregisters the client. // // The only argument that register takes is a string (id) that must // uniquely identify a client. Register(id string) (<-chan etre.CDCEvent, int64, error) // Deregister takes the id of a registered client and closes the event // channel for it (i.e., stops sending it newly polled events). Deregister(id string) // Error returns the error that caused the Poller to stop running. This // allows clients to see why the Poller stopped. Start resets the error. Error() error }
A Poller polls the Store for new events and broadcasts them to all registered clients. The Poller is a singleton. Having each client poll the Store for new events does not scale, and it causes duplicate reads of the same events. Instead, the Poller reads once from the Store and “writes many” to each client.
type RetryPolicy ¶
RetryPolicy represents the retry policy that the Store uses when reading and writing events.
var NoRetryPolicy RetryPolicy
NoRetryPolicy is a convenience var for calls like NewStore(cdc.NoRetryPolicy). Other packages must not modify this var.
type Store ¶
type Store interface { // Write writes the CDC event to a persisitent data store. If writing // fails, it retries according to the RetryPolicy. If retrying fails, // the event is written to the fallbackFile. A ErrWriteEvent is // returned if writing to the persistent data store fails, even if // writing to fallbackFile succeeds. Write(etre.CDCEvent) error // Read queries a persistent data store for events that satisfy the // given filter. Read(Filter) ([]etre.CDCEvent, error) }
A Store reads and writes CDC events to/from a persistent data store.
type WebsocketFeed ¶
type WebsocketFeed struct { *sync.Mutex // guard function calls // contains filtered or unexported fields }
WebsocketFeed represents a feed that works over a websocket. Call Run on a feed to make it listen to control messages on the websocket. When the feed receives a "start" control message, it begins sending events over the websocket. If the feed encounters an error it attempts to send an "error" control message over the websocket before closing.
func NewWebsocketFeed ¶
func NewWebsocketFeed(wsConn *websocket.Conn, poller Poller, cdcs Store) *WebsocketFeed
NewWebsocketSFeed creates a feed that works on a given websocket.
func (*WebsocketFeed) Run ¶
func (f *WebsocketFeed) Run() error
Run makes a feed listen to messages on the websocket. The feed won't do anything until it receives a control message on the websocket. Run is a blocking operation that only returns if it encounters an error. Calling Run on a running feed retruns ErrFeedAlreadyRunning. Calling Run on a feed that has stopped returns ErrWebsocketConnect.