Documentation
¶
Index ¶
- type Carrier
- func (c *Carrier) AcknowledgeMessage(m *flow.MessageContext, mErr error) error
- func (c *Carrier) Close() error
- func (c *Carrier) InternalError(m *flow.MessageContext, mErr error) error
- func (c *Carrier) SetupInput(inChan chan *flow.MessageContext) error
- func (c *Carrier) ValidErrorCarrier() error
- func (c *Carrier) WriteErr(m *flow.MessageContext) error
- func (c *Carrier) WriteFail(m *flow.MessageContext) error
- func (c *Carrier) WriteOutput(m *flow.MessageContext) error
- type Config
- type ErrorStrategy
- type ErrorTopicProcessing
- type FailStrategy
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Carrier ¶
type Carrier struct { Config *Config Reader *kafka.Reader ErrReader *kafka.Reader Writer *kafka.Writer ErrWriter *kafka.Writer FailWriter *kafka.Writer Logger journal.Logger Context context.Context // contains filtered or unexported fields }
Carrier signifies a Kafka carrier for the meander flow framework.
func (*Carrier) AcknowledgeMessage ¶
func (c *Carrier) AcknowledgeMessage(m *flow.MessageContext, mErr error) error
AcknowledgeMessage commits the message, signaling that we processed it.
func (*Carrier) InternalError ¶
func (c *Carrier) InternalError(m *flow.MessageContext, mErr error) error
InternalError is a noop for this carrier, as we just want to keep these in a queue.
func (*Carrier) SetupInput ¶
func (c *Carrier) SetupInput(inChan chan *flow.MessageContext) error
SetupInput sets up go routines that write messages to inChan.
func (*Carrier) ValidErrorCarrier ¶ added in v0.0.10
ValidErrorCarrier checks if we have an ErrWriter, and returns an err if not.
func (*Carrier) WriteErr ¶
func (c *Carrier) WriteErr(m *flow.MessageContext) error
WriteErr writes a message to the main error writer.
func (*Carrier) WriteFail ¶
func (c *Carrier) WriteFail(m *flow.MessageContext) error
WriteFail writes a message to the main failure writer.
func (*Carrier) WriteOutput ¶
func (c *Carrier) WriteOutput(m *flow.MessageContext) error
WriteOutput writes a message to the main writer.
type Config ¶
type Config struct { // Brokers -- a list of the Kafka brokers to connect to, not optional. // Required Brokers []string // InputTopic -- the topic to get input from. If empty, will not receive input, and be called in a loop. // Default: empty InputTopic string // OutputTopic -- the topic to output to. If empty, will not send output. // Default: empty OutputTopic string // ErrorTopic -- the topic to write errors to when "Separate" strategy in use, defaults // to $input-topic-error if not set ErrorTopic string // ErrorStrategy -- the error handling strategy to use. // Default: Requeue ErrorStrategy ErrorStrategy // ErrorTopicProcessing -- does this plugin process the error topic. Only relevant for the "Separate" // ErrorStrategy. // Default: "Off" ErrorTopicProcessing // FailStrategy -- The strategy to use once the RetryLimit has been reached. // Default: Move FailStrategy FailStrategy // FailTopic is the topic that messages that have been branded as a hard fault will be moved to. // Default: "failures" FailTopic string // ConsumerGroup -- The consumer group to use // Required if InputTopic set ConsumerGroup string // CACertificateFilename is the file containing the CA certificate in PEM format. CACertificateFilename string // ServiceKeyFilename is the file containing the client's service key in PEM format. ServiceKeyFilename string // ServiceCertificateFilename is the file containing the client's service key in PEM format. ServiceCertificateFilename string // InsecureSkipVerify controls whether a client verifies the server's certificate chain and host name. InsecureSkipVerify bool }
Config is the kafka configuration object.
type ErrorStrategy ¶
type ErrorStrategy uint8
ErrorStrategy signifies how we handle errors.
const ( // Requeue -- requeue messages in the configured InputTopic. ErrorStrategyRequeue ErrorStrategy = iota // Separate -- requeue messages in a separate topic. (Named $InputTopic-error). ErrorStrategySeparate // Disable -- disable error writer, will exit with an error if used as such. ErrorStrategyDisable )
type ErrorTopicProcessing ¶
type ErrorTopicProcessing uint8
ErrorTopicProcessing defines the error topic processing options.
const ( // Off -- Not processing the error topic. ErrorTopicOff ErrorTopicProcessing = iota // Only -- Only processing the error topic. ErrorTopicOnly // Both -- processing both the input topic and the error topic. ErrorTopicBoth )
type FailStrategy ¶
type FailStrategy uint8
const ( // Move -- Moves the message to FailTopic. FailMove FailStrategy = iota // Fatal -- Exit code without committing the message. FailFatal // Drop -- Silently drop it. FailDrop )