Documentation ¶
Overview ¶
Package client implements the synchronous and asynchronous kafka Producers and the kafka Consumer.
Index ¶
- Constants
- func NewClient(config *Config, partitioner string) (sarama.Client, error)
- type AsyncProducer
- func (ref *AsyncProducer) Close(async ...bool) error
- func (ref *AsyncProducer) GetCloseChannel() <-chan struct{}
- func (ref *AsyncProducer) IsClosed() bool
- func (ref *AsyncProducer) SendMsg(topic string, key sarama.Encoder, msg sarama.Encoder, metadata interface{})
- func (ref *AsyncProducer) SendMsgByte(topic string, key []byte, msg []byte, metadata interface{})
- func (ref *AsyncProducer) SendMsgToPartition(topic string, partition int32, key Encoder, msg Encoder, metadata interface{})
- func (ref *AsyncProducer) WaitForClose()
- type Config
- func (ref *Config) ConsumerConfig() *cluster.Config
- func (ref *Config) ProducerConfig() *sarama.Config
- func (ref *Config) SetAcks(acks RequiredAcks)
- func (ref *Config) SetBrokers(brokers ...string)
- func (ref *Config) SetDebug(val bool)
- func (ref *Config) SetErrorChan(val chan *ProducerError)
- func (ref *Config) SetGroup(id string)
- func (ref *Config) SetInitialOffset(offset int64)
- func (ref *Config) SetPartition(val int32)
- func (ref *Config) SetPartitioner(val string)
- func (ref *Config) SetRecvError(val bool)
- func (ref *Config) SetRecvErrorChan(val chan error)
- func (ref *Config) SetRecvMessageChan(val chan *ConsumerMessage)
- func (ref *Config) SetRecvNotification(val bool)
- func (ref *Config) SetRecvNotificationChan(val chan *cluster.Notification)
- func (ref *Config) SetSendError(val bool)
- func (ref *Config) SetSendSuccess(val bool)
- func (ref *Config) SetSuccessChan(val chan *ProducerMessage)
- func (ref *Config) SetTopics(topics string)
- func (ref *Config) ValidateAsyncProducerConfig() error
- func (ref *Config) ValidateConsumerConfig() error
- func (ref *Config) ValidateSyncProducerConfig() error
- type Consumer
- func (ref *Consumer) Close() error
- func (ref *Consumer) CommitOffsets() error
- func (ref *Consumer) ConsumerErrorHandler(in <-chan *sarama.ConsumerError)
- func (ref *Consumer) GetCloseChannel() <-chan struct{}
- func (ref *Consumer) IsClosed() bool
- func (ref *Consumer) MarkOffset(msg *ConsumerMessage, metadata string)
- func (ref *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
- func (ref *Consumer) MessageHandler(in <-chan *sarama.ConsumerMessage)
- func (ref *Consumer) PrintNotification(note map[string][]int32)
- func (ref *Consumer) Subscriptions() map[string][]int32
- func (ref *Consumer) WaitForClose()
- type ConsumerMessage
- type Encoder
- type ProducerError
- type ProducerMessage
- type ProtoConsumerMessage
- type ProtoProducerMessage
- type ProtoProducerMessageErr
- type RequiredAcks
- type SyncProducer
- func (ref *SyncProducer) Close() error
- func (ref *SyncProducer) GetCloseChannel() <-chan struct{}
- func (ref *SyncProducer) IsClosed() bool
- func (ref *SyncProducer) SendMsg(topic string, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error)
- func (ref *SyncProducer) SendMsgByte(topic string, key []byte, msg []byte) (*ProducerMessage, error)
- func (ref *SyncProducer) SendMsgToPartition(topic string, partition int32, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error)
- func (ref *SyncProducer) WaitForClose()
Examples ¶
Constants ¶
const ( // Hash scheme (messages with the same key always end up on the same partition) Hash = "hash" // Random scheme (random partition is always used) Random = "random" // Manual scheme (partitions are manually set in the provided message's partition field) Manual = "manual" )
Partitioner schemes
Variables ¶
This section is empty.
Functions ¶
Types ¶
type AsyncProducer ¶
type AsyncProducer struct { logging.Logger Config *Config Client sarama.Client Producer sarama.AsyncProducer Partition int32 sync.Mutex // contains filtered or unexported fields }
AsyncProducer allows to publish message to kafka using asynchronous API. The message using SendMsgToPartition and SendMsgByte function returns do not block. The status whether message was sent successfully or not is delivered using channels specified in config structure.
Example ¶
log := logroot.StandardLogger() //init config config := NewConfig(logroot.StandardLogger()) config.SetBrokers("localhost:9091", "localhost:9092") config.SetSendSuccess(true) config.SetSuccessChan(make(chan *ProducerMessage)) config.SetSendError(true) config.SetErrorChan(make(chan *ProducerError)) // init client sClient, err := NewClient(config, Hash) if err != nil { return } // init producer producer, err := NewAsyncProducer(config, sClient, Hash, nil) if err != nil { log.Errorf("NewAsyncProducer errored: %v\n", err) return } // send a message producer.SendMsgByte("test-topic", []byte("key"), []byte("test message"), nil) select { case msg := <-config.SuccessChan: log.Info("message sent successfully - ", msg) case err := <-config.ErrorChan: log.Error("message errored - ", err) } // close producer and release resources err = producer.Close(true) if err != nil { log.Errorf("AsyncProducer close errored: %v\n", err) return } log.Info("AsyncProducer closed")
Output:
func GetAsyncProducerMock ¶
func GetAsyncProducerMock(t mocks.ErrorReporter) (*AsyncProducer, *mocks.AsyncProducer)
GetAsyncProducerMock returns mocked implementation of async producer that doesn't need connection to Kafka broker and can be used for testing purposes.
func NewAsyncProducer ¶
func NewAsyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*AsyncProducer, error)
NewAsyncProducer returns a new AsyncProducer instance. Producer is created from provided sarama client which can be nil; in that case a new client will be created. Also the partitioner is set here. Note: provided sarama client partitioner should match the one used in config.
func (*AsyncProducer) Close ¶
func (ref *AsyncProducer) Close(async ...bool) error
Close closes the client and producer
func (*AsyncProducer) GetCloseChannel ¶
func (ref *AsyncProducer) GetCloseChannel() <-chan struct{}
GetCloseChannel returns a channel that is closed on asyncProducer cleanup
func (*AsyncProducer) IsClosed ¶
func (ref *AsyncProducer) IsClosed() bool
IsClosed returns the "closed" status
func (*AsyncProducer) SendMsg ¶
func (ref *AsyncProducer) SendMsg(topic string, key sarama.Encoder, msg sarama.Encoder, metadata interface{})
SendMsg sends a message to Kafka using default partition
func (*AsyncProducer) SendMsgByte ¶
func (ref *AsyncProducer) SendMsgByte(topic string, key []byte, msg []byte, metadata interface{})
SendMsgByte sends an async message to Kafka.
func (*AsyncProducer) SendMsgToPartition ¶ added in v1.0.4
func (ref *AsyncProducer) SendMsgToPartition(topic string, partition int32, key Encoder, msg Encoder, metadata interface{})
SendMsgToPartition sends an async message to Kafka
func (*AsyncProducer) WaitForClose ¶
func (ref *AsyncProducer) WaitForClose()
WaitForClose returns when the producer is closed
type Config ¶
type Config struct { logging.Logger // Config extends the sarama-cluster.Config with the kafkaclient namespace *cluster.Config // Context Package carries deadlines, cancelation signals, and other values. // see: http://golang.org/x/net/context Context context.Context // Cancel is a function that can be call, e.g. config.Cancel(), to cancel and close // the producer/consumer Cancel context.CancelFunc // Brokers contains "{domain:port}" array of Kafka brokers. // This list of brokers is used by the kafkaclient to determine the 'lead' broker for each topic // and the 'lead' consumer for each topic. If only one broker is supplied then it will be used to // communicate with the other brokers. // REQUIRED: PRODUCER AND CONSUMER. Brokers []string // GroupID contains the name of the consumer's group. // REQUIRED: CONSUMER. GroupID string // Debug determines if debug code should be 'turned-on'. // DEFAULT: false. OPTIONAL. Debug bool // Topics contains the topics that a consumer should retrieve messages for. // REQUIRED: CONSUMER. Topics []string // Partition is the partition. Used when configuring partitions manually. Partition int32 // Partitioner is the method used to determine a topic's partition. // REQUIRED: PRODUCER. DEFAULT: HASH Partitioner sarama.PartitionerConstructor // InitialOffset indicates the initial offset that should be used when a consumer is initialized and begins reading // the Kafka message log for the topic. If the offset was previously committed then the committed offset is used // rather than the initial offset. // REQUIRED: CONSUMER InitialOffset int64 // RequiredAcks is the level of acknowledgement reliability needed from the broker // REQUIRED: PRODUCER. DEFAULT(Async) WaitForLocal DEFAULT(Sync) WaitForAll RequiredAcks RequiredAcks // RecvNotification indicates that a Consumer return "Notification" messages after it has rebalanced. // REQUIRED: CONSUMER. DEFAULT: false. RecvNotification bool // NotificationChan function called when a "Notification" message is received by a consumer. // REQUIRED: CONSUMER if 'RecvNotification=true' RecvNotificationChan chan *cluster.Notification // RecvError indicates that "receive" errors should not be ignored and should be returned to the consumer. // REQUIRED: CONSUMER. DEFAULT: true. RecvError bool // RecvErrorChan channel is for delivery of "Error" messages received by the consumer. // REQUIRED: CONSUMER if 'RecvError=true' RecvErrorChan chan error // MessageChan channel is used for delivery of consumer messages. // REQUIRED: CONSUMER RecvMessageChan chan *ConsumerMessage // SendSuccess indicates that the Async Producer should return "Success" messages when a message // has been successfully received by the Kafka. // REQUIRED: CONSUMER. DEFAULT: false. SendSuccess bool // SuccessChan is used for delivery of message when a "Success" is returned by Async Producer. // REQUIRED: PRODUCER if 'SendSuccess=true' SuccessChan chan *ProducerMessage // SendError indicates that an Async Producer should return "Error" messages when a message transmission to Kafka // failed. // REQUIRED: CONSUMER. DEFAULT: true. SendError bool // ErrorChan is used for delivery of "Error" message if an error is returned by Async Producer. // REQUIRED: PRODUCER if 'SendError=true' ErrorChan chan *ProducerError }
Config struct provides the configuration for a Producer (Sync or Async) and Consumer.
func (*Config) ConsumerConfig ¶
func (ref *Config) ConsumerConfig() *cluster.Config
ConsumerConfig sets the Config.ConsumerConfig field
func (*Config) ProducerConfig ¶
ProducerConfig sets the Config.ProducerConfig field
func (*Config) SetAcks ¶
func (ref *Config) SetAcks(acks RequiredAcks)
SetAcks sets the Config.RequiredAcks field
func (*Config) SetBrokers ¶
SetBrokers sets the Config.Brokers field
func (*Config) SetErrorChan ¶
func (ref *Config) SetErrorChan(val chan *ProducerError)
SetErrorChan sets the Config.ErrorChan field
func (*Config) SetInitialOffset ¶
SetInitialOffset sets the Config.InitialOffset field
func (*Config) SetPartition ¶
SetPartition sets the Config.SetPartition field
func (*Config) SetPartitioner ¶
SetPartitioner sets the Config.SetPartitioner field
func (*Config) SetRecvError ¶
SetRecvError sets the Config.RecvError field
func (*Config) SetRecvErrorChan ¶
SetRecvErrorChan sets the Config.RecvErrorChan field
func (*Config) SetRecvMessageChan ¶
func (ref *Config) SetRecvMessageChan(val chan *ConsumerMessage)
SetRecvMessageChan sets the Config.RecvMessageChan field
func (*Config) SetRecvNotification ¶
SetRecvNotification sets the Config.RecvNotification field
func (*Config) SetRecvNotificationChan ¶
func (ref *Config) SetRecvNotificationChan(val chan *cluster.Notification)
SetRecvNotificationChan sets the Config.RecvNotificationChan field
func (*Config) SetSendError ¶
SetSendError sets the Config.SendError field
func (*Config) SetSendSuccess ¶
SetSendSuccess sets the Config.SendSuccess field
func (*Config) SetSuccessChan ¶
func (ref *Config) SetSuccessChan(val chan *ProducerMessage)
SetSuccessChan sets the Config.SuccessChan field
func (*Config) ValidateAsyncProducerConfig ¶
ValidateAsyncProducerConfig validates config for an Async Producer
func (*Config) ValidateConsumerConfig ¶
ValidateConsumerConfig validates config for Consumer
func (*Config) ValidateSyncProducerConfig ¶
ValidateSyncProducerConfig validates config for a Sync Producer
type Consumer ¶
type Consumer struct { logging.Logger Config *Config Client sarama.Client Consumer clusterConsumer sync.Mutex // contains filtered or unexported fields }
Consumer allows to consume message belonging to specified set of kafka topics.
Example ¶
//init config config := NewConfig(logroot.StandardLogger()) config.SetBrokers("localhost:9091,localhost:9092") config.SetRecvNotification(true) config.SetRecvNotificationChan(make(chan *cluster.Notification)) config.SetRecvError(true) config.SetRecvErrorChan(make(chan error)) config.SetRecvMessageChan(make(chan *ConsumerMessage)) config.SetTopics("topic1,topic2,topic3") config.SetGroup("test-group") // init consumer with message handlers consumer, err := NewConsumer(config, true, nil) if err != nil { log.Errorf("NewConsumer Error: %v", err) } go watchChannels(consumer, config) // wait for consumer to finish receiving messages consumer.WaitForClose() log.Info("consumer closed") // do something
Output:
func GetConsumerMock ¶
func GetConsumerMock(t mocks.ErrorReporter) *Consumer
GetConsumerMock returns mocked implementation of consumer that doesn't need connection to kafka cluster.
func NewConsumer ¶
NewConsumer returns a Consumer instance. If startHandlers is set to true, reading of messages, errors and notifications is started using new consumer. Otherwise, only instance is returned
func (*Consumer) CommitOffsets ¶
CommitOffsets manually commits marked offsets
func (*Consumer) ConsumerErrorHandler ¶ added in v1.0.4
func (ref *Consumer) ConsumerErrorHandler(in <-chan *sarama.ConsumerError)
ConsumerErrorHandler processes each error message
func (*Consumer) GetCloseChannel ¶
func (ref *Consumer) GetCloseChannel() <-chan struct{}
GetCloseChannel returns a channel that is closed on asyncProducer cleanup
func (*Consumer) MarkOffset ¶
func (ref *Consumer) MarkOffset(msg *ConsumerMessage, metadata string)
MarkOffset marks the provided message as processed, alongside a metadata string that represents the state of the partition consumer at that point in time. The metadata string can be used by another consumer to restore that state, so it can resume consumption.
Note: calling MarkOffset does not necessarily commit the offset to the backend store immediately for efficiency reasons, and it may never be committed if your application crashes. This means that you may end up processing the same message twice, and your processing should ideally be idempotent.
func (*Consumer) MarkPartitionOffset ¶
func (ref *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string)
MarkPartitionOffset marks an offset of the provided topic/partition as processed. See MarkOffset for additional explanation.
func (*Consumer) MessageHandler ¶ added in v1.0.4
func (ref *Consumer) MessageHandler(in <-chan *sarama.ConsumerMessage)
MessageHandler processes each incoming message
func (*Consumer) PrintNotification ¶
PrintNotification print the topics and partitions
func (*Consumer) Subscriptions ¶
Subscriptions returns the consumed topics and partitions
func (*Consumer) WaitForClose ¶
func (ref *Consumer) WaitForClose()
WaitForClose waits for the consumer to close
type ConsumerMessage ¶
type ConsumerMessage struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
Timestamp time.Time
}
ConsumerMessage encapsulates a Kafka message returned by the consumer.
func (*ConsumerMessage) GetKey ¶
func (cm *ConsumerMessage) GetKey() string
GetKey returns the key associated with the message.
func (*ConsumerMessage) GetOffset ¶ added in v1.0.3
func (cm *ConsumerMessage) GetOffset() int64
GetOffset returns the offset associated with the message
func (*ConsumerMessage) GetPartition ¶ added in v1.0.3
func (cm *ConsumerMessage) GetPartition() int32
GetPartition returns the partition associated with the message
func (*ConsumerMessage) GetTopic ¶
func (cm *ConsumerMessage) GetTopic() string
GetTopic returns the topic associated with the message
func (*ConsumerMessage) GetValue ¶
func (cm *ConsumerMessage) GetValue() []byte
GetValue returns the value associated with the message.
type Encoder ¶
Encoder defines an interface that is used as argument of producer functions. It wraps the sarama.Encoder
type ProducerError ¶
type ProducerError struct { *ProducerMessage Err error }
ProducerError is the type of error generated when the producer fails to deliver a message. It contains the original ProducerMessage as well as the actual error value.
func (*ProducerError) Error ¶
func (ref *ProducerError) Error() error
func (*ProducerError) String ¶
func (ref *ProducerError) String() string
type ProducerMessage ¶
type ProducerMessage struct { // The Kafka topic for this message. Topic string // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key Encoder // The actual message to store in Kafka. Pre-existing Encoders include // StringEncoder and ByteEncoder. Value Encoder // This field is used to hold arbitrary data you wish to include so it // will be available when receiving on the Successes and Errors channels. // Sarama completely ignores this field and is only to be used for // pass-through data. Metadata interface{} // Offset is the offset of the message stored on the broker. This is only // guaranteed to be defined if the message was successfully delivered and // RequiredAcks is not NoResponse. Offset int64 // Partition is the partition that the message was sent to. This is only // guaranteed to be defined if the message was successfully delivered. Partition int32 }
ProducerMessage is the collection of elements passed to the Producer in order to send a message.
func (*ProducerMessage) GetKey ¶
func (pm *ProducerMessage) GetKey() string
GetKey returns the key associated with the message.
func (*ProducerMessage) GetOffset ¶ added in v1.0.3
func (pm *ProducerMessage) GetOffset() int64
GetOffset returns the offset associated with the message.
func (*ProducerMessage) GetPartition ¶ added in v1.0.3
func (pm *ProducerMessage) GetPartition() int32
GetPartition returns the partition associated with the message.
func (*ProducerMessage) GetTopic ¶
func (pm *ProducerMessage) GetTopic() string
GetTopic returns the topic associated with the message.
func (*ProducerMessage) GetValue ¶
func (pm *ProducerMessage) GetValue() []byte
GetValue returns the content of the message.
func (*ProducerMessage) String ¶
func (pm *ProducerMessage) String() string
type ProtoConsumerMessage ¶
type ProtoConsumerMessage struct { *ConsumerMessage // contains filtered or unexported fields }
ProtoConsumerMessage encapsulates a Kafka message returned by the consumer and provides means to unmarshal the value into proto.Message.
func NewProtoConsumerMessage ¶
func NewProtoConsumerMessage(msg *ConsumerMessage, serializer keyval.Serializer) *ProtoConsumerMessage
NewProtoConsumerMessage creates new instance of ProtoConsumerMessage
func (*ProtoConsumerMessage) GetKey ¶
func (cm *ProtoConsumerMessage) GetKey() string
GetKey returns the key associated with the message.
func (*ProtoConsumerMessage) GetOffset ¶ added in v1.0.3
func (cm *ProtoConsumerMessage) GetOffset() int64
GetOffset returns the offset associated with the message.
func (*ProtoConsumerMessage) GetPartition ¶ added in v1.0.3
func (cm *ProtoConsumerMessage) GetPartition() int32
GetPartition returns the partition associated with the message.
func (*ProtoConsumerMessage) GetTopic ¶
func (cm *ProtoConsumerMessage) GetTopic() string
GetTopic returns the topic associated with the message.
type ProtoProducerMessage ¶
type ProtoProducerMessage struct { *ProducerMessage Serializer keyval.Serializer }
ProtoProducerMessage is wrapper of a producer message that simplify work with proto-modelled data.
func (*ProtoProducerMessage) GetKey ¶
func (ppm *ProtoProducerMessage) GetKey() string
GetKey returns the key associated with the message.
func (*ProtoProducerMessage) GetOffset ¶ added in v1.0.3
func (ppm *ProtoProducerMessage) GetOffset() int64
GetOffset returns the offset associated with the message.
func (*ProtoProducerMessage) GetPartition ¶ added in v1.0.3
func (ppm *ProtoProducerMessage) GetPartition() int32
GetPartition returns the partition associated with the message.
func (*ProtoProducerMessage) GetTopic ¶
func (ppm *ProtoProducerMessage) GetTopic() string
GetTopic returns the topic associated with the message.
type ProtoProducerMessageErr ¶
type ProtoProducerMessageErr struct { *ProtoProducerMessage Err error }
ProtoProducerMessageErr represents a proto-modelled message that was not published successfully.
func (*ProtoProducerMessageErr) Error ¶
func (pme *ProtoProducerMessageErr) Error() error
type RequiredAcks ¶
type RequiredAcks int16
RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements it must see before responding. Any of the constants defined here are valid except AcksUnset.
const ( // AcksUnset indicates that no valid value has been set AcksUnset RequiredAcks = -32768 // NoResponse doesn't send any response, the TCP ACK is all you get. NoResponse RequiredAcks = 0 // WaitForLocal waits for only the local commit to succeed before responding. WaitForLocal RequiredAcks = 1 // WaitForAll waits for all replicas to commit before responding. WaitForAll RequiredAcks = -1 )
type SyncProducer ¶
type SyncProducer struct { logging.Logger Config *Config Client sarama.Client Producer sarama.SyncProducer Partition int32 sync.Mutex // contains filtered or unexported fields }
SyncProducer allows to publish messages to kafka using synchronous API.
Example ¶
// init config config := NewConfig(logroot.StandardLogger()) config.ProducerConfig().Producer.RequiredAcks = sarama.WaitForAll config.SetBrokers("localhost:9091", "localhost:9092") // init client sClient, err := NewClient(config, Hash) if err != nil { return } // init producer producer, err := NewSyncProducer(config, sClient, Hash, nil) if err != nil { log.Errorf("NewSyncProducer errored: %v\n", err) return } // send message _, err = producer.SendMsgByte("test-topic", nil, []byte("test message")) if err != nil { log.Errorf("SendMsg errored: %v", err) } // close producer and release resources err = producer.Close() if err != nil { log.Errorf("SyncProducer close errored: %v", err) return } log.Info("SyncProducer closed")
Output:
func GetSyncProducerMock ¶
func GetSyncProducerMock(t mocks.ErrorReporter) (*SyncProducer, *mocks.SyncProducer)
GetSyncProducerMock returns mocked implementation of sync producer that doesn't need connection to Kafka broker and can be used for testing purposes.
func NewSyncProducer ¶
func NewSyncProducer(config *Config, sClient sarama.Client, partitioner string, wg *sync.WaitGroup) (*SyncProducer, error)
NewSyncProducer returns a new SyncProducer instance. Producer is created from provided sarama client which can be nil; in that case, a new client is created. Also the partitioner is set here. Note: provided sarama client partitioner should match the one used in config.
func (*SyncProducer) Close ¶
func (ref *SyncProducer) Close() error
Close closes the client and producer
func (*SyncProducer) GetCloseChannel ¶
func (ref *SyncProducer) GetCloseChannel() <-chan struct{}
GetCloseChannel returns a channel that is closed on asyncProducer cleanup
func (*SyncProducer) IsClosed ¶
func (ref *SyncProducer) IsClosed() bool
IsClosed returns the "closed" status
func (*SyncProducer) SendMsg ¶
func (ref *SyncProducer) SendMsg(topic string, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error)
SendMsg sends a message to Kafka using default partition
func (*SyncProducer) SendMsgByte ¶
func (ref *SyncProducer) SendMsgByte(topic string, key []byte, msg []byte) (*ProducerMessage, error)
SendMsgByte sends a message to Kafka
func (*SyncProducer) SendMsgToPartition ¶ added in v1.0.4
func (ref *SyncProducer) SendMsgToPartition(topic string, partition int32, key sarama.Encoder, msg sarama.Encoder) (*ProducerMessage, error)
SendMsgToPartition sends a message to Kafka
func (*SyncProducer) WaitForClose ¶
func (ref *SyncProducer) WaitForClose()
WaitForClose returns when the producer is closed