transport

package
v0.0.0-...-1348794 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 22, 2024 License: ISC Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateKafkaAsyncProducer

func CreateKafkaAsyncProducer(ac *config.AppConfig) (sarama.AsyncProducer, error)

CreateKafkaAsyncProducer creates a new Sarama AsyncProducer

func CreateKafkaConsumer

func CreateKafkaConsumer(ac *config.AppConfig) (sarama.ConsumerGroup, error)

CreateKafkaConsumer creates a new Sarama ConsumerGroup

func CreateKafkaProducer

func CreateKafkaProducer(ac *config.AppConfig) (sarama.SyncProducer, error)

CreateKafkaProducer creates a new Sarama SyncProducer

Types

type KafkaClient

type KafkaClient struct {
	Producer      sarama.SyncProducer
	AsyncProducer sarama.AsyncProducer
	Consumer      sarama.ConsumerGroup
}

KafkaClient is a wrapper around a Sarama SyncProducer, AsyncProducer and ConsumerGroup

func NewKafkaClient

func NewKafkaClient(ac *config.AppConfig) (*KafkaClient, error)

NewKafkaClient creates a new KafkaClient

func (*KafkaClient) Close

func (kc *KafkaClient) Close()

Close closes the KafkaClient

func (*KafkaClient) ConsumeMessages

func (kc *KafkaClient) ConsumeMessages(ctx context.Context, topics []string, handler *MessageHandler)

ConsumeMessages consumes messages from Kafka

func (*KafkaClient) PostAsyncMessage

func (kc *KafkaClient) PostAsyncMessage(c *gin.Context)

PostAsyncMessage is a handler for POST /async-messages/:topic

func (*KafkaClient) PostMessage

func (kc *KafkaClient) PostMessage(c *gin.Context)

PostMessage is a handler for POST /messages/:topic

func (*KafkaClient) SendAsyncMessage

func (kc *KafkaClient) SendAsyncMessage(topic, key string, message []byte) error

SendAsyncMessage sends a message to Kafka asynchronously

func (*KafkaClient) SendMessage

func (kc *KafkaClient) SendMessage(topic, key string, message []byte) error

SendMessage sends a message to Kafka

type Message

type Message struct {
	Metadata  MessageMetadata `json:"metadata"`
	Value     string          `json:"value"`
	Partition int32           `json:"partition"`
	Offset    int64           `json:"offset"`
}

Message represents a Kafka message

type MessageBuffer

type MessageBuffer struct {
	MaxSize int
	// contains filtered or unexported fields
}

MessageBuffer is a buffer of Kafka messages Concurrent access to the buffer is protected via a RWMutex

func (*MessageBuffer) GetMessages

func (mb *MessageBuffer) GetMessages(c *gin.Context)

GetMessages returns the messages in the buffer returning it in JSON format

func (*MessageBuffer) SaveMessage

func (mb *MessageBuffer) SaveMessage(msg Message)

SaveMessage saves a message to the buffer

type MessageHandler

type MessageHandler struct {
	Ready chan bool
	// contains filtered or unexported fields
}

MessageHandler is a Sarama consumer group handler

func NewMessageHandler

func NewMessageHandler(buffer *MessageBuffer) *MessageHandler

NewMessageHandler creates a new MessageHandler

func (*MessageHandler) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*MessageHandler) ConsumeClaim

func (c *MessageHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*MessageHandler) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

type MessageMetadata

type MessageMetadata struct {
	ReceivedAt time.Time `json:"receivedAt"`
}

MessageMetadata represents metadata about a Kafka message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL