Documentation
¶
Overview ¶
Package storage provide generic interface to interact with storage backend.
Index ¶
- Variables
- func GetRecordIndex(msg proto.Message) *structpb.Struct
- func GetRecordIndexCIDR(msg proto.Message) *netip.Prefix
- func IsNotFound(err error) bool
- func MatchAny(any *anypb.Any, query string) bool
- func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error)
- func WithQuerier(ctx context.Context, querier Querier) context.Context
- type AndFilterExpression
- type Backend
- type Cache
- type EqualsFilterExpression
- type FilterExpression
- type OrFilterExpression
- type Querier
- type QueryTrace
- type RecordStream
- type RecordStreamFilter
- type RecordStreamGenerator
- type TracingQuerier
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotFound = errors.New("record not found") ErrStreamDone = errors.New("record stream done") ErrInvalidServerVersion = status.Error(codes.Aborted, "invalid server version") )
Errors
Functions ¶
func GetRecordIndex ¶ added in v0.18.0
GetRecordIndex gets a record's index. If there is no index, nil is returned.
func GetRecordIndexCIDR ¶ added in v0.18.0
GetRecordIndexCIDR returns the $index.cidr for a record's data. If none is available nil is returned.
func IsNotFound ¶ added in v0.18.0
IsNotFound returns true if the error is because a record was not found.
func RecordStreamToList ¶ added in v0.17.3
func RecordStreamToList(recordStream RecordStream) ([]*databroker.Record, error)
RecordStreamToList converts a record stream to a list.
Types ¶
type AndFilterExpression ¶ added in v0.18.0
type AndFilterExpression []FilterExpression
An AndFilterExpression represents a logical-and comparison operator.
type Backend ¶
type Backend interface { // Close closes the backend. Close() error // Get is used to retrieve a record. Get(ctx context.Context, recordType, id string) (*databroker.Record, error) // GetOptions gets the options for a type. GetOptions(ctx context.Context, recordType string) (*databroker.Options, error) // Lease acquires a lease, or renews an existing one. If the lease is acquired true is returned. Lease(ctx context.Context, leaseName, leaseID string, ttl time.Duration) (bool, error) // Put is used to insert or update records. Put(ctx context.Context, records []*databroker.Record) (serverVersion uint64, err error) // SetOptions sets the options for a type. SetOptions(ctx context.Context, recordType string, options *databroker.Options) error // Sync syncs record changes after the specified version. Sync(ctx context.Context, recordType string, serverVersion, recordVersion uint64) (RecordStream, error) // SyncLatest syncs all the records. SyncLatest(ctx context.Context, recordType string, filter FilterExpression) (serverVersion, recordVersion uint64, stream RecordStream, err error) }
Backend is the interface required for a storage backend.
type Cache ¶ added in v0.18.0
type Cache interface { GetOrUpdate( ctx context.Context, key []byte, update func(ctx context.Context) ([]byte, error), ) ([]byte, error) Invalidate(key []byte) }
A Cache will return cached data when available or call update when not.
func NewGlobalCache ¶ added in v0.18.0
NewGlobalCache creates a new Cache backed by fastcache and a TTL.
func NewLocalCache ¶ added in v0.18.0
func NewLocalCache() Cache
NewLocalCache creates a new Cache backed by a map.
type EqualsFilterExpression ¶ added in v0.18.0
An EqualsFilterExpression represents a field comparison operator.
type FilterExpression ¶ added in v0.18.0
type FilterExpression interface {
// contains filtered or unexported methods
}
A FilterExpression describes an AST for record stream filters.
func FilterExpressionFromStruct ¶ added in v0.18.0
func FilterExpressionFromStruct(s *structpb.Struct) (FilterExpression, error)
FilterExpressionFromStruct creates a FilterExpression from a protobuf struct.
type OrFilterExpression ¶ added in v0.18.0
type OrFilterExpression []FilterExpression
An OrFilterExpression represents a logical-or comparison operator.
type Querier ¶ added in v0.18.0
type Querier interface { InvalidateCache(ctx context.Context, in *databroker.QueryRequest) Query(ctx context.Context, in *databroker.QueryRequest, opts ...grpc.CallOption) (*databroker.QueryResponse, error) }
A Querier is a read-only subset of the client methods
func GetQuerier ¶ added in v0.18.0
GetQuerier gets the databroker Querier from the context.
func NewCachingQuerier ¶ added in v0.18.0
NewCachingQuerier creates a new querier that caches results in a Cache.
func NewQuerier ¶ added in v0.18.0
func NewQuerier(client databroker.DataBrokerServiceClient) Querier
NewQuerier creates a new Querier that implements the Querier interface by making calls to the databroker over gRPC.
func NewStaticQuerier ¶ added in v0.18.0
NewStaticQuerier creates a Querier that returns statically defined protobuf records.
type QueryTrace ¶ added in v0.18.0
type QueryTrace struct {
ServerVersion, RecordVersion uint64
RecordType string
Query string
Filter *structpb.Struct
}
A QueryTrace traces a call to Query.
type RecordStream ¶ added in v0.14.0
type RecordStream interface { // Close closes the record stream and releases any underlying resources. Close() error // Next is called to retrieve the next record. If one is available it will // be returned immediately. If none is available and block is true, the method // will block until one is available or an error occurs. The error should be // checked with a call to `.Err()`. Next(block bool) bool // Record returns the current record. Record() *databroker.Record // Err returns any error that occurred while streaming. Err() error }
A RecordStream is a stream of records.
func NewConcatenatedRecordStream ¶ added in v0.18.0
func NewConcatenatedRecordStream(streams ...RecordStream) RecordStream
NewConcatenatedRecordStream creates a new record stream that streams all the records from the first stream before streaming all the records of the subsequent streams.
func NewRecordStream ¶ added in v0.17.3
func NewRecordStream( ctx context.Context, backendClosed chan struct{}, generators []RecordStreamGenerator, onClose func(), ) RecordStream
NewRecordStream creates a new RecordStream from a list of generators and an onClose function.
func RecordListToStream ¶ added in v0.17.3
func RecordListToStream(ctx context.Context, records []*databroker.Record) RecordStream
RecordListToStream converts a record list to a stream.
type RecordStreamFilter ¶ added in v0.18.0
type RecordStreamFilter func(record *databroker.Record) (keep bool)
A RecordStreamFilter filters a RecordStream.
func RecordStreamFilterFromFilterExpression ¶ added in v0.18.0
func RecordStreamFilterFromFilterExpression( expr FilterExpression, ) (filter RecordStreamFilter, err error)
RecordStreamFilterFromFilterExpression returns a RecordStreamFilter from a FilterExpression.
func (RecordStreamFilter) And ¶ added in v0.18.0
func (filter RecordStreamFilter) And( then RecordStreamFilter, ) RecordStreamFilter
And creates a new RecordStreamFilter by applying both functions to a record.
type RecordStreamGenerator ¶ added in v0.17.3
A RecordStreamGenerator generates records for a record stream.
func FilteredRecordStreamGenerator ¶ added in v0.18.0
func FilteredRecordStreamGenerator( generator RecordStreamGenerator, filter RecordStreamFilter, ) RecordStreamGenerator
FilteredRecordStreamGenerator creates a RecordStreamGenerator that only returns records that pass the filter.
type TracingQuerier ¶ added in v0.18.0
type TracingQuerier struct {
// contains filtered or unexported fields
}
A TracingQuerier records calls to Query.
func NewTracingQuerier ¶ added in v0.18.0
func NewTracingQuerier(q Querier) *TracingQuerier
NewTracingQuerier creates a new TracingQuerier.
func (*TracingQuerier) InvalidateCache ¶ added in v0.18.0
func (q *TracingQuerier) InvalidateCache(ctx context.Context, in *databroker.QueryRequest)
InvalidateCache invalidates the cache.
func (*TracingQuerier) Query ¶ added in v0.18.0
func (q *TracingQuerier) Query(ctx context.Context, in *databroker.QueryRequest, opts ...grpc.CallOption) (*databroker.QueryResponse, error)
Query queries for records.
func (*TracingQuerier) Traces ¶ added in v0.18.0
func (q *TracingQuerier) Traces() []QueryTrace
Traces returns all the traces.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package inmemory contains an in-memory implementation of the databroker backend.
|
Package inmemory contains an in-memory implementation of the databroker backend. |
Package postgres contains an implementation of the storage.Backend backed by postgres.
|
Package postgres contains an implementation of the storage.Backend backed by postgres. |
Package redis implements the storage.Backend interface for redis.
|
Package redis implements the storage.Backend interface for redis. |