kafka

package
v0.0.0-...-9a33093 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 6, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

Functions

This section is empty.

Types

type ACLFilterRequest

type ACLFilterRequest struct {
	ResourceType        sarama.AclResourceType        `json:"resource_type"`
	ResourceName        string                        `json:"resource_name"`
	ResourcePatternType sarama.AclResourcePatternType `json:"resource_pattern_type"`
	Principal           string                        `json:"principal"`
	Host                string                        `json:"host"`
	Operation           sarama.AclOperation           `json:"operation"`
	PermissionType      sarama.AclPermissionType      `json:"permission_type"`
}

type ACLRequest

type ACLRequest struct {
	ResourceType        sarama.AclResourceType        `json:"resource_type"`
	ResourceName        string                        `json:"resource_name"`
	ResourcePatternType sarama.AclResourcePatternType `json:"resource_pattern_type"`
	Principal           string                        `json:"principal"`
	Host                string                        `json:"host"`
	Operation           sarama.AclOperation           `json:"operation"`
	PermissionType      sarama.AclPermissionType      `json:"permission_type"`
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client handles produce, consume and ACL-related operations

func NewClient

func NewClient(config *Config, logger *zap.Logger) *Client

NewClient creates a new ACLManager

func (*Client) ConsumeMessages

func (c *Client) ConsumeMessages(consumer sarama.Consumer, topic string, logMsg bool)

ConsumeMessages consumes messages from a topic

func (*Client) CreateACL

func (c *Client) CreateACL(resource sarama.Resource, acl sarama.Acl) error

CreateACL creates a new ACL

func (*Client) CreateConsumer

func (c *Client) CreateConsumer() (sarama.Consumer, error)

CreateConsumer creates a new Consumer

func (*Client) CreateProducer

func (c *Client) CreateProducer() (sarama.SyncProducer, error)

CreateProducer creates a new SyncProducer

func (*Client) CreateTopic

func (c *Client) CreateTopic(topicName string, detail *sarama.TopicDetail) error

CreateTopic creates a new topic

func (*Client) DeleteACL

func (c *Client) DeleteACL(filter sarama.AclFilter) ([]sarama.MatchingAcl, error)

DeleteACL deletes ACLs based on the provided filter

func (*Client) ListAcls

func (c *Client) ListAcls(filter sarama.AclFilter) ([]sarama.ResourceAcls, error)

ListAcls lists ACLs based on the provided filter

func (*Client) ListTopics

func (c *Client) ListTopics() (map[string]sarama.TopicDetail, error)

ListTopics lists all topics

func (*Client) ProduceMessage

func (c *Client) ProduceMessage(producer sarama.SyncProducer, topic string, message []byte) error

ProduceMessage produces a message to a topic

type Config

type Config struct {
	Brokers       []string
	Version       string
	SASL          SASLConfig
	TLS           TLSConfig
	ProducerTopic string
}

Config represents the Kafka configuration options

func NewConfig

func NewConfig() *Config

NewConfig creates a new Kafka configuration with default values

func (*Config) GetBrokers

func (c *Config) GetBrokers() []string

GetBrokers returns the list of Kafka brokers

func (*Config) ToSaramaConfig

func (c *Config) ToSaramaConfig() (*sarama.Config, error)

ToSaramaConfig converts the Config to a sarama.Config

type KafkaRequest

type KafkaRequest struct {
	Message string `json:"message"`
	Topic   string `json:"topic"`
}

type PeerKafka

type PeerKafka struct {
	// contains filtered or unexported fields
}

func NewPeerKafka

func NewPeerKafka(logger *zap.Logger) *PeerKafka

func (*PeerKafka) Connect

func (p *PeerKafka) Connect(config json.RawMessage, args ...any) error

func (*PeerKafka) Disconnect

func (p *PeerKafka) Disconnect() error

func (*PeerKafka) Pub

func (p *PeerKafka) Pub(event pglogrepl.CDC, args ...any) error

func (*PeerKafka) Sub

func (p *PeerKafka) Sub(args ...any) (<-chan pglogrepl.CDC, error)

func (*PeerKafka) Type

func (p *PeerKafka) Type() pipeline.ConnectorType

type SASLConfig

type SASLConfig struct {
	Enable    bool
	Username  string
	Password  string
	Algorithm string // "sha256" or "sha512"
}

SASLConfig represents SASL authentication configuration

type TLSConfig

type TLSConfig struct {
	Enable     bool
	CertFile   string
	KeyFile    string
	CAFile     string
	SkipVerify bool
}

TLSConfig represents TLS configuration

type TopicResponse

type TopicResponse struct {
	Name   string             `json:"name"`
	Detail sarama.TopicDetail `json:"detail"`
}

type XDGSCRAMClient

type XDGSCRAMClient struct {
	*scram.Client
	*scram.ClientConversation
	scram.HashGeneratorFcn
}

func (*XDGSCRAMClient) Begin

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error)

func (*XDGSCRAMClient) Done

func (x *XDGSCRAMClient) Done() bool

func (*XDGSCRAMClient) Step

func (x *XDGSCRAMClient) Step(challenge string) (response string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL