Documentation
¶
Index ¶
- func New(setters ...Setter) consumer.Consumer
- type Consumer
- func (c *Consumer) AddHandler(handlers ...consumer.Handler)
- func (c *Consumer) Cleanup(session sarama.ConsumerGroupSession) error
- func (c *Consumer) Close()
- func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
- func (c *Consumer) Run()
- func (c *Consumer) Setup(session sarama.ConsumerGroupSession) error
- type Setter
- func SetAdmin(maxRetry int, backoff, timeout time.Duration) Setter
- func SetApiVersionRequest(apiVersionRequest bool) Setter
- func SetBrokers(brokers ...string) Setter
- func SetChannelBufferSize(channelBufferSize int) Setter
- func SetClientID(clientID string) Setter
- func SetConsumerFetch(min int32, defaultBytes int32, max int32) Setter
- func SetConsumerGroupHeartbeatInterval(interval time.Duration) Setter
- func SetConsumerGroupInstanceId(instanceId string) Setter
- func SetConsumerGroupMemberUserData(userData []byte) Setter
- func SetConsumerGroupRebalanceGroupStrategies(groupStrategies []sarama.BalanceStrategy) Setter
- func SetConsumerGroupRebalanceRetry(max int, backoff time.Duration) Setter
- func SetConsumerGroupRebalanceTimeout(timeout time.Duration) Setter
- func SetConsumerGroupResetInvalidOffsets(resetInvalidOffsets bool) Setter
- func SetConsumerGroupSessionTimeout(timeout time.Duration) Setter
- func SetConsumerInterceptors(interceptors []sarama.ConsumerInterceptor) Setter
- func SetConsumerIsolationLevel(isolationLevel sarama.IsolationLevel) Setter
- func SetConsumerMaxWaitTime(maxWaitTime time.Duration) Setter
- func SetConsumerOffsetsAutoCommit(enable bool, interval time.Duration) Setter
- func SetConsumerOffsetsInitial(initial int64) Setter
- func SetConsumerOffsetsRetention(retention time.Duration) Setter
- func SetConsumerOffsetsRetry(max int) Setter
- func SetConsumerRetry(backoff time.Duration, backoffFunc func(retries int) time.Duration) Setter
- func SetConsumerReturn(errors bool) Setter
- func SetGroupID(groupID string) Setter
- func SetLogger(logger logger.LogWriter) Setter
- func SetMetadataAllowAutoTopicCreation(allowAutoTopicCreation bool) Setter
- func SetMetadataFull(full bool) Setter
- func SetMetadataRefreshFrequency(refreshFrequency time.Duration) Setter
- func SetMetadataRetry(max int, backoff time.Duration, ...) Setter
- func SetMetadataTimeout(timeout time.Duration) Setter
- func SetMetricRegistry(metricRegistry metrics.Registry) Setter
- func SetNetDialTimeout(dialTimeout time.Duration) Setter
- func SetNetKeepAlive(keepAlive time.Duration) Setter
- func SetNetLocalAddr(addr net.Addr) Setter
- func SetNetMaxOpenRequests(maxOpenRequests int) Setter
- func SetNetProxy(enable bool, dialer proxy.Dialer) Setter
- func SetNetReadTimeout(readTimeout time.Duration) Setter
- func SetNetSASLAuthIdentity(authIdentity string) Setter
- func SetNetSASLEnable(enable bool) Setter
- func SetNetSASLGSSAPI(gssAPI sarama.GSSAPIConfig) Setter
- func SetNetSASLHandshake(handshake bool) Setter
- func SetNetSASLMechanism(saslMechanism sarama.SASLMechanism) Setter
- func SetNetSASLPassword(password string) Setter
- func SetNetSASLSCRAMAuthzID(scramAuthzID string) Setter
- func SetNetSASLTokenProvider(tokenProvider sarama.AccessTokenProvider) Setter
- func SetNetSASLUser(user string) Setter
- func SetNetSASLVersion(version int16) Setter
- func SetNetTLS(enable bool, config *tls.Config) Setter
- func SetNetWriteTimeout(writeTimeout time.Duration) Setter
- func SetRackID(rackID string) Setter
- func SetVersion(version sarama.KafkaVersion) Setter
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
func (*Consumer) AddHandler ¶
func (*Consumer) ConsumeClaim ¶
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
type Setter ¶
type Setter func(c *Consumer)
func SetApiVersionRequest ¶
func SetBrokers ¶
func SetChannelBufferSize ¶
func SetClientID ¶
func SetConsumerGroupRebalanceGroupStrategies ¶
func SetConsumerGroupRebalanceGroupStrategies(groupStrategies []sarama.BalanceStrategy) Setter
func SetConsumerInterceptors ¶
func SetConsumerInterceptors(interceptors []sarama.ConsumerInterceptor) Setter
func SetConsumerIsolationLevel ¶
func SetConsumerIsolationLevel(isolationLevel sarama.IsolationLevel) Setter
func SetConsumerMaxWaitTime ¶
func SetConsumerOffsetsRetry ¶
func SetConsumerRetry ¶
func SetConsumerReturn ¶
func SetGroupID ¶
func SetMetadataFull ¶
func SetMetadataRetry ¶
func SetMetadataTimeout ¶
func SetMetricRegistry ¶
func SetMetricRegistry(metricRegistry metrics.Registry) Setter
func SetNetDialTimeout ¶
func SetNetKeepAlive ¶
func SetNetLocalAddr ¶
func SetNetMaxOpenRequests ¶
func SetNetReadTimeout ¶
func SetNetSASLAuthIdentity ¶
func SetNetSASLEnable ¶
func SetNetSASLGSSAPI ¶
func SetNetSASLGSSAPI(gssAPI sarama.GSSAPIConfig) Setter
func SetNetSASLHandshake ¶
func SetNetSASLMechanism ¶
func SetNetSASLMechanism(saslMechanism sarama.SASLMechanism) Setter
func SetNetSASLPassword ¶
func SetNetSASLSCRAMAuthzID ¶
func SetNetSASLTokenProvider ¶
func SetNetSASLTokenProvider(tokenProvider sarama.AccessTokenProvider) Setter
func SetNetSASLUser ¶
func SetNetSASLVersion ¶
func SetNetWriteTimeout ¶
func SetVersion ¶
func SetVersion(version sarama.KafkaVersion) Setter
Click to show internal directories.
Click to hide internal directories.