Documentation ¶
Index ¶
- Constants
- func NewBatchDecoder(ctx context.Context, codecConfig *common.Config, db *sql.DB) (codec.RowEventDecoder, error)
- func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder
- func NewCanalJSONTxnEventDecoder(codecConfig *common.Config) *canalJSONTxnEventDecoder
- func NewJSONRowEventEncoderBuilder(ctx context.Context, config *common.Config) (codec.RowEventEncoderBuilder, error)
- func NewJSONTxnEventEncoderBuilder(config *common.Config) codec.TxnEventEncoderBuilder
- type BatchEncoder
- func (d *BatchEncoder) AppendRowChangedEvent(_ context.Context, _ string, e *model.RowChangedEvent, callback func()) error
- func (d *BatchEncoder) Build() []*common.Message
- func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
- func (d *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error)
- type JSONMessage
- type JSONRowEventEncoder
- func (c *JSONRowEventEncoder) AppendRowChangedEvent(ctx context.Context, _ string, e *model.RowChangedEvent, callback func()) error
- func (c *JSONRowEventEncoder) Build() []*common.Message
- func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
- func (c *JSONRowEventEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error)
- type JSONTxnEventEncoder
Constants ¶
const ( CanalPacketVersion int32 = 1 CanalProtocolVersion int32 = 1 CanalServerEncode string = "UTF-8" )
compatible with canal-1.1.4 https://github.com/alibaba/canal/tree/canal-1.1.4
Variables ¶
This section is empty.
Functions ¶
func NewBatchDecoder ¶
func NewBatchDecoder( ctx context.Context, codecConfig *common.Config, db *sql.DB, ) (codec.RowEventDecoder, error)
NewBatchDecoder return a decoder for canal-json
func NewBatchEncoderBuilder ¶
func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder
NewBatchEncoderBuilder creates a canal batchEncoderBuilder.
func NewCanalJSONTxnEventDecoder ¶
NewCanalJSONTxnEventDecoder return a new CanalJSONTxnEventDecoder.
func NewJSONRowEventEncoderBuilder ¶
func NewJSONRowEventEncoderBuilder(ctx context.Context, config *common.Config) (codec.RowEventEncoderBuilder, error)
NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder.
func NewJSONTxnEventEncoderBuilder ¶
func NewJSONTxnEventEncoderBuilder(config *common.Config) codec.TxnEventEncoderBuilder
NewJSONTxnEventEncoderBuilder creates a jsonTxnEventEncoderBuilder.
Types ¶
type BatchEncoder ¶
type BatchEncoder struct {
// contains filtered or unexported fields
}
BatchEncoder encodes the events into the byte of a batch into.
func (*BatchEncoder) AppendRowChangedEvent ¶
func (d *BatchEncoder) AppendRowChangedEvent( _ context.Context, _ string, e *model.RowChangedEvent, callback func(), ) error
AppendRowChangedEvent implements the RowEventEncoder interface
func (*BatchEncoder) Build ¶
func (d *BatchEncoder) Build() []*common.Message
Build implements the RowEventEncoder interface
func (*BatchEncoder) EncodeCheckpointEvent ¶
func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
EncodeCheckpointEvent implements the RowEventEncoder interface
func (*BatchEncoder) EncodeDDLEvent ¶
EncodeDDLEvent implements the RowEventEncoder interface
type JSONMessage ¶
type JSONMessage struct { // ignored by consumers ID int64 `json:"id"` Schema string `json:"database"` Table string `json:"table"` PKNames []string `json:"pkNames"` IsDDL bool `json:"isDdl"` EventType string `json:"type"` // officially the timestamp of the event-time of the message, in milliseconds since Epoch. ExecutionTime int64 `json:"es"` // officially the timestamp of building the message, in milliseconds since Epoch. BuildTime int64 `json:"ts"` // SQL that generated the change event, DDL or Query Query string `json:"sql"` // only works for INSERT / UPDATE / DELETE events, records each column's java representation type. SQLType map[string]int32 `json:"sqlType"` // only works for INSERT / UPDATE / DELETE events, records each column's mysql representation type. MySQLType map[string]string `json:"mysqlType"` // A Datum should be a string or nil Data []map[string]interface{} `json:"data"` Old []map[string]interface{} `json:"old"` }
JSONMessage adapted from https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java#L1
type JSONRowEventEncoder ¶
type JSONRowEventEncoder struct {
// contains filtered or unexported fields
}
JSONRowEventEncoder encodes row event in JSON format
func (*JSONRowEventEncoder) AppendRowChangedEvent ¶
func (c *JSONRowEventEncoder) AppendRowChangedEvent( ctx context.Context, _ string, e *model.RowChangedEvent, callback func(), ) error
AppendRowChangedEvent implements the interface EventJSONBatchEncoder
func (*JSONRowEventEncoder) Build ¶
func (c *JSONRowEventEncoder) Build() []*common.Message
Build implements the RowEventEncoder interface
func (*JSONRowEventEncoder) EncodeCheckpointEvent ¶
func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)
EncodeCheckpointEvent implements the RowEventEncoder interface
func (*JSONRowEventEncoder) EncodeDDLEvent ¶
EncodeDDLEvent encodes DDL events
type JSONTxnEventEncoder ¶
type JSONTxnEventEncoder struct {
// contains filtered or unexported fields
}
JSONTxnEventEncoder encodes txn event in JSON format
func (*JSONTxnEventEncoder) AppendTxnEvent ¶
func (j *JSONTxnEventEncoder) AppendTxnEvent( txn *model.SingleTableTxn, callback func(), ) error
AppendTxnEvent appends a txn event to the encoder.
func (*JSONTxnEventEncoder) Build ¶
func (j *JSONTxnEventEncoder) Build() []*common.Message
Build builds a message from the encoder and resets the encoder.