Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CacheState ¶
type CacheState int
type Client ¶
type Client struct { State State Exchanger *Exchanger Consumer *kafka.Consumer ConsumerTopic string Producer *kafka.Producer ProducerTopic string }
Client is the interface to interact with a kafka cluster
func NewClient ¶
func NewClient(config ClientConfig, requestTopic, responseTopic string, opts ...ClientOption) (*Client, error)
NewClient creates a new client object with the specified configuration and options
func (*Client) ExchangeLoop ¶
ExchangeLoop starts a loop to indefinitely wait and handle messages on the Kafka cluster
func (*Client) ExchangeMessage ¶
ExchangeMessage waits for a message and sends a message to the kafka cluster with the parameterized Exchanger
func (*Client) ReadMessage ¶
ReadMessage waits for a message from the Kafka cluster to be received on the current request topic
func (*Client) SendMessage ¶
SendMessage sends a message from the Kafka cluster to be received on the current request topic
type ClientConfig ¶
type ClientConfig struct { BootstrapServers string SecurityProtocol string SaslMechanisms string SaslUsername string SaslPassword string }
ClientConfig defines the parameters to be used by the client
type ClientHandler ¶
ClientHandler is a function that takes a specified object and creates a response
type ClientOption ¶
type ClientOption func(client *Client)
ClientOption defines possible options that can be used to customize the client
func WithExchanger ¶
func WithExchanger(e *Exchanger) ClientOption
WithExchanger customizes the client to use a specified Exchanger
func WithNewExchanger ¶
func WithNewExchanger(h ClientHandler, reqSchema, respSchema interface{}) ClientOption
WithNewExchanger customizes the client to use a newly specified Exchanger
type Error ¶
type Error struct {
Message error `json:"message"`
}
Error is the interface for sending errors through the Kafka client
type Exchanger ¶
type Exchanger struct { Client *Client Handler ClientHandler RequestSchema interface{} ResponseSchema interface{} }
Exchanger represents a request-response exchanger, handling requests and responses with the specified schemas
func NewExchanger ¶
func NewExchanger(c *Client, h ClientHandler, reqSchema, respSchema interface{}) *Exchanger
NewExchanger creates a new message exchanger for a client with a custom handler for messages
type KafkakeConfig ¶
type MessageState ¶
type MessageState struct {
// contains filtered or unexported fields
}
type Processor ¶
type Processor struct { ProcessorConfig State State RequestTopic string ResponseTopic string Consumer *kafka.Consumer Producers map[int32]*kafka.Producer Handler Handler MessageStates map[string]*MessageState }
func NewProcessor ¶
func NewProcessor(config ProcessorConfig, requestTopic, responseTopic string, handler Handler) (*Processor, error)