Documentation ¶
Overview ¶
The cosmos package implements a higher-level opinionated interface to Cosmos. The goal is to encourage safe programming practices, not to support any operation. It is always possible to drop down to the lower-level REST API wrapper in cosmosapi.
WARNING ¶
The package assumes that session-level consistency model is selected on the account.
Collection ¶
The Collection type is a simple config struct where information is provided once. If one wants to perform inconsistent operations one uses Collection directly.
collection := Collection{ Client: cosmosapi.New(url, config, httpClient), DbName: "mydb", Name: "mycollection", PartitionKey: "mypartitionkey", } var entity MyModel err = collection.StaleGet(partitionKey, id, &entity) // possibly inconsistent read err = collection.RacingPut(&entity) // can be overwritten
Collection is simply a read-config struct and therefore thread-safe.
Session ¶
Use a Session to enable Cosmos' session-level consistency. The underlying session token changes for every write to the database, so it is fundamentally not-thread-safe. Additionally there is a non-thread-safe entity cache in use. For instance it makes sense to create a new Session for each HTTP request handled. It is possible to connect a session to an end-user of your service by saving and resuming the session token.
You can't actually Get or Put directly on a session; instead, you have to start a Transaction and pass in a closure to perform these operations.
Reason 1) To safely do Put(), you need to do Compare-and-Swap (CAS). To do CAS, the operation should be written in such a way that it can be retried a number of times. This is best expressed as an idempotent closure.
Reason 2) By enforcing that the Get() happens as part of the closure we encourage writing idempotent code; where you do not build up state that assumes that the function only runs once.
Note: The Session itself is a struct passed by value, and WithRetries(), WithContext() and friends return a new struct. However they will all share a common underlying state constructed by collection.Session().
Usage:
session := collection.Session() // or ResumeSession() err := session.Transactional(func(txn cosmos.Transaction) error { var entity MyModel err := txn.Get(partitionKey, id, &entity) if err != nil { return err } entity.SomeCounter = entity.SomeCounter + 1 txn.Put(&entity) // only registers entity for put if foo { return cosmos.Rollback() // we regret the Put(), and want to return nil without commit } return nil // this actually does the commit and writes entity })
Session cache ¶
Every CAS-write through Transaction.Put() will, if successful, populate the session in-memory cache. This makes sense as we are optimistically concurrent, it is assumed that the currently running request is the only one running touching the entities. Example:
err := session.Transactional(func (txn cosmos.Transaction) error { var entity MyModel err := txn.Get(partitionKey, id, &entity) if err != nil { return err } entity.SomeCounter = entity.SomeCounter + 1 txn.Put(&entity) // ...not put in cache yet, only after successful commit return nil }) if err != nil { return err } // Cache is now populated < snip something else that required a break in transaction, e.g., external HTTP request > err = session.Transactional(func (txn cosmos.Transaction) error { var entity MyModel err := txn.Get(partitionKey, id, &entity) // Normally, the statement above simply fetched data from the in-memory cache, populated // from the closure just above. However, if the closure needs to be re-run due to another // process racing us, there will be a new network access to get the updated data. <...> })
No cache eviction is been implemented. If one is iterating over a lot of entities in the same Session, one should call session.Drop() to release memory once one is done with a given ID.
Index ¶
- Constants
- Variables
- func AddMigration(fromPrototype, toPrototype Model, convFunc migrationFunc) (dummyResult struct{})
- func CheckModel(entityPtr Model) string
- func Rollback() error
- func SessionsMiddleware(next http.Handler) http.Handler
- func WithSessions(ctx context.Context) context.Context
- type BaseModel
- type Client
- type Collection
- func (c Collection) ExecuteSproc(sprocName string, partitionKeyValue interface{}, ret interface{}, ...) error
- func (c Collection) GetContext() context.Context
- func (c Collection) GetEntityInfo(entityPtr Model) (res BaseModel, partitionValue interface{})
- func (c Collection) GetPartitionKeyRanges() ([]cosmosapi.PartitionKeyRange, error)
- func (c Collection) Init() Collection
- func (c Collection) Query(query string, entities interface{}) (cosmosapi.QueryDocumentsResponse, error)
- func (c Collection) RacingPut(entityPtr Model) error
- func (c Collection) ReadFeed(etag, partitionKeyRangeId string, maxItems int, documents interface{}) (cosmosapi.ListDocumentsResponse, error)
- func (c Collection) ResumeSession(token string) Session
- func (c Collection) Session() Session
- func (c Collection) SessionContext(ctx context.Context) Session
- func (c Collection) StaleGet(partitionValue interface{}, id string, target Model) error
- func (c Collection) StaleGetExisting(partitionValue interface{}, id string, target Model) error
- func (c Collection) WithContext(ctx context.Context) Collection
- type Model
- type Session
- func (session Session) Drop(partitionValue interface{}, id string)
- func (session Session) Get(partitionValue interface{}, id string, target Model) error
- func (session Session) Token() string
- func (session Session) Transaction(closure func(*Transaction) error) error
- func (session Session) WithContext(ctx context.Context) Session
- func (session Session) WithRetries(n int) Session
- type Transaction
Constants ¶
const DefaultConflictRetries = 3
Variables ¶
var ContentionError = errors.New("Contention error; optimistic concurrency control did not succeed after all the retries")
var ModelNameRegexp = regexp.MustCompile(`^[a-zA-Z_]+/[0-9]+$`)
ModelNameRegexp defines the names that are accepted in the cosmosmodel:\"\" specifier (`^[a-zA-Z_]+/[0-9]+$`)
var NotImplementedError = errors.New("Not implemented")
var PutWithoutGetError = errors.New("Attempting to put an entity that has not been get first")
Functions ¶
func AddMigration ¶
func AddMigration(fromPrototype, toPrototype Model, convFunc migrationFunc) (dummyResult struct{})
func CheckModel ¶
CheckModel will check that the Model attribute is correctly set; also return the value. Pass pointer to interface.
func SessionsMiddleware ¶
SessionMiddleware is a convenience middleware for initializing the session state container on the request context. See: WithSessions()
func WithSessions ¶
WithSessions initializes a container for the session states on the context. This enables restoring the cosmos session from the context. Can be used on a child context to reset the sessions without resetting the sessions of the parent context.
Types ¶
type Client ¶
type Client interface { GetDocument(ctx context.Context, dbName, colName, id string, ops cosmosapi.GetDocumentOptions, out interface{}) (cosmosapi.DocumentResponse, error) CreateDocument(ctx context.Context, dbName, colName string, doc interface{}, ops cosmosapi.CreateDocumentOptions) (*cosmosapi.Resource, cosmosapi.DocumentResponse, error) ReplaceDocument(ctx context.Context, dbName, colName, id string, doc interface{}, ops cosmosapi.ReplaceDocumentOptions) (*cosmosapi.Resource, cosmosapi.DocumentResponse, error) QueryDocuments(ctx context.Context, dbName, collName string, qry cosmosapi.Query, docs interface{}, ops cosmosapi.QueryDocumentsOptions) (cosmosapi.QueryDocumentsResponse, error) ListDocuments(ctx context.Context, dbName, colName string, ops *cosmosapi.ListDocumentsOptions, docs interface{}) (cosmosapi.ListDocumentsResponse, error) GetCollection(ctx context.Context, dbName, colName string) (*cosmosapi.Collection, error) DeleteCollection(ctx context.Context, dbName, colName string) error DeleteDatabase(ctx context.Context, dbName string, ops *cosmosapi.RequestOptions) error ExecuteStoredProcedure(ctx context.Context, dbName, colName, sprocName string, ops cosmosapi.ExecuteStoredProcedureOptions, ret interface{}, args ...interface{}) error GetPartitionKeyRanges(ctx context.Context, dbName, colName string, options *cosmosapi.GetPartitionKeyRangesOptions) (cosmosapi.GetPartitionKeyRangesResponse, error) ListOffers(ctx context.Context, ops *cosmosapi.RequestOptions) (*cosmosapi.Offers, error) ReplaceOffer(ctx context.Context, offerOps cosmosapi.OfferReplaceOptions, ops *cosmosapi.RequestOptions) (*cosmosapi.Offer, error) }
Client is an interface exposing the public API of the cosmosapi.Client struct
type Collection ¶
type Collection struct { Client Client DbName string Name string PartitionKey string Context context.Context // contains filtered or unexported fields }
func (Collection) ExecuteSproc ¶
func (c Collection) ExecuteSproc(sprocName string, partitionKeyValue interface{}, ret interface{}, args ...interface{}) error
Execute a StoredProcedure on the collection
func (Collection) GetContext ¶
func (c Collection) GetContext() context.Context
func (Collection) GetEntityInfo ¶
func (c Collection) GetEntityInfo(entityPtr Model) (res BaseModel, partitionValue interface{})
GetEntityInfo uses reflection to return information about the entity without each entity having to implement getters. One should pass a pointer to a struct that embeds "BaseModel" as well as a field having the partition field name; failure to do so will panic.
Note: GetEntityInfo will also always assert that the Model property is set to the declared value
func (Collection) GetPartitionKeyRanges ¶
func (c Collection) GetPartitionKeyRanges() ([]cosmosapi.PartitionKeyRange, error)
func (Collection) Init ¶
func (c Collection) Init() Collection
Init the collection. Certain features requires this to be called on the collection, for backwards compatibility many features can be used without initializing. Currently only required if you want to store session state on the context (Collection.SessionContext())
func (Collection) Query ¶
func (c Collection) Query(query string, entities interface{}) (cosmosapi.QueryDocumentsResponse, error)
func (Collection) RacingPut ¶
func (c Collection) RacingPut(entityPtr Model) error
RacingPut simply does a raw write of document passed in without any considerations about races or consistency. An "upsert" will be performed without any Etag checks. `entityPtr` should be a pointer to the struct
func (Collection) ReadFeed ¶
func (c Collection) ReadFeed(etag, partitionKeyRangeId string, maxItems int, documents interface{}) (cosmosapi.ListDocumentsResponse, error)
Retrieve <maxItems> documents that have changed within the partition key range since <etag>. Note that according to https://docs.microsoft.com/en-us/rest/api/cosmos-db/list-documents (as of Jan 14 16:30:27 UTC 2019) <maxItems>, which corresponds to the x-ms-max-item-count HTTP request header, is (quote):
"An integer indicating the maximum number of items to be returned per page."
However incremental feed reads seems to always return maximum one page, ie. the continuation token (x-ms-continuation HTTP response header) is always empty.
func (Collection) ResumeSession ¶
func (c Collection) ResumeSession(token string) Session
func (Collection) Session ¶
func (c Collection) Session() Session
func (Collection) SessionContext ¶
func (c Collection) SessionContext(ctx context.Context) Session
func (Collection) StaleGet ¶
func (c Collection) StaleGet(partitionValue interface{}, id string, target Model) error
StaleGet reads an element from the database. `target` should be a pointer to a struct that empeds BaseModel. If the document does not exist, the recipient struct is filled with the zero-value, including Etag which will become an empty String.
func (Collection) StaleGetExisting ¶
func (c Collection) StaleGetExisting(partitionValue interface{}, id string, target Model) error
StaleGetExisting is similar to StaleGet, but returns an error if the document is not found instead of an empty document. Test for this condition using errors.Cause(e) == cosmosapi.ErrNotFound
func (Collection) WithContext ¶
func (c Collection) WithContext(ctx context.Context) Collection
type Model ¶
type Model interface { // This method is called on entities after a successful Get() (whether from database or cache). // If the result of a Collection.StaleGet() is used, txn==nil; if Transaction.Get() is used, // txn is set. PostGet(txn *Transaction) error // This method is called on entities right before the write to database. // If Collection.RacingPut() is used, txn==nil; if we are inside a transaction // commit, txn is set. PrePut(txn *Transaction) error // Exported by BaseModel IsNew() bool }
type Session ¶
type Session struct { Context context.Context ConflictRetries int Collection Collection // contains filtered or unexported fields }
func (Session) Drop ¶
Drop removes an entity from the session cache, so that the next fetch will always go out externally to fetch it.
func (Session) Get ¶
Convenience method for doing a simple Get within a session without explicitly starting a transaction
func (Session) Transaction ¶
func (session Session) Transaction(closure func(*Transaction) error) error
Transaction <todo rest of docs>. Note: On commit, the Etag is updated on all relevant entities (but normally these should never be used outside)
func (Session) WithRetries ¶
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction is simply a wrapper around Session which unlocks some of the methods that should only be called inside an idempotent closure
func (*Transaction) Get ¶
func (txn *Transaction) Get(partitionValue interface{}, id string, target Model) (err error)
func (*Transaction) Put ¶
func (txn *Transaction) Put(entityPtr Model)