Documentation ¶
Overview ¶
Package btrdb implementes a golang client driver for btrdb
For functions returning value, version and error channels, please pay attention to the following concurrenct pattern:
- The value channel must be completely consumed, always.
- The version channel need not be consumed if not required. Only one value will ever be written to the version channel.
- The error channel need not be read, but you cannot assume that there was not an error just because there were values
- You can defer reading the error channel until after the value channel is closed (it will be closed early on error).
A good pattern is the following:
valchan, errchan = some.Method() for v := range valchan { do stuff } if <-errchan != nil { handle error }
Index ¶
- Constants
- Variables
- func EndpointsFromEnv() []string
- func OptKV(iz ...interface{}) map[string]*string
- type AnnotationVersion
- type BTrDB
- func (b *BTrDB) Create(ctx context.Context, uu uuid.UUID, collection string, tags map[string]string, ...) (*Stream, error)
- func (b *BTrDB) Disconnect() error
- func (b *BTrDB) EndpointFor(ctx context.Context, uuid uuid.UUID) (*Endpoint, error)
- func (b *BTrDB) EndpointForHash(ctx context.Context, hash uint32) (*Endpoint, error)
- func (b *BTrDB) GetAnyEndpoint(ctx context.Context) (*Endpoint, error)
- func (b *BTrDB) GetMetadataUsage(ctx context.Context, prefix string) (tags map[string]int, annotations map[string]int, err error)
- func (b *BTrDB) Info(ctx context.Context) (*MASH, error)
- func (b *BTrDB) ListAllCollections(ctx context.Context) ([]string, error)
- func (b *BTrDB) ListCollections(ctx context.Context, prefix string) ([]string, error)
- func (b *BTrDB) LookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, ...) ([]*Stream, error)
- func (b *BTrDB) ReadEndpointFor(ctx context.Context, uuid uuid.UUID) (*Endpoint, error)
- func (b *BTrDB) ResyncMash()
- func (b *BTrDB) SnoopEpErr(ep *Endpoint, err chan error) chan error
- func (b *BTrDB) StreamFromUUID(uu uuid.UUID) *Stream
- func (b *BTrDB) StreamingLookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, ...) (chan *Stream, chan error)
- func (b *BTrDB) TestEpError(ep *Endpoint, err error) bool
- type ChangedRange
- type CodedError
- type Endpoint
- func (b *Endpoint) AlignedWindows(ctx context.Context, uu uuid.UUID, start int64, end int64, pointwidth uint8, ...) (chan StatPoint, chan uint64, chan error)
- func (b *Endpoint) Changes(ctx context.Context, uu uuid.UUID, fromVersion uint64, toVersion uint64, ...) (chan ChangedRange, chan uint64, chan error)
- func (b *Endpoint) Create(ctx context.Context, uu uuid.UUID, collection string, tags map[string]string, ...) error
- func (b *Endpoint) DeleteRange(ctx context.Context, uu uuid.UUID, start int64, end int64) (uint64, error)
- func (b *Endpoint) Disconnect() error
- func (b *Endpoint) FaultInject(ctx context.Context, typ uint64, args []byte) ([]byte, error)
- func (b *Endpoint) Flush(ctx context.Context, uu uuid.UUID) error
- func (b *Endpoint) GetGRPC() pb.BTrDBClient
- func (b *Endpoint) GetMetadataUsage(ctx context.Context, prefix string) (tags map[string]int, annotations map[string]int, err error)
- func (b *Endpoint) Info(ctx context.Context) (*MASH, *pb.InfoResponse, error)
- func (b *Endpoint) Insert(ctx context.Context, uu uuid.UUID, values []*pb.RawPoint) error
- func (b *Endpoint) ListAllCollections(ctx context.Context) ([]string, error)
- func (b *Endpoint) ListCollections(ctx context.Context, prefix string, from string, limit uint64) ([]string, error)
- func (b *Endpoint) LookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, ...) (chan *Stream, chan error)
- func (b *Endpoint) Nearest(ctx context.Context, uu uuid.UUID, time int64, version uint64, backward bool) (RawPoint, uint64, error)
- func (b *Endpoint) Obliterate(ctx context.Context, uu uuid.UUID) error
- func (b *Endpoint) RawValues(ctx context.Context, uu uuid.UUID, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)
- func (b *Endpoint) SetStreamAnnotations(ctx context.Context, uu uuid.UUID, expected AnnotationVersion, ...) error
- func (b *Endpoint) StreamInfo(ctx context.Context, uu uuid.UUID, omitDescriptor bool, omitVersion bool) (collection string, aver AnnotationVersion, tags map[string]string, ...)
- func (b *Endpoint) Windows(ctx context.Context, uu uuid.UUID, start int64, end int64, width uint64, ...) (chan StatPoint, chan uint64, chan error)
- type M
- type MASH
- type RawPoint
- type StatPoint
- type Stream
- func (s *Stream) AlignedWindows(ctx context.Context, start int64, end int64, pointwidth uint8, version uint64) (chan StatPoint, chan uint64, chan error)
- func (s *Stream) Annotations(ctx context.Context) (map[string]string, AnnotationVersion, error)
- func (s *Stream) CachedAnnotations(ctx context.Context) (map[string]string, AnnotationVersion, error)
- func (s *Stream) Changes(ctx context.Context, fromVersion uint64, toVersion uint64, resolution uint8) (crv chan ChangedRange, cver chan uint64, cerr chan error)
- func (s *Stream) Collection(ctx context.Context) (string, error)
- func (s *Stream) CompareAndSetAnnotation(ctx context.Context, expected AnnotationVersion, changes map[string]*string) error
- func (s *Stream) DeleteRange(ctx context.Context, start int64, end int64) (ver uint64, err error)
- func (s *Stream) Exists(ctx context.Context) (bool, error)
- func (s *Stream) Flush(ctx context.Context) error
- func (s *Stream) Insert(ctx context.Context, vals []RawPoint) error
- func (s *Stream) InsertF(ctx context.Context, length int, time func(int) int64, val func(int) float64) error
- func (s *Stream) InsertTV(ctx context.Context, times []int64, values []float64) error
- func (s *Stream) Nearest(ctx context.Context, time int64, version uint64, backward bool) (rv RawPoint, ver uint64, err error)
- func (s *Stream) Obliterate(ctx context.Context) error
- func (s *Stream) RawValues(ctx context.Context, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)
- func (s *Stream) Tags(ctx context.Context) (map[string]string, error)
- func (s *Stream) UUID() uuid.UUID
- func (s *Stream) Version(ctx context.Context) (uint64, error)
- func (s *Stream) Windows(ctx context.Context, start int64, end int64, width uint64, depth uint8, ...) (chan StatPoint, chan uint64, chan error)
Constants ¶
const LatestVersion = 0
LatestVersion can be passed to any functions taking a version to use the latest version of that stream
Variables ¶
var ErrorClusterDegraded = &CodedError{&pb.Status{Code: 419, Msg: "Cluster is degraded"}}
ErrorClusterDegraded is returned when a write operation on an unmapped UUID is attempted. generally the same operation will succeed if attempted once the cluster has recovered.
var ErrorDisconnected = &CodedError{&pb.Status{Code: 421, Msg: "Driver is disconnected"}}
ErrorDisconnected is returned when operations are attempted after Disconnect() is called.
var ErrorWrongArgs = &CodedError{&pb.Status{Code: 421, Msg: "Invalid Arguments"}}
ErrorWrongArgs is returned from API functions if the parameters are nonsensical
Functions ¶
func EndpointsFromEnv ¶
func EndpointsFromEnv() []string
EndpointsFromEnv reads the environment variable BTRDB_ENDPOINTS of the format server:port,server:port,server:port and returns it as a string slice. This function is typically used as btrdb.Connect(btrdb.EndpointsFromEnv()...)
func OptKV ¶
OptKV is a utility function for use in SetAnnotations or LookupStreams that turns a list of arguments into a map[string]*string. Typical use:
OptKV("key","value", //Set or match key=vale "key2", nil) //Delete or match key2=*
OptKV can also take a single map[string]string and return a map[string]*string, e.g
OptKV(stream.Tags()) //Match exactly this set of tags
Types ¶
type AnnotationVersion ¶
type AnnotationVersion uint64
AnnotationVersion is the version of a stream annotation. It begins at 1 for a newly created stream and increases by 1 for each SetStreamAnnotation call. An AnnotationVersion of 0 means "any version"
type BTrDB ¶
type BTrDB struct {
// contains filtered or unexported fields
}
BTrDB is the main object you should use to interact with BTrDB.
func Connect ¶
Connect takes a list of endpoints and returns a BTrDB handle. Note that only a single endpoint is technically required, but having more endpoints will make the initial connection more robust to cluster changes. Different addresses for the same endpoint are permitted
func ConnectAuth ¶
ConnectAuth takes an API key and a list of endpoints and returns a BTrDB handle. Note that only a single endpoint is technically required, but having more endpoints will make the initial connection more robust to cluster changes. Different addresses for the same endpoint are permitted
func (*BTrDB) Disconnect ¶
Disconnect will close all active connections to the cluster. All future calls will return ErrorDisconnected
func (*BTrDB) EndpointFor ¶
EndpointFor returns the endpoint that should be used to write the given uuid
func (*BTrDB) EndpointForHash ¶
EndpointForHash is a low level function that returns a single endpoint for an endpoint hash.
func (*BTrDB) GetAnyEndpoint ¶
func (*BTrDB) GetMetadataUsage ¶
func (*BTrDB) ListAllCollections ¶
func (*BTrDB) ListCollections ¶
func (*BTrDB) LookupStreams ¶
func (*BTrDB) ReadEndpointFor ¶
ReadEndpointFor returns the endpoint that should be used to read the given uuid
func (*BTrDB) ResyncMash ¶
func (b *BTrDB) ResyncMash()
func (*BTrDB) SnoopEpErr ¶
This should invalidate the endpoint if some kind of error occurs. Because some values may have already been delivered, async functions using snoopEpErr will not be able to mask cluster errors from the user
func (*BTrDB) StreamFromUUID ¶
StreamFromUUID creates a stream handle for use in stream operations. it does not ensure that the stream exists, for that use Stream.Exists()
func (*BTrDB) StreamingLookupStreams ¶
type ChangedRange ¶
type CodedError ¶
CodedError is an error that contains a numeric code. Most errors returned by this package are actually *CodedError objects. Use ToCodedError()
func ToCodedError ¶
func ToCodedError(e error) *CodedError
ToCodedError can be used to convert any error into a CodedError. If the error object is actually not coded, it will receive code 501.
func (*CodedError) Error ¶
func (ce *CodedError) Error() string
Error() implements the error interface
type Endpoint ¶
type Endpoint struct {
// contains filtered or unexported fields
}
Endpoint is a low level connection to a single server. Rather use BTrDB which manages creating and destroying Endpoint objects as required
func ConnectEndpoint ¶
ConnectEndpoint is a low level call that connects to a single BTrDB server. It takes multiple arguments, but it is assumed that they are all different addresses for the same server, in decreasing order of priority. It returns a Endpoint, which is generally never used directly. Rather use Connect()
func ConnectEndpointAuth ¶
func ConnectEndpointAuth(ctx context.Context, apikey string, addresses ...string) (*Endpoint, error)
ConnectEndpointAuth is a low level call that connects to a single BTrDB server. It takes multiple arguments, but it is assumed that they are all different addresses for the same server, in decreasing order of priority. It returns a Endpoint, which is generally never used directly. Rather use ConnectAuthenticated()
func (*Endpoint) AlignedWindows ¶
func (b *Endpoint) AlignedWindows(ctx context.Context, uu uuid.UUID, start int64, end int64, pointwidth uint8, version uint64) (chan StatPoint, chan uint64, chan error)
AlignedWindows is a low level function, rather use Stream.AlignedWindows()
func (*Endpoint) Changes ¶
func (b *Endpoint) Changes(ctx context.Context, uu uuid.UUID, fromVersion uint64, toVersion uint64, resolution uint8) (chan ChangedRange, chan uint64, chan error)
Changes is a low level function, rather use BTrDB.Changes()
func (*Endpoint) Create ¶
func (b *Endpoint) Create(ctx context.Context, uu uuid.UUID, collection string, tags map[string]string, annotations map[string]string) error
Create is a low level function, rather use BTrDB.Create()
func (*Endpoint) DeleteRange ¶
func (b *Endpoint) DeleteRange(ctx context.Context, uu uuid.UUID, start int64, end int64) (uint64, error)
DeleteRange is a low level function, rather use Stream.DeleteRange()
func (*Endpoint) Disconnect ¶
Disconnect will close the underlying GRPC connection. The endpoint cannot be used after calling this method.
func (*Endpoint) FaultInject ¶
FaultInject is a debugging function that allows specific low level control of the endpoint. If you have to read the documentation, this is not for you. Server must be started with $BTRDB_ENABLE_FAULT_INJECT=YES
func (*Endpoint) GetGRPC ¶
func (b *Endpoint) GetGRPC() pb.BTrDBClient
GetGRPC will return the underlying GRPC client object.
func (*Endpoint) GetMetadataUsage ¶
func (b *Endpoint) GetMetadataUsage(ctx context.Context, prefix string) (tags map[string]int, annotations map[string]int, err error)
GetMetadataUsage is a low level function. Rather use BTrDB.GetMetadataUsage
func (*Endpoint) ListAllCollections ¶
ListAllCollections is a low level function, and in particular will only work with small numbers of collections. Rather use BTrDB.ListAllCollections()
func (*Endpoint) ListCollections ¶
func (b *Endpoint) ListCollections(ctx context.Context, prefix string, from string, limit uint64) ([]string, error)
ListCollections is a low level function, and in particular has complex constraints. Rather use BTrDB.ListCollections()
func (*Endpoint) LookupStreams ¶
func (b *Endpoint) LookupStreams(ctx context.Context, collection string, isCollectionPrefix bool, tags map[string]*string, annotations map[string]*string, patchDB *BTrDB) (chan *Stream, chan error)
LookupStreams is a low level function, rather use BTrDB.LookupStreams()
func (*Endpoint) Nearest ¶
func (b *Endpoint) Nearest(ctx context.Context, uu uuid.UUID, time int64, version uint64, backward bool) (RawPoint, uint64, error)
Nearest is a low level function, rather use Stream.Nearest()
func (*Endpoint) Obliterate ¶
Obliterate is a low level function, rather use Stream.Obliterate()
func (*Endpoint) RawValues ¶
func (b *Endpoint) RawValues(ctx context.Context, uu uuid.UUID, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)
RawValues is a low level function, rather use Stream.RawValues()
func (*Endpoint) SetStreamAnnotations ¶
func (b *Endpoint) SetStreamAnnotations(ctx context.Context, uu uuid.UUID, expected AnnotationVersion, changes map[string]*string) error
SetStreamAnnotation is a low level function, rather use Stream.SetAnnotation() or Stream.CompareAndSetAnnotation()
func (*Endpoint) StreamInfo ¶
func (b *Endpoint) StreamInfo(ctx context.Context, uu uuid.UUID, omitDescriptor bool, omitVersion bool) ( collection string, aver AnnotationVersion, tags map[string]string, anns map[string]string, version uint64, err error)
StreamAnnotation is a low level function, rather use Stream.Annotation()
type M ¶
M is an alias to neaten code specifying tags: btrdb.LookupStream(ctx, "mycollection", btrdb.M{"tagkey":"tagval"})
type MASH ¶
The MASH struct (Master Allocation by Stable Hashing) contains information about the cluster and which fraction of the uuid space is being served by which endpoints. Generally you will not need to use this, but it is handy for checking the cluster is healthy.
type RawPoint ¶
type RawPoint struct { //Nanoseconds since the epoch Time int64 //Value. Units are stream-dependent Value float64 }
RawPoint represents a single timestamped value
type StatPoint ¶
type StatPoint struct { //The time of the start of the window, in nanoseconds since the epoch UTC Time int64 Min float64 Mean float64 Max float64 Count uint64 }
StatPoint represents a statistical summary of a window. The length of that window must be determined from context (e.g the parameters passed to AlignedWindow or Window methods)
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
Stream is a handle on a Stream in BTrDB. Stream operations should be done through this object.
func (*Stream) AlignedWindows ¶
func (s *Stream) AlignedWindows(ctx context.Context, start int64, end int64, pointwidth uint8, version uint64) (chan StatPoint, chan uint64, chan error)
AlignedWindows reads power-of-two aligned windows from BTrDB. It is faster than Windows(). Each returned window will be 2^pointwidth nanoseconds long, starting at start. Note that start is inclusive, but end is exclusive. That is, results will be returned for all windows that start in the interval [start, end). If end < start+2^pointwidth you will not get any results. If start and end are not powers of two, the bottom pointwidth bits will be cleared. Each window will contain statistical summaries of the window. Statistical points with count == 0 will be omitted.
func (*Stream) Annotations ¶
Annotations returns the annotations of the stream (and the annotation version). It will always require a round trip to the server. If you are ok with stale data and want a higher performance version, use Stream.CachedAnnotations(). Do not modify the resulting map.
func (*Stream) CachedAnnotations ¶
func (s *Stream) CachedAnnotations(ctx context.Context) (map[string]string, AnnotationVersion, error)
CachedAnnotations returns the annotations of the stream, reusing previous results if available, otherwise fetching from the server
func (*Stream) Collection ¶
Collection returns the collection of the stream. It may require a round trip to the server depending on how the stream was acquired
func (*Stream) CompareAndSetAnnotation ¶
func (s *Stream) CompareAndSetAnnotation(ctx context.Context, expected AnnotationVersion, changes map[string]*string) error
CompareAndSetAnnotation will make the changes in the given map (where a nil pointer means delete) as long as the annotation version matches
func (*Stream) DeleteRange ¶
DeleteRange will delete all points between start (inclusive) and end (exclusive). Note that BTrDB has persistent multiversioning, so the deleted points can still be accessed on an older version of the stream returns the version of the stream and any error
func (*Stream) Exists ¶
Exists returns true if the stream exists. This is essential after using StreamFromUUID as the stream may not exist, causing a 404 error on later stream operations. Any operation that returns a stream from collection and tags will have ensured the stream exists already.
func (*Stream) Insert ¶
Insert inserts the given array of RawPoint values. If the array is larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large arrays.
func (*Stream) InsertF ¶
func (s *Stream) InsertF(ctx context.Context, length int, time func(int) int64, val func(int) float64) error
InsertF will call the given time and val functions to get each value of the insertion. It is similar to InsertTV but may require less allocations if your data is already in a different data structure. If the size is larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large size.
func (*Stream) InsertTV ¶
InsertTV allows insertion of two equal length arrays, one containing times and the other containing values. The arrays need not be sorted, but they must correspond (i.e the first element of times is the time for the firt element of values). If the arrays are larger than appropriate, this function will automatically chunk the inserts. As a consequence, the insert is not necessarily atomic, but can be used with very large arrays.
func (*Stream) Nearest ¶
func (s *Stream) Nearest(ctx context.Context, time int64, version uint64, backward bool) (rv RawPoint, ver uint64, err error)
Nearest will return the nearest point to the given time. If backward is false, the returned point will be >= time. If backward is true, the returned point will be <time. The version of the stream used to satisfy the query is returned.
func (*Stream) Obliterate ¶
Obliterate completely removes a stream. This operation is immediate but the space will only be freed slowly
func (*Stream) RawValues ¶
func (s *Stream) RawValues(ctx context.Context, start int64, end int64, version uint64) (chan RawPoint, chan uint64, chan error)
RawValues reads raw values from BTrDB. The returned RawPoint channel must be fully consumed.
func (*Stream) Tags ¶
Tags returns the tags of the stream. It may require a round trip to the server depending on how the stream was acquired. Do not modify the resulting map as it is a reference to the internal stream state
func (*Stream) UUID ¶
UUID returns the stream's UUID. The stream may nor may not exist yet, depending on how the stream object was obtained. See also Stream.Exists()
func (*Stream) Version ¶
Version returns the current data version of the stream. This is not cached, it queries each time. Take care that you do not intorduce races in your code by assuming this function will always return the same vaue
func (*Stream) Windows ¶
func (s *Stream) Windows(ctx context.Context, start int64, end int64, width uint64, depth uint8, version uint64) (chan StatPoint, chan uint64, chan error)
Windows returns arbitrary precision windows from BTrDB. It is slower than AlignedWindows, but still significantly faster than RawValues. Each returned window will be width nanoseconds long. start is inclusive, but end is exclusive (e.g if end < start+width you will get no results). That is, results will be returned for all windows that start at a time less than the end timestamp. If (end - start) is not a multiple of width, then end will be decreased to the greatest value less than end such that (end - start) is a multiple of width (i.e., we set end = start + width * floordiv(end - start, width). The depth parameter is an optimization that can be used to speed up queries on fast queries. Each window will be accurate to 2^depth nanoseconds. If depth is zero, the results are accurate to the nanosecond. On a dense stream for large windows, this accuracy may not be required. For example for a window of a day, +- one second may be appropriate, so a depth of 30 can be specified. This is much faster to execute on the database side. The StatPoint channel MUST be fully consumed.
Directories ¶
Path | Synopsis |
---|---|
Package grpcinterface is a reverse proxy.
|
Package grpcinterface is a reverse proxy. |