query

package
v0.0.0-...-35a3c3a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 14, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultStatsCollector

type DefaultStatsCollector struct {
	sync.Mutex // Not using RWMutex because in both ops we need to block on R and W.
	// contains filtered or unexported fields
}

DefaultStatsCollector collects Stats from query executions.

func (*DefaultStatsCollector) Add

func (c *DefaultStatsCollector) Add(startedAt, finishedAt time.Time)

Add stores a new data point.

func (*DefaultStatsCollector) Stats

func (c *DefaultStatsCollector) Stats() Stats

Stats generates a new Stats struct based on sent data points.

type DefaultWorker

type DefaultWorker struct {
	Input  chan Query
	Output chan Result
	// contains filtered or unexported fields
}

DefaultWorker is a basic implementation of a Worker.

func NewDefaultWorker

func NewDefaultWorker(ID int, runner Runner, input chan Query, output chan Result) *DefaultWorker

NewDefaultWorker creates a new DefaultWorker.

func (*DefaultWorker) Enqueue

func (w *DefaultWorker) Enqueue(_ context.Context, query Query)

Enqueue enqueues a new query into the worker's processing line. Implements the Worker interface.

func (*DefaultWorker) ID

func (w *DefaultWorker) ID() int

ID returns the ID of the worker. Implements the Worker interface.

func (*DefaultWorker) Start

func (w *DefaultWorker) Start(ctx context.Context) error

Start boots the Worker by starting its processing loop. Implements the Worker interface.

func (*DefaultWorker) Stop

func (w *DefaultWorker) Stop(_ context.Context) error

Stop gracefully stops the worker by waiting for all executing queries. Implements the Worker interface.

type Query

type Query interface {
	fmt.Stringer

	// EntityID returns the ID representing the entity being queried. Useful for sharding, debugging, etc.
	EntityID() string
}

Query represents a query in a DB.

type Result

type Result struct {
	Err      error
	EntityID string
	TS       time.Time
	Max      float64
	Min      float64
}

Result is the product of an executed Query.

type Runner

type Runner func(context.Context, Query) []Result

Runner runs queries in a DB. Runners are Context aware, including Context timeouts.

func WithContextTimeout

func WithContextTimeout(r Runner, maxDuration time.Duration) Runner

WithContextTimeout is a Runner wrapper that sets a timeout in the context of the execution.

func WithStats

func WithStats(r Runner, statsCollector StatsCollector) Runner

WithStats is a Runner wrapper that collect stats through a StatsCollector.

type ShardedWorkerPool

type ShardedWorkerPool struct {
	Input          chan Query
	Output         chan Result
	Workers        []Worker
	WorkerAssigner WorkerAssigner
	// contains filtered or unexported fields
}

ShardedWorkerPool is a WorkerPool that uses standard sharding in order to distribute queries across workers. The hashing algorithm is implemented on WorkerAssigner.

func NewShardedWorkerPool

func NewShardedWorkerPool(queryRunner Runner, numOfWorkers uint, workerAssigner WorkerAssigner, input chan Query, output chan Result) *ShardedWorkerPool

NewShardedWorkerPool creates a ShardedWorkerPool.

func (*ShardedWorkerPool) Start

func (s *ShardedWorkerPool) Start(ctx context.Context) error

Start initiates the pool and enters into a read loop.

func (*ShardedWorkerPool) Stop

func (s *ShardedWorkerPool) Stop(ctx context.Context) error

Stop waits for all queries to be executed and then stop workers and close their channels.

type Stats

type Stats struct {
	// TotalQueries is the # of executed queries
	TotalQueries int

	// TimeAcrossAllQueries is the execution time since the very first query until the last one.
	TimeAcrossAllQueries time.Duration

	// MinTime is the lowest query execution time.
	MinTime time.Duration

	// MedianTime is by the duration in the middle, where one half of the durations are higher and the other half are slower.
	MedianTime time.Duration

	// AvgTime is the sum of all durations divided by the total number of queries.
	AvgTime time.Duration

	// MaxTime is the highest query execution time.
	MaxTime time.Duration
}

Stats are stats collected during Query executions.

type StatsCollector

type StatsCollector interface {
	Add(startedAt, finishedAt time.Time)
	Stats() Stats
}

StatsCollector collect stats from query executions.

type Worker

type Worker interface {
	run.Startable
	run.Stoppable

	// ID returns the ID of the worker.
	ID() int

	// Enqueue enqueues a new query into the worker's processing line.
	Enqueue(context.Context, Query)
}

Worker represents a worker of a WorkerPool.

func ShardingFNV1aWorkerAssigner

func ShardingFNV1aWorkerAssigner(q Query, workers []Worker) Worker

ShardingFNV1aWorkerAssigner is a WorkerAssigner that foll

type WorkerAssigner

type WorkerAssigner func(Query, []Worker) Worker

WorkerAssigner assigns a Worker based on a Query and available workers.

type WorkerPool

type WorkerPool interface {
	run.Startable
	run.Stoppable
}

WorkerPool is a pool of Worker

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL