Documentation ¶
Index ¶
- Constants
- type Config
- type ISession
- type Receiver
- type RelayConfig
- type Sender
- type Session
- func (sess *Session) Close()
- func (sess *Session) Connect(config *RelayConfig)
- func (sess *Session) Handle(data []byte, size uint32, config *RelayConfig) (bool, []byte, error)
- func (sess *Session) MakeAckMessage(id []byte) []byte
- func (sess *Session) MakeAuthMessage(login bool, config *RelayConfig) []byte
- func (sess *Session) MakeMessage(data []byte) []byte
- func (sess *Session) Send(data []byte) (int, error)
- func (sess *Session) SendMessage(data []byte) (int, error)
- type TopicConsumer
- type TopicProducer
Constants ¶
const ( PASSWORD = "NET-KAFKA-REPEATER" AUTH = 0xef01 DATA = 0xfe01 MAX_BUF_SIZE = 0xffffffff MAX_MSG_SIZE = 100000 )
Message struct FLAG LEN ID DATA
2 4 4
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct { Exe string PidFile string LogFile string WorkDir string Mode string `sender or receiver` // Relay Relay map[string]interface{} // Kafka Topic Topic map[string]interface{} }
Configuration for the repeater
'Exe' is the 'net-kafka-repeater' executable path
Default 'WorkDir' (working directory) is current path
'Mode' specifies a 'sender' or a 'receiver'
'Relay' contains configurations for the netwokring
'Topic' containers kafka configurations
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver containers a kafka topic producer and a pointer to relayer configurations
func NewReceiver ¶
Create a receiver and parse the networking configurations
type RelayConfig ¶
RelayConfig is the configuration of the networking
'Mode' specifies using 'tcp' or 'kcp'(UDP based)
'Password' specifies the authentication credential for new connection
'Ack' specifies if need to confirm each message's delivery from receiver before next message goes out.(Not implemented yet)
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender containers a kafka topic consumer and a pointer to relay configuraitons
'cs' is sink handler to send the data out
type Session ¶
Session is designed to maintain a reliable networking connection with is a tcp connectin or kcp simulated connection
'sigs' is signal chan to indicate the status change of the connection for reconnecting
func (*Session) Connect ¶
func (sess *Session) Connect(config *RelayConfig)
Setup the session network connection
func (*Session) Handle ¶
Handle a message received from a sender
If the message flag(the first two bytes) indicates it's authentication, then process login.
If it's kafka message and deliver it to kafka producer. Login status will be checked before the delivery.
Anything else, drop the connection
func (*Session) MakeAckMessage ¶
Compose the Acknowlegde message for a kafka message with a id (random uint32) of the message
func (*Session) MakeAuthMessage ¶
func (sess *Session) MakeAuthMessage(login bool, config *RelayConfig) []byte
Compose the Authentation message
func (*Session) MakeMessage ¶
Compose the kafka message to send out
type TopicConsumer ¶
type TopicConsumer struct { Topic string // contains filtered or unexported fields }
TopicConsumer is wrapper of librdkafka consumer, it's holding a kafka consumer and a kafka ConfigMap
func NewTopicConsumer ¶
func NewTopicConsumer(config *Config) *TopicConsumer
Create a TopicConsumer instance and set it up
func (*TopicConsumer) Poll ¶
func (c *TopicConsumer) Poll(n int) *kafka.Message
Poll would call the kafka consumer's Poll to retrieve a kafka message
func (*TopicConsumer) Setup ¶
func (c *TopicConsumer) Setup(config *Config)
Setup a TopicConsunmer with kafka configurations, and subscribe to topics
type TopicProducer ¶
TopicProducer is a wrapper of the librdkafka producer.
It's holding a kafka ConfigMap annd a kafka producer ¶
Order is to indicates the message should be delivered successfully to kafka before the next one ¶
'toChannel' specified if to use producer Events to receive kafka delivery events
'delivery' is the golang channel to receive the kafka delivery event
func NewTopicProducer ¶
func NewTopicProducer(config *Config) *TopicProducer
Create a TopicProducer instance and set it up.
func (*TopicProducer) Process ¶
func (p *TopicProducer) Process(data []byte)
Process is the entry function to handle the task of delivery a message to kakfa
When 'order' is true, it will block until it received the delivery success event
func (*TopicProducer) Setup ¶
func (p *TopicProducer) Setup(config *Config)
Setup the producer with configurations