repeater

package
v0.0.0-...-108d9fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 28, 2019 License: MIT Imports: 9 Imported by: 0

Documentation

Index

Constants

View Source
const (
	PASSWORD     = "NET-KAFKA-REPEATER"
	AUTH         = 0xef01
	DATA         = 0xfe01
	MAX_BUF_SIZE = 0xffffffff
	MAX_MSG_SIZE = 100000
)

Message struct FLAG LEN ID DATA

2    4   4

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Exe     string
	PidFile string
	LogFile string
	WorkDir string

	Mode string `sender or receiver`

	// Relay
	Relay map[string]interface{}

	// Kafka Topic
	Topic map[string]interface{}
}

Configuration for the repeater

'Exe' is the 'net-kafka-repeater' executable path

Default 'WorkDir' (working directory) is current path

'Mode' specifies a 'sender' or a 'receiver'

'Relay' contains configurations for the netwokring

'Topic' containers kafka configurations

type ISession

type ISession interface {
	SendMessage([]byte) (int, error)
}

Sesssion interface for sending message to remote receiver

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver containers a kafka topic producer and a pointer to relayer configurations

func NewReceiver

func NewReceiver(config *Config) *Receiver

Create a receiver and parse the networking configurations

func (*Receiver) Start

func (r *Receiver) Start()

Start the receiver, will would create listeners and setup kafka producer

type RelayConfig

type RelayConfig struct {
	Address  string
	Mode     string
	Password string
	Ack      bool
}

RelayConfig is the configuration of the networking

'Mode' specifies using 'tcp' or 'kcp'(UDP based)

'Password' specifies the authentication credential for new connection

'Ack' specifies if need to confirm each message's delivery from receiver before next message goes out.(Not implemented yet)

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender containers a kafka topic consumer and a pointer to relay configuraitons

'cs' is sink handler to send the data out

func NewSender

func NewSender(config *Config) *Sender

Create a sender and parse the relay networking configuration forit

func (*Sender) Process

func (s *Sender) Process(msg *kafka.Message)

Process would send the kafka message out to receiver, it will retry in 1 second if it failed.

func (*Sender) Start

func (s *Sender) Start()

Start the sender. Would setup a outgoing connection to receiver and maintain it.

It will also create a kafka consumer and subscribe to the desire topic.

type Session

type Session struct {
	Ip string

	Login bool
	// contains filtered or unexported fields
}

Session is designed to maintain a reliable networking connection with is a tcp connectin or kcp simulated connection

'sigs' is signal chan to indicate the status change of the connection for reconnecting

func (*Session) Close

func (sess *Session) Close()

Close the networking connection

func (*Session) Connect

func (sess *Session) Connect(config *RelayConfig)

Setup the session network connection

func (*Session) Handle

func (sess *Session) Handle(data []byte, size uint32, config *RelayConfig) (bool, []byte, error)

Handle a message received from a sender

If the message flag(the first two bytes) indicates it's authentication, then process login.

If it's kafka message and deliver it to kafka producer. Login status will be checked before the delivery.

Anything else, drop the connection

func (*Session) MakeAckMessage

func (sess *Session) MakeAckMessage(id []byte) []byte

Compose the Acknowlegde message for a kafka message with a id (random uint32) of the message

func (*Session) MakeAuthMessage

func (sess *Session) MakeAuthMessage(login bool, config *RelayConfig) []byte

Compose the Authentation message

func (*Session) MakeMessage

func (sess *Session) MakeMessage(data []byte) []byte

Compose the kafka message to send out

func (*Session) Send

func (sess *Session) Send(data []byte) (int, error)

Send raw bytes out to receiver

func (*Session) SendMessage

func (sess *Session) SendMessage(data []byte) (int, error)

Send Kafka message out to receiver (Only the message payload without headers)

type TopicConsumer

type TopicConsumer struct {
	Topic string
	// contains filtered or unexported fields
}

TopicConsumer is wrapper of librdkafka consumer, it's holding a kafka consumer and a kafka ConfigMap

func NewTopicConsumer

func NewTopicConsumer(config *Config) *TopicConsumer

Create a TopicConsumer instance and set it up

func (*TopicConsumer) Poll

func (c *TopicConsumer) Poll(n int) *kafka.Message

Poll would call the kafka consumer's Poll to retrieve a kafka message

func (*TopicConsumer) Setup

func (c *TopicConsumer) Setup(config *Config)

Setup a TopicConsunmer with kafka configurations, and subscribe to topics

type TopicProducer

type TopicProducer struct {
	Topic string

	Order bool
	// contains filtered or unexported fields
}

TopicProducer is a wrapper of the librdkafka producer.

It's holding a kafka ConfigMap annd a kafka producer

Order is to indicates the message should be delivered successfully to kafka before the next one

'toChannel' specified if to use producer Events to receive kafka delivery events

'delivery' is the golang channel to receive the kafka delivery event

func NewTopicProducer

func NewTopicProducer(config *Config) *TopicProducer

Create a TopicProducer instance and set it up.

func (*TopicProducer) Process

func (p *TopicProducer) Process(data []byte)

Process is the entry function to handle the task of delivery a message to kakfa

When 'order' is true, it will block until it received the delivery success event

func (*TopicProducer) Setup

func (p *TopicProducer) Setup(config *Config)

Setup the producer with configurations

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL