databusc

package
v0.1.64 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2022 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEmptyCircle = errors.New("empty circle")

ErrEmptyCircle is the error returned when trying to get an element when nothing has been added to hash.

Functions

func NewConsumerHandle added in v0.1.49

func NewConsumerHandle(param *ConsumerParam, appname string, Id int) (*consumerEvent, error)

Types

type Consistent added in v0.1.46

type Consistent struct {
	NumberOfReplicas int

	UseFnv bool
	sync.RWMutex
	// contains filtered or unexported fields
}

Consistent holds the information about the members of the consistent hash circle.

func New added in v0.1.46

func New() *Consistent

New creates a new Consistent object with a default setting of 20 replicas for each entry.

To change the number of replicas, set NumberOfReplicas before adding entries.

func (*Consistent) Add added in v0.1.46

func (c *Consistent) Add(elt string)

Add inserts a string element in the consistent hash.

func (*Consistent) Get added in v0.1.46

func (c *Consistent) Get(name string) (string, error)

Get returns an element close to where name hashes to in the circle.

func (*Consistent) GetN added in v0.1.46

func (c *Consistent) GetN(name string, n int) ([]string, error)

GetN returns the N closest distinct elements to the name input in the circle.

func (*Consistent) GetTwo added in v0.1.46

func (c *Consistent) GetTwo(name string) (string, string, error)

GetTwo returns the two closest distinct elements to the name input in the circle.

func (*Consistent) Members added in v0.1.46

func (c *Consistent) Members() []string

func (*Consistent) Remove added in v0.1.46

func (c *Consistent) Remove(elt string)

Remove removes an element from the hash.

func (*Consistent) Set added in v0.1.46

func (c *Consistent) Set(elts []string)

Set sets all the elements in the hash. If there are existing elements not present in elts, they will be removed.

type ConsumerDeal

type ConsumerDeal interface {
	//数据处理的实现
	DealMessage(msg *kafka.Message) error
}

使用者必须实现的接口

type ConsumerEvent

type ConsumerEvent interface {
	//启动轮询消费数据
	Start() error

	//关闭消费者,必须调用
	Close()

	//提交offset
	CommitMessage(msg *kafka.Message) error
}

func NewConsumer

func NewConsumer(param *ConsumerParam, appname string, Id int) (ConsumerEvent, error)

type ConsumerParam

type ConsumerParam struct {
	Address   string
	GroupId   string
	Topic     string
	Dealhanle ConsumerDeal
	//0:poll 1:channel
	ConsumerMode int
	//0:timer commitmsg 1:commitmsg 2:auto
	CommitMode int
	ThreadNum  int
	QueueLen   int
	SessionMs  int
	PollMs     int
}

type ProducerEvent

type ProducerEvent interface {
	//发送消息接口
	SendMessage(data []byte, key string) error

	//发送消息到指定分区接口
	SendMessagePartition(data []byte, partition uint32) error

	//发送消息到分区取模接口
	SendMessageByMod(data []byte, key uint32) error

	//关闭生产者
	Close()
}

func NewAsyncProducer

func NewAsyncProducer(param *ProducerParam) (ProducerEvent, error)

type ProducerParam

type ProducerParam struct {
	Address  string
	Topic    string
	IsAck    bool
	KafkaVer string
	//0:channel 1:sync
	ConsumerMode int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL