Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AsyncKafkaWriter ¶ added in v0.3.0
type AsyncKafkaWriter struct {
// contains filtered or unexported fields
}
AsyncKafkaWriter represents a writer that sends messages to Kafka using an async producer
Fields: - produce: a channel used to send messages to the Kafka producer - conf: the configuration for the Kafka writer - debug: a flag indicating whether debug mode is enabled - wg: a WaitGroup used for synchronization - once: a sync.Once used for one-time initialization
func NewAsyncKafkaWriter ¶ added in v0.3.0
func NewAsyncKafkaWriter(conf Config) (*AsyncKafkaWriter, error)
NewAsyncKafkaWriter creates a new AsyncKafkaWriter with the provided configuration. It initializes a new Kafka configuration and sets the producer properties. If no producer is defined in the configuration, it creates a new asynchronous producer using the given endpoints and Kafka configuration. If an error occurs during producer creation, it returns an error. The AsyncKafkaWriter struct is then initialized with the configuration and a buffered channel for producing messages. It launches a goroutine that listens for producer errors and incoming messages to be produced. If an error occurs during message production, it prints the error message and the failed message to stderr. If the channel for producing messages is closed, the goroutine will close the producer and finish execution. Finally, it returns a pointer to the AsyncKafkaWriter and a nil error.
func NewAsyncKafkaWriterWithContext ¶ added in v0.3.0
func NewAsyncKafkaWriterWithContext(ctx context.Context, conf Config) (writer *AsyncKafkaWriter, err error)
NewAsyncKafkaWriterWithContext creates a new NewAsyncKafkaWriterWithContext with a context and configuration. It launches a goroutine to asynchronously create an AsyncKafkaWriter using the provided configuration. The function waits for the creation to complete or for the context to be cancelled. If the context is cancelled before the creation is completed, it returns an error indicating a connect timeout. If the creation is completed successfully, it returns the created AsyncKafkaWriter.
func (*AsyncKafkaWriter) Close ¶ added in v0.3.0
func (h *AsyncKafkaWriter) Close() error
Close closes the AsyncKafkaWriter's producer by calling the Close method on the underlying producer. It returns an error if there was an error closing the producer.
func (*AsyncKafkaWriter) Write ¶ added in v0.3.0
func (h *AsyncKafkaWriter) Write(buf []byte) (n int, err error)
Write writes a byte buffer to the AsyncKafkaWriter's producer input channel. It appends a copy of the buffer to the producer input channel. If the input channel buffer is full, it returns an error indicating a buffer overflow. The error message will include the contents of the buffer that was dropped. This function returns the number of bytes written and a nil error on success.
func (*AsyncKafkaWriter) WriteWithContext ¶ added in v0.3.0
WriteWithContext writes a byte buffer to the AsyncKafkaWriter's producer input channel, but unlike the Write method it respects the context timeout It appends a copy of the buffer to the producer input channel. If the context expires before the method can write to the channel, it returns an error indicating a deadline exceeded. If the input channel buffer is full, it returns an error indicating a buffer overflow. The error message will include the contents of the buffer that was dropped. This function returns the number of bytes written and a nil error on success.
type Config ¶
type Config struct { BufferSize int Endpoints []string Topic string AsyncProducer sarama.AsyncProducer SyncProducer sarama.SyncProducer }
Config represents a configuration for an AsyncKafkaWriter
type KafkaWriter ¶
type KafkaWriter interface { Write(buf []byte) (n int, err error) WriteWithContext(ctx context.Context, buf []byte) (n int, err error) Close() error }
KafkaWriter represents a type that provides functionality to write data to a Kafka topic.
The Write method is used to write the provided buffer of bytes to the Kafka topic. It returns the number of bytes written and any error that occurred during the write operation.
The WriteWithContext method is similar to Write, but it also accepts a context.Context as the first argument. The context can be used to control the write operation, such as setting a timeout or cancellation.
The Close method is used to close the KafkaWriter and release any resources associated with it. It returns an error if there was an issue closing the KafkaWriter.
func NewSyncKafkaWriter ¶ added in v0.3.0
func NewSyncKafkaWriter(conf Config) (KafkaWriter, error)
NewSyncKafkaWriter creates a new SyncKafkaWriter with the provided configuration. It initializes a new Kafka configuration and sets the producer properties. If no producer is defined in the configuration, it creates a new synchronous producer using the given endpoints and Kafka configuration. If an error occurs during producer creation, it returns an error. The SyncKafkaWriter struct is then initialized with the configuration and a buffered channel for producing messages. It launches a goroutine that listens for producer errors and incoming messages to be produced. If an error occurs during message production, it prints the error message and the failed message to stderr. If the channel for producing messages is closed, the goroutine will close the producer and finish execution. Finally, it returns a pointer to the SyncKafkaWriter and a nil error.
func NewSyncKafkaWriterWithContext ¶ added in v0.3.0
func NewSyncKafkaWriterWithContext(ctx context.Context, conf Config) (writer KafkaWriter, err error)
NewSyncKafkaWriterWithContext creates a new NewSyncKafkaWriterWithContext with a context and configuration. It launches a goroutine to synchronously create a SyncKafkaWriter using the provided configuration. The function waits for the creation to complete or for the context to be cancelled. If the context is cancelled before the creation is completed, it returns an error indicating a connect timeout. If the creation is completed successfully, it returns the created SyncKafkaWriter.
type SyncKafkaWriter ¶ added in v0.3.0
type SyncKafkaWriter struct {
// contains filtered or unexported fields
}
SyncKafkaWriter represents a writer that sends messages to Kafka using an sync producer
Fields: - produce: a channel used to send messages to the Kafka producer - conf: the configuration for the Kafka writer - debug: a flag indicating whether debug mode is enabled - wg: a WaitGroup used for synchronization - once: a sync.Once used for one-time initialization
func (*SyncKafkaWriter) Close ¶ added in v0.3.0
func (h *SyncKafkaWriter) Close() error
Close closes the SyncKafkaWriter's producer connection to the Kafka cluster. It returns an error if the producer is unable to successfully close the connection. Once closed, the SyncKafkaWriter should not be used for further writing operations.
func (*SyncKafkaWriter) Write ¶ added in v0.3.0
func (h *SyncKafkaWriter) Write(buf []byte) (n int, err error)
Write writes a byte buffer to the SyncKafkaWriter's producer input channel. It appends a copy of the buffer to the producer input channel. If the input channel buffer is full, it returns an error indicating a buffer overflow. The error message will include the contents of the buffer that was dropped. This function returns the number of bytes written and a nil error on success.
func (*SyncKafkaWriter) WriteWithContext ¶ added in v0.3.0
WriteWithContext writes a byte buffer to the SyncKafkaWriter's producer input channel, but unlike the Write method it respects the context timeout It appends a copy of the buffer to the producer input channel. If the context expires before the method can write to the channel, it returns an error indicating a deadline exceeded. If the input channel buffer is full, it returns an error indicating a buffer overflow. The error message will include the contents of the buffer that was dropped. This function returns the number of bytes written and a nil error on success.