state

package
v1.8.1-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2022 License: Apache-2.0 Imports: 4 Imported by: 88

README

State Stores

State Stores provide a common way to interact with different data store implementations, and allow users to opt-in to advanced capabilities using defined metadata.

Currently supported state stores are:

  • Aerospike
  • Alibaba Cloud Tablestore
  • AWS DynamoDB
  • Azure Blob Storage
  • Azure CosmosDB
  • Azure Table Storage
  • Cassandra
  • Cloud Firestore (Datastore mode)
  • Couchbase
  • Etcd
  • HashiCorp Consul
  • Hazelcast
  • Memcached
  • MongoDB
  • MySQL
  • PostgreSQL
  • Redis
  • RethinkDB
  • SQL Server
  • Zookeeper

Implementing a new State Store

A compliant state store needs to implement one or more interfaces: Store and TransactionalStore.

The interface for Store:

type Store interface {
	Init(metadata Metadata) error
	Delete(req *DeleteRequest) error
	BulkDelete(req []DeleteRequest) error
	Get(req *GetRequest) (*GetResponse, error)
	Set(req *SetRequest) error
	BulkSet(req []SetRequest) error
}

The interface for TransactionalStore:

type TransactionalStore interface {
	Init(metadata Metadata) error
	Multi(reqs []TransactionalRequest) error
}

See the documentation site for examples.

Implementing State Query API

State Store has an optional API for querying the state.

Please refer to the documentation site for API description and definition.

// Querier is an interface to execute queries.
type Querier interface {
        Query(req *QueryRequest) (*QueryResponse, error)
}

Below are the definitions of structures (including nested) for QueryRequest and QueryResponse.

// QueryResponse is the request object for querying the state.
type QueryRequest struct {
        Query    query.Query       `json:"query"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

type Query struct {
        Filters map[string]interface{} `json:"filter"`
        Sort    []Sorting              `json:"sort"`
        Page    Pagination             `json:"page"`

        // derived from Filters
        Filter Filter
}

type Sorting struct {
        Key   string `json:"key"`
        Order string `json:"order,omitempty"`
}

type Pagination struct {
        Limit int    `json:"limit"`
        Token string `json:"token,omitempty"`
}

// QueryResponse is the response object on querying state.
type QueryResponse struct {
        Results  []QueryItem       `json:"results"`
        Token    string            `json:"token,omitempty"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

// QueryItem is an object representing a single entry in query results.
type QueryItem struct {
        Key   string  `json:"key"`
        Data  []byte  `json:"data"`
        ETag  *string `json:"etag,omitempty"`
        Error string  `json:"error,omitempty"`
}

Upon receiving the query request, Dapr validates it and transforms into object Query, which, in turn, is passed on to the state store component.

The Query object has a member Filter that implements parsing interface per component as described below.

type Filter interface {
	Parse(interface{}) error
}

type FilterEQ struct {
	Key string
	Val interface{}
}

type FilterIN struct {
	Key  string
	Vals []interface{}
}

type FilterAND struct {
	Filters []Filter
}

type FilterOR struct {
	Filters []Filter
}

To simplify the process of query translation, we leveraged visitor design pattern. A state store component developer would need to implement the visit method, and the runtime will use it to construct the native query statement.

type Visitor interface {
	// returns "equal" expression
	VisitEQ(*FilterEQ) (string, error)
	// returns "in" expression
	VisitIN(*FilterIN) (string, error)
	// returns "and" expression
	VisitAND(*FilterAND) (string, error)
	// returns "or" expression
	VisitOR(*FilterOR) (string, error)
	// receives concatenated filters and finalizes the native query
	Finalize(string, *MidQuery) error
}

The Dapr runtime implements QueryBuilder object that takes in Visitor interface and constructs the native query.

type QueryBuilder struct {
	visitor Visitor
}

func (h *QueryBuilder) BuildQuery(mq *MidQuery) error {...}

The last part is to implement Querier interface in the component:

type Querier interface {
	Query(req *QueryRequest) (*QueryResponse, error)
}

A sample implementation might look like that:

func (m *MyComponent) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	query := &Query{} // Query implements Visitor interface
	qbuilder := state.NewQueryBuilder(query)
	if err := qbuilder.BuildQuery(&req.Query); err != nil {
		return &state.QueryResponse{}, err
	}
	data, token, err := query.execute(ctx)
	if err != nil {
		return &state.QueryResponse{}, err
	}
	return &state.QueryResponse{
		Results:  data,
		Token:    token,
	}, nil
}

Some of the examples of State Query API implementation are MongoDB and CosmosDB state store components.

Documentation

Index

Constants

View Source
const (
	FirstWrite = "first-write"
	LastWrite  = "last-write"
	Strong     = "strong"
	Eventual   = "eventual"
)

Variables

This section is empty.

Functions

func CheckRequestOptions added in v0.3.0

func CheckRequestOptions(options interface{}) error

CheckRequestOptions checks if request options use supported keywords.

func DeleteWithOptions added in v0.3.0

func DeleteWithOptions(method func(req *DeleteRequest) error, req *DeleteRequest) error

DeleteWithOptions handles DeleteRequest with options.

func Ping added in v1.8.0

func Ping(store Store) error

func SetWithOptions added in v0.3.0

func SetWithOptions(method func(req *SetRequest) error, req *SetRequest) error

SetWithOptions handles SetRequest with request options.

Types

type BulkDeleteRowMismatchError added in v1.7.0

type BulkDeleteRowMismatchError struct {
	// contains filtered or unexported fields
}

BulkDeleteRowMismatchError represents mismatch in rowcount while deleting rows.

func NewBulkDeleteRowMismatchError added in v1.7.0

func NewBulkDeleteRowMismatchError(expected, affected uint64) *BulkDeleteRowMismatchError

BulkDeleteRowMismatchError returns a BulkDeleteRowMismatchError.

func (*BulkDeleteRowMismatchError) Error added in v1.7.0

type BulkGetResponse added in v1.0.0

type BulkGetResponse struct {
	Key         string            `json:"key"`
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	Error       string            `json:"error,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

BulkGetResponse is the response object for bulk get response.

type BulkStore added in v1.0.0

type BulkStore interface {
	BulkDelete(req []DeleteRequest) error
	BulkGet(req []GetRequest) (bool, []BulkGetResponse, error)
	BulkSet(req []SetRequest) error
}

BulkStore is an interface to perform bulk operations on store.

type DefaultBulkStore added in v1.0.0

type DefaultBulkStore struct {
	// contains filtered or unexported fields
}

DefaultBulkStore is a default implementation of BulkStore.

func NewDefaultBulkStore added in v1.0.0

func NewDefaultBulkStore(store Store) DefaultBulkStore

NewDefaultBulkStore build a default bulk store.

func (*DefaultBulkStore) BulkDelete added in v1.0.0

func (b *DefaultBulkStore) BulkDelete(req []DeleteRequest) error

BulkDelete performs a bulk delete operation.

func (*DefaultBulkStore) BulkGet added in v1.0.0

func (b *DefaultBulkStore) BulkGet(req []GetRequest) (bool, []BulkGetResponse, error)

BulkGet performs a bulks get operations.

func (*DefaultBulkStore) BulkSet added in v1.0.0

func (b *DefaultBulkStore) BulkSet(req []SetRequest) error

BulkSet performs a bulks save operation.

func (*DefaultBulkStore) Features added in v1.1.1

func (b *DefaultBulkStore) Features() []Feature

Features returns the features of the encapsulated store.

type DeleteRequest

type DeleteRequest struct {
	Key      string            `json:"key"`
	ETag     *string           `json:"etag,omitempty"`
	Metadata map[string]string `json:"metadata"`
	Options  DeleteStateOption `json:"options,omitempty"`
}

DeleteRequest is the object describing a delete state request.

func (DeleteRequest) GetKey added in v0.2.0

func (r DeleteRequest) GetKey() string

Key gets the Key on a DeleteRequest.

func (DeleteRequest) GetMetadata added in v0.2.0

func (r DeleteRequest) GetMetadata() map[string]string

Metadata gets the Metadata on a DeleteRequest.

type DeleteStateOption

type DeleteStateOption struct {
	Concurrency string `json:"concurrency,omitempty"` // "concurrency"
	Consistency string `json:"consistency"`           // "eventual, strong"
}

DeleteStateOption controls how a state store reacts to a delete request.

type ETagError added in v1.0.0

type ETagError struct {
	// contains filtered or unexported fields
}

ETagError is a custom error type for etag exceptions.

func NewETagError added in v1.0.0

func NewETagError(kind ETagErrorKind, err error) *ETagError

NewETagError returns an ETagError wrapping an existing context error.

func (*ETagError) Error added in v1.0.0

func (e *ETagError) Error() string

func (*ETagError) Kind added in v1.0.0

func (e *ETagError) Kind() ETagErrorKind

type ETagErrorKind added in v1.0.0

type ETagErrorKind string
const (
	ETagInvalid  ETagErrorKind = "invalid"
	ETagMismatch ETagErrorKind = "mismatch"
)

type Feature added in v1.1.1

type Feature string

Feature names a feature that can be implemented by PubSub components.

const (
	// FeatureETag is the feature to etag metadata in state store.
	FeatureETag Feature = "ETAG"
	// FeatureTransactional is the feature that performs transactional operations.
	FeatureTransactional Feature = "TRANSACTIONAL"
	// FeatureQueryAPI is the feature that performs query operations.
	FeatureQueryAPI Feature = "QUERY_API"
)

func (Feature) IsPresent added in v1.1.1

func (f Feature) IsPresent(features []Feature) bool

IsPresent checks if a given feature is present in the list.

type GetRequest

type GetRequest struct {
	Key      string            `json:"key"`
	Metadata map[string]string `json:"metadata"`
	Options  GetStateOption    `json:"options,omitempty"`
}

GetRequest is the object describing a state fetch request.

type GetResponse

type GetResponse struct {
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

GetResponse is the response object for getting state.

type GetStateOption

type GetStateOption struct {
	Consistency string `json:"consistency"` // "eventual, strong"
}

GetStateOption controls how a state store reacts to a get request.

type KeyInt added in v0.2.0

type KeyInt interface {
	GetKey() string
	GetMetadata() map[string]string
}

KeyInt is an interface that allows gets of the Key and Metadata inside requests.

type Metadata

type Metadata struct {
	Properties map[string]string `json:"properties"`
}

Metadata contains a state store specific set of metadata properties.

type OperationType

type OperationType string

OperationType describes a CRUD operation performed against a state store.

const Delete OperationType = "delete"

Delete is a delete operation.

const Upsert OperationType = "upsert"

Upsert is an update or create operation.

type Querier added in v1.5.0

type Querier interface {
	Query(req *QueryRequest) (*QueryResponse, error)
}

Querier is an interface to execute queries.

type QueryItem added in v1.5.0

type QueryItem struct {
	Key         string  `json:"key"`
	Data        []byte  `json:"data"`
	ETag        *string `json:"etag,omitempty"`
	Error       string  `json:"error,omitempty"`
	ContentType *string `json:"contentType,omitempty"`
}

QueryItem is an object representing a single entry in query results.

type QueryRequest added in v1.5.0

type QueryRequest struct {
	Query    query.Query       `json:"query"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

type QueryResponse added in v1.5.0

type QueryResponse struct {
	Results  []QueryItem       `json:"results"`
	Token    string            `json:"token,omitempty"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

QueryResponse is the response object for querying state.

type SetRequest

type SetRequest struct {
	Key         string            `json:"key"`
	Value       interface{}       `json:"value"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	Options     SetStateOption    `json:"options,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

SetRequest is the object describing an upsert request.

func (SetRequest) GetKey added in v0.2.0

func (r SetRequest) GetKey() string

GetKey gets the Key on a SetRequest.

func (SetRequest) GetMetadata added in v0.2.0

func (r SetRequest) GetMetadata() map[string]string

GetMetadata gets the Key on a SetRequest.

type SetStateOption

type SetStateOption struct {
	Concurrency string `json:"concurrency,omitempty"` // first-write, last-write
	Consistency string `json:"consistency"`           // "eventual, strong"
}

SetStateOption controls how a state store reacts to a set request.

type Store

type Store interface {
	BulkStore
	Init(metadata Metadata) error
	Features() []Feature
	Delete(req *DeleteRequest) error
	Get(req *GetRequest) (*GetResponse, error)
	Set(req *SetRequest) error
}

Store is an interface to perform operations on store.

type TransactionalStateOperation added in v0.3.0

type TransactionalStateOperation struct {
	Operation OperationType `json:"operation"`
	Request   interface{}   `json:"request"`
}

TransactionalStateOperation describes operation type, key, and value for transactional operation.

type TransactionalStateRequest added in v0.3.0

type TransactionalStateRequest struct {
	Operations []TransactionalStateOperation `json:"operations"`
	Metadata   map[string]string             `json:"metadata,omitempty"`
}

TransactionalStateRequest describes a transactional operation against a state store that comprises multiple types of operations The Request field is either a DeleteRequest or SetRequest.

type TransactionalStore

type TransactionalStore interface {
	Init(metadata Metadata) error
	Multi(request *TransactionalStateRequest) error
}

TransactionalStore is an interface for initialization and support multiple transactional requests.

Directories

Path Synopsis
alicloud
aws
azure
gcp
hashicorp
oci
Package zookeeper is a generated GoMock package.
Package zookeeper is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL