Documentation
¶
Overview ¶
Package etre provides API clients and low-level primitive data types.
Index ¶
- Constants
- Variables
- func IsMetalabel(label string) bool
- type ByRev
- type CDCClient
- type CDCEvent
- type Entity
- type EntityClient
- type EntityClients
- type Error
- type Latency
- type MockCDCClient
- type MockEntityClient
- func (c MockEntityClient) Delete(query string) (WriteResult, error)
- func (c MockEntityClient) DeleteLabel(id string, label string) (WriteResult, error)
- func (c MockEntityClient) DeleteOne(id string) (WriteResult, error)
- func (c MockEntityClient) EntityType() string
- func (c MockEntityClient) Insert(entities []Entity) (WriteResult, error)
- func (c MockEntityClient) Labels(id string) ([]string, error)
- func (c MockEntityClient) Query(query string, filter QueryFilter) ([]Entity, error)
- func (c MockEntityClient) Update(query string, patch Entity) (WriteResult, error)
- func (c MockEntityClient) UpdateOne(id string, patch Entity) (WriteResult, error)
- func (c MockEntityClient) WithSet(set Set) EntityClient
- type QueryFilter
- type RevOrder
- type Set
- type Write
- type WriteResult
Constants ¶
const ( API_ROOT string = "/api/v1" META_LABEL_ID = "_id" META_LABEL_TYPE = "_type" VERSION = "0.9.0-alpha" CDC_WRITE_TIMEOUT int = 5 // seconds )
const DEFAULT_MAX_ENTITIES = 1000
Variables ¶
var ( ErrTypeMismatch = errors.New("entity _type and Client entity type are different") ErrIdSet = errors.New("entity _id is set but not allowed on insert") ErrIdNotSet = errors.New("entity _id is not set") ErrNoEntity = errors.New("empty entity or id slice; at least one required") ErrNoLabel = errors.New("empty label slice; at least one required") ErrNoQuery = errors.New("empty query string") ErrBadData = errors.New("data from CDC feed is not event or control") ErrCallerBlocked = errors.New("caller blocked") ErrEntityNotFound = errors.New("entity not found") )
var Debug = false
Functions ¶
func IsMetalabel ¶
Types ¶
type CDCClient ¶
type CDCClient interface { // Start starts the CDC feed from the given time. On success, a feed channel // is returned on which the caller can receive CDC events for as long as // the feed remains connected. Calling Start again returns the same feed // channel if already started. To restart the feed, call Stop then Start // again. On error or abnormal shutdown, the feed channel is closed, and // Error returns the error. Start(time.Time) (<-chan CDCEvent, error) // Stop stops the feed and closes the feed channel returned by Start. It is // safe to call multiple times. Stop() // Ping pings the API and reports latency. Latency values are all zero on // timeout or error. On error, the feed is most likely closed. Ping(timeout time.Duration) Latency // Error returns the error that caused the feed channel to be closed. Start // resets the error. Error() error }
A CDCClient consumes a change feed of Change Data Capture (CDC) events for all entity types. It handles all control messages. The caller should call Stop when done to shutdown the feed. On error or abnormal shutdown, Error returns the last error.
func NewCDCClient ¶
NewCDCClient creates a CDC feed consumer on the given websocket address. addr must be ws://host:port or wss://host:port.
bufferSize causes Start to create and return a buffered feed channel. A value of 10 is reasonable. If the channel blocks, it is closed and Error returns ErrCallerBlocked.
Enable debug prints a lot of low-level feed/websocket logging to STDERR.
The client does not automatically ping the server. The caller should run a separate goroutine to periodically call Ping. Every 10-60s is reasonable.
type CDCEvent ¶
type CDCEvent struct { EventId string `json:"eventId" bson:"eventId"` EntityId string `json:"entityId" bson:"entityId"` // _id of entity EntityType string `json:"entityType" bson:"entityType"` // user-defined Rev uint `json:"rev" bson:"rev"` // entity revision as of this op, 0 on insert Ts int64 `json:"ts" bson:"ts"` // Unix nanoseconds User string `json:"user" bson:"user"` Op string `json:"op" bson:"op"` // i=insert, u=update, d=delete Old *Entity `json:"old,omitempty" bson:"old,omitempty"` // old values of affected labels, null on insert New *Entity `json:"new,omitempty" bson:"new,omitempty"` // new values of affected labels, null on delete // Set op fields are optional, copied from entity if set. The three // fields are all or nothing: all should be set, or none should be set. // Etre has no semantic awareness of set op values, nor does it validate // them. The caller is responsible for ensuring they're correct. SetId string `json:"setId,omitempty" bson:"setId,omitempty"` SetOp string `json:"setOp,omitempty" bson:"setOp,omitempty"` SetSize int `json:"setSize,omitempty" bson:"setSize,omitempty"` }
type Entity ¶
type Entity map[string]interface{}
Entity represents a single Etre entity. The caller is responsible for knowing or determining the type of value for each key.
If label _type is set, the Client verifies that it matches its type. For example, if _type = "foo", Insert or Update with a Client bound to entity type "bar" returns ErrTypeMismatch. If label _type is not set, the Client entity type is presumed.
Label _id cannot be set on insert. If set, Insert returns ErrIdSet. On update, label _id must be set; if not, Update returns ErrIdNotSet. _id corresponds to WriteResult.Writes[].Id.
type EntityClient ¶
type EntityClient interface { // Query returns entities that match the query and pass the filter. Query(query string, filter QueryFilter) ([]Entity, error) // Insert is a bulk operation that creates the given entities. Insert([]Entity) (WriteResult, error) // Update is a bulk operation that patches entities that match the query. Update(query string, patch Entity) (WriteResult, error) // UpdateOne patches the given entity by internal ID. UpdateOne(id string, patch Entity) (WriteResult, error) // Delete is a bulk operation that removes all entities that match the query. Delete(query string) (WriteResult, error) // DeleteOne removes the given entity by internal ID. DeleteOne(id string) (WriteResult, error) // Labels returns all labels on the given entity by internal ID. Labels(id string) ([]string, error) // DeleteLabel removes the given label from the given entity by internal ID. // Labels should be stable, long-lived. Consequently, there's no bulk label delete. DeleteLabel(id string, label string) (WriteResult, error) // EntityType returns the entity type of the client. EntityType() string // WithSet returns a new EntityClient that uses the given Set for all write operations. // The Set cannot be removed. Therefore, when the set is complete, discard the new // EntityClient (let its reference count become zero). On insert, the given Set is added // to entities that do not have explicit set labels (_setOp, _setId, and _setSize). // On update and delete, the given Set is passed as URL query parameteres (setOp, setId, // and setSize). Sets do not apply to queries. The Set is not checked or validated; the // caller must ensure that Set.Size is greater than zero and Set.Op and Set.Id are nonempty // strings. WithSet(Set) EntityClient }
EntityClient represents a entity type-specific client. No interface method has an entity type argument because a client is bound to only one entity type. Use a EntityClients map to pass multiple clients for different entity types.
func NewEntityClient ¶
func NewEntityClient(entityType, addr string, httpClient *http.Client) EntityClient
NewEntityClient creates a new type-specific Etre API client that makes requests with the given http.Client. An Etre client is bound to the specified entity type. Use an etre.EntityClients map to pass multiple type-specific clients. Like the given http.Client, an Etre client is safe for use by multiple goroutines, so only one entity type-specific client should be created.
type EntityClients ¶
type EntityClients map[string]EntityClient
EntityClients represents type-specific entity clients keyed on user-defined const which define each entity type. For example:
const ( ENTITY_TYPE_FOO string = "foo" ENTITY_TYPE_BAR = "bar" )
Pass an etre.EntityClients to use like:
func CreateFoo(ec etre.EntityClients) { ec[ENTITY_TYPE_FOO].Insert(...) }
Using EntityClients and const entity types is optional but helps avoid typos.
type Error ¶
type Error struct { Message string `json:"message"` // human-readable and loggable error message Type string `json:"type"` // error slug (e.g. db-error, missing-param, etc.) EntityId string `json:"entityId"` // entity ID that caused error, if any HTTPStatus int `json:"httpStatus"` // HTTP status code }
Error is the standard response for all handled errors. Client errors (HTTP 400 codes) and internal errors (HTTP 500 codes) are returned as an Error, if handled. If not handled (API crash, panic, etc.), Etre returns an HTTP 500 code and the response data is undefined; the client should print any response data as a string.
type Latency ¶
type Latency struct { Send int64 // client -> server Recv int64 // server -> client RTT int64 // client -> server -> client }
Latency represents network latencies in milliseconds.
type MockCDCClient ¶
type MockCDCClient struct { StartFunc func(time.Time) (<-chan CDCEvent, error) StopFunc func() PingFunc func(time.Duration) Latency ErrorFunc func() error }
func (MockCDCClient) Error ¶
func (c MockCDCClient) Error() error
func (MockCDCClient) Start ¶
func (c MockCDCClient) Start(startTs time.Time) (<-chan CDCEvent, error)
func (MockCDCClient) Stop ¶
func (c MockCDCClient) Stop()
type MockEntityClient ¶
type MockEntityClient struct { QueryFunc func(string, QueryFilter) ([]Entity, error) InsertFunc func([]Entity) (WriteResult, error) UpdateFunc func(query string, patch Entity) (WriteResult, error) UpdateOneFunc func(id string, patch Entity) (WriteResult, error) DeleteFunc func(query string) (WriteResult, error) DeleteOneFunc func(id string) (WriteResult, error) LabelsFunc func(id string) ([]string, error) DeleteLabelFunc func(id string, label string) (WriteResult, error) EntityTypeFunc func() string WithSetFunc func(Set) EntityClient }
MockEntityClient implements EntityClient for testing. Defined callback funcs are called for the respective interface method, otherwise the default methods return empty slices and no error. Defining a callback function allows tests to intercept, save, and inspect Client calls and simulate Etre API returns.
func (MockEntityClient) Delete ¶
func (c MockEntityClient) Delete(query string) (WriteResult, error)
func (MockEntityClient) DeleteLabel ¶
func (c MockEntityClient) DeleteLabel(id string, label string) (WriteResult, error)
func (MockEntityClient) DeleteOne ¶
func (c MockEntityClient) DeleteOne(id string) (WriteResult, error)
func (MockEntityClient) EntityType ¶
func (c MockEntityClient) EntityType() string
func (MockEntityClient) Insert ¶
func (c MockEntityClient) Insert(entities []Entity) (WriteResult, error)
func (MockEntityClient) Query ¶
func (c MockEntityClient) Query(query string, filter QueryFilter) ([]Entity, error)
func (MockEntityClient) Update ¶
func (c MockEntityClient) Update(query string, patch Entity) (WriteResult, error)
func (MockEntityClient) UpdateOne ¶
func (c MockEntityClient) UpdateOne(id string, patch Entity) (WriteResult, error)
func (MockEntityClient) WithSet ¶
func (c MockEntityClient) WithSet(set Set) EntityClient
type QueryFilter ¶
type QueryFilter struct { // ReturnLabels defines labels included in matching entities. An empty slice // returns all labels, including meta-labels. Else, only labels in the slice // are returned. ReturnLabels []string }
QueryFilter represents filtering options for EntityClient.Query().
type RevOrder ¶
type RevOrder struct {
// contains filtered or unexported fields
}
RevOrder handles entity revision ordering. Normally, an Etre CDC feed sends entities in revision order (ascending revision numbers). It is possible, although extremely unlikely, that revisions can be received out of order. A RevOrder handles this by buffering the out-of-order revisions until a complete, in-order sequence is received. It can be used by a CDC feed consumer like:
revo := etre.NewRevOrder(0) // 0 = DEFAULT_MAX_ENTITIES // Handle out-of-order revisions (per entity ID) ok, prev := revo.InOrder(event) if !ok { // This rev is out of order, skip (buffered in revo) continue } // Sync ordered set of previously out-of-order revs if prev != nil { for _, p := range prev { if err := sync(p); err != nil { return err } } } // Sync the current rev (in order) if err := sync(event); err != nil { return err }
Using a RevOrder is optional but a best practice to be safe.
func NewRevOrder ¶
NewRevOrder returns a new RevOrder that tracks the last revision of at most maxEntities using an LRU cache. If NewRevOrder is zero, DEFAULT_MAX_ENTITIES is used. It's not possible or necessary to track all entities. If an entity hasn't been seen in awhile (e.g. 1,000 entities later), we presume that it won't be seen again, but if it is we can further presume that the next revision is the correct next revision to the previous revision seen but evicted from the LRU cache.
func (*RevOrder) InOrder ¶
InOrder returns true if the given event is in order. If not (false), the caller should skip and ignore the event. If true, a non-nil slice of previous revisions is also returned when they complete an in-order sequnce to but not including the given event. In this case, the caller should sync all previous revisions first, then sync the given event.
type Write ¶
type Write struct { Id string `json:"id"` // internal _id of entity (all write ops) URI string `json:"uri,omitempty"` // fully-qualified address of new entity (insert) Diff Entity `json:"diff,omitempty"` // previous entity label values (update) Error string `json:"error,omitempty"` // v0.8 backward-compatibility }
Write represents the successful write of one entity.
type WriteResult ¶
type WriteResult struct { Writes []Write `json:"writes"` // successful writes Error *Error `json:"error,omitempty"` // error before, during, or after writes }
WriteResult represents the result of a write operation (insert, update delete). On success or failure, all write ops return a WriteResult.
If Error is set (not nil), some or all writes failed. Writes stop on the first error, so len(Writes) = index into slice of entities sent by client that failed. For example, if the first entity causes an error, len(Writes) = 0. If the third entity fails, len(Writes) = 2 (zero indexed).
func (WriteResult) IsZero ¶
func (wr WriteResult) IsZero() bool
Directories
¶
Path | Synopsis |
---|---|
Package api provides API endpoints and controllers.
|
Package api provides API endpoints and controllers. |
bin
|
|
Package cdc provides interfaces for reading and writing change data capture (CDC) events.
|
Package cdc provides interfaces for reading and writing change data capture (CDC) events. |
Package entity is a connector to execute CRUD commands for a single entity and many entities on a DB instance.
|
Package entity is a connector to execute CRUD commands for a single entity and many entities on a DB instance. |
Package es provides a framework for integration with other programs.
|
Package es provides a framework for integration with other programs. |
app
Package app provides app-wide data structs and functions.
|
Package app provides app-wide data structs and functions. |
config
Package config handles config files, -config, and env vars at startup.
|
Package config handles config files, -config, and env vars at startup. |
Package kls provides a light-weight parser for Kubernetes label selectors.
|
Package kls provides a light-weight parser for Kubernetes label selectors. |
Package query is a wrapper for Kubernetes Labels Selector (KLS).
|
Package query is a wrapper for Kubernetes Labels Selector (KLS). |
Package test provides helper functions for tests.
|
Package test provides helper functions for tests. |