workload

package
v0.0.0-...-0236cb5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2023 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Overview

package workload contains an interface for orchestrating goroutines. The idea is that we can experiment with different concurrency models without disrupting our call sites.

Index

Constants

View Source
const (
	// For batched inserts, how many rows per API call
	DefaultBatchSize = 100
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Constructor

type Constructor func(WorkloadConfig) (Workload, error)

func GetWorkloadConstructor

func GetWorkloadConstructor(workloadType string) (Constructor, error)

GetWorkload adds future support for different concurrency models

type CoreWorkload

type CoreWorkload struct {
	Context         context.Context
	Config          *config.Config
	Schema          schema.Schema
	MetricsRegistry metrics.Registry

	DataWriteGenerationTimer metrics.Timer // Used to time data generation
	DataReadGenerationTimer  metrics.Timer // Used to time data geenration
	DataWriteTimer           metrics.Timer // Used to time writes
	DataWriteMeter           metrics.Meter // Used to measure volume of writes
	DataReadTimer            metrics.Timer // Used to time reads
	DataReadMeter            metrics.Meter // Used to measure volume of reads
	// contains filtered or unexported fields
}

func (*CoreWorkload) Execute

func (c *CoreWorkload) Execute() error

func (*CoreWorkload) GetGeneratorMap

func (c *CoreWorkload) GetGeneratorMap(t schema.Table) (data.GeneratorMap, error)

GetGeneratorMap will return a generator map suitable for creating insert operations against a table

func (*CoreWorkload) GetOperationSelector

func (c *CoreWorkload) GetOperationSelector() (selector.Selector, error)

func (*CoreWorkload) GetReadGeneratorMap

func (c *CoreWorkload) GetReadGeneratorMap(t schema.Table) (*sample.SampleGenerator, error)

GetReadGeneratorMap will sample rows from the table and create a map structure for creating point reads

func (*CoreWorkload) Initialize

func (c *CoreWorkload) Initialize() error

func (*CoreWorkload) Load

func (c *CoreWorkload) Load(x []string) error

func (*CoreWorkload) Plan

func (c *CoreWorkload) Plan(pt JobType, targets []string) error

Plan will create *Targets for each TargetName

func (*CoreWorkload) Run

func (c *CoreWorkload) Run(x string) error

Run will execute a Run phase against the target table.

func (*CoreWorkload) SampleTable

func (c *CoreWorkload) SampleTable(t schema.Table) (map[string]interface{}, error)

SampleTable will return a map[string]interface of values using the tables primary keys

func (*CoreWorkload) Stop

func (c *CoreWorkload) Stop() error

func (*CoreWorkload) SummarizePlan

func (c *CoreWorkload) SummarizePlan()

type Job

type Job struct {
	JobType           JobType           // Job Type (load or run)
	Context           context.Context   // Context
	Client            *spanner.Client   // Spanner Client
	Table             string            // Table name to execute against
	Operations        int               // How many operations in this job
	Batched           bool              // When true, batch $operations mostly used for load
	BatchSize         int               // Write batch size
	Columns           []string          // Tables column names to ask for during reads
	StaleReads        bool              // Perform stale reads if true
	Staleness         time.Duration     // If performing stale reads, use this exact staleness
	OperationSelector selector.Selector // Weghted choice selector (read or write)

	// Generators
	WriteGenerator data.GeneratorMap       // Generator for making row data
	ReadGenerator  *sample.SampleGenerator // Generator for point reads

	// Metrics
	DataWriteGenerationTimer metrics.Timer // Used to time data generation
	DataReadGenerationTimer  metrics.Timer // Used to time data geenration
	DataWriteTimer           metrics.Timer // Used to time writes
	DataWriteMeter           metrics.Meter // Used to measure volume of writes
	DataReadTimer            metrics.Timer // Used to time reads
	DataReadMeter            metrics.Meter // Used to measure volume of reads

	FatalErr error
}

func (*Job) Execute

func (j *Job) Execute()

func (*Job) InsertBatch

func (j *Job) InsertBatch() error

* InsertBatch will insert $operations rows in batches

func (*Job) InsertOne

func (j *Job) InsertOne() error

* InsertOne will insert one row into the jobs table

func (*Job) ReadOne

func (j *Job) ReadOne() error

type JobType

type JobType uint8
const (
	JobLoad JobType = 1 + iota
	JobRun
)

type Target

type Target struct {
	Config                   *config.Config
	Context                  context.Context
	Client                   *spanner.Client
	JobType                  JobType                 // Determines if we are in a 'run' phase or a 'load' phase
	Table                    schema.Table            // Which table this target points at
	TableName                string                  // string name of the table
	Operations               int                     // Total number of operations to execute against this target
	ColumnNames              []string                // Col names for reads
	OperationSelector        selector.Selector       // If JobType == JobRun this is used to determine if it should be a read op or a write op
	WriteGenerator           data.GeneratorMap       // Map used for generating row data on inserts
	ReadGenerator            *sample.SampleGenerator // Sample generator for generating point reads
	DataWriteGenerationTimer metrics.Timer           // Used to time data generation
	DataReadGenerationTimer  metrics.Timer           // Used to time data geenration
	DataWriteTimer           metrics.Timer           // Used to time writes
	DataWriteMeter           metrics.Meter           // Used to measure volume of writes
	DataReadTimer            metrics.Timer           // Used to time reads
	DataReadMeter            metrics.Meter           // Used to measure volume of reads
}

func FindTargetByName

func FindTargetByName(plan []*Target, name string) *Target

func (*Target) CreateMaps

func (t *Target) CreateMaps(j *Job)

func (*Target) GetGeneratorMap

func (t *Target) GetGeneratorMap() (data.GeneratorMap, error)

GetGeneratorMap will return a generator map suitable for creating insert operations against a table

func (*Target) NewJob

func (t *Target) NewJob() *Job

type WorkerPool

type WorkerPool struct {
	Context context.Context
	Config  *config.Config
	Schema  schema.Schema

	Pool *pool.Pool
	Jobs []pool.Job

	MetricsRegistry metrics.Registry
	// contains filtered or unexported fields
}

func (*WorkerPool) Initialize

func (w *WorkerPool) Initialize() error

func (*WorkerPool) Load

func (w *WorkerPool) Load(tables []string) error

func (*WorkerPool) Run

func (w *WorkerPool) Run(tableName string) error

func (*WorkerPool) Stop

func (w *WorkerPool) Stop() error

Stop the worker pool

type WorkerPoolLoadJob

type WorkerPoolLoadJob struct {
	Context         context.Context
	Client          *spanner.Client
	TableName       string
	RowCount        int
	Statement       string
	GeneratorMap    data.GeneratorMap
	Batch           bool
	BatchSize       int
	WaitGroup       *sync.WaitGroup
	MetricsRegistry metrics.Registry
}

WorkerPoolLoadJob is responsible for inserting data into a table

func (*WorkerPoolLoadJob) Execute

func (j *WorkerPoolLoadJob) Execute()

func (*WorkerPoolLoadJob) InsertDML

func (j *WorkerPoolLoadJob) InsertDML()

func (*WorkerPoolLoadJob) InsertMap

func (j *WorkerPoolLoadJob) InsertMap()

func (*WorkerPoolLoadJob) InsertMapBatch

func (j *WorkerPoolLoadJob) InsertMapBatch()

type WorkerPoolRunJob

type WorkerPoolRunJob struct {
	Context           context.Context
	Client            *spanner.Client
	TableName         string
	ReadMap           data.GeneratorMap // Generate data for point reads
	WriteMap          data.GeneratorMap // Generate data for writes
	OperationSelector selector.Selector // Weghted choice selector (read or write)
	WaitGroup         *sync.WaitGroup
	StaleReads        bool // Should we perform stale reads? If not, strong reads
	Staleness         time.Duration
	Operations        int // How many operations to perform

	Table         schema.Table
	ReadGenerator *sample.SampleGenerator

	MetricsRegistry metrics.Registry
	// contains filtered or unexported fields
}

func (*WorkerPoolRunJob) Execute

func (j *WorkerPoolRunJob) Execute()

func (*WorkerPoolRunJob) Initialize

func (j *WorkerPoolRunJob) Initialize() error

func (*WorkerPoolRunJob) Insert

func (j *WorkerPoolRunJob) Insert() error

func (*WorkerPoolRunJob) ReadStale

func (j *WorkerPoolRunJob) ReadStale() error

func (*WorkerPoolRunJob) ReadStrong

func (j *WorkerPoolRunJob) ReadStrong() error

type Workload

type Workload interface {
	Load([]string) error
	Run(string) error
	Stop() error
}

func NewCoreWorkload

func NewCoreWorkload(cfg WorkloadConfig) (Workload, error)

NewCoreWorkload initializes a "worker pool" type workload

func NewPoolWorkload

func NewPoolWorkload(cfg WorkloadConfig) (Workload, error)

NewPoolWorkload initializes a "worker pool" type workload

type WorkloadConfig

type WorkloadConfig struct {
	Context        context.Context
	Config         *config.Config
	Schema         schema.Schema
	MetricRegistry metrics.Registry
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL