Documentation ¶
Index ¶
- Variables
- type ACLFilterRequest
- type ACLRequest
- type Client
- func (c *Client) ConsumeMessages(consumer sarama.Consumer, topic string, logMsg bool)
- func (c *Client) CreateACL(resource sarama.Resource, acl sarama.Acl) error
- func (c *Client) CreateConsumer() (sarama.Consumer, error)
- func (c *Client) CreateProducer() (sarama.SyncProducer, error)
- func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error
- func (c *Client) DeleteACL(filter sarama.AclFilter) ([]sarama.MatchingAcl, error)
- func (c *Client) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)
- func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)
- func (c *Client) ProduceMessage(producer sarama.SyncProducer, topic string, message []byte) error
- type Config
- type KafkaRequest
- type PeerKafka
- type SASLConfig
- type TLSConfig
- type TopicResponse
- type XDGSCRAMClient
Constants ¶
This section is empty.
Variables ¶
View Source
var ( SHA256 scram.HashGeneratorFcn = sha256.New SHA512 scram.HashGeneratorFcn = sha512.New )
Functions ¶
This section is empty.
Types ¶
type ACLFilterRequest ¶
type ACLFilterRequest struct { ResourceType sarama.AclResourceType `json:"resource_type"` ResourceName string `json:"resource_name"` ResourcePatternType sarama.AclResourcePatternType `json:"resource_pattern_type"` Principal string `json:"principal"` Host string `json:"host"` Operation sarama.AclOperation `json:"operation"` PermissionType sarama.AclPermissionType `json:"permission_type"` }
type ACLRequest ¶
type ACLRequest struct { ResourceType sarama.AclResourceType `json:"resource_type"` ResourceName string `json:"resource_name"` ResourcePatternType sarama.AclResourcePatternType `json:"resource_pattern_type"` Principal string `json:"principal"` Host string `json:"host"` Operation sarama.AclOperation `json:"operation"` PermissionType sarama.AclPermissionType `json:"permission_type"` }
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client handles produce, consume and ACL-related operations
func (*Client) ConsumeMessages ¶
ConsumeMessages consumes messages from a topic
func (*Client) CreateConsumer ¶
CreateConsumer creates a new Consumer
func (*Client) CreateProducer ¶
func (c *Client) CreateProducer() (sarama.SyncProducer, error)
CreateProducer creates a new SyncProducer
func (*Client) CreateTopic ¶
func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error
CreateTopic creates a new topic
func (*Client) ListTopics ¶
func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)
ListTopics lists all topics
func (*Client) ProduceMessage ¶
ProduceMessage produces a message to a topic
type Config ¶
type Config struct { Brokers []string Version string SASL SASLConfig TLS TLSConfig ProducerTopic string }
Config represents the Kafka configuration options
func NewConfig ¶
func NewConfig() *Config
NewConfig creates a new Kafka configuration with default values
func (*Config) GetBrokers ¶
GetBrokers returns the list of Kafka brokers
type KafkaRequest ¶
type PeerKafka ¶
type PeerKafka struct {
// contains filtered or unexported fields
}
func NewPeerKafka ¶
func (*PeerKafka) Disconnect ¶
func (*PeerKafka) Type ¶
func (p *PeerKafka) Type() pipeline.ConnectorType
type SASLConfig ¶
type SASLConfig struct { Enable bool Username string Password string Algorithm string // "sha256" or "sha512" }
SASLConfig represents SASL authentication configuration
type TopicResponse ¶
type TopicResponse struct { Name string `json:"name"` Detail sarama.TopicDetail `json:"detail"` }
type XDGSCRAMClient ¶
type XDGSCRAMClient struct { *scram.Client *scram.ClientConversation scram.HashGeneratorFcn }
func (*XDGSCRAMClient) Begin ¶
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)
func (*XDGSCRAMClient) Done ¶
func (x *XDGSCRAMClient) Done() bool
Click to show internal directories.
Click to hide internal directories.