Documentation ¶
Index ¶
- Constants
- Variables
- func DefaultConfig() *config
- func NewMechanism(saslMechanism, saslUserName, saslPassword string) (sasl.Mechanism, error)
- func Register(comp Compressor)
- type AssignedPartitions
- type Authentication
- type Balancer
- type Client
- func (wc *Client) CreateTopics(ctx context.Context, req *kafka.CreateTopicsRequest) (res *kafka.CreateTopicsResponse, err error)
- func (wc *Client) DeleteTopics(ctx context.Context, req *kafka.DeleteTopicsRequest) (res *kafka.DeleteTopicsResponse, err error)
- func (wc *Client) ListOffsets(ctx context.Context, req *kafka.ListOffsetsRequest) (res *kafka.ListOffsetsResponse, err error)
- func (wc *Client) Metadata(ctx context.Context, req *kafka.MetadataRequest) (res *kafka.MetadataResponse, err error)
- func (wc *Client) OffsetFetch(ctx context.Context, req *kafka.OffsetFetchRequest) (res *kafka.OffsetFetchResponse, err error)
- type ClientInterceptor
- type Component
- type Compressor
- type Consumer
- func (r *Consumer) Close() error
- func (r *Consumer) CommitMessages(ctx context.Context, msgs ...*Message) (err error)
- func (r *Consumer) FetchMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)
- func (r *Consumer) Lag() int64
- func (r *Consumer) Offset() int64
- func (r *Consumer) ReadLag(ctx context.Context) (lag int64, err error)
- func (r *Consumer) ReadMessage(ctx context.Context) (msg Message, ctxOutput context.Context, err error)
- func (r *Consumer) SetOffset(offset int64) (err error)
- func (r *Consumer) SetOffsetAt(ctx context.Context, t time.Time) (err error)
- type ConsumerGroup
- type ConsumerGroupOptions
- type Container
- type CtxMessage
- type Message
- type Messages
- type Option
- type Producer
- type RevokedPartitions
- type ServerInterceptor
- type TLSConfig
- type TopicPartition
Constants ¶
View Source
const PackageName = "component.ekafka"
Variables ¶
View Source
var ( ErrRecoverableError = errors.New("recoverable error is retryable") ErrDoNotCommit = errors.New("do not commit") )
Functions ¶
func NewMechanism ¶ added in v1.0.2
func Register ¶ added in v1.0.4
func Register(comp Compressor)
Types ¶
type AssignedPartitions ¶
type AssignedPartitions struct {
Partitions []TopicPartition
}
type Authentication ¶ added in v1.0.2
type Authentication struct { // TLS authentication TLS *TLSConfig }
func (*Authentication) ConfigureDialerAuthentication ¶ added in v1.0.2
func (config *Authentication) ConfigureDialerAuthentication(opts *kafka.Dialer) (err error)
func (*Authentication) ConfigureTransportAuthentication ¶ added in v1.0.2
func (config *Authentication) ConfigureTransportAuthentication(opts *kafka.Transport) (err error)
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) CreateTopics ¶
func (*Client) DeleteTopics ¶
func (*Client) ListOffsets ¶
type ClientInterceptor ¶
type ClientInterceptor func(oldProcessFn clientProcessFn) (newProcessFn clientProcessFn)
func InterceptorClientChain ¶
func InterceptorClientChain(interceptors ...ClientInterceptor) ClientInterceptor
type Component ¶
type Component struct {
// contains filtered or unexported fields
}
Component kafka 组件,包含Client、Producers、Consumers
func (*Component) ConsumerGroup ¶
func (cmp *Component) ConsumerGroup(name string) *ConsumerGroup
ConsumerGroup 返回指定名称的 ConsumerGroup
func (*Component) GetCompName ¶
type Compressor ¶ added in v1.0.4
type Compressor interface { Compress(input []byte) (output []byte, err error) DeCompress(input []byte) (output []byte, err error) ContentEncoding() string }
func GetCompressor ¶ added in v1.0.4
func GetCompressor(encoding string) Compressor
type Consumer ¶
type Consumer struct { Config consumerConfig Brokers []string `json:"brokers" toml:"brokers"` // contains filtered or unexported fields }
Consumer 消费者/消费者组,
func (*Consumer) CommitMessages ¶
func (*Consumer) FetchMessage ¶
func (*Consumer) ReadMessage ¶
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(options ConsumerGroupOptions) (*ConsumerGroup, error)
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close() error
func (*ConsumerGroup) CommitMessages ¶
func (cg *ConsumerGroup) CommitMessages(ctx context.Context, messages ...Message) error
type ConsumerGroupOptions ¶
type ConsumerGroupOptions struct { Logger *elog.Component Brokers []string GroupID string Topic string HeartbeatInterval time.Duration PartitionWatchInterval time.Duration WatchPartitionChanges bool SessionTimeout time.Duration RebalanceTimeout time.Duration JoinGroupBackoff time.Duration StartOffset int64 RetentionTime time.Duration Timeout time.Duration Reader readerOptions EnableAutoRun bool SASLUserName string SASLPassword string SASLMechanism string // contains filtered or unexported fields }
type CtxMessage ¶ added in v1.0.4
type Option ¶
type Option func(c *Container)
func WithClientInterceptor ¶
func WithClientInterceptor(interceptors ...ClientInterceptor) Option
WithClientInterceptor 注入拦截器
func WithRegisterBalancer ¶
WithRegisterBalancer 注册名字为<balancerName>的balancer 注册之后可通过在producer配置文件中可通过<balancerName>来指定使用此balancer
func WithServerInterceptor ¶
func WithServerInterceptor(interceptors ...ServerInterceptor) Option
WithServerInterceptor 注入拦截器
type RevokedPartitions ¶
type RevokedPartitions struct {
Partitions []TopicPartition
}
type ServerInterceptor ¶
type ServerInterceptor func(oldProcessFn serverProcessFn) (newProcessFn serverProcessFn)
func InterceptorServerChain ¶
func InterceptorServerChain(interceptors ...ServerInterceptor) ServerInterceptor
type TLSConfig ¶ added in v1.0.2
type TLSConfig struct { // Enable TLS Enabled bool // Path to the CA cert. For a client this verifies the server certificate. CAFile string // Path to the TLS cert to use for TLS required connections. (optional) CertFile string // Path to the TLS key to use for TLS required connections. (optional) KeyFile string // InsecureSkipVerify will enable TLS but not verify the certificate InsecureSkipVerify bool // MinVersion sets the minimum TLS version that is acceptable. // If not set, TLS 1.2 will be used. (optional) MinVersion string // MaxVersion sets the maximum TLS version that is acceptable. // If not set, refer to crypto/tls for defaults. (optional) MaxVersion string }
TLSConfig is the interface used to configure a tcp client or server from a `Config`
type TopicPartition ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.