Documentation ¶
Index ¶
- func GetReaderConfigConstructor(name string) func() interface{}
- func NewHttpReaderConfigFunc() interface{}
- func NewKafkaReaderConfigFunc() interface{}
- func SetReaderConfigConstructor(name string, fn func() interface{})
- func SetReaderConstructor(name string, fn ReaderConstructor)
- type HttpReader
- type HttpReaderConfig
- type KafkaReader
- type KafkaReaderConfig
- type Reader
- type ReaderBase
- type ReaderConfig
- type ReaderConfigByType
- type ReaderConstructor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetReaderConfigConstructor ¶
func GetReaderConfigConstructor(name string) func() interface{}
GetReaderConfigConstructor 获取读取器配置构造函数
func NewHttpReaderConfigFunc ¶
func NewHttpReaderConfigFunc() interface{}
func NewKafkaReaderConfigFunc ¶
func NewKafkaReaderConfigFunc() interface{}
func SetReaderConfigConstructor ¶
func SetReaderConfigConstructor(name string, fn func() interface{})
SetReaderConfigConstructor 注册读取器配置构造函数
func SetReaderConstructor ¶
func SetReaderConstructor(name string, fn ReaderConstructor)
SetReaderConstructor 注册读取器构造函数
Types ¶
type HttpReader ¶
type HttpReader struct { ReaderBase // contains filtered or unexported fields }
func (*HttpReader) Close ¶
func (h *HttpReader) Close() error
func (*HttpReader) Complete ¶
func (h *HttpReader) Complete(params *types.BinlogParams) error
func (*HttpReader) Read ¶
func (h *HttpReader) Read() (*types.BinlogParams, error)
type HttpReaderConfig ¶
type HttpReaderConfig struct { Listen string `json:"listen" yaml:"listen"` // 监听端口 PushPath string `json:"push_path" yaml:"push_path"` // 请求接收路径 PreParamsLen int32 `json:"pre_params_len,omitempty" yaml:"pre_params_len,omitempty" default:"10"` // params 管道缓冲长度 PushTimeout time.Duration `json:"push_timeout,omitempty" yaml:"push_timeout,omitempty" default:"1s"` // 超时时间 }
func (*HttpReaderConfig) Equal ¶
func (h *HttpReaderConfig) Equal(config ReaderConfig) bool
func (*HttpReaderConfig) GetUniqueId ¶
func (h *HttpReaderConfig) GetUniqueId() string
type KafkaReader ¶
type KafkaReader struct { ReaderBase // contains filtered or unexported fields }
func (*KafkaReader) Close ¶
func (k *KafkaReader) Close() error
func (*KafkaReader) Complete ¶
func (k *KafkaReader) Complete(params *types.BinlogParams) error
func (*KafkaReader) Read ¶
func (k *KafkaReader) Read() (*types.BinlogParams, error)
type KafkaReaderConfig ¶
type KafkaReaderConfig struct { Brokers []string `json:"brokers" yaml:"brokers" default:"localhost:9092"` Username string `json:"username,omitempty" yaml:"username,omitempty"` Password string `json:"password,omitempty" yaml:"password,omitempty"` Group string `json:"group" yaml:"group" default:"test"` Topic string `json:"topic" yaml:"topic"` Partition int `json:"partition" yaml:"partition"` MinBytes int `json:"min_bytes,omitempty" yaml:"min_bytes,omitempty" default:"10240"` MaxBytes int `json:"max_bytes,omitempty" yaml:"max_bytes,omitempty" default:"10485760"` StartOffset int64 `json:"start_offset,omitempty" yaml:"start_offset,omitempty" default:"-1"` MaxWait time.Duration `json:"max_wait,omitempty" yaml:"max_wait,omitempty" default:"1s"` CommitInterval time.Duration `json:"commit_interval,omitempty" yaml:"commit_interval,omitempty" default:"1s"` QueueCapacity int `json:"queue_capacity,omitempty" yaml:"queue_capacity,omitempty" default:"1000"` }
func (*KafkaReaderConfig) Equal ¶
func (k *KafkaReaderConfig) Equal(config ReaderConfig) bool
func (*KafkaReaderConfig) GetUniqueId ¶
func (k *KafkaReaderConfig) GetUniqueId() string
type Reader ¶
type Reader interface { Read() (*types.BinlogParams, error) Complete(params *types.BinlogParams) error GetConfig() ReaderConfig GetCtx() context.Context Close() error }
func NewHttpReaderFunc ¶
func NewKafkaReaderFunc ¶
type ReaderBase ¶
type ReaderBase struct {
// contains filtered or unexported fields
}
func NewReaderBase ¶
func NewReaderBase(conf ReaderConfig, parent context.Context) ReaderBase
func (*ReaderBase) FirstClose ¶
func (r *ReaderBase) FirstClose() bool
func (*ReaderBase) GetConfig ¶
func (r *ReaderBase) GetConfig() ReaderConfig
func (*ReaderBase) GetCtx ¶
func (r *ReaderBase) GetCtx() context.Context
type ReaderConfig ¶
type ReaderConfig interface { GetUniqueId() string Equal(ReaderConfig) bool }
type ReaderConfigByType ¶
type ReaderConfigByType struct { Type string `json:"type"` Config ReaderConfig `json:"-"` }
ReaderConfigByType 读取器配置携带type参数 json 反序列化时通过type获取具体的类型,再进行实例化
func (*ReaderConfigByType) UnmarshalJSON ¶
func (r *ReaderConfigByType) UnmarshalJSON(bytes []byte) error
UnmarshalJSON 根据type 获取到具体到 config 结构体,并重新序列化赋值
type ReaderConstructor ¶
func GetReaderConstructor ¶
func GetReaderConstructor(name string) ReaderConstructor
GetReaderConstructor 获取读取器构造函数
Click to show internal directories.
Click to hide internal directories.