dbms

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 8, 2024 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecModeNormal = "Normal" // Non-Transactional Execution
	ExecModeStmt   = "Stmt"   // prepare/exec without Transaction
	ExecModeTx     = "Tx"     // Transactional Execution
	ExecModeStmtTx = "StmtTx" // prepare/exec with Transaction
)

Execution Mode

Variables

This section is empty.

Functions

func StartWrite

func StartWrite(ctx context.Context, w BatchWriter,
	receiver plugin.RecordReceiver) (err error)

StartWrite - Begins the process of writing records to the database using the batch writer and record receiver.

Types

type BaseBatchWriter

type BaseBatchWriter struct {
	Task *Task
	// contains filtered or unexported fields
}

BaseBatchWriter - A basic implementation of a batch writer, providing the fundamental functionality for writing data in batches.

func NewBaseBatchWriter

func NewBaseBatchWriter(task *Task, execMode string, opts *sql.TxOptions) *BaseBatchWriter

NewBaseBatchWriter - Creates a new instance of the basic batch writer based on the task, execution mode, and transaction options.

func (*BaseBatchWriter) BatchSize

func (b *BaseBatchWriter) BatchSize() int

BatchSize - The number of records to be inserted in a single batch.

func (*BaseBatchWriter) BatchTimeout

func (b *BaseBatchWriter) BatchTimeout() time.Duration

BatchTimeout - The maximum time allowed for a single batch insertion.

func (*BaseBatchWriter) BatchWrite

func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error)

BatchWrite - The process of writing data in batches.

func (*BaseBatchWriter) JobID

func (b *BaseBatchWriter) JobID() int64

JobID - The unique identifier for a job.

func (*BaseBatchWriter) TaskGroupID

func (b *BaseBatchWriter) TaskGroupID() int64

TaskGroupID - The unique identifier for a group of tasks.

func (*BaseBatchWriter) TaskID

func (b *BaseBatchWriter) TaskID() int64

TaskID - The unique identifier for a specific task within a task group.

type BaseConfig

type BaseConfig struct {
	Username     string                `json:"username"`     // Username
	Password     string                `json:"password"`     // Password
	Column       []string              `json:"column"`       // Column Information
	Connection   dbmsreader.ConnConfig `json:"connection"`   // Connection Information
	WriteMode    string                `json:"writeMode"`    // Write Mode, e.g., Insert
	BatchSize    int                   `json:"batchSize"`    // Batch Size for Single Write
	BatchTimeout time2.Duration        `json:"batchTimeout"` // Batch Timeout for Single Write
	PreSQL       []string              `json:"preSQL"`       // Prepared SQL Statement
	PostSQL      []string              `json:"postSQL"`      // Ending SQL Statement
	// contains filtered or unexported fields
}

BaseConfig - Basic Relational Database Configuration for writers. Unless there are special requirements, this configuration can be used to quickly implement writers.

func NewBaseConfig

func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)

NewBaseConfig - Extract relational database configuration from the configuration file.

func (*BaseConfig) GetBaseTable

func (b *BaseConfig) GetBaseTable() *database.BaseTable

GetBaseTable - Retrieve table information.

func (*BaseConfig) GetBatchSize

func (b *BaseConfig) GetBatchSize() int

GetBatchSize - Retrieve the batch size for a single write.

func (*BaseConfig) GetBatchTimeout

func (b *BaseConfig) GetBatchTimeout() time.Duration

GetBatchTimeout - Retrieve the batch timeout for a single write.

func (*BaseConfig) GetColumns

func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)

GetColumns - Retrieve column information.

func (*BaseConfig) GetPassword

func (b *BaseConfig) GetPassword() string

GetPassword - Retrieve the password.

func (*BaseConfig) GetPostSQL

func (b *BaseConfig) GetPostSQL() []string

GetPostSQL - Retrieve the ending SQL statement.

func (*BaseConfig) GetPreSQL

func (b *BaseConfig) GetPreSQL() []string

GetPreSQL - Retrieve the prepared SQL statement.

func (*BaseConfig) GetRetryStrategy

func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy,
	error)

GetRetryStrategy - Retrieve the retry strategy.

func (*BaseConfig) GetURL

func (b *BaseConfig) GetURL() string

GetURL - Retrieve the connection URL.

func (*BaseConfig) GetUsername

func (b *BaseConfig) GetUsername() string

GetUsername - Retrieve the username.

func (*BaseConfig) GetWriteMode

func (b *BaseConfig) GetWriteMode() string

GetWriteMode - Retrieve the write mode.

func (*BaseConfig) IgnoreOneByOneError

func (b *BaseConfig) IgnoreOneByOneError() bool

IgnoreOneByOneError - Ignore individual retry errors.

type BaseDbHandler

type BaseDbHandler struct {
	// contains filtered or unexported fields
}

BaseDbHandler Basic Database Execution Handler Encapsulation

func NewBaseDbHandler

func NewBaseDbHandler(newExecer func(name string, conf *config.JSON) (Execer, error), opts *sql.TxOptions) *BaseDbHandler

NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts

func (*BaseDbHandler) Config

func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)

Config Obtain relational database configuration through configuration

func (*BaseDbHandler) Execer

func (d *BaseDbHandler) Execer(name string, conf *config.JSON) (Execer, error)

Execer Obtain an executor through the database name and configuration

func (*BaseDbHandler) TableParam

func (d *BaseDbHandler) TableParam(config Config, execer Execer) database.Parameter

TableParam Obtain table parameters through relational database configuration and executor

type BatchWriter

type BatchWriter interface {
	JobID() int64                                                   // Job ID - A unique identifier for a job or task.
	TaskGroupID() int64                                             // Task Group ID - A unique identifier for a group of tasks.
	TaskID() int64                                                  // Task ID - A unique identifier for a specific task within a task group.
	BatchSize() int                                                 // Batch Size - The number of records to be written in a single batch.
	BatchTimeout() time.Duration                                    // Batch Timeout - The maximum time allowed for a single batch write operation.
	BatchWrite(ctx context.Context, records []element.Record) error // Batch Write - The process of writing data in batches.
}

BatchWriter - A tool or component used for writing data in batches.

type Config

type Config interface {
	GetUsername() string                                                     // Get Username
	GetPassword() string                                                     // Get Password
	GetURL() string                                                          // Get Connection URL
	GetColumns() []dbmsreader.Column                                         // Get Column Information
	GetBaseTable() *database.BaseTable                                       // Get Table Information
	GetWriteMode() string                                                    // Get Write Mode
	GetBatchSize() int                                                       // Batch Size for Single Write
	GetBatchTimeout() time.Duration                                          // Batch Timeout for Single Write
	GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error) // Get Retry Strategy
	IgnoreOneByOneError() bool                                               // Ignore Individual Retry Errors
	GetPreSQL() []string                                                     // Get Prepared SQL Statement
	GetPostSQL() []string                                                    // Get Ending SQL Statement
}

Config - Relational Database Writer Configuration

type DbHandler

type DbHandler interface {
	Execer(name string, conf *config.JSON) (Execer, error)      // Obtain an executor through the database name and configuration
	Config(conf *config.JSON) (Config, error)                   // Obtain relational database configuration through configuration
	TableParam(config Config, execer Execer) database.Parameter // Obtain table parameters through relational database configuration and executor
}

DbHandler Database Execution Handler Encapsulation

type Execer

type Execer interface {
	Table(*database.BaseTable) database.Table
	// Obtain relational database configuration through configuration
	PingContext(ctx context.Context) error

	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
	// BaseDbHandler Basic Database Execution Handler Encapsulation
	ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)

	FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error)

	BatchExec(ctx context.Context, opts *database.ParameterOptions) (err error)
	// NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts
	BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) (err error)

	BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)

	BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) (err error)

	Close() error
}

type Job

type Job struct {
	*plugin.BaseJob

	Handler DbHandler // Database handle
	Execer  Execer    // Executor
	// contains filtered or unexported fields
}

Job Work

func NewJob

func NewJob(handler DbHandler) *Job

NewJob Get work through database handle

func (*Job) Destroy

func (j *Job) Destroy(ctx context.Context) (err error)

Destroy Destruction

func (*Job) Init

func (j *Job) Init(ctx context.Context) (err error)

Init Initialization

func (*Job) Post

func (j *Job) Post(ctx context.Context) (err error)

Post Post-processing

func (*Job) Prepare

func (j *Job) Prepare(ctx context.Context) (err error)

Prepare Preparation

func (*Job) Split

func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)

Split Task division

type Task

type Task struct {
	*writer.BaseTask

	Handler DbHandler
	Execer  Execer
	Config  Config
	Table   database.Table
}

Task

func NewTask

func NewTask(handler DbHandler) *Task

NewTask Create a task through the database handle

func (*Task) Destroy

func (t *Task) Destroy(ctx context.Context) (err error)

Destroy

func (*Task) Init

func (t *Task) Init(ctx context.Context) (err error)

Init

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL