output

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 21, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func IterateBatchedSend

func IterateBatchedSend(msg message.Batch, fn func(int, *message.Part) error) error

IterateBatchedSend executes a closure fn on each message of a batch, where the closure is expected to attempt a send and return an error. If an error is returned then it is added to a batch error in order to support index specific error handling.

However, if a fatal error is returned such as a connection loss or shut down then it is returned immediately.

Types

type AsyncSink

type AsyncSink interface {
	// Connect attempts to establish a connection to the sink, if
	// unsuccessful returns an error. If the attempt is successful (or not
	// necessary) returns nil.
	Connect(ctx context.Context) error

	// WriteBatch should block until either the message is sent (and
	// acknowledged) to a sink, or a transport specific error has occurred, or
	// the Type is closed.
	WriteBatch(ctx context.Context, msg message.Batch) error

	// Close is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	Close(ctx context.Context) error
}

AsyncSink is a type that writes Bento messages to a third party sink. If the protocol supports a form of acknowledgement then it will be returned by the call to Write.

type AsyncWriter

type AsyncWriter struct {
	// contains filtered or unexported fields
}

AsyncWriter is an output type that writes messages to a writer.Type.

func (*AsyncWriter) ConnectionStatus added in v1.2.0

func (w *AsyncWriter) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the status of the given output connection.

func (*AsyncWriter) Consume

func (w *AsyncWriter) Consume(ts <-chan message.Transaction) error

Consume assigns a messages channel for the output to read.

func (*AsyncWriter) TriggerCloseNow

func (w *AsyncWriter) TriggerCloseNow()

TriggerCloseNow shuts down the output and stops processing messages.

func (*AsyncWriter) WaitForClose

func (w *AsyncWriter) WaitForClose(ctx context.Context) error

WaitForClose blocks until the File output has closed down.

type Config

type Config struct {
	Label      string             `json:"label" yaml:"label"`
	Type       string             `json:"type" yaml:"type"`
	Plugin     any                `json:"plugin,omitempty" yaml:"plugin,omitempty"`
	Processors []processor.Config `json:"processors" yaml:"processors"`
}

Config is the all encompassing configuration struct for all output types. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

func FromAny

func FromAny(prov docs.Provider, value any) (conf Config, err error)

func NewConfig

func NewConfig() Config

NewConfig returns a configuration struct fully populated with default values. Deprecated: Do not add new components here. Instead, use the public plugin APIs. Examples can be found in: ./internal/impl.

type Streamed

type Streamed interface {
	// Consume starts the type receiving transactions from a Transactor.
	Consume(<-chan message.Transaction) error

	// ConnectionStatus returns the current status of the given component
	// connection. The result is a slice in order to accommodate higher order
	// components that wrap several others.
	ConnectionStatus() component.ConnectionStatuses

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Streamed is a common interface implemented by outputs and provides channel based streaming APIs.

func NewAsyncWriter

func NewAsyncWriter(typeStr string, maxInflight int, w AsyncSink, mgr component.Observability) (Streamed, error)

NewAsyncWriter creates a Streamed implementation around an AsyncSink.

func OnlySinglePayloads

func OnlySinglePayloads(out Streamed) Streamed

OnlySinglePayloads expands message batches into individual payloads, respecting the max in flight of the wrapped output. This is a more efficient way of feeding messages into an output that handles its own batching mechanism internally, or does not support batching at all.

func WrapWithPipelines

func WrapWithPipelines(out Streamed, pipeConstructors ...iprocessor.PipelineConstructorFunc) (Streamed, error)

WrapWithPipelines wraps an output with a variadic number of pipelines.

type Sync

type Sync interface {
	// WriteTransaction attempts to write a transaction to an output.
	WriteTransaction(context.Context, message.Transaction) error

	// ConnectionStatus returns the current status of the given component
	// connection. The result is a slice in order to accommodate higher order
	// components that wrap several others.
	ConnectionStatus() component.ConnectionStatuses

	// TriggerStopConsuming instructs the output to start shutting down
	// resources once all pending messages are delivered and acknowledged.
	TriggerStopConsuming()

	// TriggerCloseNow triggers the shut down of this component but should not
	// block the calling goroutine.
	TriggerCloseNow()

	// WaitForClose is a blocking call to wait until the component has finished
	// shutting down and cleaning up resources.
	WaitForClose(ctx context.Context) error
}

Sync is a common interface implemented by outputs and provides synchronous based writing APIs.

type WithPipeline

type WithPipeline struct {
	// contains filtered or unexported fields
}

WithPipeline is a type that wraps both an output type and a pipeline type by routing the pipeline through the output, and implements the output.Type interface in order to act like an ordinary output.

func WrapWithPipeline

func WrapWithPipeline(out Streamed, pipeConstructor iprocessor.PipelineConstructorFunc) (*WithPipeline, error)

WrapWithPipeline routes a processing pipeline directly into an output and returns a type that manages both and acts like an ordinary output.

func (*WithPipeline) ConnectionStatus added in v1.2.0

func (i *WithPipeline) ConnectionStatus() component.ConnectionStatuses

ConnectionStatus returns the current status of the given component connection. The result is a slice in order to accommodate higher order components that wrap several others.

func (*WithPipeline) Consume

func (i *WithPipeline) Consume(tsChan <-chan message.Transaction) error

Consume starts the type listening to a message channel from a producer.

func (*WithPipeline) TriggerCloseNow

func (i *WithPipeline) TriggerCloseNow()

TriggerCloseNow triggers a closure of this object but does not block.

func (*WithPipeline) WaitForClose

func (i *WithPipeline) WaitForClose(ctx context.Context) error

WaitForClose is a blocking call to wait until the object has finished closing down and cleaning up resources.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL