Documentation ¶
Index ¶
- Constants
- Variables
- type Broker
- func (b *Broker) Acknowledge(ctx context.Context, ...) (bool, error)
- func (b *Broker) AcknowledgeMessages(ctx context.Context, ...) error
- func (b *Broker) Bootstrap() error
- func (b *Broker) Commit(ctx context.Context, ...) (bool, error)
- func (b *Broker) Conf() *Config
- func (b *Broker) Consume(ctx context.Context, topicName, partition, consumerGroup, consumerName string, ...) error
- func (b *Broker) CreateTopic(ctx context.Context, params *sgproto.CreateTopicParams) error
- func (b *Broker) FetchFromSync(topicName, partition string, from []byte, fn func(msg *sgproto.Message) error) error
- func (b *Broker) FetchRange(ctx context.Context, req *sgproto.FetchRangeRequest, ...) error
- func (b *Broker) Get(ctx context.Context, topicName string, partition string, key []byte) (*sgproto.Message, error)
- func (b *Broker) GetController() *sandglass.Node
- func (b *Broker) GetMarkStateMessage(ctx context.Context, ...) (*sgproto.Message, error)
- func (b *Broker) GetTopic(name string) *topic.Topic
- func (b *Broker) HasKey(ctx context.Context, topicName string, partition string, ...) (bool, error)
- func (b *Broker) IsController() bool
- func (b *Broker) Join(clusterAddrs ...string) (err error)
- func (b *Broker) LastOffset(ctx context.Context, ...) (sandflake.ID, error)
- func (b *Broker) LaunchWatchers() error
- func (b *Broker) MarkConsumed(ctx context.Context, ...) (bool, error)
- func (b *Broker) Members() []*sandglass.Node
- func (b *Broker) Name() string
- func (b *Broker) NotAcknowledge(ctx context.Context, ...) (bool, error)
- func (b *Broker) Produce(ctx context.Context, req *sgproto.ProduceMessageRequest) (*sgproto.ProduceResponse, error)
- func (b *Broker) Stop(ctx context.Context) error
- func (b *Broker) Topics() []*topic.Topic
- func (b *Broker) TriggerSyncRequest() error
- func (b *Broker) WaitForIt() error
- type Config
- type ConsumerGroup
Constants ¶
View Source
const (
ConsumerOffsetTopicName = "consumer_offsets"
)
Variables ¶
View Source
var ( ErrTopicAlreadyExist = errors.New("ErrTopicAlreadyExist") ErrInvalidTopicName = errors.New("ErrInvalidTopicName") ErrUnableToSelectReplicas = errors.New("ErrUnableToSelectReplicas") ErrTopicNotFound = errors.New("ErrTopicNotFound") ErrPartitionNotFound = errors.New("ErrPartitionNotFound") ErrNoPartitionSet = errors.New("ErrNoPartitionSet") ErrNoControllerSet = errors.New("ErrNoControllerSet") ErrNoLeaderFound = errors.New("ErrNoLeaderFound") ErrNoConsumerFound = errors.New("ErrNoConsumerFound") )
View Source
var DefaultStateCheckInterval = 1 * time.Second
View Source
var (
ErrNoKeySet = errors.New("ErrNoKeySet")
)
View Source
var (
RedeliveryTimeout = 10 * time.Second
)
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct { logy.Logger ShutdownCh chan struct{} // contains filtered or unexported fields }
func (*Broker) Acknowledge ¶
func (*Broker) AcknowledgeMessages ¶
func (b *Broker) AcknowledgeMessages(ctx context.Context, topicName, partitionName, consumerGroup, consumerName string, offsets []sandflake.ID) error
FIXME: share same code between AcknowledgeMessages a AcknowledgeMessage
func (*Broker) CreateTopic ¶
func (*Broker) FetchFromSync ¶
func (*Broker) FetchRange ¶
func (*Broker) GetController ¶
func (*Broker) GetMarkStateMessage ¶ added in v0.1.5
func (*Broker) IsController ¶
func (*Broker) LastOffset ¶
func (*Broker) LaunchWatchers ¶
func (*Broker) MarkConsumed ¶ added in v0.1.5
func (*Broker) NotAcknowledge ¶ added in v0.1.7
func (*Broker) Produce ¶ added in v0.1.7
func (b *Broker) Produce(ctx context.Context, req *sgproto.ProduceMessageRequest) (*sgproto.ProduceResponse, error)
func (*Broker) TriggerSyncRequest ¶
type Config ¶
type Config struct { Name string `yaml:"name,omitempty"` DCName string `yaml:"dc_name,omitempty"` BindAddr string `yaml:"bind_addr,omitempty"` AdvertiseAddr string `yaml:"advertise_addr,omitempty"` DBPath string `yaml:"db_path,omitempty"` GossipPort string `yaml:"gossip_port,omitempty"` HTTPPort string `yaml:"http_port,omitempty"` GRPCPort string `yaml:"grpc_port,omitempty"` RaftPort string `yaml:"raft_port,omitempty"` InitialPeers []string `yaml:"initial_peers,omitempty"` BootstrapRaft bool `yaml:"bootstrap_raft,omitempty"` LoggingLevel *logy.Level `yaml:"-"` OffsetReplicationFactor int `yaml:"-"` }
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
func NewConsumerGroup ¶
func NewConsumerGroup(b *Broker, topic, partition, name string) *ConsumerGroup
Click to show internal directories.
Click to hide internal directories.