Documentation
¶
Overview ¶
Package storage provide generic interface to interact with storage backend.
Index ¶
- Variables
- func GetRecordIndex(msg proto.Message) *structpb.Struct
- func GetRecordIndexCIDR(msg proto.Message) *netip.Prefix
- func IsNotFound(err error) bool
- func MatchAny(any *anypb.Any, query string) bool
- func PatchRecord(existing, record *databroker.Record, fields *fieldmaskpb.FieldMask) error
- func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error)
- func WithQuerier(ctx context.Context, querier Querier) context.Context
- type AndFilterExpression
- type Backend
- type Cache
- type EqualsFilterExpression
- type FilterExpression
- type OrFilterExpression
- type Querier
- type QueryTrace
- type RecordStream
- type RecordStreamFilter
- type RecordStreamGenerator
- type TracingQuerier
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("record not found") ErrStreamDone = errors.New("record stream done") ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version") )
Errors
Functions ¶
func GetRecordIndex ¶ added in v0.18.0
GetRecordIndex gets a record's index. If there is no index, nil is returned.
func GetRecordIndexCIDR ¶ added in v0.18.0
GetRecordIndexCIDR returns the $index.cidr for a record's data. If none is available nil is returned.
func IsNotFound ¶ added in v0.18.0
IsNotFound returns true if the error is because a record was not found.
func PatchRecord ¶ added in v0.24.0
func PatchRecord(existing, record *databroker.Record, fields *fieldmaskpb.FieldMask) error
PatchRecord extracts the data from existing and record, updates the existing data subject to the provided field mask, and stores the result back into record. The existing record is not modified.
func RecordStreamToList ¶ added in v0.17.3
func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error)
RecordStreamToList converts a record stream to a list.
Types ¶
type AndFilterExpression ¶ added in v0.18.0
type AndFilterExpression []FilterExpression
An AndFilterExpression represents a logical-and comparison operator.
type Backend ¶
type Backend interface { // Close closes the backend. Close() error // Get is used to retrieve a record. Get(ctx context.Context, recordType, id string) (*databroker.Record, error) // GetOptions gets the options for a type. GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) // Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned. Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error) // ListTypes lists all the known record types. ListTypes(ctx context.Context) ([]string, error) // Put is used to insert or update records. Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error) // Patch is used to update specific fields of existing records. Patch(ctx context.Context, records []*databroker.Record, fields *fieldmaskpb.FieldMask) (serverVersion uint64, patchedRecords []*databroker.Record, err error) // SetOptions sets the options for a type. SetOptions(ctx context.Context, recordType string, options *databroker.Options) error // Sync syncs record changes after the specified version. Sync(ctx context.Context, recordType string, serverVersion, recordVersion uint64) (RecordStream, error) // SyncLatest syncs all the records. SyncLatest(ctx context.Context, recordType string, filter FilterExpression) (serverVersion, recordVersion uint64, stream RecordStream, err error) }
Backend is the interface required for a storage backend.
type Cache ¶ added in v0.18.0
type Cache interface { GetOrUpdate( ctx context.Context, key []byte, update func(ctx context.Context) ([]byte, error), ) ([]byte, error) Invalidate(key []byte) }
A Cache will return cached data when available or call update when not.
func NewGlobalCache ¶ added in v0.18.0
NewGlobalCache creates a new Cache backed by fastcache and a TTL.
func NewLocalCache ¶ added in v0.18.0
func NewLocalCache() Cache
NewLocalCache creates a new Cache backed by a map.
type EqualsFilterExpression ¶ added in v0.18.0
An EqualsFilterExpression represents a field comparison operator.
type FilterExpression ¶ added in v0.18.0
type FilterExpression interface {
// contains filtered or unexported methods
}
A FilterExpression describes an AST for record stream filters.
func FilterExpressionFromStruct ¶ added in v0.18.0
func FilterExpressionFromStruct(s *structpb.Struct) (FilterExpression, error)
FilterExpressionFromStruct creates a FilterExpression from a protobuf struct.
type OrFilterExpression ¶ added in v0.18.0
type OrFilterExpression []FilterExpression
An OrFilterExpression represents a logical-or comparison operator.
type Querier ¶ added in v0.18.0
type Querier interface { InvalidateCache(ctx context.Context, in *databroker.QueryRequest) Query(ctx context.Context, in *databroker.QueryRequest, opts ...grpc.CallOption) (*databroker.QueryResponse, error) }
A Querier is a read-only subset of the client methods
func GetQuerier ¶ added in v0.18.0
GetQuerier gets the databroker Querier from the context.
func NewCachingQuerier ¶ added in v0.18.0
NewCachingQuerier creates a new querier that caches results in a Cache.
func NewQuerier ¶ added in v0.18.0
func NewQuerier(client databroker.DataBrokerServiceClient) Querier
NewQuerier creates a new Querier that implements the Querier interface by making calls to the databroker over gRPC.
func NewStaticQuerier ¶ added in v0.18.0
NewStaticQuerier creates a Querier that returns statically defined protobuf records.
type QueryTrace ¶ added in v0.18.0
type QueryTrace struct {
ServerVersion, RecordVersion uint64
RecordType string
Query string
Filter *structpb.Struct
}
A QueryTrace traces a call to Query.
type RecordStream ¶ added in v0.14.0
type RecordStream interface { // Close closes the record stream and releases any underlying resources. Close() error // Next is called to retrieve the next record. If one is available it will // be returned immediately. If none is available and block is true, the method // will block until one is available or an error occurs. The error should be // checked with a call to `.Err()`. Next(block bool) bool // Record returns the current record. Record() *databroker.Record // Err returns any error that occurred while streaming. Err() error }
A RecordStream is a stream of records.
func NewConcatenatedRecordStream ¶ added in v0.18.0
func NewConcatenatedRecordStream(streams ...RecordStream) RecordStream
NewConcatenatedRecordStream creates a new record stream that streams all the records from the first stream before streaming all the records of the subsequent streams.
func NewRecordStream ¶ added in v0.17.3
func NewRecordStream( ctx context.Context, backendClosed chan struct{}, generators []RecordStreamGenerator, onClose func(), ) RecordStream
NewRecordStream creates a new RecordStream from a list of generators and an onClose function.
func RecordListToStream ¶ added in v0.17.3
func RecordListToStream(ctx context.Context, records []*databroker.Record) RecordStream
RecordListToStream converts a record list to a stream.
type RecordStreamFilter ¶ added in v0.18.0
type RecordStreamFilter func(record *databroker.Record) (keep bool)
A RecordStreamFilter filters a RecordStream.
func RecordStreamFilterFromFilterExpression ¶ added in v0.18.0
func RecordStreamFilterFromFilterExpression( expr FilterExpression, ) (filter RecordStreamFilter, err error)
RecordStreamFilterFromFilterExpression returns a RecordStreamFilter from a FilterExpression.
func (RecordStreamFilter) And ¶ added in v0.18.0
func (filter RecordStreamFilter) And( then RecordStreamFilter, ) RecordStreamFilter
And creates a new RecordStreamFilter by applying both functions to a record.
type RecordStreamGenerator ¶ added in v0.17.3
A RecordStreamGenerator generates records for a record stream.
func FilteredRecordStreamGenerator ¶ added in v0.18.0
func FilteredRecordStreamGenerator( generator RecordStreamGenerator, filter RecordStreamFilter, ) RecordStreamGenerator
FilteredRecordStreamGenerator creates a RecordStreamGenerator that only returns records that pass the filter.
type TracingQuerier ¶ added in v0.18.0
type TracingQuerier struct {
// contains filtered or unexported fields
}
A TracingQuerier records calls to Query.
func NewTracingQuerier ¶ added in v0.18.0
func NewTracingQuerier(q Querier) *TracingQuerier
NewTracingQuerier creates a new TracingQuerier.
func (*TracingQuerier) InvalidateCache ¶ added in v0.18.0
func (q *TracingQuerier) InvalidateCache(ctx context.Context, in *databroker.QueryRequest)
InvalidateCache invalidates the cache.
func (*TracingQuerier) Query ¶ added in v0.18.0
func (q *TracingQuerier) Query(ctx context.Context, in *databroker.QueryRequest, opts ...grpc.CallOption) (*databroker.QueryResponse, error)
Query queries for records.
func (*TracingQuerier) Traces ¶ added in v0.18.0
func (q *TracingQuerier) Traces() []QueryTrace
Traces returns all the traces.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package inmemory contains an in-memory implementation of the databroker backend.
|
Package inmemory contains an in-memory implementation of the databroker backend. |
Package postgres contains an implementation of the storage.Backend backed by postgres.
|
Package postgres contains an implementation of the storage.Backend backed by postgres. |
Package storagetest contains test cases for use in verifying the behavior of a storage.Backend implementation.
|
Package storagetest contains test cases for use in verifying the behavior of a storage.Backend implementation. |