Documentation ¶
Index ¶
- Constants
- type Elastic
- type Filter
- type FilterType
- type Flavor
- type Http
- type Instance
- type Kafka
- type Mode
- type Mysql
- type OptionPipeline
- func WithAddFilter(filter *Filter) OptionPipeline
- func WithPipeDelete(d bool) OptionPipeline
- func WithPipeMode(mode Mode) OptionPipeline
- func WithPipeSafe(uPipe *Pipeline) OptionPipeline
- func WithPipeStatus(status Status) OptionPipeline
- func WithUpdateFilter(index int, filter *Filter) OptionPipeline
- type OptionPosition
- type OptionRecord
- type Output
- type Pipeline
- type Position
- type RabbitMQ
- type RecordPosition
- type Redis
- type RocketMQ
- type Sender
- type Status
- type Stdout
Constants ¶
const SENDER_TYPE_Elastic = "elastic"
const SENDER_TYPE_KAFKA = "kafka"
const SENDER_TYPE_REDIS = "redis"
const SENDER_TYPE_ROCKETMQ = "rocketMQ"
const SNEDER_TYPE_HTTP = "http"
const SNEDER_TYPE_RABBITMQ = "rabbitMQ"
const SNEDER_TYPE_STDOUT = "stdout"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Filter ¶
type Filter struct { Type FilterType `json:"type"` Rule string `json:"rule"` }
Filter of pipeline
func BlackFilter ¶ added in v1.1.3
BlackFilter returns a black filter
func WhiteFilter ¶ added in v1.1.3
WhiteFilter returns a white filter
type FilterType ¶
type FilterType string
FilterType types of filter
const ( // FILTER_WHITE white list FILTER_WHITE FilterType = "white" // FILTER_BLACK black list FILTER_BLACK FilterType = "black" )
type Instance ¶
type Instance struct { PipelineName string `json:"pipeline_name"` NodeName string `json:"node_name"` CreateTime time.Time `json:"create_time"` }
Instance store struct
type Kafka ¶
type Kafka struct { Brokers string `json:"brokers"` Topic string `json:"topic"` RequiredAcks *sarama.RequiredAcks `json:"require_acks"` Compression *sarama.CompressionCodec `json:"compression"` Retries *int `json:"retries"` Idepotent *bool `json:"idepotent"` }
Kafka output configuration
type Mysql ¶
type Mysql struct { Address string `json:"address"` Port uint16 `json:"port"` User string `json:"user"` Password string `json:"password"` ServerId uint32 `json:"server_id"` Flavor Flavor `json:"flavor"` Mode Mode `json:"mode"` }
Mysql store struct
type OptionPipeline ¶
type OptionPipeline func(p *Pipeline)
OptionPipeline configure pipeline
func WithAddFilter ¶
func WithAddFilter(filter *Filter) OptionPipeline
func WithPipeDelete ¶
func WithPipeDelete(d bool) OptionPipeline
func WithPipeMode ¶
func WithPipeMode(mode Mode) OptionPipeline
func WithPipeStatus ¶
func WithPipeStatus(status Status) OptionPipeline
WithPipeStatus sets pipeline status
func WithUpdateFilter ¶
func WithUpdateFilter(index int, filter *Filter) OptionPipeline
type OptionPosition ¶
type OptionPosition func(position *Position)
OptionPosition Position options
func WithBinlogFile ¶
func WithBinlogFile(b string) OptionPosition
WithBinlogFile sets binlog file to OptionPosition
func WithGTIDSet ¶
func WithGTIDSet(g string) OptionPosition
WithGTIDSet sets GTIDSet to OptionPosition
type OptionRecord ¶ added in v1.1.3
type OptionRecord func(record *RecordPosition)
OptionRecord record position options
func WithPipelineName ¶ added in v1.1.3
func WithPipelineName(name string) OptionRecord
WithPipelineName set RecordPosition pipeline name
type Output ¶
type Output struct {
Sender *Sender `json:"sender"`
}
Output pipeline output
func EmptyOutput ¶ added in v1.1.3
func EmptyOutput() *Output
EmptyOutput return a new empty Output object
type Pipeline ¶
type Pipeline struct { Name string `json:"name"` Status Status `json:"status"` AliasName string `json:"aliasName"` Mysql *Mysql `json:"mysql"` Filters []*Filter `json:"filters"` Output *Output `json:"output"` Replicas int `json:"replicas"` CreateTime time.Time `json:"create_time"` Remark string `json:"remark"` IsDelete bool `json:"is_delete"` // If use newest posion to sync mysql replication when get mysql error 1236 (could not find binary log index) FixPosNewest bool `json:"fix_pos_newest"` }
Pipeline pipeline's definition
func NewPipeline ¶ added in v1.1.3
NewPipeline returns a new pipeline with default values
type Position ¶
type Position struct { BinlogFile string `json:"binlog_file"` BinlogPosition uint32 `json:"binlog_position"` GTIDSet string `json:"gtid_set"` PipelineName string `json:"pipeline_name"` TotalRows int `json:"total_rows"` ConsumeRows int `json:"consume_rows"` }
Position mysql replication position
type RabbitMQ ¶ added in v1.1.3
type RabbitMQ struct { // rabbitMQ url Url string `json:"url"` ExchangeName string `json:"exchange_name"` }
RabbitMQ basic model for pipeline config
type RecordPosition ¶ added in v1.1.3
type RecordPosition struct { PipelineName string `json:"pipeline_name"` Pre *Position `json:"pre"` Now *Position `json:"now"` }
RecordPosition mysql replication position with pre position
func NewRecordPosition ¶ added in v1.1.3
func NewRecordPosition(opts ...OptionRecord) *RecordPosition
NewRecordPosition return empty RecordPosition
type Redis ¶ added in v1.1.3
type Redis struct { Addr string `json:"address"` UserName string `json:"username"` Password string `json:"password"` DB int `json:"db"` List string `json:"list"` }
Redis output configuration
type RocketMQ ¶ added in v1.1.3
type RocketMQ struct { // Endpoint http endpoint Endpoint string `json:"endpoint"` // TopicName topic name TopicName string `json:"topic_name"` // Instance name InstanceId string `json:"instance_id"` AccessKey string `json:"access_key"` SecretKey string `json:"secret_key"` }
RocketMQ aliyun rocketmq configuration
type Sender ¶
type Sender struct { Name string `json:"name"` Type string `json:"type"` Kafka *Kafka `json:"kafka"` Stdout *Stdout `json:"stdout"` Http *Http `json:"http"` RabbitMQ *RabbitMQ `json:"rabbitMQ"` Redis *Redis `json:"redis"` RocketMQ *RocketMQ `json:"rocketMQ"` // todo use interface for sender params Elastic Elastic `json:"elastic"` }
Sender output configuration