producer

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2024 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.

Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.

Package producer contains functions that can be used to produce (that is send) messages to properly configured Kafka broker.

Index

Constants

View Source
const (
	// StatusReceived is reported when a new payload is received.
	StatusReceived = "received"
	// StatusMessageProcessed is reported when the message of a payload has been processed.
	StatusMessageProcessed = "processed"
	// StatusSuccess is reported upon a successful handling of a payload.
	StatusSuccess = "success"
	// StatusError is reported when the handling of a payload fails for any reason.
	StatusError = "error"
)

Variables

View Source
var NewDeadLetterProducer = func(brokerCfg broker.Configuration) (*DeadLetterProducer, error) {
	if brokerCfg.DeadLetterQueueTopic == "" {
		return nil, nil
	}
	p, err := New(brokerCfg)
	if err != nil {
		log.Error().Err(err).Msg("unable to create a new dead letter producer")
		return nil, err
	}
	return &DeadLetterProducer{
		KafkaProducer: *p,
		Configuration: brokerCfg,
	}, nil
}

NewDeadLetterProducer constructs producer for payload tracker topic. It is implemented as variable in order to allow monkey patching in unit tests.

View Source
var NewPayloadTrackerProducer = func(brokerCfg broker.Configuration) (*PayloadTrackerProducer, error) {
	if brokerCfg.PayloadTrackerTopic == "" {
		return nil, nil
	}

	p, err := New(brokerCfg)
	if err != nil {
		log.Error().Err(err).Msg("unable to create a new payload tracker producer")
		return nil, err
	}
	return &PayloadTrackerProducer{
		KafkaProducer: *p,
		Configuration: brokerCfg,
	}, nil
}

NewPayloadTrackerProducer constructs producer for payload tracker topic. It is implemented as variable in order to allow monkey patching in unit tests.

Functions

This section is empty.

Types

type DeadLetterProducer added in v1.2.4

type DeadLetterProducer struct {
	KafkaProducer KafkaProducer
	Configuration broker.Configuration
}

DeadLetterProducer is a producer for dead letter queue

func (*DeadLetterProducer) Close added in v1.2.4

func (producer *DeadLetterProducer) Close() error

Close allow the Sarama producer to be gracefully closed

func (*DeadLetterProducer) SendDeadLetter added in v1.2.4

func (producer *DeadLetterProducer) SendDeadLetter(msg *sarama.ConsumerMessage) error

SendDeadLetter loads the unprocessed message to the dedicated Kafka topic for further analysis

type KafkaProducer

type KafkaProducer struct {
	Producer sarama.SyncProducer
}

KafkaProducer is an implementation of Producer interface

func New

func New(brokerCfg broker.Configuration) (*KafkaProducer, error)

New constructs new implementation of Producer interface

func (*KafkaProducer) Close

func (producer *KafkaProducer) Close() error

Close allow the Sarama producer to be gracefully closed

type PayloadTrackerMessage

type PayloadTrackerMessage struct {
	Service   string `json:"service"`
	RequestID string `json:"request_id"`
	Status    string `json:"status"`
	Date      string `json:"date"`
	OrgID     string `json:"org_id,omitempty"`
	Account   string `json:"account,omitempty"`
}

PayloadTrackerMessage represents content of messages sent to the Payload Tracker topic in Kafka.

type PayloadTrackerProducer added in v1.2.4

type PayloadTrackerProducer struct {
	KafkaProducer KafkaProducer
	Configuration broker.Configuration
}

PayloadTrackerProducer is a producer for payload tracker topic

func (*PayloadTrackerProducer) Close added in v1.2.4

func (producer *PayloadTrackerProducer) Close() error

Close allow the Sarama producer to be gracefully closed

func (*PayloadTrackerProducer) TrackPayload added in v1.2.4

func (producer *PayloadTrackerProducer) TrackPayload(
	reqID types.RequestID,
	timestamp time.Time,
	orgID *types.OrgID,
	account *types.Account,
	status string,
) error

TrackPayload publishes the status of a payload with the given request ID to the payload tracker Kafka topic. Please keep in mind that if the request ID is empty, the payload will not be tracked and no error will be raised because this can happen in some scenarios and it is not considered an error. Instead, only a warning is logged and no error is returned.

type Producer

type Producer interface {
	Close() error
}

Producer represents any producer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL