producerconsumer

package
v0.0.0-...-2824e21 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 8, 2023 License: MIT Imports: 4 Imported by: 0

README

Producer-Consumer

这个包实现了生产者-消费者模式,通过Goroutine和channel进行数据传输和处理。

特性

  • Producer通过Goroutine并发生成数据,写入Buffer Channel

  • Consumer通过Goroutine从Buffer Channel并发读取数据处理

  • 使用缓冲Channel进行解耦

  • 支持配置生产者和消费者的并发数

  • 非阻塞写保证生产者效率

  • 超时机制防止永久阻塞

  • 反压机制防止生产速度过快

  • 自定义错误处理

示例

p := newProducer() 

c := newConsumer()

// 实现生产和消费函数

p.Run(ctx) 

c.Run(ctx)

p.Inject(ctx, c.Buffer)

接口

  • NewProducerNewConsumer 创建实例

  • Producer.Run 启动生产goroutine

  • Consumer.Run 启动消费goroutine

  • Producer.Inject 将数据从生产者输入消费者channel

  • Producer.CloseConsumer.Close 关闭

生产者接口

  • SetProduceFunc 自定义生产函数

  • SetErrorHandler 错误处理

  • SetNotifier 生命周期通知

消费者接口

  • SetConsumeFunc 自定义消费函数

  • SetErrorHandler 错误处理

  • SetNotifier 生命周期通知

Documentation

Overview

package Consumerconsumer implements a Consumer and consumer model to handle data generation and processing in goroutines.

package producerconsumer implements a producer and consumer model to handle data generation and processing in goroutines.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Consumer

type Consumer struct {

	// Buffer is a buffered channel that holds data to be consumed.
	Buffer chan interface{}

	// NumProcs is the number of concurrent goroutines that will
	// process data.
	NumProcs int

	// ConsumeFunc is a handler function that will be invoked for
	// each data item to process it.
	ConsumeFunc func(interface{}) error

	// ErrHandler is a handler function that will be called
	// if an error occurs during processing.
	ErrHandler func(error)

	// Notifier is a callback function that will be invoked
	// on specific events.
	Notifier func(string)
}

Consumer represents a consumer for processing data concurrently.

func NewConsumer

func NewConsumer(bufferSize int, numProcs int) *Consumer

NewConsumer creates a new Consumer instance.

func (*Consumer) Close

func (c *Consumer) Close()

Close gracefully closes the Consumer. It closes the Buffer channel and notifies shutdown.

func (*Consumer) HandleError

func (c *Consumer) HandleError(handler ErrHandler)

sets the error handler function.

func (*Consumer) Notify

func (c *Consumer) Notify(notifier Notifier)

sets the Notify handler function.

func (*Consumer) Run

func (c *Consumer) Run(ctx context.Context)

Run starts the consumer by spinning up multiple concurrent goroutines to process data.

type ErrHandler

type ErrHandler func(error)

ErrHandler Consumer err

type Notifier

type Notifier func(string)

Notifier

type Producer

type Producer struct {

	// Buffer is the buffered channel for holding produced data before
	// it is consumed.
	Buffer chan interface{}

	// NumProcs controls the number of goroutines that invoke ProduceFunc
	// to generate data concurrently.
	NumProcs int

	// ProduceFunc is the custom data generation function provided by
	// clients. It should return generated data and any error encountered.
	ProduceFunc func() (interface{}, error)

	// ErrHandler handles any errors returned by ProduceFunc.
	// If not set, errors will be ignored.
	ErrHandler func(error)

	// Notifier sends notifications about the producer lifecycle,
	// e.g. when data generation starts and finishes.
	// This can be used to add monitoring and logging.
	Notifier func(string)
}

Producer generates data and writes to a buffered channel. It controls a number of goroutines that invoke the custom ProduceFunc to generate data and handle errors.

func NewProducer

func NewProducer(bufferSize int, numProcs int) *Producer

NewProducer creates a new Producer instance. It accepts bufferSize and numProcs arguments to configure the Producer.

bufferSize controls the size of the buffered channel used by Producer. numProcs controls the number of goroutines that will generate data.

A configured Producer instance is returned ready to Start data generation.

Example usage:

p := NewProducer(100, 4) // bufferSize 100, four goroutines
p.Start()

func (*Producer) Close

func (p *Producer) Close()

Close closes the Producer's buffer channel.

This signals to any goroutines waiting to send to the channel that no more data will arrive. They will exit once the channel is drained.

It should be called when data generation is complete and before disposing the Producer object.

func (*Producer) HandleError

func (p *Producer) HandleError(handler ErrHandler)

HandleError sets a custom error handler function.

The handler will be invoked whenever an error is returned by the ProduceFunc during data generation.

If no custom handler is set, errors will be ignored.

Example:

p.HandleError(func(err error){
  log.Printf("data generation failed: %v", err)
})

func (*Producer) Inject

func (p *Producer) Inject(ctx context.Context, out chan interface{})

Inject writes to out channel in a non-blocking manner, dropping messages if the out channel is full.

func (*Producer) Notify

func (p *Producer) Notify(notifier Notifier)

Notify sets a notifier function to receive lifecycle notifications.

The notifier will be invoked during key events like start/stop of data generation.

This allows monitoring systems to track the Producer lifecycle events.

Example:

p.Notify(func(msg string){
  log.Println(msg)
})

func (*Producer) Run

func (p *Producer) Run(ctx context.Context)

Run starts the Producer goroutines to generate data.

It will start NumProcs goroutines to execute the ProduceFunc concurrently to generate data.

The provided context can be used to control cancelation and timeouts.

Run blocks until all goroutines finish or the context is canceled.

Example usage:

ctx := context.Background()
p.Run(ctx)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL