Documentation ¶
Index ¶
- Constants
- func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
- type BaseBatchWriter
- func (b *BaseBatchWriter) BatchSize() int
- func (b *BaseBatchWriter) BatchTimeout() time.Duration
- func (b *BaseBatchWriter) BatchWrite(ctx context.Context, records []element.Record) (err error)
- func (b *BaseBatchWriter) JobID() int64
- func (b *BaseBatchWriter) TaskGroupID() int64
- func (b *BaseBatchWriter) TaskID() int64
- type BaseConfig
- func (b *BaseConfig) GetBaseTable() *database.BaseTable
- func (b *BaseConfig) GetBatchSize() int
- func (b *BaseConfig) GetBatchTimeout() time.Duration
- func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)
- func (b *BaseConfig) GetPassword() string
- func (b *BaseConfig) GetPostSQL() []string
- func (b *BaseConfig) GetPreSQL() []string
- func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
- func (b *BaseConfig) GetURL() string
- func (b *BaseConfig) GetUsername() string
- func (b *BaseConfig) GetWriteMode() string
- func (b *BaseConfig) IgnoreOneByOneError() bool
- type BaseDbHandler
- type BatchWriter
- type Config
- type DbHandler
- type Execer
- type Job
- func (j *Job) Destroy(ctx context.Context) (err error)
- func (j *Job) Init(ctx context.Context) (err error)
- func (j *Job) Post(ctx context.Context) (err error)
- func (j *Job) Prepare(ctx context.Context) (err error)
- func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)
- type Task
Constants ¶
const ( ExecModeNormal = "Normal" // Non-Transactional Execution ExecModeStmt = "Stmt" // prepare/exec without Transaction ExecModeTx = "Tx" // Transactional Execution ExecModeStmtTx = "StmtTx" // prepare/exec with Transaction )
Execution Mode
Variables ¶
This section is empty.
Functions ¶
func StartWrite ¶
func StartWrite(ctx context.Context, w BatchWriter, receiver plugin.RecordReceiver) (err error)
StartWrite - Begins the process of writing records to the database using the batch writer and record receiver.
Types ¶
type BaseBatchWriter ¶
type BaseBatchWriter struct { Task *Task // contains filtered or unexported fields }
BaseBatchWriter - A basic implementation of a batch writer, providing the fundamental functionality for writing data in batches.
func NewBaseBatchWriter ¶
func NewBaseBatchWriter(task *Task, execMode string, opts *sql.TxOptions) *BaseBatchWriter
NewBaseBatchWriter - Creates a new instance of the basic batch writer based on the task, execution mode, and transaction options.
func (*BaseBatchWriter) BatchSize ¶
func (b *BaseBatchWriter) BatchSize() int
BatchSize - The number of records to be inserted in a single batch.
func (*BaseBatchWriter) BatchTimeout ¶
func (b *BaseBatchWriter) BatchTimeout() time.Duration
BatchTimeout - The maximum time allowed for a single batch insertion.
func (*BaseBatchWriter) BatchWrite ¶
BatchWrite - The process of writing data in batches.
func (*BaseBatchWriter) JobID ¶
func (b *BaseBatchWriter) JobID() int64
JobID - The unique identifier for a job.
func (*BaseBatchWriter) TaskGroupID ¶
func (b *BaseBatchWriter) TaskGroupID() int64
TaskGroupID - The unique identifier for a group of tasks.
func (*BaseBatchWriter) TaskID ¶
func (b *BaseBatchWriter) TaskID() int64
TaskID - The unique identifier for a specific task within a task group.
type BaseConfig ¶
type BaseConfig struct { Username string `json:"username"` // Username Password string `json:"password"` // Password Column []string `json:"column"` // Column Information Connection dbmsreader.ConnConfig `json:"connection"` // Connection Information WriteMode string `json:"writeMode"` // Write Mode, e.g., Insert BatchSize int `json:"batchSize"` // Batch Size for Single Write BatchTimeout time2.Duration `json:"batchTimeout"` // Batch Timeout for Single Write PreSQL []string `json:"preSQL"` // Prepared SQL Statement PostSQL []string `json:"postSQL"` // Ending SQL Statement // contains filtered or unexported fields }
BaseConfig - Basic Relational Database Configuration for writers. Unless there are special requirements, this configuration can be used to quickly implement writers.
func NewBaseConfig ¶
func NewBaseConfig(conf *config.JSON) (c *BaseConfig, err error)
NewBaseConfig - Extract relational database configuration from the configuration file.
func (*BaseConfig) GetBaseTable ¶
func (b *BaseConfig) GetBaseTable() *database.BaseTable
GetBaseTable - Retrieve table information.
func (*BaseConfig) GetBatchSize ¶
func (b *BaseConfig) GetBatchSize() int
GetBatchSize - Retrieve the batch size for a single write.
func (*BaseConfig) GetBatchTimeout ¶
func (b *BaseConfig) GetBatchTimeout() time.Duration
GetBatchTimeout - Retrieve the batch timeout for a single write.
func (*BaseConfig) GetColumns ¶
func (b *BaseConfig) GetColumns() (columns []dbmsreader.Column)
GetColumns - Retrieve column information.
func (*BaseConfig) GetPassword ¶
func (b *BaseConfig) GetPassword() string
GetPassword - Retrieve the password.
func (*BaseConfig) GetPostSQL ¶
func (b *BaseConfig) GetPostSQL() []string
GetPostSQL - Retrieve the ending SQL statement.
func (*BaseConfig) GetPreSQL ¶
func (b *BaseConfig) GetPreSQL() []string
GetPreSQL - Retrieve the prepared SQL statement.
func (*BaseConfig) GetRetryStrategy ¶
func (b *BaseConfig) GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error)
GetRetryStrategy - Retrieve the retry strategy.
func (*BaseConfig) GetURL ¶
func (b *BaseConfig) GetURL() string
GetURL - Retrieve the connection URL.
func (*BaseConfig) GetUsername ¶
func (b *BaseConfig) GetUsername() string
GetUsername - Retrieve the username.
func (*BaseConfig) GetWriteMode ¶
func (b *BaseConfig) GetWriteMode() string
GetWriteMode - Retrieve the write mode.
func (*BaseConfig) IgnoreOneByOneError ¶
func (b *BaseConfig) IgnoreOneByOneError() bool
IgnoreOneByOneError - Ignore individual retry errors.
type BaseDbHandler ¶
type BaseDbHandler struct {
// contains filtered or unexported fields
}
BaseDbHandler Basic Database Execution Handler Encapsulation
func NewBaseDbHandler ¶
func NewBaseDbHandler(newExecer func(name string, conf *config.JSON) (Execer, error), opts *sql.TxOptions) *BaseDbHandler
NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts
func (*BaseDbHandler) Config ¶
func (d *BaseDbHandler) Config(conf *config.JSON) (Config, error)
Config Obtain relational database configuration through configuration
func (*BaseDbHandler) Execer ¶
Execer Obtain an executor through the database name and configuration
func (*BaseDbHandler) TableParam ¶
func (d *BaseDbHandler) TableParam(config Config, execer Execer) database.Parameter
TableParam Obtain table parameters through relational database configuration and executor
type BatchWriter ¶
type BatchWriter interface { JobID() int64 // Job ID - A unique identifier for a job or task. TaskGroupID() int64 // Task Group ID - A unique identifier for a group of tasks. TaskID() int64 // Task ID - A unique identifier for a specific task within a task group. BatchSize() int // Batch Size - The number of records to be written in a single batch. BatchTimeout() time.Duration // Batch Timeout - The maximum time allowed for a single batch write operation. BatchWrite(ctx context.Context, records []element.Record) error // Batch Write - The process of writing data in batches. }
BatchWriter - A tool or component used for writing data in batches.
type Config ¶
type Config interface { GetUsername() string // Get Username GetPassword() string // Get Password GetURL() string // Get Connection URL GetColumns() []dbmsreader.Column // Get Column Information GetBaseTable() *database.BaseTable // Get Table Information GetWriteMode() string // Get Write Mode GetBatchSize() int // Batch Size for Single Write GetBatchTimeout() time.Duration // Batch Timeout for Single Write GetRetryStrategy(j schedule.RetryJudger) (schedule.RetryStrategy, error) // Get Retry Strategy IgnoreOneByOneError() bool // Ignore Individual Retry Errors GetPreSQL() []string // Get Prepared SQL Statement GetPostSQL() []string // Get Ending SQL Statement }
Config - Relational Database Writer Configuration
type DbHandler ¶
type DbHandler interface { Execer(name string, conf *config.JSON) (Execer, error) // Obtain an executor through the database name and configuration Config(conf *config.JSON) (Config, error) // Obtain relational database configuration through configuration TableParam(config Config, execer Execer) database.Parameter // Obtain table parameters through relational database configuration and executor }
DbHandler Database Execution Handler Encapsulation
type Execer ¶
type Execer interface { Table(*database.BaseTable) database.Table // Obtain relational database configuration through configuration PingContext(ctx context.Context) error QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) // BaseDbHandler Basic Database Execution Handler Encapsulation ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) FetchTableWithParam(ctx context.Context, param database.Parameter) (database.Table, error) BatchExec(ctx context.Context, opts *database.ParameterOptions) (err error) // NewBaseDbHandler Create a database execution handler encapsulation using the executor function newExecer and database transaction execution options opts BatchExecStmt(ctx context.Context, opts *database.ParameterOptions) (err error) BatchExecWithTx(ctx context.Context, opts *database.ParameterOptions) (err error) BatchExecStmtWithTx(ctx context.Context, opts *database.ParameterOptions) (err error) Close() error }
type Job ¶
type Job struct { *plugin.BaseJob Handler DbHandler // Database handle Execer Execer // Executor // contains filtered or unexported fields }
Job Work