Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitKafkaSender ¶
func InitKafkaSender(config config.InternalDataConfig)
InitKafkaSender constructs the singleton instance of a messagesender using the kafka transport
func ShutdownKafkaSender ¶
func ShutdownKafkaSender()
ShutdownKafkaSender stops the kafka producer and unsets the singleton
Types ¶
type KafkaMessageReceiver ¶
type KafkaMessageReceiver struct {
// contains filtered or unexported fields
}
KafkaMessageReceiver consumes messages from the configured 'messagetopic'.
func (*KafkaMessageReceiver) Initialized ¶
func (r *KafkaMessageReceiver) Initialized() bool
Initialized indicates whether the receiver has completed initial read of all pending (non-acknowledged) messages from the transport.
func (*KafkaMessageReceiver) SetNotificationFunc ¶
func (r *KafkaMessageReceiver) SetNotificationFunc(notifier NotificationFunc)
SetNotificationFunc assigns a function to be called for each message that is delivered on the topic.
func (*KafkaMessageReceiver) Shutdown ¶
func (r *KafkaMessageReceiver) Shutdown()
Shutdown stops the message consumer
func (*KafkaMessageReceiver) Start ¶
func (r *KafkaMessageReceiver) Start()
Start instructs the messagereceiver to begin accepting messages
type KafkaMessageSender ¶
type KafkaMessageSender struct {
// contains filtered or unexported fields
}
KafkaMessageSender is a message Sender for Kafka.
func (KafkaMessageSender) Ack ¶
func (s KafkaMessageSender) Ack(msg Message) error
Ack acknowledges that the message has been received and completed; it will not be delivered again. Because of the inherent raciness of kafka message delivery this cannot be guaranteed, and all message processing within a node must be idempotent.
func (KafkaMessageSender) Send ¶
func (s KafkaMessageSender) Send(msg Message) error
Send sends the message on the configured kafka topic
func (KafkaMessageSender) Shutdown ¶
func (s KafkaMessageSender) Shutdown()
Shutdown stops the underlying kafka producer
type Message ¶
type Message struct { MessageType string `json:"messagetype"` // e.g. RequestConsumerRecovery or UpdateRateData or CancelConsumerRecovery or RestartSource (for ES recovery?) Key string `json:"key"` // key for the Kafka message carrying this message; this can be either be a unique business key or synthetic unique key Payload []byte `json:"payload"` // the payload (optional) delivered to the target along with the message itself }
Message is a single message to be delivered to a source or node
type NotificationFunc ¶
NotificationFunc is the method used to send a new message to all sources / nodes that accept it
type Receiver ¶
type Receiver interface { Start() Initialized() bool SetNotificationFunc(notifier NotificationFunc) Shutdown() }
Receiver receives messages using the configured transport
func NewKafkaReceiver ¶
func NewKafkaReceiver(config *config.InternalDataConfig) (Receiver, error)
NewKafkaReceiver creates a new instance of the messagereceiver
type Sender ¶
Sender sends messages using the configured transport
func NewKafkaMessageSender ¶
func NewKafkaMessageSender(config *config.InternalDataConfig) Sender
NewKafkaMessageSender creates and configures a Sender that uses a compact kafka topic as its transport.