kafka

package
v0.4.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 28, 2020 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Package kafka https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go

Index

Constants

This section is empty.

Variables

View Source
var (
	// SHA256 todo
	SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
	// SHA512 todo
	SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
)

Functions

This section is empty.

Types

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher kafka broker

func NewPublisher

func NewPublisher(conf *PublisherConfig) (*Publisher, error)

NewPublisher kafka broker

func (*Publisher) Connect

func (b *Publisher) Connect() error

Connect 连接

func (*Publisher) Debug

func (b *Publisher) Debug(l logger.Logger)

Debug 日志

func (*Publisher) Disconnect

func (b *Publisher) Disconnect() error

Disconnect 端口连接

func (*Publisher) Pub

func (b *Publisher) Pub(topic string, e *event.Event) error

Pub 发布事件

type PublisherConfig

type PublisherConfig struct {
	BulkMaxSize        int           `json:"bulk_max_size"`
	PublishTimeout     time.Duration `json:"publish_timeout"      `
	BulkFlushFrequency time.Duration `json:"bulk_flush_frequency"`
	Partitioner        string        `json:"partitioner"`
	Compression        string        `json:"compression"`
	CompressionLevel   int           `json:"compression_level"`
	MaxRetries         int           `json:"max_retries"`
	MaxMessageBytes    *int          `json:"max_message_bytes"`
	RequiredACKs       *int          `json:"required_acks"`
	// contains filtered or unexported fields
}

PublisherConfig todo

func DefaultPublisherConfig

func DefaultPublisherConfig() *PublisherConfig

DefaultPublisherConfig 默认配置

func (*PublisherConfig) Validate

func (c *PublisherConfig) Validate() error

Validate 校验配置

type Subscriber

type Subscriber struct {
	// contains filtered or unexported fields
}

Subscriber kafka broker

func NewSubscriber

func NewSubscriber(conf *SubscriberConfig) (*Subscriber, error)

NewSubscriber kafka broker

func (*Subscriber) Cleanup

Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited

func (*Subscriber) Connect

func (s *Subscriber) Connect() error

Connect 连接

func (*Subscriber) ConsumeClaim

func (s *Subscriber) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error

ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().

func (*Subscriber) Disconnect

func (s *Subscriber) Disconnect() error

Disconnect 端口连接

func (*Subscriber) Setup

Setup is run at the beginning of a new session, before ConsumeClaim

func (*Subscriber) Sub

func (s *Subscriber) Sub(topic string, h bus.EventHandler) error

Sub 订阅事件

type SubscriberConfig

type SubscriberConfig struct {
	GroupID         string `json:"group_id,omitempty"`
	Offset          string `json:"offset,omitempty"`
	BalanceStrategy string `json:"balance_strategy,omitempty"`
	// contains filtered or unexported fields
}

SubscriberConfig todo

func DefaultSubscriberConfig

func DefaultSubscriberConfig() *SubscriberConfig

DefaultSubscriberConfig 默认配置

func (*SubscriberConfig) Validate

func (s *SubscriberConfig) Validate() error

Validate 校验配置

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

XDGSCRAMClient todo

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

Begin todo

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

Done todo

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Step todo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL