Documentation ¶
Index ¶
- type DDLProducer
- func NewKafkaDDLProducer(_ context.Context, changefeedID model.ChangeFeedID, ...) DDLProducer
- func NewMockDDLProducer(_ context.Context, _ model.ChangeFeedID, _ kafka.SyncProducer) DDLProducer
- func NewMockPulsarProducerDDL(ctx context.Context, changefeedID model.ChangeFeedID, ...) (DDLProducer, error)
- func NewPulsarProducer(ctx context.Context, changefeedID model.ChangeFeedID, ...) (DDLProducer, error)
- type Factory
- type MockDDLProducer
- func (m *MockDDLProducer) Close()
- func (m *MockDDLProducer) GetAllEvents() []*common.Message
- func (m *MockDDLProducer) GetEvents(topic string, partitionNum int32) []*common.Message
- func (m *MockDDLProducer) SyncBroadcastMessage(ctx context.Context, topic string, totalPartitionsNum int32, ...) error
- func (m *MockDDLProducer) SyncSendMessage(_ context.Context, topic string, partitionNum int32, message *common.Message) error
- type PulsarFactory
- type PulsarMockProducers
- func (p *PulsarMockProducers) Close()
- func (p *PulsarMockProducers) Flush(ctx context.Context) error
- func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage
- func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage
- func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error)
- func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, totalPartitionsNum int32, ...) error
- func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, partitionNum int32, message *common.Message) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DDLProducer ¶
type DDLProducer interface { // SyncBroadcastMessage broadcasts a message synchronously. SyncBroadcastMessage( ctx context.Context, topic string, totalPartitionsNum int32, message *common.Message, ) error // SyncSendMessage sends a message for a partition synchronously. SyncSendMessage( ctx context.Context, topic string, partitionNum int32, message *common.Message, ) error // Close closes the producer. Close() }
DDLProducer is the interface for DDL message producer.
func NewKafkaDDLProducer ¶
func NewKafkaDDLProducer(_ context.Context, changefeedID model.ChangeFeedID, syncProducer kafka.SyncProducer, ) DDLProducer
NewKafkaDDLProducer creates a new kafka producer for replicating DDL.
func NewMockDDLProducer ¶
func NewMockDDLProducer(_ context.Context, _ model.ChangeFeedID, _ kafka.SyncProducer) DDLProducer
NewMockDDLProducer creates a mock producer.
func NewMockPulsarProducerDDL ¶
func NewMockPulsarProducerDDL( ctx context.Context, changefeedID model.ChangeFeedID, pConfig *config.PulsarConfig, client pulsar.Client, sinkConfig *config.SinkConfig, ) (DDLProducer, error)
NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer
func NewPulsarProducer ¶
func NewPulsarProducer( ctx context.Context, changefeedID model.ChangeFeedID, pConfig *config.PulsarConfig, client pulsar.Client, sinkConfig *config.SinkConfig, ) (DDLProducer, error)
NewPulsarProducer creates a pulsar producer
type Factory ¶
type Factory func(ctx context.Context, changefeedID model.ChangeFeedID, syncProducer kafka.SyncProducer) DDLProducer
Factory is a function to create a producer.
type MockDDLProducer ¶
type MockDDLProducer struct {
// contains filtered or unexported fields
}
MockDDLProducer is a mock producer for test.
func (*MockDDLProducer) GetAllEvents ¶
func (m *MockDDLProducer) GetAllEvents() []*common.Message
GetAllEvents returns the events received by the mock producer.
func (*MockDDLProducer) GetEvents ¶
func (m *MockDDLProducer) GetEvents(topic string, partitionNum int32, ) []*common.Message
GetEvents returns the event filtered by the key.
func (*MockDDLProducer) SyncBroadcastMessage ¶
func (m *MockDDLProducer) SyncBroadcastMessage(ctx context.Context, topic string, totalPartitionsNum int32, message *common.Message, ) error
SyncBroadcastMessage stores a message to all partitions of the topic.
func (*MockDDLProducer) SyncSendMessage ¶
func (m *MockDDLProducer) SyncSendMessage(_ context.Context, topic string, partitionNum int32, message *common.Message, ) error
SyncSendMessage stores a message to a partition of the topic.
type PulsarFactory ¶
type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID, pConfig *config.PulsarConfig, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error)
PulsarFactory is a function to create a pulsar producer.
type PulsarMockProducers ¶
type PulsarMockProducers struct {
// contains filtered or unexported fields
}
PulsarMockProducers is a mock pulsar producer
func NewMockPulsarProducer ¶
func NewMockPulsarProducer( ctx context.Context, changefeedID model.ChangeFeedID, pConfig *config.PulsarConfig, client pulsar.Client, ) (*PulsarMockProducers, error)
NewMockPulsarProducer creates a pulsar producer
func (*PulsarMockProducers) Flush ¶
func (p *PulsarMockProducers) Flush(ctx context.Context) error
Flush waits for all the messages in the async producer to be sent to Pulsar. Notice: this method is not thread-safe. Do not try to call AsyncSendMessage and Flush functions in different threads, otherwise Flush will not work as expected. It may never finish or flush the wrong message. Because inflight will be modified by mistake.
func (*PulsarMockProducers) GetAllEvents ¶
func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage
GetAllEvents returns the events received by the mock producer.
func (*PulsarMockProducers) GetEvents ¶
func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage
GetEvents returns the event filtered by the key.
func (*PulsarMockProducers) GetProducerByTopic ¶
func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error)
GetProducerByTopic returns a producer by topic name
func (*PulsarMockProducers) SyncBroadcastMessage ¶
func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string, totalPartitionsNum int32, message *common.Message, ) error
SyncBroadcastMessage pulsar consume all partitions
func (*PulsarMockProducers) SyncSendMessage ¶
func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string, partitionNum int32, message *common.Message, ) error
SyncSendMessage sends a message partitionNum is not used,pulsar consume all partitions