Documentation ¶
Index ¶
- Constants
- Variables
- func GetDriverType(db *sql.DB) int
- type ConnectionPoolOptions
- type DBKey
- type DBOption
- type DBStore
- func (d *DBStore) Count() (int, error)
- func (d *DBStore) GetAll() ([]StoredMessage, error)
- func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error)
- func (d *DBStore) MostRecentTimestamp() (int64, error)
- func (d *DBStore) Put(env *protocol.Envelope) error
- func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error)
- func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error
- func (d *DBStore) Stop()
- func (d *DBStore) Validate(env *protocol.Envelope) error
- type Hash
- type MessageProvider
- type Metrics
- type MigrationFn
- type Queries
- type StoredMessage
Constants ¶
const ( TimestampLength = 8 HashLength = 32 DigestLength = HashLength PubsubTopicLength = HashLength DBKeyLength = TimestampLength + PubsubTopicLength + DigestLength )
const ( UndefinedDriver = iota PostgresDriver SQLiteDriver )
const MaxTimeVariance = time.Duration(20) * time.Second
MaxTimeVariance is the maximum duration in the future allowed for a message timestamp
const WALMode = "wal"
WALMode for sqlite.
Variables ¶
var ErrFutureMessage = errors.New("message timestamp in the future")
ErrFutureMessage indicates that a message with timestamp in future was requested to be stored
var ( // ErrInvalidByteSize is returned when DBKey can't be created // from a byte slice because it has invalid length. ErrInvalidByteSize = errors.New("byte slice has invalid length") )
var ErrInvalidCursor = errors.New("invalid cursor")
ErrInvalidCursor indicates that an invalid cursor has been passed to access store
var ErrMessageTooOld = errors.New("message too old")
ErrMessageTooOld indicates that a message that was too old was requested to be stored.
Functions ¶
func GetDriverType ¶ added in v0.9.0
Types ¶
type ConnectionPoolOptions ¶
type ConnectionPoolOptions struct { MaxOpenConnections int MaxIdleConnections int ConnectionMaxLifetime time.Duration ConnectionMaxIdleTime time.Duration }
ConnectionPoolOptions is the options to be used for DB connection pooling
type DBKey ¶
type DBKey struct {
// contains filtered or unexported fields
}
DBKey key to be stored in a db.
type DBOption ¶
DBOption is an optional setting that can be used to configure the DBStore
func DefaultOptions ¶
func DefaultOptions() []DBOption
DefaultOptions returns the default DBoptions to be used.
func WithDriver ¶
func WithDriver(driverName string, datasourceName string, connectionPoolOptions ...ConnectionPoolOptions) DBOption
WithDriver is a DBOption that will open a *sql.DB connection
func WithMigrations ¶ added in v0.4.0
func WithMigrations(migrationFn MigrationFn) DBOption
WithMigrations is a DBOption used to determine if migrations should be executed, and what driver to use
type DBStore ¶
type DBStore struct { MessageProvider // contains filtered or unexported fields }
DBStore is a MessageProvider that has a *sql.DB connection
func NewDBStore ¶
func NewDBStore(reg prometheus.Registerer, log *zap.Logger, options ...DBOption) (*DBStore, error)
Creates a new DB store using the db specified via options. It will create a messages table if it does not exist and clean up records according to the retention policy used
func (*DBStore) GetAll ¶
func (d *DBStore) GetAll() ([]StoredMessage, error)
GetAll returns all the stored WakuMessages
func (*DBStore) GetStoredMessage ¶
func (d *DBStore) GetStoredMessage(row *sql.Rows) (StoredMessage, error)
GetStoredMessage is a helper function used to convert a `*sql.Rows` into a `StoredMessage`
func (*DBStore) MostRecentTimestamp ¶
MostRecentTimestamp returns an unix timestamp with the most recent senderTimestamp in the message table
func (*DBStore) Query ¶
func (d *DBStore) Query(query *pb.HistoryQuery) (*pb.Index, []StoredMessage, error)
Query retrieves messages from the DB
func (*DBStore) Start ¶
func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) error
Start starts the store server functionality
type Hash ¶
type Hash [HashLength]byte
type MessageProvider ¶
type MessageProvider interface { GetAll() ([]StoredMessage, error) Validate(env *protocol.Envelope) error Put(env *protocol.Envelope) error Query(query *pb.HistoryQuery) ([]StoredMessage, error) MostRecentTimestamp() (int64, error) Start(ctx context.Context, timesource timesource.Timesource) error Stop() }
MessageProvider is an interface that provides access to store/retrieve messages from a persistence store.
type Metrics ¶ added in v0.8.0
type Metrics interface { RecordMessage(num int) RecordError(err metricsErrCategory) RecordInsertDuration(duration time.Duration) RecordQueryDuration(duration time.Duration) }
Metrics exposes the functions required to update prometheus metrics for archive protocol
type MigrationFn ¶ added in v0.8.0
type Queries ¶ added in v0.8.0
type Queries struct {
// contains filtered or unexported fields
}
Queries are the SQL queries for a given table.
func CreateQueries ¶ added in v0.8.0
CreateQueries Function creates a set of queries for an SQL table. Note: Do not use this function to create queries for a table, rather use <rdb>.NewQueries to create table as well as queries.
func (Queries) GetSize ¶ added in v0.8.0
GetSize returns the query for determining the size of a value.
func (Queries) Offset ¶ added in v0.8.0
Offset returns the query fragment for returning rows from a given offset.
func (Queries) Prefix ¶ added in v0.8.0
Prefix returns the query fragment for getting a rows with a key prefix.
type StoredMessage ¶
type StoredMessage struct { ID []byte PubsubTopic string ReceiverTime int64 Message *wpb.WakuMessage }
StoredMessage is the format of the message stored in persistence store