kafka

package
v0.0.27 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2022 License: MIT Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Carrier

type Carrier struct {
	Config     *Config
	Reader     *kafka.Reader
	ErrReader  *kafka.Reader
	Writer     *kafka.Writer
	ErrWriter  *kafka.Writer
	FailWriter *kafka.Writer
	Logger     journal.Logger
	Context    context.Context
	// contains filtered or unexported fields
}

Carrier signifies a Kafka carrier for the meander flow framework.

func New

func New(ctx context.Context, logger journal.Logger, config *Config) (*Carrier, error)

New constructs a new Kafka carrier instance based on the config.

func (*Carrier) AcknowledgeMessage

func (c *Carrier) AcknowledgeMessage(m *flow.MessageContext, mErr error) error

AcknowledgeMessage commits the message, signaling that we processed it.

func (*Carrier) Close

func (c *Carrier) Close() error

Close closes all kafka handles.

func (*Carrier) InternalError

func (c *Carrier) InternalError(m *flow.MessageContext, mErr error) error

InternalError is a noop for this carrier, as we just want to keep these in a queue.

func (*Carrier) SetupInput

func (c *Carrier) SetupInput(inChan chan *flow.MessageContext) error

SetupInput sets up go routines that write messages to inChan.

func (*Carrier) ValidErrorCarrier added in v0.0.10

func (c *Carrier) ValidErrorCarrier() error

ValidErrorCarrier checks if we have an ErrWriter, and returns an err if not.

func (*Carrier) WriteErr

func (c *Carrier) WriteErr(m *flow.MessageContext) error

WriteErr writes a message to the main error writer.

func (*Carrier) WriteFail

func (c *Carrier) WriteFail(m *flow.MessageContext) error

WriteFail writes a message to the main failure writer.

func (*Carrier) WriteOutput

func (c *Carrier) WriteOutput(m *flow.MessageContext) error

WriteOutput writes a message to the main writer.

type Config

type Config struct {
	// Brokers -- a list of the Kafka brokers to connect to, not optional.
	// Required
	Brokers []string
	// InputTopic -- the topic to get input from. If empty, will not receive input, and be called in a loop.
	// Default: empty
	InputTopic string
	// OutputTopic -- the topic to output to. If empty, will not send output.
	// Default: empty
	OutputTopic string
	// ErrorTopic -- the topic to write errors to when "Separate" strategy in use, defaults
	// to $input-topic-error if not set
	ErrorTopic string
	// ErrorStrategy -- the error handling strategy to use.
	// Default: Requeue
	ErrorStrategy ErrorStrategy
	// ErrorTopicProcessing -- does this plugin process the error topic. Only relevant for the "Separate"
	// ErrorStrategy.
	// Default: "Off"
	ErrorTopicProcessing
	// FailStrategy -- The strategy to use once the RetryLimit has been reached.
	// Default: Move
	FailStrategy FailStrategy
	// FailTopic is the topic that messages that have been branded as a hard fault will be moved to.
	// Default: "failures"
	FailTopic string
	// ConsumerGroup -- The consumer group to use
	// Required if InputTopic set
	ConsumerGroup string
	// CACertificateFilename is the file containing the CA certificate in PEM format.
	CACertificateFilename string
	// ServiceKeyFilename is the file containing the client's service key in PEM format.
	ServiceKeyFilename string
	// ServiceCertificateFilename is the file containing the client's service key in PEM format.
	ServiceCertificateFilename string
	// InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name.
	InsecureSkipVerify bool
}

Config is the kafka configuration object.

type ErrorStrategy

type ErrorStrategy uint8

ErrorStrategy signifies how we handle errors.

const (
	// Requeue -- requeue messages in the configured InputTopic.
	ErrorStrategyRequeue ErrorStrategy = iota
	// Separate -- requeue messages in a separate topic. (Named $InputTopic-error).
	ErrorStrategySeparate
	// Disable -- disable error writer, will exit with an error if used as such.
	ErrorStrategyDisable
)

type ErrorTopicProcessing

type ErrorTopicProcessing uint8

ErrorTopicProcessing defines the error topic processing options.

const (
	// Off -- Not processing the error topic.
	ErrorTopicOff ErrorTopicProcessing = iota
	// Only -- Only processing the error topic.
	ErrorTopicOnly
	// Both -- processing both the input topic and the error topic.
	ErrorTopicBoth
)

type FailStrategy

type FailStrategy uint8
const (
	// Move -- Moves the message to FailTopic.
	FailMove FailStrategy = iota
	// Fatal -- Exit code without committing the message.
	FailFatal
	// Drop -- Silently drop it.
	FailDrop
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL