Documentation ¶
Index ¶
- type FetcherState
- type PartitionConsumer
- type PartitionConsumerConfig
- type Strategy
- type TopicAndPartition
- type TopicAndPartitionSet
- func (this *TopicAndPartitionSet) Add(tp TopicAndPartition) bool
- func (this *TopicAndPartitionSet) AddAll(tps []TopicAndPartition)
- func (this *TopicAndPartitionSet) Contains(tp TopicAndPartition) bool
- func (this *TopicAndPartitionSet) ContainsAll(tps []TopicAndPartition) bool
- func (this *TopicAndPartitionSet) GetArray() []TopicAndPartition
- func (this *TopicAndPartitionSet) IsEmpty() bool
- func (this *TopicAndPartitionSet) Remove(tp TopicAndPartition) bool
- func (this *TopicAndPartitionSet) RemoveAll(tps []TopicAndPartition)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FetcherState ¶
type FetcherState struct { LastCommitted int64 Removed bool // contains filtered or unexported fields }
func NewFetcherState ¶
func NewFetcherState(initialOffset int64) *FetcherState
func (*FetcherState) GetOffset ¶
func (this *FetcherState) GetOffset() int64
func (*FetcherState) GetStopChannel ¶
func (this *FetcherState) GetStopChannel() chan<- bool
func (*FetcherState) SetOffset ¶
func (this *FetcherState) SetOffset(offset int64)
type PartitionConsumer ¶
type PartitionConsumer struct {
// contains filtered or unexported fields
}
func NewPartitionConsumer ¶
func NewPartitionConsumer(consumerConfig PartitionConsumerConfig) *PartitionConsumer
func (*PartitionConsumer) Add ¶
func (this *PartitionConsumer) Add(topic string, partition int32, strategy Strategy) error
func (*PartitionConsumer) GetTopicPartitions ¶
func (this *PartitionConsumer) GetTopicPartitions() *TopicAndPartitionSet
func (*PartitionConsumer) Remove ¶
func (this *PartitionConsumer) Remove(topic string, partition int32)
type PartitionConsumerConfig ¶
type PartitionConsumerConfig struct { // Consumer group Group string //Interval to commit offsets at CommitInterval time.Duration // BrokerList is a bootstrap list to discover other brokers in a cluster. At least one broker is required. BrokerList []string // ReadTimeout is a timeout to read the response from a TCP socket. ReadTimeout time.Duration // WriteTimeout is a timeout to write the request to a TCP socket. WriteTimeout time.Duration // ConnectTimeout is a timeout to connect to a TCP socket. ConnectTimeout time.Duration // Sets whether the connection should be kept alive. KeepAlive bool // A keep alive period for a TCP connection. KeepAliveTimeout time.Duration // Maximum number of open connections for a connector. MaxConnections int // Maximum number of open connections for a single broker for a connector. MaxConnectionsPerBroker int // Maximum fetch size in bytes which will be used in all Consume() calls. FetchSize int32 // The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will block FetchMinBytes int32 // The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy FetchMinBytes FetchMaxWaitTime int32 // Number of retries to get topic metadata. MetadataRetries int // Backoff value between topic metadata requests. MetadataBackoff time.Duration // Number of retries to commit an offset. CommitOffsetRetries int // Backoff value between commit offset requests. CommitOffsetBackoff time.Duration // Number of retries to get consumer metadata. ConsumerMetadataRetries int // Backoff value between consumer metadata requests. ConsumerMetadataBackoff time.Duration // ClientID that will be used by a connector to identify client requests by broker. ClientID string // Backoff value between fetches in case of errors FetchErrorBackoff time.Duration }
func NewPartitionConsumerConfig ¶
func NewPartitionConsumerConfig(group string) *PartitionConsumerConfig
type Strategy ¶
type Strategy func(topic string, partition int32, messages []*siesta.MessageAndOffset) error
type TopicAndPartition ¶
type TopicAndPartitionSet ¶
type TopicAndPartitionSet struct {
// contains filtered or unexported fields
}
func NewTopicAndPartitionSet ¶
func NewTopicAndPartitionSet() *TopicAndPartitionSet
func (*TopicAndPartitionSet) Add ¶
func (this *TopicAndPartitionSet) Add(tp TopicAndPartition) bool
func (*TopicAndPartitionSet) AddAll ¶
func (this *TopicAndPartitionSet) AddAll(tps []TopicAndPartition)
func (*TopicAndPartitionSet) Contains ¶
func (this *TopicAndPartitionSet) Contains(tp TopicAndPartition) bool
func (*TopicAndPartitionSet) ContainsAll ¶
func (this *TopicAndPartitionSet) ContainsAll(tps []TopicAndPartition) bool
func (*TopicAndPartitionSet) GetArray ¶
func (this *TopicAndPartitionSet) GetArray() []TopicAndPartition
func (*TopicAndPartitionSet) IsEmpty ¶
func (this *TopicAndPartitionSet) IsEmpty() bool
func (*TopicAndPartitionSet) Remove ¶
func (this *TopicAndPartitionSet) Remove(tp TopicAndPartition) bool
func (*TopicAndPartitionSet) RemoveAll ¶
func (this *TopicAndPartitionSet) RemoveAll(tps []TopicAndPartition)
Click to show internal directories.
Click to hide internal directories.