Documentation ¶
Index ¶
- Variables
- func TbMessage2OttoValue(msg *CGMessage) (otto.Value, error)
- type CGMessage
- type CompressionType
- type Consumer
- type ConsumerGroup
- type ConsumerGroupMessage
- type ConsumerGroupOption
- func OptionConsumerGroupFailedCh(ch chan<- *ConsumerGroupMessage) ConsumerGroupOption
- func OptionConsumerGroupHeartbeatInterval(d time.Duration) ConsumerGroupOption
- func OptionConsumerGroupOffsetsInitial(initial int64) ConsumerGroupOption
- func OptionConsumerGroupSessionTimeout(d time.Duration) ConsumerGroupOption
- func OptionConsumerGroupVersion(version string) ConsumerGroupOption
- type ConsumerMessage
- type ConsumerOption
- type Producer
- type ProducerMessage
- type ProducerOption
- func OptionCompression(compressionType CompressionType) ProducerOption
- func OptionFailedCh(ch chan<- *ProducerMessage) ProducerOption
- func OptionFlushBytes(size int) ProducerOption
- func OptionFlushFrequency(frequency time.Duration) ProducerOption
- func OptionFlushMaxMessages(count int) ProducerOption
- func OptionFlushMessages(count int) ProducerOption
- func OptionManualPartition() ProducerOption
- func OptionMaxMessageBytes(size int) ProducerOption
- func OptionNetTLS(config *tls.Config) ProducerOption
- func OptionProducerConcu(queue uint64, concu uint32) ProducerOption
- func OptionProducerUnshare() ProducerOption
- func OptionRequiredAcks(requireType RequireType) ProducerOption
- func OptionRetryBackoff(backoff time.Duration) ProducerOption
- func OptionRetryMax(retry int) ProducerOption
- func OptionSasl(user, passwd string) ProducerOption
- func OptionSucceedCh(ch chan<- *ProducerMessage) ProducerOption
- func OptionTimeout(timeout time.Duration) ProducerOption
- func OptionTopic(topic string, queue uint64, concu uint32) ProducerOption
- func OptionVersion(version sarama.KafkaVersion) ProducerOption
- type RequireType
- type TbProducer
Constants ¶
This section is empty.
Variables ¶
View Source
var ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest )
Functions ¶
Types ¶
type CGMessage ¶
func CGMessage2TbCGMessage ¶
func CGMessage2TbCGMessage(cgmsg *ConsumerGroupMessage) (*CGMessage, error)
type CompressionType ¶
type CompressionType int8
const ( CompressionNone CompressionType = 0 CompressionGZIP CompressionType = 1 CompressionSnappy CompressionType = 2 )
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func NewConsumer ¶
func NewConsumer(addrs []string, options ...ConsumerOption) (*Consumer, error)
func (*Consumer) Output ¶
func (c *Consumer) Output() <-chan *ConsumerMessage
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(addrs []string, options ...ConsumerGroupOption) (*ConsumerGroup, error)
func (*ConsumerGroup) Add ¶
func (cg *ConsumerGroup) Add(topic string, group string, handlers ...func(*ConsumerGroupMessage)) error
func (*ConsumerGroup) Fini ¶
func (cg *ConsumerGroup) Fini()
func (*ConsumerGroup) Output ¶
func (cg *ConsumerGroup) Output() <-chan *ConsumerGroupMessage
type ConsumerGroupMessage ¶
type ConsumerGroupOption ¶
type ConsumerGroupOption func(*ConsumerGroup) error
func OptionConsumerGroupFailedCh ¶
func OptionConsumerGroupFailedCh(ch chan<- *ConsumerGroupMessage) ConsumerGroupOption
func OptionConsumerGroupHeartbeatInterval ¶
func OptionConsumerGroupHeartbeatInterval(d time.Duration) ConsumerGroupOption
func OptionConsumerGroupOffsetsInitial ¶
func OptionConsumerGroupOffsetsInitial(initial int64) ConsumerGroupOption
func OptionConsumerGroupSessionTimeout ¶
func OptionConsumerGroupSessionTimeout(d time.Duration) ConsumerGroupOption
func OptionConsumerGroupVersion ¶
func OptionConsumerGroupVersion(version string) ConsumerGroupOption
0.10.2.1 2.8.0.0
type ConsumerMessage ¶
type ConsumerOption ¶
func OptionConsumerFailedCh ¶
func OptionConsumerFailedCh(ch chan<- *ConsumerMessage) ConsumerOption
func OptionConsumerNetTLS ¶
func OptionConsumerNetTLS(config *tls.Config) ConsumerOption
func OptionConsumerSasl ¶
func OptionConsumerSasl(user, passwd string) ConsumerOption
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
* Producer结束应该调用Fini,这样会等待本模块所有可能block * 的函数结束才返回。 *
func NewProducer ¶
func NewProducer(addrs []string, options ...ProducerOption) (*Producer, error)
func (*Producer) Input ¶
func (p *Producer) Input() chan<- *ProducerMessage
type ProducerMessage ¶
type ProducerMessage struct { //topic Topic string // The partitioning key for this message. Pre-existing Encoders include // StringEncoder and ByteEncoder. Key sarama.Encoder //Payload to produce. Payload []byte // The headers are key-value pairs that are transparently passed // by Kafka between producers and consumers. Headers []sarama.RecordHeader //Custom data which will be enqueued FailedQueue if failed, //or be enqueued SucceedQueue if succeed. Custom interface{} //Partition specified by user when OptionManualPartition //was set, or returned by broker. Partition int32 //Offset returned by broker. Offset int64 //Error will be set while showing up in FailedQueue. Error error }
用户消息
func OttoValue2PMessage ¶
func OttoValue2PMessage(msg otto.Value) (*ProducerMessage, error)
{ "Topic": "foo", "Payload": "bar" }
type ProducerOption ¶
func OptionCompression ¶
func OptionCompression(compressionType CompressionType) ProducerOption
func OptionFailedCh ¶
func OptionFailedCh(ch chan<- *ProducerMessage) ProducerOption
func OptionFlushBytes ¶
func OptionFlushBytes(size int) ProducerOption
func OptionFlushFrequency ¶
func OptionFlushFrequency(frequency time.Duration) ProducerOption
func OptionFlushMaxMessages ¶
func OptionFlushMaxMessages(count int) ProducerOption
func OptionFlushMessages ¶
func OptionFlushMessages(count int) ProducerOption
func OptionManualPartition ¶
func OptionManualPartition() ProducerOption
func OptionMaxMessageBytes ¶
func OptionMaxMessageBytes(size int) ProducerOption
func OptionNetTLS ¶
func OptionNetTLS(config *tls.Config) ProducerOption
func OptionProducerConcu ¶
func OptionProducerConcu(queue uint64, concu uint32) ProducerOption
用户队列和分派并发
func OptionRequiredAcks ¶
func OptionRequiredAcks(requireType RequireType) ProducerOption
func OptionRetryBackoff ¶
func OptionRetryBackoff(backoff time.Duration) ProducerOption
func OptionRetryMax ¶
func OptionRetryMax(retry int) ProducerOption
func OptionSasl ¶
func OptionSasl(user, passwd string) ProducerOption
func OptionSucceedCh ¶
func OptionSucceedCh(ch chan<- *ProducerMessage) ProducerOption
如果库消息发送成功,则把该消息回写ch
func OptionTimeout ¶
func OptionTimeout(timeout time.Duration) ProducerOption
func OptionTopic ¶
func OptionTopic(topic string, queue uint64, concu uint32) ProducerOption
每个topic配置的读队列和并发
func OptionVersion ¶
func OptionVersion(version sarama.KafkaVersion) ProducerOption
type RequireType ¶
type RequireType int8
const ( RequireNoResponse RequireType = 0 RequireOnlyLeader RequireType = 1 RequireAllReplicas RequireType = -1 )
type TbProducer ¶
type TbProducer struct {
// contains filtered or unexported fields
}
func NewTbProducer ¶
func NewTbProducer() (*TbProducer, error)
func (*TbProducer) Produce ¶
func (tbproducer *TbProducer) Produce(call otto.FunctionCall) otto.Value
Click to show internal directories.
Click to hide internal directories.