Documentation ¶
Index ¶
- Variables
- func BodyParse(bin []byte, p interface{}) error
- func NewKafkaPubSub(brokerUrl ...string) *kafkaPubSub
- func NewSubscriber(appCtx AppContext) *subscriber
- func ToInt32(in *int32) int32
- func ToPInt32(in int32) *int32
- type AppContext
- type Client
- func (ps *Client) Close() error
- func (ps *Client) InitConsumer(brokerURLs ...string) error
- func (ps *Client) InitConsumerGroup(consumerGroup string) error
- func (ps *Client) InitPublisher(brokerURLs ...string)
- func (ps *Client) ListTopics(brokers ...string) ([]string, error)
- func (ps *Client) OnAsyncSubscribe(topics []*Topic, numberPuller int, buf chan *Message) error
- func (ps *Client) OnScanMessages(topics []string, bufMessage chan Message) error
- func (ps *Client) Publish(topic string, messages ...interface{}) error
- type ConsumerGroupHandle
- type ConsumerJob
- type GroupJob
- type Message
- type PubSub
- type SenderConfig
- type Subscriber
- type Topic
- type User
Constants ¶
This section is empty.
Variables ¶
View Source
var ( NUM_PARTITION = 3 REPLICATION_FACTOR = 1 )
Functions ¶
func NewKafkaPubSub ¶
func NewKafkaPubSub(brokerUrl ...string) *kafkaPubSub
func NewSubscriber ¶
func NewSubscriber(appCtx AppContext) *subscriber
Types ¶
type AppContext ¶
type AppContext interface {
GetKafka() PubSub
}
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
func (*Client) InitConsumer ¶
func (*Client) InitConsumerGroup ¶
func (*Client) InitPublisher ¶
InitPublisher init with addr is url of lookupd
func (*Client) OnAsyncSubscribe ¶
func (*Client) OnScanMessages ¶
type ConsumerGroupHandle ¶
type ConsumerGroupHandle struct {
// contains filtered or unexported fields
}
ConsumerGroupHandle represents a Sarama consumer group consumer
func (*ConsumerGroupHandle) Cleanup ¶
func (consumer *ConsumerGroupHandle) Cleanup(ss sarama.ConsumerGroupSession) error
Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (*ConsumerGroupHandle) ConsumeClaim ¶
func (consumer *ConsumerGroupHandle) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error
ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (*ConsumerGroupHandle) Setup ¶
func (consumer *ConsumerGroupHandle) Setup(ss sarama.ConsumerGroupSession) error
type ConsumerJob ¶
type Message ¶
type Message struct { Id string Offset int64 `json:"offset,omitempty"` Partition int `json:"partition,omitempty"` Topic string `json:"topic,omitempty"` Body []byte `json:"body,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` ConsumerGroup string `json:"consumer_group,omitempty"` Commit func() Headers map[string]string }
Message define message encode/decode sarama message
func NewMessage ¶
func NewMessage(data interface{}) *Message
type SenderConfig ¶
SenderConfig addion config when publish message
type Subscriber ¶
type Subscriber interface { Start() error StartSubTopic(topic string, isConcurrency bool, consumerJobs ...ConsumerJob) GetAppContext() AppContext }
Click to show internal directories.
Click to hide internal directories.