Documentation ¶
Overview ¶
Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.
Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.
Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.
Index ¶
Constants ¶
const ( // StatusReceived is reported when a new payload is received. StatusReceived = "received" // StatusMessageProcessed is reported when the message of a payload has been processed. StatusMessageProcessed = "processed" // StatusSuccess is reported upon a successful handling of a payload. StatusSuccess = "success" // StatusError is reported when the handling of a payload fails for any reason. StatusError = "error" )
Variables ¶
var NewDeadLetterProducer = func(brokerCfg broker.Configuration) (*DeadLetterProducer, error) { if brokerCfg.DeadLetterQueueTopic == "" { return nil, nil } p, err := New(brokerCfg) if err != nil { log.Error().Err(err).Msg("unable to create a new dead letter producer") return nil, err } return &DeadLetterProducer{ KafkaProducer: *p, Configuration: brokerCfg, }, nil }
NewDeadLetterProducer constructs producer for payload tracker topic. It is implemented as variable in order to allow monkey patching in unit tests.
var NewPayloadTrackerProducer = func(brokerCfg broker.Configuration) (*PayloadTrackerProducer, error) { if brokerCfg.PayloadTrackerTopic == "" { return nil, nil } p, err := New(brokerCfg) if err != nil { log.Error().Err(err).Msg("unable to create a new payload tracker producer") return nil, err } return &PayloadTrackerProducer{ KafkaProducer: *p, Configuration: brokerCfg, }, nil }
NewPayloadTrackerProducer constructs producer for payload tracker topic. It is implemented as variable in order to allow monkey patching in unit tests.
Functions ¶
This section is empty.
Types ¶
type DeadLetterProducer ¶ added in v1.2.4
type DeadLetterProducer struct { KafkaProducer KafkaProducer Configuration broker.Configuration }
DeadLetterProducer is a producer for dead letter queue
func (*DeadLetterProducer) Close ¶ added in v1.2.4
func (producer *DeadLetterProducer) Close() error
Close allow the Sarama producer to be gracefully closed
func (*DeadLetterProducer) SendDeadLetter ¶ added in v1.2.4
func (producer *DeadLetterProducer) SendDeadLetter(msg *sarama.ConsumerMessage) error
SendDeadLetter loads the unprocessed message to the dedicated Kafka topic for further analysis
type KafkaProducer ¶
type KafkaProducer struct {
Producer sarama.SyncProducer
}
KafkaProducer is an implementation of Producer interface
func New ¶
func New(brokerCfg broker.Configuration) (*KafkaProducer, error)
New constructs new implementation of Producer interface
func (*KafkaProducer) Close ¶
func (producer *KafkaProducer) Close() error
Close allow the Sarama producer to be gracefully closed
type PayloadTrackerMessage ¶
type PayloadTrackerMessage struct { Service string `json:"service"` RequestID string `json:"request_id"` Status string `json:"status"` Date string `json:"date"` OrgID string `json:"org_id,omitempty"` Account string `json:"account,omitempty"` }
PayloadTrackerMessage represents content of messages sent to the Payload Tracker topic in Kafka.
type PayloadTrackerProducer ¶ added in v1.2.4
type PayloadTrackerProducer struct { KafkaProducer KafkaProducer Configuration broker.Configuration }
PayloadTrackerProducer is a producer for payload tracker topic
func (*PayloadTrackerProducer) Close ¶ added in v1.2.4
func (producer *PayloadTrackerProducer) Close() error
Close allow the Sarama producer to be gracefully closed
func (*PayloadTrackerProducer) TrackPayload ¶ added in v1.2.4
func (producer *PayloadTrackerProducer) TrackPayload( reqID types.RequestID, timestamp time.Time, orgID *types.OrgID, account *types.Account, status string, ) error
TrackPayload publishes the status of a payload with the given request ID to the payload tracker Kafka topic. Please keep in mind that if the request ID is empty, the payload will not be tracked and no error will be raised because this can happen in some scenarios and it is not considered an error. Instead, only a warning is logged and no error is returned.