cosmos

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2021 License: MIT Imports: 10 Imported by: 0

Documentation

Overview

The cosmos package implements a higher-level opinionated interface to Cosmos. The goal is to encourage safe programming practices, not to support any operation. It is always possible to drop down to the lower-level REST API wrapper in cosmosapi.

WARNING

The package assumes that session-level consistency model is selected on the account.

Collection

The Collection type is a simple config struct where information is provided once. If one wants to perform inconsistent operations one uses Collection directly.

collection := Collection{
  Client: cosmosapi.New(url, config, httpClient),
  DbName: "mydb",
  Name: "mycollection",
  PartitionKey: "mypartitionkey",
}
var entity MyModel
err = collection.StaleGet(partitionKey, id, &entity)  // possibly inconsistent read
err = collection.RacingPut(&entity)  // can be overwritten

Collection is simply a read-config struct and therefore thread-safe.

Session

Use a Session to enable Cosmos' session-level consistency. The underlying session token changes for every write to the database, so it is fundamentally not-thread-safe. Additionally there is a non-thread-safe entity cache in use. For instance it makes sense to create a new Session for each HTTP request handled. It is possible to connect a session to an end-user of your service by saving and resuming the session token.

You can't actually Get or Put directly on a session; instead, you have to start a Transaction and pass in a closure to perform these operations.

Reason 1) To safely do Put(), you need to do Compare-and-Swap (CAS). To do CAS, the operation should be written in such a way that it can be retried a number of times. This is best expressed as an idempotent closure.

Reason 2) By enforcing that the Get() happens as part of the closure we encourage writing idempotent code; where you do not build up state that assumes that the function only runs once.

Note: The Session itself is a struct passed by value, and WithRetries(), WithContext() and friends return a new struct. However they will all share a common underlying state constructed by collection.Session().

Usage:

session := collection.Session()  // or ResumeSession()
err := session.Transactional(func(txn cosmos.Transaction) error {
  var entity MyModel
  err := txn.Get(partitionKey, id, &entity)
  if err != nil {
    return err
  }
  entity.SomeCounter = entity.SomeCounter + 1
  txn.Put(&entity)  // only registers entity for put
  if foo {
    return cosmos.Rollback()  // we regret the Put(), and want to return nil without commit
  }
  return nil // this actually does the commit and writes entity
})

Session cache

Every CAS-write through Transaction.Put() will, if successful, populate the session in-memory cache. This makes sense as we are optimistically concurrent, it is assumed that the currently running request is the only one running touching the entities. Example:

err := session.Transactional(func (txn cosmos.Transaction) error {
  var entity MyModel
  err := txn.Get(partitionKey, id, &entity)
  if err != nil {
    return err
  }
  entity.SomeCounter = entity.SomeCounter + 1
  txn.Put(&entity)  // ...not put in cache yet, only after successful commit
  return nil
})
if err != nil {
  return err
}
// Cache is now populated

< snip something else that required a break in transaction, e.g., external HTTP request >

err = session.Transactional(func (txn cosmos.Transaction) error {
  var entity MyModel

  err := txn.Get(partitionKey, id, &entity)
  // Normally, the statement above simply fetched data from the in-memory cache, populated
  // from the closure just above. However, if the closure needs to be re-run due to another
  // process racing us, there will be a new network access to get the updated data.
  <...>
})

No cache eviction is been implemented. If one is iterating over a lot of entities in the same Session, one should call session.Drop() to release memory once one is done with a given ID.

Index

Constants

View Source
const DefaultConflictRetries = 3

Variables

View Source
var ContentionError = errors.New("Contention error; optimistic concurrency control did not succeed after all the retries")
View Source
var ModelNameRegexp = regexp.MustCompile(`^[a-zA-Z_]+/[0-9]+$`)

ModelNameRegexp defines the names that are accepted in the cosmosmodel:\"\" specifier (`^[a-zA-Z_]+/[0-9]+$`)

View Source
var NotImplementedError = errors.New("Not implemented")
View Source
var PutWithoutGetError = errors.New("Attempting to put an entity that has not been get first")

Functions

func AddMigration

func AddMigration(fromPrototype, toPrototype Model, convFunc migrationFunc) (dummyResult struct{})

func CheckModel

func CheckModel(entityPtr Model) string

CheckModel will check that the Model attribute is correctly set; also return the value. Pass pointer to interface.

func Rollback

func Rollback() error

func SessionsMiddleware

func SessionsMiddleware(next http.Handler) http.Handler

SessionMiddleware is a convenience middleware for initializing the session state container on the request context. See: WithSessions()

func WithSessions

func WithSessions(ctx context.Context) context.Context

WithSessions initializes a container for the session states on the context. This enables restoring the cosmos session from the context. Can be used on a child context to reset the sessions without resetting the sessions of the parent context.

Types

type BaseModel

type BaseModel cosmosapi.Resource

func (*BaseModel) IsNew

func (bm *BaseModel) IsNew() bool

This method will return true if the document is new (document was not found on get, or get has not been attempted)

type Client

type Client interface {
	GetDocument(ctx context.Context, dbName, colName, id string, ops cosmosapi.GetDocumentOptions, out interface{}) (cosmosapi.DocumentResponse, error)
	CreateDocument(ctx context.Context, dbName, colName string, doc interface{}, ops cosmosapi.CreateDocumentOptions) (*cosmosapi.Resource, cosmosapi.DocumentResponse, error)
	ReplaceDocument(ctx context.Context, dbName, colName, id string, doc interface{}, ops cosmosapi.ReplaceDocumentOptions) (*cosmosapi.Resource, cosmosapi.DocumentResponse, error)
	QueryDocuments(ctx context.Context, dbName, collName string, qry cosmosapi.Query, docs interface{}, ops cosmosapi.QueryDocumentsOptions) (cosmosapi.QueryDocumentsResponse, error)
	ListDocuments(ctx context.Context, dbName, colName string, ops *cosmosapi.ListDocumentsOptions, docs interface{}) (cosmosapi.ListDocumentsResponse, error)
	GetCollection(ctx context.Context, dbName, colName string) (*cosmosapi.Collection, error)
	DeleteCollection(ctx context.Context, dbName, colName string) error
	DeleteDatabase(ctx context.Context, dbName string, ops *cosmosapi.RequestOptions) error
	ExecuteStoredProcedure(ctx context.Context, dbName, colName, sprocName string, ops cosmosapi.ExecuteStoredProcedureOptions, ret interface{}, args ...interface{}) error
	GetPartitionKeyRanges(ctx context.Context, dbName, colName string, options *cosmosapi.GetPartitionKeyRangesOptions) (cosmosapi.GetPartitionKeyRangesResponse, error)
	ListOffers(ctx context.Context, ops *cosmosapi.RequestOptions) (*cosmosapi.Offers, error)
	ReplaceOffer(ctx context.Context, offerOps cosmosapi.OfferReplaceOptions, ops *cosmosapi.RequestOptions) (*cosmosapi.Offer, error)
}

Client is an interface exposing the public API of the cosmosapi.Client struct

type Collection

type Collection struct {
	Client       Client
	DbName       string
	Name         string
	PartitionKey string
	Context      context.Context
	// contains filtered or unexported fields
}

func (Collection) ExecuteSproc

func (c Collection) ExecuteSproc(sprocName string, partitionKeyValue interface{}, ret interface{}, args ...interface{}) error

Execute a StoredProcedure on the collection

func (Collection) GetContext

func (c Collection) GetContext() context.Context

func (Collection) GetEntityInfo

func (c Collection) GetEntityInfo(entityPtr Model) (res BaseModel, partitionValue interface{})

GetEntityInfo uses reflection to return information about the entity without each entity having to implement getters. One should pass a pointer to a struct that embeds "BaseModel" as well as a field having the partition field name; failure to do so will panic.

Note: GetEntityInfo will also always assert that the Model property is set to the declared value

func (Collection) GetPartitionKeyRanges

func (c Collection) GetPartitionKeyRanges() ([]cosmosapi.PartitionKeyRange, error)

func (Collection) Init

func (c Collection) Init() Collection

Init the collection. Certain features requires this to be called on the collection, for backwards compatibility many features can be used without initializing. Currently only required if you want to store session state on the context (Collection.SessionContext())

func (Collection) Query

func (c Collection) Query(query string, entities interface{}) (cosmosapi.QueryDocumentsResponse, error)

func (Collection) RacingPut

func (c Collection) RacingPut(entityPtr Model) error

RacingPut simply does a raw write of document passed in without any considerations about races or consistency. An "upsert" will be performed without any Etag checks. `entityPtr` should be a pointer to the struct

func (Collection) ReadFeed

func (c Collection) ReadFeed(etag, partitionKeyRangeId string, maxItems int, documents interface{}) (cosmosapi.ListDocumentsResponse, error)

Retrieve <maxItems> documents that have changed within the partition key range since <etag>. Note that according to https://docs.microsoft.com/en-us/rest/api/cosmos-db/list-documents (as of Jan 14 16:30:27 UTC 2019) <maxItems>, which corresponds to the x-ms-max-item-count HTTP request header, is (quote):

"An integer indicating the maximum number of items to be returned per page."

However incremental feed reads seems to always return maximum one page, ie. the continuation token (x-ms-continuation HTTP response header) is always empty.

func (Collection) ResumeSession

func (c Collection) ResumeSession(token string) Session

func (Collection) Session

func (c Collection) Session() Session

func (Collection) SessionContext

func (c Collection) SessionContext(ctx context.Context) Session

func (Collection) StaleGet

func (c Collection) StaleGet(partitionValue interface{}, id string, target Model) error

StaleGet reads an element from the database. `target` should be a pointer to a struct that empeds BaseModel. If the document does not exist, the recipient struct is filled with the zero-value, including Etag which will become an empty String.

func (Collection) StaleGetExisting

func (c Collection) StaleGetExisting(partitionValue interface{}, id string, target Model) error

StaleGetExisting is similar to StaleGet, but returns an error if the document is not found instead of an empty document. Test for this condition using errors.Cause(e) == cosmosapi.ErrNotFound

func (Collection) WithContext

func (c Collection) WithContext(ctx context.Context) Collection

type Model

type Model interface {
	// This method is called on entities after a successful Get() (whether from database or cache).
	// If the result of a Collection.StaleGet() is used, txn==nil; if Transaction.Get() is used,
	// txn is set.
	PostGet(txn *Transaction) error
	// This method is called on entities right before the write to database.
	// If Collection.RacingPut() is used, txn==nil; if we are inside a transaction
	// commit, txn is set.
	PrePut(txn *Transaction) error
	// Exported by BaseModel
	IsNew() bool
}

type Session

type Session struct {
	Context         context.Context
	ConflictRetries int
	Collection      Collection
	// contains filtered or unexported fields
}

func (Session) Drop

func (session Session) Drop(partitionValue interface{}, id string)

Drop removes an entity from the session cache, so that the next fetch will always go out externally to fetch it.

func (Session) Get

func (session Session) Get(partitionValue interface{}, id string, target Model) error

Convenience method for doing a simple Get within a session without explicitly starting a transaction

func (Session) Token

func (session Session) Token() string

func (Session) Transaction

func (session Session) Transaction(closure func(*Transaction) error) error

Transaction <todo rest of docs>. Note: On commit, the Etag is updated on all relevant entities (but normally these should never be used outside)

func (Session) WithContext

func (session Session) WithContext(ctx context.Context) Session

func (Session) WithRetries

func (session Session) WithRetries(n int) Session

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction is simply a wrapper around Session which unlocks some of the methods that should only be called inside an idempotent closure

func (*Transaction) Get

func (txn *Transaction) Get(partitionValue interface{}, id string, target Model) (err error)

func (*Transaction) Put

func (txn *Transaction) Put(entityPtr Model)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL