Documentation ¶
Index ¶
- Constants
- type Authorizer
- func (c *Authorizer) ConsumeAcls()
- func (c *Authorizer) CreateAcl(operation models.ACLOperation, resourceType models.ACLResourceType, ...) error
- func (c *Authorizer) CreateTestACLs() error
- func (c *Authorizer) GetAcls(resourceType models.ACLResourceType, resourceName *string, ...) []*models.ACL
- func (c *Authorizer) WaitSynced()
- type Cluster
- func (c *Cluster) APICreateTopic(topicName string, partitions int32, replicationFactor int16, ...) (*models.Topic, error)
- func (c *Cluster) APIDeleteTopicCluster(topicName string) error
- func (c *Cluster) APIGetTopic(topicName string) (*models.Topic, error)
- func (c *Cluster) APIUpdateTopic(topicName string, partitions int32, config map[string]*string) (*models.Topic, error)
- func (c *Cluster) Close() error
- func (c *Cluster) ConsumeTopicConfigs()
- func (c *Cluster) Fetch(brokerID int32, request *kmsg.FetchRequest) (*kmsg.FetchResponse, error)
- func (c *Cluster) ListOffsets(brokerID int32, request *kmsg.ListOffsetsRequest) (*kmsg.ListOffsetsResponse, error)
- func (c *Cluster) Produce(brokerID int32, transactionID *string, timeoutMillis int32, ...) (*kmsg.ProduceResponse, error)
- func (c *Cluster) TopicMetadata(ctx context.Context, topics []string) (*kmsg.MetadataResponse, error)
- func (c *Cluster) WaitSynced()
- type Controller
- func (c *Controller) APIDeleteTopicPointer(topicName string) error
- func (c *Controller) APIGetTopicPointer(topicName string) (*string, error)
- func (c *Controller) APISetTopicPointer(topicName string, cluster string) error
- func (c *Controller) ConsumeTopicPointers()
- func (c *Controller) CreateInternalTopics() error
- func (c *Controller) DescribeGroup(group string) (*kmsg.DescribeGroupsResponse, error)
- func (c *Controller) FindGroupCoordinator(consumerGroup string) (*kmsg.FindCoordinatorResponse, error)
- func (c *Controller) FindTransactionCoordinator(transaction string) (*kmsg.FindCoordinatorResponse, error)
- func (c *Controller) GetAuthorizer() *Authorizer
- func (c *Controller) HeartBeat(request *kmsg.HeartbeatRequest) (*kmsg.HeartbeatResponse, error)
- func (c *Controller) InitProducer(transactionTimeoutDuration time.Duration) (*kmsg.InitProducerIDResponse, error)
- func (c *Controller) JoinGroup(request *kmsg.JoinGroupRequest) (*kmsg.JoinGroupResponse, error)
- func (c *Controller) LeaveGroup(request *kmsg.LeaveGroupRequest) (*kmsg.LeaveGroupResponse, error)
- func (c *Controller) OffsetCommit(group, topic string, groupGenerationId, partition int32, offset int64) (errors.KafkaError, error)
- func (c *Controller) OffsetFetch(group, topic string, partition int32) (int64, error)
- func (c *Controller) OffsetFetchAllTopics(group string) (map[string]map[int32]int64, error)
- func (c *Controller) Start() error
- func (c *Controller) SyncGroupCustom(request *franz.SyncGroupRequest) (*kmsg.SyncGroupResponse, error)
- type LogicalBroker
- func (b *LogicalBroker) Close() error
- func (b *LogicalBroker) ConsumeBrokers()
- func (b *LogicalBroker) GetBroker(brokerID int32) *models.Broker
- func (b *LogicalBroker) GetClusterByTopic(topicName string) *Cluster
- func (b *LogicalBroker) GetClusters() map[string]*Cluster
- func (b *LogicalBroker) GetController() *Controller
- func (b *LogicalBroker) GetRegisteredBrokers() []*models.Broker
- func (b *LogicalBroker) GetTopic(topicName string) (*Cluster, *models.Topic)
- func (b *LogicalBroker) GetTopics() ([]*models.Topic, error)
- func (b *LogicalBroker) Start() error
Constants ¶
View Source
const ( InternalControlKey = "__krouter_control" InternalTopicTopicPointers = "__krouter_topic_pointers" )
View Source
const ( GroupCoordinatorRedisKeyFmt = "{group-%s}-coordinator" GroupGenerationRedisKeyFmt = "{group-%s}-generation" GroupTopicOffsetRedisKeyFmt = "{group-%s}-offset-topic" GroupTopicPartitionOffsetRedisKeyFmt = GroupTopicOffsetRedisKeyFmt + "-%s-partition-%d" )
View Source
const (
InternalTopicAcls = "__krouter_acls"
)
View Source
const (
InternalTopicBrokers = "__krouter_brokers"
)
View Source
const (
InternalTopicTopicConfig = "__krouter_topic_configs"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Authorizer ¶
type Authorizer struct {
// contains filtered or unexported fields
}
func NewAuthorizer ¶
func (*Authorizer) ConsumeAcls ¶
func (c *Authorizer) ConsumeAcls()
func (*Authorizer) CreateAcl ¶
func (c *Authorizer) CreateAcl(operation models.ACLOperation, resourceType models.ACLResourceType, patternType models.ACLPatternType, resourceName string, principal string, permission models.ACLPermission) error
func (*Authorizer) CreateTestACLs ¶
func (c *Authorizer) CreateTestACLs() error
func (*Authorizer) GetAcls ¶
func (c *Authorizer) GetAcls(resourceType models.ACLResourceType, resourceName *string, patternType models.ACLPatternType, principal *string, operation models.ACLOperation, permission models.ACLPermission) []*models.ACL
func (*Authorizer) WaitSynced ¶
func (c *Authorizer) WaitSynced()
type Cluster ¶
type Cluster struct { Name string // contains filtered or unexported fields }
func NewCluster ¶
func (*Cluster) APICreateTopic ¶
func (*Cluster) APIDeleteTopicCluster ¶
func (*Cluster) APIGetTopic ¶
func (*Cluster) APIUpdateTopic ¶
func (*Cluster) ConsumeTopicConfigs ¶
func (c *Cluster) ConsumeTopicConfigs()
func (*Cluster) Fetch ¶
func (c *Cluster) Fetch(brokerID int32, request *kmsg.FetchRequest) (*kmsg.FetchResponse, error)
func (*Cluster) ListOffsets ¶
func (c *Cluster) ListOffsets(brokerID int32, request *kmsg.ListOffsetsRequest) (*kmsg.ListOffsetsResponse, error)
func (*Cluster) Produce ¶
func (c *Cluster) Produce(brokerID int32, transactionID *string, timeoutMillis int32, topics []kmsg.ProduceRequestTopic) (*kmsg.ProduceResponse, error)
func (*Cluster) TopicMetadata ¶
func (*Cluster) WaitSynced ¶
func (c *Cluster) WaitSynced()
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
func NewController ¶
func NewController(log logr.Logger, addrs []string, redisClient *redisw.RedisClient) (*Controller, error)
func (*Controller) APIDeleteTopicPointer ¶
func (c *Controller) APIDeleteTopicPointer(topicName string) error
func (*Controller) APIGetTopicPointer ¶
func (c *Controller) APIGetTopicPointer(topicName string) (*string, error)
func (*Controller) APISetTopicPointer ¶
func (c *Controller) APISetTopicPointer(topicName string, cluster string) error
func (*Controller) ConsumeTopicPointers ¶
func (c *Controller) ConsumeTopicPointers()
func (*Controller) CreateInternalTopics ¶
func (c *Controller) CreateInternalTopics() error
func (*Controller) DescribeGroup ¶
func (c *Controller) DescribeGroup(group string) (*kmsg.DescribeGroupsResponse, error)
func (*Controller) FindGroupCoordinator ¶
func (c *Controller) FindGroupCoordinator(consumerGroup string) (*kmsg.FindCoordinatorResponse, error)
func (*Controller) FindTransactionCoordinator ¶
func (c *Controller) FindTransactionCoordinator(transaction string) (*kmsg.FindCoordinatorResponse, error)
func (*Controller) GetAuthorizer ¶
func (c *Controller) GetAuthorizer() *Authorizer
func (*Controller) HeartBeat ¶
func (c *Controller) HeartBeat(request *kmsg.HeartbeatRequest) (*kmsg.HeartbeatResponse, error)
func (*Controller) InitProducer ¶
func (c *Controller) InitProducer(transactionTimeoutDuration time.Duration) (*kmsg.InitProducerIDResponse, error)
func (*Controller) JoinGroup ¶
func (c *Controller) JoinGroup(request *kmsg.JoinGroupRequest) (*kmsg.JoinGroupResponse, error)
func (*Controller) LeaveGroup ¶
func (c *Controller) LeaveGroup(request *kmsg.LeaveGroupRequest) (*kmsg.LeaveGroupResponse, error)
func (*Controller) OffsetCommit ¶
func (c *Controller) OffsetCommit(group, topic string, groupGenerationId, partition int32, offset int64) (errors.KafkaError, error)
func (*Controller) OffsetFetch ¶
func (c *Controller) OffsetFetch(group, topic string, partition int32) (int64, error)
func (*Controller) OffsetFetchAllTopics ¶
func (*Controller) Start ¶
func (c *Controller) Start() error
func (*Controller) SyncGroupCustom ¶
func (c *Controller) SyncGroupCustom(request *franz.SyncGroupRequest) (*kmsg.SyncGroupResponse, error)
type LogicalBroker ¶
type LogicalBroker struct { AdvertiseListener *net.TCPAddr ClusterID string BrokerID int32 // contains filtered or unexported fields }
func InitBroker ¶
func (*LogicalBroker) Close ¶
func (b *LogicalBroker) Close() error
func (*LogicalBroker) ConsumeBrokers ¶
func (b *LogicalBroker) ConsumeBrokers()
func (*LogicalBroker) GetClusterByTopic ¶
func (b *LogicalBroker) GetClusterByTopic(topicName string) *Cluster
func (*LogicalBroker) GetClusters ¶
func (b *LogicalBroker) GetClusters() map[string]*Cluster
func (*LogicalBroker) GetController ¶
func (b *LogicalBroker) GetController() *Controller
func (*LogicalBroker) GetRegisteredBrokers ¶
func (b *LogicalBroker) GetRegisteredBrokers() []*models.Broker
func (*LogicalBroker) GetTopic ¶
func (b *LogicalBroker) GetTopic(topicName string) (*Cluster, *models.Topic)
func (*LogicalBroker) Start ¶
func (b *LogicalBroker) Start() error
Source Files ¶
Click to show internal directories.
Click to hide internal directories.