Documentation ¶
Index ¶
- Constants
- type CacheManager
- type DatabaseManager
- type DbManager
- func (mgr DbManager) AddJob(job core.JobDescription, onSave func(job core.JobDescription) error) error
- func (mgr DbManager) Close()
- func (mgr DbManager) DeleteJob(id string, onDelete func(id string) error) error
- func (mgr DbManager) GetJob(id string) (*core.JobDescription, error)
- func (mgr DbManager) GetMeasurements(query MeasurementsQuery) ([]core.Measurement, error)
- func (mgr DbManager) ListJobs() ([]core.JobDescription, error)
- func (mgr DbManager) SaveMeasurements(ctx context.Context, in <-chan *core.Measurement) (<-chan int, <-chan error)
- type DbTestSuite
- func (s *DbTestSuite) SetupTest()
- func (s *DbTestSuite) TearDownSuite()
- func (s *DbTestSuite) TestAddJobCallbackError()
- func (s *DbTestSuite) TestAddJobDupe()
- func (s *DbTestSuite) TestAddJobOk()
- func (s *DbTestSuite) TestDeleteJobCallbackError()
- func (s *DbTestSuite) TestDeleteJobMissing()
- func (s *DbTestSuite) TestDeleteJobOk()
- func (s *DbTestSuite) TestGetJobNotFound()
- func (s *DbTestSuite) TestGetJobSuccess()
- func (s *DbTestSuite) TestGetMeasurements()
- func (s *DbTestSuite) TestListJobs()
- func (s *DbTestSuite) TestSaveMeasurements()
- func (s *DbTestSuite) TestSaveMeasurementsCancel()
- func (s *DbTestSuite) TestSaveMeasurementsChunks()
- type EmbeddedCacheManager
- type MeasurementsQuery
- type PostgresManager
- type RedisCacheManager
- func (cache RedisCacheManager) Close()
- func (cache RedisCacheManager) LoadGaugeStatuses(jobID string) (map[string]core.Status, error)
- func (cache RedisCacheManager) LoadJobStatuses() (map[string]core.Status, error)
- func (cache RedisCacheManager) LoadLatestMeasurements(from map[string]core.StringSet) (map[core.GaugeID]core.Measurement, error)
- func (cache RedisCacheManager) SaveLatestMeasurements(ctx context.Context, in <-chan *core.Measurement) <-chan error
- func (cache RedisCacheManager) SaveStatus(jobID, code string, err error, count int) error
- type SqliteManager
Constants ¶
const ( // NSStatus is redis namespace prefix for job/gauge statuses NSStatus = "status" // NSLatest is redis namespace prefix for latest measurements NSLatest = "latest" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheManager ¶
type CacheManager interface { // LoadJobStatuses returns statuses of currenly running jobs. // returns map where keys are job ids LoadJobStatuses() (map[string]core.Status, error) // LoadGaugeStatuses returns statuses of gauges for given job // returns map where keys are gauge codes LoadGaugeStatuses(jobID string) (map[string]core.Status, error) // SaveStatus saves harvest status for entire job (if code is empty) or single gauge // count means number of saved measurements SaveStatus(jobID, code string, err error, count int) error // LoadLatestMeasurements returns latest measurements // it accepts a map where keys are scripts (not job ids!) and values are sets of gauge codes LoadLatestMeasurements(from map[string]core.StringSet) (map[core.GaugeID]core.Measurement, error) // SaveLatestMeasurements saves given measurements. If there're multiple values per gauge, the most recent one will be saved // Input measurements are supposed to be filtered against previous latest values from cache // This is done inside job (it also ensures we don't save dupe measurements in db) SaveLatestMeasurements(ctx context.Context, in <-chan *core.Measurement) <-chan error // Close is callled when cache must be shut down Close() }
CacheManager manager is used to store latest measurement for each gauge and auxiliary information that is safe to lose
type DatabaseManager ¶
type DatabaseManager interface { // ListJobs returns slice of currently active jobs ListJobs() ([]core.JobDescription, error) // GetJob returns active job by its id GetJob(id string) (*core.JobDescription, error) // AddJob creates new job from description and starts it immediately // onSave argument is used to ensure transactional behavior when adding job to scheduler AddJob(job core.JobDescription, onSave func(job core.JobDescription) error) error // DeleteJon stops running job and deletes it // onDelete argument is used to ensure transactional behavior when adding job to scheduler DeleteJob(id string, onDelete func(id string) error) error // SaveMeasurements saves measurements from the channel in db, until the channel is closed // It supports context cancelation // returns channel where one single int will be written: total number of saved mesurements SaveMeasurements(ctx context.Context, in <-chan *core.Measurement) (<-chan int, <-chan error) // GetMeasurements returns measurements stored in db GetMeasurements(query MeasurementsQuery) ([]core.Measurement, error) // Close is called when db should be shut down Close() }
DatabaseManager is used to store all harvested measurements it's also used to store jobs so that they persist between service restarts
type DbManager ¶
type DbManager struct {
// contains filtered or unexported fields
}
DbManager implements DatabaseManager using sql database
func (DbManager) AddJob ¶
func (mgr DbManager) AddJob(job core.JobDescription, onSave func(job core.JobDescription) error) error
AddJob implements DatabaseManager interface
func (DbManager) GetJob ¶
func (mgr DbManager) GetJob(id string) (*core.JobDescription, error)
GetJob implements DatabaseManager interface
func (DbManager) GetMeasurements ¶
func (mgr DbManager) GetMeasurements(query MeasurementsQuery) ([]core.Measurement, error)
GetMeasurements implements DatabaseManager interface
func (DbManager) ListJobs ¶
func (mgr DbManager) ListJobs() ([]core.JobDescription, error)
ListJobs implements DatabaseManager interface
func (DbManager) SaveMeasurements ¶
func (mgr DbManager) SaveMeasurements(ctx context.Context, in <-chan *core.Measurement) (<-chan int, <-chan error)
SaveMeasurements implements DatabaseManager interface
type DbTestSuite ¶
func (*DbTestSuite) SetupTest ¶
func (s *DbTestSuite) SetupTest()
func (*DbTestSuite) TearDownSuite ¶
func (s *DbTestSuite) TearDownSuite()
func (*DbTestSuite) TestAddJobCallbackError ¶
func (s *DbTestSuite) TestAddJobCallbackError()
func (*DbTestSuite) TestAddJobDupe ¶
func (s *DbTestSuite) TestAddJobDupe()
func (*DbTestSuite) TestAddJobOk ¶
func (s *DbTestSuite) TestAddJobOk()
func (*DbTestSuite) TestDeleteJobCallbackError ¶
func (s *DbTestSuite) TestDeleteJobCallbackError()
func (*DbTestSuite) TestDeleteJobMissing ¶
func (s *DbTestSuite) TestDeleteJobMissing()
func (*DbTestSuite) TestDeleteJobOk ¶
func (s *DbTestSuite) TestDeleteJobOk()
func (*DbTestSuite) TestGetJobNotFound ¶
func (s *DbTestSuite) TestGetJobNotFound()
func (*DbTestSuite) TestGetJobSuccess ¶
func (s *DbTestSuite) TestGetJobSuccess()
func (*DbTestSuite) TestGetMeasurements ¶
func (s *DbTestSuite) TestGetMeasurements()
func (*DbTestSuite) TestListJobs ¶
func (s *DbTestSuite) TestListJobs()
func (*DbTestSuite) TestSaveMeasurements ¶
func (s *DbTestSuite) TestSaveMeasurements()
func (*DbTestSuite) TestSaveMeasurementsCancel ¶
func (s *DbTestSuite) TestSaveMeasurementsCancel()
func (*DbTestSuite) TestSaveMeasurementsChunks ¶
func (s *DbTestSuite) TestSaveMeasurementsChunks()
type EmbeddedCacheManager ¶
type EmbeddedCacheManager struct { RedisCacheManager // contains filtered or unexported fields }
EmbeddedCacheManager is cache manager that uses embedded redis https://github.com/alicebob/miniredis
func NewEmbeddedCacheManager ¶
func NewEmbeddedCacheManager() (*EmbeddedCacheManager, error)
NewEmbeddedCacheManager creates new miniredis cache manager
func (EmbeddedCacheManager) Close ¶
func (cache EmbeddedCacheManager) Close()
Close implements CacheManager interface
type MeasurementsQuery ¶
MeasurementsQuery is intermediate data struct to convert HTTP request to database queries
func NewMeasurementsQuery ¶
func NewMeasurementsQuery(script, code, fromS, toS string) (*MeasurementsQuery, error)
NewMeasurementsQuery builds db query from raw string arguments (passed via URL) If both TO and FROM are empty, a period of 30 days from current db time will be used If TO is empty string, current time from db will be used If given period is longer than 30 days, it will be trimmed to 30 days endig at TO timestamp
type PostgresManager ¶
type PostgresManager struct {
DbManager
}
PostgresManager implements DatabaseManager interface
func NewPostgresManager ¶
func NewPostgresManager(pgConnStr string, chunkSize int) (*PostgresManager, error)
NewPostgresManager creates new PostgresManager with connection string and chunk size
type RedisCacheManager ¶
type RedisCacheManager struct {
// contains filtered or unexported fields
}
RedisCacheManager is cache manager that uses real redis
func NewRedisCacheManager ¶
func NewRedisCacheManager(host, port string) (*RedisCacheManager, error)
NewRedisCacheManager creates new redis cache manager
func (RedisCacheManager) Close ¶
func (cache RedisCacheManager) Close()
Close implements CacheManager interface
func (RedisCacheManager) LoadGaugeStatuses ¶
LoadGaugeStatuses implements CacheManager interface
func (RedisCacheManager) LoadJobStatuses ¶
func (cache RedisCacheManager) LoadJobStatuses() (map[string]core.Status, error)
LoadJobStatuses implements CacheManager interface
func (RedisCacheManager) LoadLatestMeasurements ¶
func (cache RedisCacheManager) LoadLatestMeasurements(from map[string]core.StringSet) (map[core.GaugeID]core.Measurement, error)
LoadLatestMeasurements implements CacheManager interface
func (RedisCacheManager) SaveLatestMeasurements ¶
func (cache RedisCacheManager) SaveLatestMeasurements(ctx context.Context, in <-chan *core.Measurement) <-chan error
SaveLatestMeasurements implements CacheManager interface
func (RedisCacheManager) SaveStatus ¶
func (cache RedisCacheManager) SaveStatus(jobID, code string, err error, count int) error
SaveStatus implements CacheManager interface
type SqliteManager ¶
type SqliteManager struct {
DbManager
}
SqliteManager implements DatabaseManager interface for Sqlite datbase https://github.com/mattn/go-sqlite3
func NewSqliteDb ¶
func NewSqliteDb(chunkSize int) (*SqliteManager, error)
NewSqliteDb creates SqliteManager with given chunkSize SqliteManager cannot be used for write access concurrently. Test usage only