pipeline

package
v1.1.21-alpha Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 19, 2024 License: MIT Imports: 4 Imported by: 0

Documentation

Index

Constants

View Source
const SENDER_TYPE_Elastic = "elastic"
View Source
const SENDER_TYPE_KAFKA = "kafka"
View Source
const SENDER_TYPE_REDIS = "redis"
View Source
const SENDER_TYPE_ROCKETMQ = "rocketMQ"
View Source
const SNEDER_TYPE_HTTP = "http"
View Source
const SNEDER_TYPE_RABBITMQ = "rabbitMQ"
View Source
const SNEDER_TYPE_STDOUT = "stdout"

Variables

This section is empty.

Functions

This section is empty.

Types

type Elastic added in v1.1.3

type Elastic struct {
	// Endpoints is elastic addresses
	Endpoints string `json:"endpoints"`
	Index     string `json:"index"`
	Type      string `json:"type"`
	Username  string `json:"username"`
	Password  string `json:"password"`
}

type Filter

type Filter struct {
	Type FilterType `json:"type"`
	Rule string     `json:"rule"`
}

Filter of pipeline

func BlackFilter added in v1.1.3

func BlackFilter(rule string) (f *Filter)

BlackFilter returns a black filter

func WhiteFilter added in v1.1.3

func WhiteFilter(rule string) (f *Filter)

WhiteFilter returns a white filter

type FilterType

type FilterType string

FilterType types of filter

const (
	// FILTER_WHITE white list
	FILTER_WHITE FilterType = "white"
	// FILTER_BLACK black list
	FILTER_BLACK FilterType = "black"
)

type Flavor

type Flavor string

Flavor mysql or mariaDB

const (
	// FLAVOR_MYSQL MySQL DB
	FLAVOR_MYSQL Flavor = "MySQL"
	// FLAVOR_MARIADB MariaDB
	FLAVOR_MARIADB Flavor = "MariaDB"
)

func (Flavor) YaString

func (f Flavor) YaString() string

YaString convert binlogo flavor string

type Http added in v1.0.11

type Http struct {
	API     string `json:"api"`
	Retries int    `json:"retries"`
}

Http output configuration

func EmptyHttp added in v1.1.3

func EmptyHttp() *Http

EmptyHttp return a new empty Http object

type Instance

type Instance struct {
	PipelineName string    `json:"pipeline_name"`
	NodeName     string    `json:"node_name"`
	CreateTime   time.Time `json:"create_time"`
}

Instance store struct

type Kafka

type Kafka struct {
	Brokers      string                   `json:"brokers"`
	Topic        string                   `json:"topic"`
	RequiredAcks *sarama.RequiredAcks     `json:"require_acks"`
	Compression  *sarama.CompressionCodec `json:"compression"`
	Retries      *int                     `json:"retries"`
	Idepotent    *bool                    `json:"idepotent"`
}

Kafka output configuration

type Mode

type Mode string

Mode mysql replication mode

const (
	// MODE_GTID GTID Mode
	MODE_GTID Mode = "gtid"
	// MODE_POSITION common Mode
	MODE_POSITION Mode = "position"
)

type Mysql

type Mysql struct {
	Address  string `json:"address"`
	Port     uint16 `json:"port"`
	User     string `json:"user"`
	Password string `json:"password"`
	ServerId uint32 `json:"server_id"`
	Flavor   Flavor `json:"flavor"`
	Mode     Mode   `json:"mode"`
}

Mysql store struct

type OptionPipeline

type OptionPipeline func(p *Pipeline)

OptionPipeline configure pipeline

func WithAddFilter

func WithAddFilter(filter *Filter) OptionPipeline

func WithPipeDelete

func WithPipeDelete(d bool) OptionPipeline

func WithPipeMode

func WithPipeMode(mode Mode) OptionPipeline

func WithPipeSafe

func WithPipeSafe(uPipe *Pipeline) OptionPipeline

WithPipeSafe sets pipeline

func WithPipeStatus

func WithPipeStatus(status Status) OptionPipeline

WithPipeStatus sets pipeline status

func WithUpdateFilter

func WithUpdateFilter(index int, filter *Filter) OptionPipeline

type OptionPosition

type OptionPosition func(position *Position)

OptionPosition Position options

func WithBinlogFile

func WithBinlogFile(b string) OptionPosition

WithBinlogFile sets binlog file to OptionPosition

func WithGTIDSet

func WithGTIDSet(g string) OptionPosition

WithGTIDSet sets GTIDSet to OptionPosition

func WithPos

func WithPos(p uint32) OptionPosition

WithPos sets binlog position to OptionPosition

type OptionRecord added in v1.1.3

type OptionRecord func(record *RecordPosition)

OptionRecord record position options

func WithNow added in v1.1.3

func WithNow(n *Position) OptionRecord

WithNow record position now

func WithPipelineName added in v1.1.3

func WithPipelineName(name string) OptionRecord

WithPipelineName set RecordPosition pipeline name

func WithPre added in v1.1.3

func WithPre(p *Position) OptionRecord

WithPre record position pre

type Output

type Output struct {
	Sender *Sender `json:"sender"`
}

Output pipeline output

func EmptyOutput added in v1.1.3

func EmptyOutput() *Output

EmptyOutput return a new empty Output object

type Pipeline

type Pipeline struct {
	Name       string    `json:"name"`
	Status     Status    `json:"status"`
	AliasName  string    `json:"aliasName"`
	Mysql      *Mysql    `json:"mysql"`
	Filters    []*Filter `json:"filters"`
	Output     *Output   `json:"output"`
	Replicas   int       `json:"replicas"`
	CreateTime time.Time `json:"create_time"`
	Remark     string    `json:"remark"`
	IsDelete   bool      `json:"is_delete"`
	// If use newest posion to sync mysql replication when get mysql error 1236 (could not find binary log index)
	FixPosNewest bool `json:"fix_pos_newest"`
}

Pipeline pipeline's definition

func NewPipeline added in v1.1.3

func NewPipeline(name string) (pipe *Pipeline)

NewPipeline returns a new pipeline with default values

func (*Pipeline) ExpectRun

func (s *Pipeline) ExpectRun() bool

ExpectRun determine whether the pipeline should run

func (*Pipeline) Key

func (s *Pipeline) Key() (key string)

Key generate etcd key

func (*Pipeline) Unmarshal

func (s *Pipeline) Unmarshal(val []byte) (err error)

Unmarshal generate from json data

func (*Pipeline) Val

func (s *Pipeline) Val() (val string)

Val generate json data

type Position

type Position struct {
	BinlogFile     string `json:"binlog_file"`
	BinlogPosition uint32 `json:"binlog_position"`
	GTIDSet        string `json:"gtid_set"`
	PipelineName   string `json:"pipeline_name"`
	TotalRows      int    `json:"total_rows"`
	ConsumeRows    int    `json:"consume_rows"`
}

Position mysql replication position

func (*Position) Key

func (s *Position) Key() (key string)

Key get etcd key prefix

func (*Position) Reset added in v1.1.3

func (p *Position) Reset()

Reset reset position

func (*Position) Unmarshal

func (s *Position) Unmarshal(val []byte) (err error)

Unmarshal unmarshal json data to object

func (*Position) Val

func (s *Position) Val() (val string)

Val get position json data

type RabbitMQ added in v1.1.3

type RabbitMQ struct {
	// rabbitMQ url
	Url          string `json:"url"`
	ExchangeName string `json:"exchange_name"`
}

RabbitMQ basic model for pipeline config

type RecordPosition added in v1.1.3

type RecordPosition struct {
	PipelineName string    `json:"pipeline_name"`
	Pre          *Position `json:"pre"`
	Now          *Position `json:"now"`
}

RecordPosition mysql replication position with pre position

func NewRecordPosition added in v1.1.3

func NewRecordPosition(opts ...OptionRecord) *RecordPosition

NewRecordPosition return empty RecordPosition

type Redis added in v1.1.3

type Redis struct {
	Addr     string `json:"address"`
	UserName string `json:"username"`
	Password string `json:"password"`
	DB       int    `json:"db"`
	List     string `json:"list"`
}

Redis output configuration

type RocketMQ added in v1.1.3

type RocketMQ struct {
	// Endpoint http endpoint
	Endpoint string `json:"endpoint"`
	// TopicName topic name
	TopicName string `json:"topic_name"`
	// Instance name
	InstanceId string `json:"instance_id"`
	AccessKey  string `json:"access_key"`
	SecretKey  string `json:"secret_key"`
}

RocketMQ aliyun rocketmq configuration

type Sender

type Sender struct {
	Name     string    `json:"name"`
	Type     string    `json:"type"`
	Kafka    *Kafka    `json:"kafka"`
	Stdout   *Stdout   `json:"stdout"`
	Http     *Http     `json:"http"`
	RabbitMQ *RabbitMQ `json:"rabbitMQ"`
	Redis    *Redis    `json:"redis"`
	RocketMQ *RocketMQ `json:"rocketMQ"`
	// todo use interface for sender params
	Elastic Elastic `json:"elastic"`
}

Sender output configuration

type Status

type Status string

Status of Pipeline

const (
	// STATUS_RUN run
	STATUS_RUN Status = "run"
	// STATUS_STOP stop
	STATUS_STOP Status = "stop"
)

type Stdout

type Stdout struct {
}

Stdout output configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL