state

package
v1.14.0-rc.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 2, 2024 License: Apache-2.0 Imports: 9 Imported by: 88

README

State Stores

State Stores provide a common way to interact with different data store implementations, and allow users to opt-in to advanced capabilities using defined metadata.

Implementing a new State Store

A compliant state store needs to implement one or more interfaces: Store and TransactionalStore, defined in the store.go file.

See the documentation site for examples.

Implementing State Query API

State Store has an optional API for querying the state.

Please refer to the documentation site for API description and definition.

// Querier is an interface to execute queries.
type Querier interface {
        Query(req *QueryRequest) (*QueryResponse, error)
}

Below are the definitions of structures (including nested) for QueryRequest and QueryResponse.

// QueryResponse is the request object for querying the state.
type QueryRequest struct {
        Query    query.Query       `json:"query"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

type Query struct {
        Filters map[string]interface{} `json:"filter"`
        Sort    []Sorting              `json:"sort"`
        Page    Pagination             `json:"page"`

        // derived from Filters
        Filter Filter
}

type Sorting struct {
        Key   string `json:"key"`
        Order string `json:"order,omitempty"`
}

type Pagination struct {
        Limit int    `json:"limit"`
        Token string `json:"token,omitempty"`
}

// QueryResponse is the response object on querying state.
type QueryResponse struct {
        Results  []QueryItem       `json:"results"`
        Token    string            `json:"token,omitempty"`
        Metadata map[string]string `json:"metadata,omitempty"`
}

// QueryItem is an object representing a single entry in query results.
type QueryItem struct {
        Key   string  `json:"key"`
        Data  []byte  `json:"data"`
        ETag  *string `json:"etag,omitempty"`
        Error string  `json:"error,omitempty"`
}

Upon receiving the query request, Dapr validates it and transforms into object Query, which, in turn, is passed on to the state store component.

The Query object has a member Filter that implements parsing interface per component as described below.

type Filter interface {
	Parse(interface{}) error
}

type FilterEQ struct {
	Key string
	Val interface{}
}

type FilterNEQ struct {
    Key string
    Val interface{}
}

type FilterGT struct {
    Key string
    Val interface{}
}

type FilterGTE struct {
    Key string
    Val interface{}
}

type FilterLT struct {
    Key string
    Val interface{}
}

type FilterLTE struct {
    Key string
    Val interface{}
}

type FilterIN struct {
	Key  string
	Vals []interface{}
}

type FilterAND struct {
	Filters []Filter
}

type FilterOR struct {
	Filters []Filter
}

To simplify the process of query translation, we leveraged visitor design pattern. A state store component developer would need to implement the visit method, and the runtime will use it to construct the native query statement.

type Visitor interface {
	// returns "equal" expression
	VisitEQ(*FilterEQ) (string, error)
	// returns "not equal" expression 
	VisitNEQ(*FilterNEQ) (string, error) 
	// returns "greater than" expression
	VisitGT(*FilterGT) (string, error)
	// returns "greater than equal" expression
	VisitGTE(*FilterGTE) (string, error)
	// returns "less than" expression
	VisitLT(*FilterLT) (string, error)
	// returns "less than equal" expression
	VisitLTE(*FilterLTE) (string, error)
	// returns "in" expression
	VisitIN(*FilterIN) (string, error)
	// returns "and" expression
	VisitAND(*FilterAND) (string, error)
	// returns "or" expression
	VisitOR(*FilterOR) (string, error)
	// receives concatenated filters and finalizes the native query
	Finalize(string, *MidQuery) error
}

The Dapr runtime implements QueryBuilder object that takes in Visitor interface and constructs the native query.

type QueryBuilder struct {
	visitor Visitor
}

func (h *QueryBuilder) BuildQuery(mq *MidQuery) error {...}

The last part is to implement Querier interface in the component:

type Querier interface {
	Query(req *QueryRequest) (*QueryResponse, error)
}

A sample implementation might look like that:

func (m *MyComponent) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()

	query := &Query{} // Query implements Visitor interface
	qbuilder := state.NewQueryBuilder(query)
	if err := qbuilder.BuildQuery(&req.Query); err != nil {
		return &state.QueryResponse{}, err
	}
	data, token, err := query.execute(ctx)
	if err != nil {
		return &state.QueryResponse{}, err
	}
	return &state.QueryResponse{
		Results:  data,
		Token:    token,
	}, nil
}

Some of the examples of State Query API implementation are Redis, MongoDB and CosmosDB state store components.

Documentation

Index

Constants

View Source
const (
	FirstWrite = "first-write"
	LastWrite  = "last-write"
	Strong     = "strong"
	Eventual   = "eventual"
)
View Source
const (
	// GetRespMetaKeyTTLExpireTime is the key for the metadata value of the TTL
	// expire time. Value is a RFC3339 formatted string.
	GetRespMetaKeyTTLExpireTime string = "ttlExpireTime"
)

Variables

View Source
var ErrPingNotImplemented = errors.New("ping is not implemented by this state store")

ErrPingNotImplemented is returned by Ping if the state store does not implement the Pinger interface

Functions

func CheckRequestOptions added in v0.3.0

func CheckRequestOptions(options interface{}) error

CheckRequestOptions checks if request options use supported keywords.

func DoBulkSetDelete added in v1.11.0

func DoBulkSetDelete[T stateRequestConstraint](ctx context.Context, req []T, method func(ctx context.Context, req *T) error, opts BulkStoreOpts) error

DoBulkSetDelete performs BulkSet and BulkDelete.

func Ping added in v1.8.0

func Ping(ctx context.Context, store Store) error

Types

type BaseStore added in v1.11.0

type BaseStore interface {
	Init(ctx context.Context, metadata Metadata) error
	Features() []Feature
	Delete(ctx context.Context, req *DeleteRequest) error
	Get(ctx context.Context, req *GetRequest) (*GetResponse, error)
	Set(ctx context.Context, req *SetRequest) error
}

BaseStore is an interface that contains the base methods for each state store.

type BulkDeleteRowMismatchError added in v1.7.0

type BulkDeleteRowMismatchError struct {
	// contains filtered or unexported fields
}

BulkDeleteRowMismatchError represents mismatch in rowcount while deleting rows.

func NewBulkDeleteRowMismatchError added in v1.7.0

func NewBulkDeleteRowMismatchError(expected, affected uint64) *BulkDeleteRowMismatchError

BulkDeleteRowMismatchError returns a BulkDeleteRowMismatchError.

func (*BulkDeleteRowMismatchError) Error added in v1.7.0

type BulkGetOpts added in v1.11.0

type BulkGetOpts struct {
	// Number of requests made in parallel while retrieving values in bulk.
	// When set to <= 0 (the default value), will fetch all requested values in bulk without limit.
	// Note that if the component implements a native BulkGet method, this value may be ignored.
	Parallelism int
}

BulkGetOpts contains options for the BulkGet method.

type BulkGetResponse added in v1.0.0

type BulkGetResponse struct {
	Key         string            `json:"key"`
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	Error       string            `json:"error,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

BulkGetResponse is the response object for bulk get response.

func DoBulkGet added in v1.11.0

func DoBulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts, getFn func(ctx context.Context, req *GetRequest) (*GetResponse, error)) ([]BulkGetResponse, error)

DoBulkGet performs BulkGet.

type BulkStore added in v1.0.0

type BulkStore interface {
	BulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts) ([]BulkGetResponse, error)
	BulkSet(ctx context.Context, req []SetRequest, opts BulkStoreOpts) error
	BulkDelete(ctx context.Context, req []DeleteRequest, opts BulkStoreOpts) error
}

BulkStore is an interface to perform bulk operations on store.

func NewDefaultBulkStore added in v1.0.0

func NewDefaultBulkStore(base BaseStore) BulkStore

NewDefaultBulkStore build a default bulk store.

type BulkStoreError added in v1.11.0

type BulkStoreError struct {
	// contains filtered or unexported fields
}

BulkStoreError is an error object that contains details on the operations that failed.

func NewBulkStoreError added in v1.11.0

func NewBulkStoreError(key string, err error) BulkStoreError

func (BulkStoreError) ETagError added in v1.11.0

func (e BulkStoreError) ETagError() *ETagError

ETagError returns an *ETagError if the wrapped error is of that kind; otherwise, returns nil

func (BulkStoreError) Error added in v1.11.0

func (e BulkStoreError) Error() string

Error returns the error message. It implements the error interface.

func (BulkStoreError) Key added in v1.11.0

func (e BulkStoreError) Key() string

Key returns the key of the operation that failed.

func (BulkStoreError) Unwrap added in v1.11.0

func (e BulkStoreError) Unwrap() error

Unwrap returns the wrapped error. It implements the error wrapping interface.

type BulkStoreOpts added in v1.11.0

type BulkStoreOpts struct {
	// Number of requests made in parallel while storing/deleting values in bulk.
	// When set to <= 0 (the default value), will perform all operations in parallel without limit.
	Parallelism int
}

BulkStoreOpts contains options for the BulkSet and BulkDelete methods.

type DefaultBulkStore added in v1.0.0

type DefaultBulkStore struct {
	// contains filtered or unexported fields
}

DefaultBulkStore is a default implementation of BulkStore.

func (*DefaultBulkStore) BulkDelete added in v1.0.0

func (b *DefaultBulkStore) BulkDelete(ctx context.Context, req []DeleteRequest, opts BulkStoreOpts) error

BulkDelete performs a bulk delete operation.

func (*DefaultBulkStore) BulkGet added in v1.0.0

func (b *DefaultBulkStore) BulkGet(ctx context.Context, req []GetRequest, opts BulkGetOpts) ([]BulkGetResponse, error)

BulkGet performs a Get operation in bulk.

func (*DefaultBulkStore) BulkSet added in v1.0.0

func (b *DefaultBulkStore) BulkSet(ctx context.Context, req []SetRequest, opts BulkStoreOpts) error

BulkSet performs a bulk save operation.

type DeleteRequest

type DeleteRequest struct {
	Key      string            `json:"key"`
	ETag     *string           `json:"etag,omitempty"`
	Metadata map[string]string `json:"metadata"`
	Options  DeleteStateOption `json:"options,omitempty"`
}

DeleteRequest is the object describing a delete state request.

func (DeleteRequest) GetKey added in v0.2.0

func (r DeleteRequest) GetKey() string

Key gets the Key on a DeleteRequest.

func (DeleteRequest) GetMetadata added in v0.2.0

func (r DeleteRequest) GetMetadata() map[string]string

Metadata gets the Metadata on a DeleteRequest.

func (DeleteRequest) HasETag added in v1.11.0

func (r DeleteRequest) HasETag() bool

HasETag returns true if the request has a non-empty ETag.

func (DeleteRequest) Operation added in v1.11.0

func (r DeleteRequest) Operation() OperationType

Operation returns the operation type for DeleteRequest, implementing TransactionalStateOperationRequest.

type DeleteStateOption

type DeleteStateOption struct {
	Concurrency string `json:"concurrency,omitempty"` // "concurrency"
	Consistency string `json:"consistency"`           // "eventual, strong"
}

DeleteStateOption controls how a state store reacts to a delete request.

type DeleteWithPrefix added in v1.13.0

type DeleteWithPrefix interface {
	DeleteWithPrefix(ctx context.Context, req DeleteWithPrefixRequest) (DeleteWithPrefixResponse, error)
}

DeleteWithPrefix is an optional interface to delete objects with a prefix.

type DeleteWithPrefixRequest added in v1.13.0

type DeleteWithPrefixRequest struct {
	Prefix string `json:"prefix"`
}

DeleteWithPrefixRequest is the object describing a delete with prefix state request used for deleting actors.

func (*DeleteWithPrefixRequest) Validate added in v1.13.0

func (r *DeleteWithPrefixRequest) Validate() error

type DeleteWithPrefixResponse added in v1.13.0

type DeleteWithPrefixResponse struct {
	Count int64 `json:"count"` // count of items removed
}

DeleteWithPrefixResponse is the object representing a delete with prefix state response containing the number of items removed.

type ETagError added in v1.0.0

type ETagError struct {
	// contains filtered or unexported fields
}

ETagError is a custom error type for etag exceptions.

func NewETagError added in v1.0.0

func NewETagError(kind ETagErrorKind, err error) *ETagError

NewETagError returns an ETagError wrapping an existing context error.

func (*ETagError) Error added in v1.0.0

func (e *ETagError) Error() string

func (*ETagError) Kind added in v1.0.0

func (e *ETagError) Kind() ETagErrorKind

func (*ETagError) Unwrap added in v1.11.0

func (e *ETagError) Unwrap() error

type ETagErrorKind added in v1.0.0

type ETagErrorKind string
const (
	ETagInvalid  ETagErrorKind = "invalid"
	ETagMismatch ETagErrorKind = "mismatch"
)

type Feature added in v1.1.1

type Feature = features.Feature[BaseStore]

Feature names a feature that can be implemented by state store components.

const (
	// FeatureETag is the feature to etag metadata in state store.
	FeatureETag Feature = "ETAG"
	// FeatureTransactional is the feature that performs transactional operations.
	FeatureTransactional Feature = "TRANSACTIONAL"
	// FeatureQueryAPI is the feature that performs query operations.
	FeatureQueryAPI Feature = "QUERY_API"
	// FeatureTTL is the feature that supports TTLs.
	FeatureTTL Feature = "TTL"
	// FeatureDeleteWithPrefix is the feature that supports deleting with prefix.
	FeatureDeleteWithPrefix Feature = "DELETE_WITH_PREFIX"
	// FeaturePartitionKey is the feature that supports the partition
	FeaturePartitionKey Feature = "PARTITION_KEY"
)

type GetRequest

type GetRequest struct {
	Key      string            `json:"key"`
	Metadata map[string]string `json:"metadata"`
	Options  GetStateOption    `json:"options,omitempty"`
}

GetRequest is the object describing a state "fetch" request.

func (GetRequest) GetKey added in v1.11.0

func (r GetRequest) GetKey() string

Key gets the Key on a GetRequest.

func (GetRequest) GetMetadata added in v1.11.0

func (r GetRequest) GetMetadata() map[string]string

Metadata gets the Metadata on a GetRequest.

type GetResponse

type GetResponse struct {
	Data        []byte            `json:"data"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

GetResponse is the response object for getting state.

type GetStateOption

type GetStateOption struct {
	Consistency string `json:"consistency"` // "eventual, strong"
}

GetStateOption controls how a state store reacts to a get request.

type Metadata

type Metadata struct {
	metadata.Base `json:",inline"`
}

Metadata contains a state store specific set of metadata properties.

type OperationType

type OperationType string

OperationType describes a CRUD operation performed against a state store.

const (
	// OperationUpsert is an update or create transactional operation.
	OperationUpsert OperationType = "upsert"
	// OperationDelete is a delete transactional operation.
	OperationDelete OperationType = "delete"
)

type Querier added in v1.5.0

type Querier interface {
	Query(ctx context.Context, req *QueryRequest) (*QueryResponse, error)
}

Querier is an interface to execute queries.

type QueryItem added in v1.5.0

type QueryItem struct {
	Key         string  `json:"key"`
	Data        []byte  `json:"data"`
	ETag        *string `json:"etag,omitempty"`
	Error       string  `json:"error,omitempty"`
	ContentType *string `json:"contentType,omitempty"`
}

QueryItem is an object representing a single entry in query results.

type QueryRequest added in v1.5.0

type QueryRequest struct {
	Query    query.Query       `json:"query"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

type QueryResponse added in v1.5.0

type QueryResponse struct {
	Results  []QueryItem       `json:"results"`
	Token    string            `json:"token,omitempty"`
	Metadata map[string]string `json:"metadata,omitempty"`
}

QueryResponse is the response object for querying state.

type SetRequest

type SetRequest struct {
	Key         string            `json:"key"`
	Value       any               `json:"value"`
	ETag        *string           `json:"etag,omitempty"`
	Metadata    map[string]string `json:"metadata,omitempty"`
	Options     SetStateOption    `json:"options,omitempty"`
	ContentType *string           `json:"contentType,omitempty"`
}

SetRequest is the object describing an upsert request.

func (SetRequest) GetKey added in v0.2.0

func (r SetRequest) GetKey() string

GetKey gets the Key on a SetRequest.

func (SetRequest) GetMetadata added in v0.2.0

func (r SetRequest) GetMetadata() map[string]string

GetMetadata gets the Key on a SetRequest.

func (SetRequest) HasETag added in v1.11.0

func (r SetRequest) HasETag() bool

HasETag returns true if the request has a non-empty ETag.

func (SetRequest) Operation added in v1.11.0

func (r SetRequest) Operation() OperationType

Operation returns the operation type for SetRequest, implementing TransactionalStateOperationRequest.

type SetStateOption

type SetStateOption struct {
	Concurrency string // first-write, last-write
	Consistency string // "eventual, strong"
}

SetStateOption controls how a state store reacts to a set request.

type StateRequest added in v1.11.0

type StateRequest interface {
	GetKey() string
	GetMetadata() map[string]string
}

StateRequest is an interface that allows gets of the Key and Metadata inside requests.

type Store

Store is an interface to perform operations on store.

type TransactionalStateOperation added in v0.3.0

type TransactionalStateOperation interface {
	StateRequest

	Operation() OperationType
}

TransactionalStateOperation is an interface for all requests that can be part of a transaction.

type TransactionalStateRequest added in v0.3.0

type TransactionalStateRequest struct {
	Operations []TransactionalStateOperation
	Metadata   map[string]string
}

TransactionalStateRequest describes a transactional operation against a state store that comprises multiple types of operations The Request field is either a DeleteRequest or SetRequest.

type TransactionalStore

type TransactionalStore interface {
	Multi(ctx context.Context, request *TransactionalStateRequest) error
}

TransactionalStore is an interface for initialization and support multiple transactional requests.

type TransactionalStoreMultiMaxSize added in v1.12.0

type TransactionalStoreMultiMaxSize interface {
	MultiMaxSize() int
}

TransactionalStoreMultiMaxSize is an optional interface transactional state stores can implement to indicate the maximum size for a transaction.

Directories

Path Synopsis
alicloud
aws
azure
cloudflare
gcp
hashicorp
Package mongodb is an implementation of StateStore interface to perform operations on store
Package mongodb is an implementation of StateStore interface to perform operations on store
oci
postgresql
v1
v2
Package zookeeper is a generated GoMock package.
Package zookeeper is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL