Documentation ¶
Index ¶
- func Topic(name string, partition int32) common.Topic
- type DirectStreamer
- func (ds *DirectStreamer) Config() (bool, StreamConfig)
- func (ds *DirectStreamer) Consumer() *internal.StreamConsumer
- func (ds *DirectStreamer) GroupId() string
- func (ds *DirectStreamer) Run()
- func (ds *DirectStreamer) SetConfig(config StreamConfig)
- func (ds *DirectStreamer) Stop() error
- func (ds *DirectStreamer) Topic() common.Topic
- type PassThroughSerializer
- type StreamConfig
- type TopicStreamer
- func (ts *TopicStreamer) AddConfig(config StreamConfig)
- func (ts *TopicStreamer) Configs() []StreamConfig
- func (ts *TopicStreamer) Consumer() *internal.StreamConsumer
- func (ts *TopicStreamer) GroupId() string
- func (ts *TopicStreamer) Run()
- func (ts *TopicStreamer) Stop() error
- func (ts *TopicStreamer) Topic() common.Topic
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type DirectStreamer ¶ added in v0.2.0
type DirectStreamer struct {
// contains filtered or unexported fields
}
DirectStreamer is a streamer that streams messages from a topic to a topic.
func NewDirectStreamer ¶ added in v0.2.0
func NewDirectStreamer(brokers []string, src common.Topic, groupId string, args ...interface{}) *DirectStreamer
NewDirectStreamer creates a new topic streamer that streams messages from a topic to a topic. The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.
- ds := NewDirectStreamer(brokers, topic, groupId)
- ds := NewDirectStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
- ds := NewDirectStreamer(brokers, topic, groupId, nil, producerConfig)
func (*DirectStreamer) Config ¶ added in v0.2.0
func (ds *DirectStreamer) Config() (bool, StreamConfig)
func (*DirectStreamer) Consumer ¶ added in v0.2.0
func (ds *DirectStreamer) Consumer() *internal.StreamConsumer
func (*DirectStreamer) GroupId ¶ added in v0.2.0
func (ds *DirectStreamer) GroupId() string
func (*DirectStreamer) Run ¶ added in v0.2.0
func (ds *DirectStreamer) Run()
func (*DirectStreamer) SetConfig ¶ added in v0.2.0
func (ds *DirectStreamer) SetConfig(config StreamConfig)
func (*DirectStreamer) Stop ¶ added in v0.2.0
func (ds *DirectStreamer) Stop() error
func (*DirectStreamer) Topic ¶ added in v0.2.0
func (ds *DirectStreamer) Topic() common.Topic
type PassThroughSerializer ¶
type PassThroughSerializer struct { }
func NewPassThroughSerializer ¶
func NewPassThroughSerializer() *PassThroughSerializer
func (*PassThroughSerializer) MessageToProduceMessage ¶
func (ts *PassThroughSerializer) MessageToProduceMessage(value string) string
type StreamConfig ¶
type StreamConfig struct {
// contains filtered or unexported fields
}
func NewStreamConfig ¶
func NewStreamConfig(ms common.MessageSerializer, topic common.Topic) StreamConfig
func (StreamConfig) MessageSerializer ¶
func (ss StreamConfig) MessageSerializer() common.MessageSerializer
func (StreamConfig) Topic ¶
func (ss StreamConfig) Topic() common.Topic
type TopicStreamer ¶
type TopicStreamer struct {
// contains filtered or unexported fields
}
TopicStreamer is a streamer that streams messages from a topic to other topics.
func NewTopicStreamer ¶
func NewTopicStreamer(brokers []string, topic common.Topic, groupId string, args ...interface{}) *TopicStreamer
NewTopicStreamer creates a new topic streamer that streams messages from a topic to other topics. The streamer is configured with a list of brokers, a topic to stream from and a consumer group id . If you want to override the default configuration of the sarama consumer and producer, you can pass additional arguments.
- ts := NewTopicStreamer(brokers, topic, groupId)
- ts := NewTopicStreamer(brokers, topic, groupId, consumerConfig, producerConfig)
- ts := NewTopicStreamer(brokers, topic, groupId, nil, producerConfig)
func (*TopicStreamer) AddConfig ¶
func (ts *TopicStreamer) AddConfig(config StreamConfig)
func (*TopicStreamer) Configs ¶ added in v0.1.1
func (ts *TopicStreamer) Configs() []StreamConfig
func (*TopicStreamer) Consumer ¶ added in v0.1.1
func (ts *TopicStreamer) Consumer() *internal.StreamConsumer
func (*TopicStreamer) GroupId ¶ added in v0.2.0
func (ts *TopicStreamer) GroupId() string
func (*TopicStreamer) Run ¶
func (ts *TopicStreamer) Run()
func (*TopicStreamer) Stop ¶
func (ts *TopicStreamer) Stop() error
func (*TopicStreamer) Topic ¶ added in v0.1.1
func (ts *TopicStreamer) Topic() common.Topic