Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var ( ErrDiscard = errors.New("discard current element") ErrDiscardOldest = errors.New("discard oldest element") )
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config defines params for Coordinator
type Coordinator ¶
type Coordinator struct {
// contains filtered or unexported fields
}
Coordinator implements a producer-consumer workflow. Put() add new elements into inner buffer queue, and will be processed by Consumer.
func NewCoordinator ¶
func NewCoordinator(config Config) Coordinator
func (Coordinator) Close ¶
func (c Coordinator) Close(graceful bool) error
Close closes the Coordinator, no more element can be put any more. It can be graceful, which means: 1. blocking 2. all remaining elements in buffer queue will make sure to be consumed.
func (Coordinator) Put ¶
Put new element into inner buffer queue. It return error when inner buffer queue is full, and elements failed putting to queue is the first return value. Due to different rejectPolicy, multiple elements may be discarded before current element put successfully. Common usages pattern:
discarded, err := c.Put(e) if err != nil { fmt.Errorf("discarded elements %+v for err %v", discarded, err) }
func (Coordinator) RemainingCapacity ¶
func (c Coordinator) RemainingCapacity() int
RemainingCapacity return how many elements inner buffer queue can hold.
func (Coordinator) Start ¶
func (c Coordinator) Start()
Start workers to consume elements from queue.
type Option ¶
type Option func(*Config)
Option constructs a Config
func SetBatchInterval ¶
func SetBatchSize ¶
func SetBufferSize ¶
SetBufferSize defines inner buffer queue's size
func SetCallback ¶
SetCallback defines callback invoked with elements and err returned from consumer
func SetNumConsumer ¶
func SetRejectPolicy ¶
func SetRejectPolicy(rp RejectPolicy) Option
SetRejectPolicy defines which elements get discarded when the queue is full
type RejectPolicy ¶
type RejectPolicy int
rejectPolicy control which elements get discarded when the queue is full
const ( // Block current goroutine, no elements discarded Block RejectPolicy = iota // Discard current element Discard // DiscardOldest remove the oldest to make room for new element DiscardOldest )