plugin

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2022 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseJob

type BaseJob struct {
	*BasePlugin
	// contains filtered or unexported fields
}

BaseJob 基础工作,用于辅助和简化工作接口的实现

func NewBaseJob

func NewBaseJob() *BaseJob

NewBaseJob 获取NewBaseJob

func (*BaseJob) Collector

func (b *BaseJob) Collector() JobCollector

Collector 采集器

func (*BaseJob) SetCollector

func (b *BaseJob) SetCollector(collector JobCollector)

SetCollector 设置采集器

type BasePluggable

type BasePluggable struct {
	// contains filtered or unexported fields
}

BasePluggable 基础可插件化 用于辅助各类可插件化接口实现,简化其实现

func NewBasePluggable

func NewBasePluggable() *BasePluggable

NewBasePluggable 创建可插件化插件

func (*BasePluggable) Description

func (b *BasePluggable) Description() (string, error)

Description 插件描述,当description不存在或者不是字符串时会返回错误

func (*BasePluggable) Developer

func (b *BasePluggable) Developer() (string, error)

Developer 插件开发者,当developer不存在或者不是字符串时会返回错误

func (*BasePluggable) PeerPluginJobConf

func (b *BasePluggable) PeerPluginJobConf() *config.JSON

PeerPluginJobConf 设置个性化配置

func (*BasePluggable) PeerPluginName

func (b *BasePluggable) PeerPluginName() string

PeerPluginName 对应插件名称

func (*BasePluggable) PluginConf

func (b *BasePluggable) PluginConf() *config.JSON

PluginConf 插件配置

func (*BasePluggable) PluginJobConf

func (b *BasePluggable) PluginJobConf() *config.JSON

PluginJobConf 工作配置

func (*BasePluggable) PluginName

func (b *BasePluggable) PluginName() (string, error)

PluginName 插件名称,当name不存在或者不是字符串时会返回错误

func (*BasePluggable) SetPeerPluginJobConf

func (b *BasePluggable) SetPeerPluginJobConf(conf *config.JSON)

SetPeerPluginJobConf 设置对应工作配置

func (*BasePluggable) SetPeerPluginName

func (b *BasePluggable) SetPeerPluginName(name string)

SetPeerPluginName 设置对应工作名

func (*BasePluggable) SetPluginConf

func (b *BasePluggable) SetPluginConf(conf *config.JSON)

SetPluginConf 设置插件配置

func (*BasePluggable) SetPluginJobConf

func (b *BasePluggable) SetPluginJobConf(conf *config.JSON)

SetPluginJobConf 设置插件工作配置

type BasePlugin

type BasePlugin struct {
	*BasePluggable
}

BasePlugin 基础插件,用于辅助和简化插件的实现

func NewBasePlugin

func NewBasePlugin() *BasePlugin

NewBasePlugin 创建基础插件

func (*BasePlugin) Post

func (b *BasePlugin) Post(ctx context.Context) error

Post 后置通知空方法

func (*BasePlugin) PostHandler

func (b *BasePlugin) PostHandler(ctx context.Context, conf *config.JSON) error

PostHandler 后置通知处理空方法

func (*BasePlugin) PreCheck

func (b *BasePlugin) PreCheck(ctx context.Context) error

PreCheck 预检查空方法

func (*BasePlugin) PreHandler

func (b *BasePlugin) PreHandler(ctx context.Context, conf *config.JSON) error

PreHandler 预处理空方法

func (*BasePlugin) Prepare

func (b *BasePlugin) Prepare(ctx context.Context) error

Prepare 预备空方法

type BaseTask

type BaseTask struct {
	*BasePlugin
	// contains filtered or unexported fields
}

BaseTask 基础任务,用于辅助和简化任务接口的实现

func NewBaseTask

func NewBaseTask() *BaseTask

NewBaseTask 创建基础任务

func (*BaseTask) Format

func (b *BaseTask) Format(format string) string

Format 日志格式

func (*BaseTask) JobID

func (b *BaseTask) JobID() int64

JobID 工作ID

func (*BaseTask) SetJobID

func (b *BaseTask) SetJobID(jobID int64)

SetJobID 设置工作ID

func (*BaseTask) SetTaskCollector

func (b *BaseTask) SetTaskCollector(collector TaskCollector)

SetTaskCollector 设置任务信息收集器

func (*BaseTask) SetTaskGroupID

func (b *BaseTask) SetTaskGroupID(taskGroupID int64)

SetTaskGroupID 设置任务组ID

func (*BaseTask) SetTaskID

func (b *BaseTask) SetTaskID(taskID int64)

SetTaskID 设置任务ID

func (*BaseTask) TaskCollector

func (b *BaseTask) TaskCollector() TaskCollector

TaskCollector 任务信息收集器

func (*BaseTask) TaskGroupID

func (b *BaseTask) TaskGroupID() int64

TaskGroupID 任务组ID

func (*BaseTask) TaskID

func (b *BaseTask) TaskID() int64

TaskID 任务ID

func (*BaseTask) Wrapf

func (b *BaseTask) Wrapf(err error, format string, args ...interface{}) error

Wrapf 包裹错误

type Job

type Job interface {
	Plugin
	Collector() JobCollector   //todo 工作采集器目前未使用
	SetCollector(JobCollector) //todo  设置工作采集器目前未使用
}

Job 工作

type JobCollector

type JobCollector interface {
	MessageMap() map[string][]string
	MessageByKey(key string) []string
}

JobCollector 工作信息采集器,用于统计整个工作的进度,错误信息等 toto 当前未实现监控模块,为此需要在后面来实现这个接口的结构体

type Pluggable

type Pluggable interface {
	//插件开发者,一般写入插件配置中
	Developer() (string, error)
	//插件描述,一般写入插件配置中
	Description() (string, error)
	//插件名称,一般写入插件配置中
	PluginName() (string, error)
	/*插件配置,基础配置如下,其余可以根据个性化定制
	{
		"name" : "mysqlreader",
		"developer":"Breeze0806",
		"description":"use github.com/go-sql-driver/mysql. database/sql DB execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter."
	}
	*/
	PluginConf() *config.JSON
	//插件工作配置
	PluginJobConf() *config.JSON
	//对应插件名(对于Writer来说就是Reader,对应Reader来说就是Wirter)
	PeerPluginName() string
	//对应插件配置(对于Writer来说就是Reader,对应Reader来说就是Wirter)
	PeerPluginJobConf() *config.JSON
	//设置工作插件
	SetPluginJobConf(conf *config.JSON)
	//设置对应插件配置(对于Writer来说就是Reader,对应Reader来说就是Wirter)
	SetPeerPluginJobConf(conf *config.JSON)
	//设置对应插件名(对于Writer来说就是Reader,对应Reader来说就是Wirter)
	SetPeerPluginName(name string)
	//设置插件配置
	SetPluginConf(conf *config.JSON)
	//初始化插件,需要实现者个性化实现
	Init(ctx context.Context) error
	//销毁插件,需要实现者个性化实现
	Destroy(ctx context.Context) error
}

Pluggable 可插件化接口

type Plugin

type Plugin interface {
	Pluggable
	//预检查,在数据库中用于检查权限
	PreCheck(ctx context.Context) error
	//准备
	Prepare(ctx context.Context) error
	//后置通知
	Post(ctx context.Context) error
	//预备处理, todo当前未用到
	PreHandler(ctx context.Context, conf *config.JSON) error
	//后置通知处理, todo当前未用到
	PostHandler(ctx context.Context, conf *config.JSON) error
}

Plugin 插件

type RecordReceiver

type RecordReceiver interface {
	GetFromReader() (element.Record, error) //从reader中读取记录
	Shutdown() error                        // 关闭
}

RecordReceiver 记录接收器

type RecordSender

type RecordSender interface {
	CreateRecord() (element.Record, error)  //创建记录
	SendWriter(record element.Record) error //将记录发往写入器
	Flush() error                           //将记录刷新到记录发送器
	Terminate() error                       //终止发送信号
	Shutdown() error                        //关闭
}

RecordSender 记录发送器

type Task

type Task interface {
	Plugin

	//任务信息收集器,todo 未使用
	TaskCollector() TaskCollector
	//设置任务信息收集器,todo 未使用
	SetTaskCollector(collector TaskCollector)

	//工作ID
	JobID() int64
	//设置工作ID
	SetJobID(jobID int64)
	//任务组ID
	TaskGroupID() int64
	//设置任务组ID
	SetTaskGroupID(taskGroupID int64)
	//任务ID
	TaskID() int64
	//设置任务ID
	SetTaskID(taskID int64)
	//包裹错误
	Wrapf(err error, format string, args ...interface{}) error
	//Format 日志格式
	Format(format string) string
}

Task 任务接口

type TaskCollector

type TaskCollector interface {
	CollectDirtyRecordWithError(record element.Record, err error)
	CollectDirtyRecordWithMsg(record element.Record, msgErr string)
	CollectDirtyRecord(record element.Record, err error, msgErr string)
	CollectMessage(key string, value string)
}

TaskCollector 任务收集器 todo 当前未使用

type Type

type Type string

Type 插件类型

var (
	Reader      Type = "reader"      //读取器
	Writer      Type = "writer"      //写入器
	Transformer Type = "transformer" //转化器
	Handler     Type = "handler"     //处理器
)

插件类型枚举

func NewType

func NewType(s string) Type

NewType 新增类型

func (Type) IsValid

func (t Type) IsValid() bool

IsValid 是否合法

func (Type) String

func (t Type) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL