Documentation
¶
Overview ¶
package Consumerconsumer implements a Consumer and consumer model to handle data generation and processing in goroutines.
package producerconsumer implements a producer and consumer model to handle data generation and processing in goroutines.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { // Buffer is a buffered channel that holds data to be consumed. Buffer chan interface{} // NumProcs is the number of concurrent goroutines that will // process data. NumProcs int // ConsumeFunc is a handler function that will be invoked for // each data item to process it. ConsumeFunc func(interface{}) error // ErrHandler is a handler function that will be called // if an error occurs during processing. ErrHandler func(error) // Notifier is a callback function that will be invoked // on specific events. Notifier func(string) }
Consumer represents a consumer for processing data concurrently.
func NewConsumer ¶
NewConsumer creates a new Consumer instance.
func (*Consumer) Close ¶
func (c *Consumer) Close()
Close gracefully closes the Consumer. It closes the Buffer channel and notifies shutdown.
func (*Consumer) HandleError ¶
func (c *Consumer) HandleError(handler ErrHandler)
sets the error handler function.
type Producer ¶
type Producer struct { // Buffer is the buffered channel for holding produced data before // it is consumed. Buffer chan interface{} // NumProcs controls the number of goroutines that invoke ProduceFunc // to generate data concurrently. NumProcs int // ProduceFunc is the custom data generation function provided by // clients. It should return generated data and any error encountered. ProduceFunc func() (interface{}, error) // ErrHandler handles any errors returned by ProduceFunc. // If not set, errors will be ignored. ErrHandler func(error) // Notifier sends notifications about the producer lifecycle, // e.g. when data generation starts and finishes. // This can be used to add monitoring and logging. Notifier func(string) }
Producer generates data and writes to a buffered channel. It controls a number of goroutines that invoke the custom ProduceFunc to generate data and handle errors.
func NewProducer ¶
NewProducer creates a new Producer instance. It accepts bufferSize and numProcs arguments to configure the Producer.
bufferSize controls the size of the buffered channel used by Producer. numProcs controls the number of goroutines that will generate data.
A configured Producer instance is returned ready to Start data generation.
Example usage:
p := NewProducer(100, 4) // bufferSize 100, four goroutines p.Start()
func (*Producer) Close ¶
func (p *Producer) Close()
Close closes the Producer's buffer channel.
This signals to any goroutines waiting to send to the channel that no more data will arrive. They will exit once the channel is drained.
It should be called when data generation is complete and before disposing the Producer object.
func (*Producer) HandleError ¶
func (p *Producer) HandleError(handler ErrHandler)
HandleError sets a custom error handler function.
The handler will be invoked whenever an error is returned by the ProduceFunc during data generation.
If no custom handler is set, errors will be ignored.
Example:
p.HandleError(func(err error){ log.Printf("data generation failed: %v", err) })
func (*Producer) Inject ¶
Inject writes to out channel in a non-blocking manner, dropping messages if the out channel is full.
func (*Producer) Notify ¶
Notify sets a notifier function to receive lifecycle notifications.
The notifier will be invoked during key events like start/stop of data generation.
This allows monitoring systems to track the Producer lifecycle events.
Example:
p.Notify(func(msg string){ log.Println(msg) })
func (*Producer) Run ¶
Run starts the Producer goroutines to generate data.
It will start NumProcs goroutines to execute the ProduceFunc concurrently to generate data.
The provided context can be used to control cancelation and timeouts.
Run blocks until all goroutines finish or the context is canceled.
Example usage:
ctx := context.Background() p.Run(ctx)