cdc

package
v0.9.0-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2018 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package cdc provides interfaces for reading and writing change data capture (CDC) events.

Index

Constants

View Source
const (
	// @todo: make this dynamically generated
	DEFAULT_CHUNK_WINDOW = 3600000 // milliseconds
)
View Source
const (
	// Interval at which feed components emit log lines.
	DEFAULT_LOG_INTERVAL = 30 // seconds
)

Variables

View Source
var (
	ErrWebsocketConnect   = errors.New("cannot connect to websocket")
	ErrFeedAlreadyRunning = errors.New("feed already running")
	ErrChannelClosed      = errors.New("feed channel is closed")
	ErrStreamerStopped    = errors.New("feed event streamer was stopped")
)
View Source
var (
	ErrPollerNotRunning     = errors.New("poller not running")
	ErrPollerAlreadyRunning = errors.New("poller already running")
)
View Source
var CurrentTimestamp func() int64 = func() int64 { return time.Now().UnixNano() / int64(time.Millisecond) }

This allows tests to set the time.

View Source
var (
	ErrStreamerLag = errors.New("streamer fell too far behind the CDC event poller")
)

Functions

This section is empty.

Types

type Delay

type Delay struct {
	Hostname string `json:"hostname"`
	Ts       int64  `json:"ts"`
}

Delay represents the maximum upper-bound timestamp for a given Etre instance.

type Delayer

type Delayer interface {
	// Returns the maximum upper-bound timetstamp that is safe to use when
	// querying for CDC events.
	MaxTimestamp() (int64, error)

	// BeginChange marks an entity change as having started. The only
	// argument it takes is a string (id) that must uniquely identify
	// the change.
	BeginChange(changeId string) error

	// EndChange takes the id of an entity change and marks is as having
	// ended.
	EndChange(changeId string) error
}

A Delayer keeps track of the maximum upper-bound timestamp that is safe to use when querying for CDC events. As long as the maximum upper-bound timestamp used, queries are guaranteed to return consistent data. This guarantee doesn't apply if the Delayer timestamp is not used.

The Delyer is a singleton. Its need arises from the fact that there is no guarantee that CDC events are written in the order that they happen in (entities and CDC events are not written in a single transaction). This can occur if event1 happens before event2, but event2 is written to the datastore before event1. Until event1 is written, the Delayer returns a timestamp that is LESS than the starting time for both event1 and event2. When both events have been written, the Delayer returns a timestamp GREATER than or equal to the completion timestamp of both events. This ensures that any queries to the Store that use this timestamp as the maximum upper-bound timestamp never get event2 without getting event1 first.

func NewDynamicDelayer

func NewDynamicDelayer(conn db.Connector, database, collection string) (Delayer, error)

NewDynamicDelayer returns a Delayer that dynamically updates the maximum upper-bound timestamp. It keeps track of all active entity changes (inserts, updates, deletes) on this Etre instance and continually writes the start time of the oldest active change to a persistent data store. Each Etre instance maintains its own "oldest active change" record in the data store. The max upper-bound timestamp that is safe to use is calculated by querying the smallest "oldest active change" value from the data store.

func NewStaticDelayer

func NewStaticDelayer(delay int) (Delayer, error)

NewStaticDelayer returns a Delayer that uses a static value for determining determining the maximum upper-bound timestamp. It always returns time.Now() - delay as the maximum upper-bound timestamp.

type ErrWriteEvent

type ErrWriteEvent struct {
	// contains filtered or unexported fields
}

ErrWriteEvent represents an error in writing a CDC event to a persistent data store. It satisfies the Error interface.

func (ErrWriteEvent) Error

func (e ErrWriteEvent) Error() string

type FeedFactory

type FeedFactory interface {
	// MakeWebsocket makes a websocket feed.
	MakeWebsocket(*websocket.Conn) *WebsocketFeed
	// MakeInternal makes an internal feed.
	MakeInternal(bufferSize int) *InternalFeed
}

A FeedFactory creates feeds. A feed produces a change feed of Change Data Capture (CDC) events for all entity types. Feeds are used by clients to stream events as they happen within Etre. Each client has its own feed.

func NewFeedFactory

func NewFeedFactory(poller Poller, cdcs Store) FeedFactory

type Filter

type Filter struct {
	SinceTs int64 // Only read events that have a timestamp greater than or equal to this value.
	UntilTs int64 // Only read events that have a timestamp less than this value.
	Limit   int
}

Filter contains fields that are used to filter events that the CDC reads. Unset fields are ignored.

var NoFilter Filter

NoFilter is a convenience var for calls like Read(cdc.NoFilter). Other packages must not modify this var.

type InternalFeed

type InternalFeed struct {
	*sync.Mutex // guard function calls
	// contains filtered or unexported fields
}

InternalFeed represents a feed that works on a channel. It is used by internal components in Etre that need access to a feed of CDC events (currently it is only used in tests). Call Start on a feed to start it and to get access to the channel that the feed sends events on.

func NewInternalFeed

func NewInternalFeed(bufferSize int, poller Poller, cdcs Store) *InternalFeed

NewInternalFeed creates a feed that works on a channel.

bufferSize causes Start to create and return a buffered feed channel. A value of 10 is reasonable. If the channel blocks, it is closed and Error returns ErrCallerBlocked.

func (*InternalFeed) Error

func (f *InternalFeed) Error() error

Error returns the error that caused the feed channel to be closed.

func (*InternalFeed) Start

func (f *InternalFeed) Start(startTs int64) <-chan etre.CDCEvent

Start starts a feed from a given timestamp. It returns a feed channel on which the caller can receive CDC events for as long as the feed is running. Calling Start again returns the same feed channel if already started. On error or when Stop is called, the feed channel is closed and Error returns the error. A feed cannot be restarted once it has stopped.

func (*InternalFeed) Stop

func (f *InternalFeed) Stop()

Stop stops the feed and closes the feed channel returned by Start. It is safe to call multiple times.

type Poller

type Poller interface {
	// Run runs the Poller. It only stops running if it encounters an error
	// (which can be inspected with Error), else it runs forever. Clients
	// can register with the Poller once it is running.
	Run() error

	// Register registers a client with the Poller. It takes a brief lock
	// on the Poller to ensure that we get a consistent snapshot of when
	// the client is registered. It returns a buffered channel that the
	// client can select on to get all events that the Poller polls after
	// the client is registered. It also returns a timestamp which
	// represents the last upper-bound timestamp that it polled before the
	// client was registered. The client can assume that all events after
	// this timestamp will come through the aforementioned channel. If the
	// buffer on the channel fills up (if the client isn't consuming from
	// it fast enough), the Poller automatically deregisters the client.
	//
	// The only argument that register takes is a string (id) that must
	// uniquely identify a client.
	Register(id string) (<-chan etre.CDCEvent, int64, error)

	// Deregister takes the id of a registered client and closes the event
	// channel for it (i.e., stops sending it newly polled events).
	Deregister(id string)

	// Error returns the error that caused the Poller to stop running. This
	// allows clients to see why the Poller stopped. Start resets the error.
	Error() error
}

A Poller polls the Store for new events and broadcasts them to all registered clients. The Poller is a singleton. Having each client poll the Store for new events does not scale, and it causes duplicate reads of the same events. Instead, the Poller reads once from the Store and “writes many” to each client.

func NewPoller

func NewPoller(cdcs Store, delayer Delayer, clientBufferSize int, pollInterval *time.Ticker) Poller

NewPoller creates a poller. The provided clientBufferSize is the buffer size used when creating event channels for registered clients (read the documentation on Poller.Register for more details on what this means).

type RetryPolicy

type RetryPolicy struct {
	RetryCount int
	RetryWait  int // milliseconds
}

RetryPolicy represents the retry policy that the Store uses when reading and writing events.

var NoRetryPolicy RetryPolicy

NoRetryPolicy is a convenience var for calls like NewStore(cdc.NoRetryPolicy). Other packages must not modify this var.

type Store

type Store interface {
	// Write writes the CDC event to a persisitent data store. If writing
	// fails, it retries according to the RetryPolicy. If retrying fails,
	// the event is written to the fallbackFile. A ErrWriteEvent is
	// returned if writing to the persistent data store fails, even if
	// writing to fallbackFile succeeds.
	Write(etre.CDCEvent) error

	// Read queries a persistent data store for events that satisfy the
	// given filter.
	Read(Filter) ([]etre.CDCEvent, error)
}

A Store reads and writes CDC events to/from a persistent data store.

func NewStore

func NewStore(conn db.Connector, database, collection, fallbackFile string, writeRetryPolicy RetryPolicy) Store

type WebsocketFeed

type WebsocketFeed struct {
	*sync.Mutex // guard function calls
	// contains filtered or unexported fields
}

WebsocketFeed represents a feed that works over a websocket. Call Run on a feed to make it listen to control messages on the websocket. When the feed receives a "start" control message, it begins sending events over the websocket. If the feed encounters an error it attempts to send an "error" control message over the websocket before closing.

func NewWebsocketFeed

func NewWebsocketFeed(wsConn *websocket.Conn, poller Poller, cdcs Store) *WebsocketFeed

NewWebsocketSFeed creates a feed that works on a given websocket.

func (*WebsocketFeed) Run

func (f *WebsocketFeed) Run() error

Run makes a feed listen to messages on the websocket. The feed won't do anything until it receives a control message on the websocket. Run is a blocking operation that only returns if it encounters an error. Calling Run on a running feed retruns ErrFeedAlreadyRunning. Calling Run on a feed that has stopped returns ErrWebsocketConnect.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL