Documentation ¶
Index ¶
- Variables
- func PartitionMapToReplicaAssignment(pm *kafkazk.PartitionMap) kafkaadmin.ReplicaAssignment
- type BrokerSet
- type Checkpoint
- type Config
- type ErrReservedTag
- type KafkaObject
- type RequestThrottle
- type RequestThrottleConfig
- type ReservedFields
- type Server
- func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)
- func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (*pb.Empty, error)
- func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
- func (s *Server) DeleteStaleTags(ctx context.Context, now func() time.Time, c Config) error
- func (s *Server) DeleteTopic(ctx context.Context, req *pb.TopicRequest) (*pb.Empty, error)
- func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
- func (s *Server) DialZK(ctx context.Context, wg *sync.WaitGroup, c *kafkazk.Config) error
- func (s *Server) EnablingLocking(c *kafkazk.Config) error
- func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
- func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
- func (s *Server) InitKafkaAdmin(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error
- func (s *Server) InitKafkaConsumer(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error
- func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
- func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
- func (s *Server) LogRequest(ctx context.Context, params string, reqID uint64)
- func (s *Server) MarkForDeletion(ctx context.Context, now func() time.Time) error
- func (s *Server) ReassigningTopics(ctx context.Context, _ *pb.Empty) (*pb.TopicResponse, error)
- func (s *Server) RunHTTP(ctx context.Context, wg *sync.WaitGroup) error
- func (s *Server) RunRPC(ctx context.Context, wg *sync.WaitGroup) error
- func (s *Server) RunTagCleanup(ctx context.Context, wg *sync.WaitGroup, c Config) error
- func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
- func (s *Server) TagBrokers(ctx context.Context, req *pb.TagBrokersRequest) (*pb.TagBrokersResponse, error)
- func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
- func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)
- func (s *Server) TranslateOffsets(ctx context.Context, req *pb.TranslateOffsetRequest) (*pb.TranslateOffsetResponse, error)
- func (s *Server) UnderReplicatedTopics(ctx context.Context, _ *pb.Empty) (*pb.TopicResponse, error)
- func (s *Server) UnmappedBrokers(ctx context.Context, req *pb.UnmappedBrokersRequest) (*pb.BrokerResponse, error)
- func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, context.CancelFunc, error)
- type TagCleaner
- type TagHandler
- type TagHandlerConfig
- type TagSet
- type TagStorage
- type Tags
- type TopicSet
- type ZKTagStorage
- func (t *ZKTagStorage) DeleteTags(o KafkaObject, keysToDelete []string) error
- func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool
- func (t *ZKTagStorage) GetAllTags() (map[KafkaObject]TagSet, error)
- func (t *ZKTagStorage) GetAllTagsForType(kafkaObjectType string) (map[KafkaObject]TagSet, error)
- func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)
- func (t *ZKTagStorage) Init() error
- func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error
- func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error
- type ZKTagStorageConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFetchingBrokers error. ErrFetchingBrokers = errors.New("error fetching brokers") // ErrBrokerNotExist error. ErrBrokerNotExist = errors.New("broker does not exist") // ErrBrokerIDEmpty error. ErrBrokerIDEmpty = errors.New("broker Id field must be specified") // ErrBrokerIDsEmpty error. ErrBrokerIDsEmpty = errors.New("broker Ids field must be specified") )
var ( // ErrFetchingTopics error. ErrFetchingTopics = errors.New("error fetching topics") // ErrTopicNotExist error. ErrTopicNotExist = errors.New("topic does not exist") // ErrTopicNameEmpty error. ErrTopicNameEmpty = errors.New("topic Name field must be specified") // ErrTopicFieldMissing error. ErrTopicFieldMissing = errors.New("topic field missing in request body") // ErrTopicAlreadyExists error. ErrTopicAlreadyExists = errors.New("topic already exists") // ErrInsufficientBrokers error. ErrInsufficientBrokers = errors.New("insufficient number of brokers") // ErrInvalidBrokerId error. ErrInvalidBrokerId = errors.New("invalid broker id") )
var ( // ErrInvalidKafkaObjectType error. ErrInvalidKafkaObjectType = errors.New("invalid Kafka object type") // ErrKafkaObjectDoesNotExist error. ErrKafkaObjectDoesNotExist = errors.New("requested Kafka object does not exist") // ErrNilTagSet error. ErrNilTagSet = errors.New("must provide a non-nil or non-empty TagSet") // ErrNilTags error. ErrNilTags = errors.New("must provide a non-nil or non-empty tags") )
var ( // ErrGroupIDEmpty error. ErrGroupIDEmpty = errors.New("GroupId field must be specified") )
var ( // ErrRequestThrottleTimeout error. ErrRequestThrottleTimeout = errors.New("wait time exceeded") )
var TagMarkTimeKey = "tagMarkedForDeletionTime"
Functions ¶
func PartitionMapToReplicaAssignment ¶
func PartitionMapToReplicaAssignment(pm *kafkazk.PartitionMap) kafkaadmin.ReplicaAssignment
PartitionMapToReplicaAssignment takes a *kafkazk.PartitionMap and transforms it into an admin.ReplicaAssignment.
Types ¶
type Checkpoint ¶ added in v3.4.0
type Checkpoint struct { Topic string Partition uint32 ConsumerGroupID string UpstreamOffset uint64 Offset uint64 Metadata string }
Checkpoint holds a record emmitted from the MirrorCheckpointConnector in MirrorMaker2.
type Config ¶
type Config struct { HTTPListen string GRPCListen string ReadReqRate int WriteReqRate int ZKTagsPrefix string DefaultRequestTimeout time.Duration TagCleanupFrequencyMinutes int TagAllowedStalenessMinutes int // contains filtered or unexported fields }
Config holds Server configurations.
type ErrReservedTag ¶
type ErrReservedTag struct {
// contains filtered or unexported fields
}
ErrReservedTag error.
func (ErrReservedTag) Error ¶
func (e ErrReservedTag) Error() string
type KafkaObject ¶
KafkaObject holds an object type (broker, topic) and object identifier (ID, name).
func (KafkaObject) Complete ¶
func (o KafkaObject) Complete() bool
Complete checks if a KafkaObject is valid and has a non-empty ID field value.
func (KafkaObject) Valid ¶
func (o KafkaObject) Valid() bool
Valid checks if a KafkaObject has a valid Type field value.
type RequestThrottle ¶
RequestThrottle controls request rates with a configurable burst capacity and per-second rate backed with a token bucket.
func NewRequestThrottle ¶
func NewRequestThrottle(cfg RequestThrottleConfig) (RequestThrottle, error)
NewRequestThrottle initializes a RequestThrottle.
type RequestThrottleConfig ¶
type RequestThrottleConfig struct { // Burst capacity. Capacity int // Request rate (reqs/s). Rate int }
RequestThrottleConfig specifies the RequestThrottle burst capacity and per-second rate limit.
type ReservedFields ¶
ReservedFields is a mapping of object types (topic, broker) to a set of fields reserved for internal use; these are default fields that become searchable through the tags interface.
func GetReservedFields ¶
func GetReservedFields() ReservedFields
GetReservedFields returns a map proto message types to field names considered reserved for internal use. All fields specified in the Registry proto messages are discovered here and reserved by default.
type Server ¶
type Server struct { pb.UnimplementedRegistryServer Locking cluster.Lock HTTPListen string GRPCListen string ZK kafkazk.Handler Tags *TagHandler // contains filtered or unexported fields }
Server implements the registry APIs.
func (*Server) BrokerMappings ¶
func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)
BrokerMappings returns all topic names that have at least one partition held by the requested broker. The broker is specified in the BrokerRequest.ID field.
func (*Server) CreateTopic ¶
CreateTopic creates a topic if it doesn't exist. Topic tags can optionally be set at topic creation time. Additionally, topics can be created on a target set of brokers by specifying the broker tag(s) in the request.
func (*Server) DeleteBrokerTags ¶
func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
DeleteBrokerTags deletes custom tags for the specified broker.
func (*Server) DeleteStaleTags ¶ added in v3.8.0
DeleteStaleTags deletes any tags that have not had a kafka resource associated with them.
func (*Server) DeleteTopic ¶ added in v3.6.0
DeleteTopic deletes the topic specified in the req.Topic.Name field.
func (*Server) DeleteTopicTags ¶
func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
DeleteTopicTag deletes custom tags for the specified topic.
func (*Server) DialZK ¶
DialZK takes a Context, WaitGroup and *kafkazk.Config and initializes a kafkazk.Handler. A background shutdown procedure is called when the context is cancelled.
func (*Server) EnablingLocking ¶ added in v3.13.0
EnablingLocking uses distributed locking for write operations.
func (*Server) GetBrokers ¶
func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
GetBrokers gets brokers. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.
func (*Server) GetTopics ¶
func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
GetTopics gets topics. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.
func (*Server) InitKafkaAdmin ¶
func (s *Server) InitKafkaAdmin(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error
InitKafkaAdmin takes a Context, WaitGroup and an admin.Config and initializes an admin.Client. A background shutdown procedure is called when the context is cancelled.
func (*Server) InitKafkaConsumer ¶ added in v3.4.0
func (s *Server) InitKafkaConsumer(ctx context.Context, wg *sync.WaitGroup, cfg kafkaadmin.Config) error
InitKafkaConsumer takes a Context, WaitGroup and an admin.Config and initializes a kafka.Consumer. A background shutdown procedure is called when the context is cancelled.
func (*Server) ListBrokers ¶
func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
ListBrokers gets broker IDs. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.
func (*Server) ListTopics ¶
func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
ListTopics gets topic names. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.
func (*Server) LogRequest ¶
LogRequest takes a request context and input parameters as a string and logs the request data.
func (*Server) MarkForDeletion ¶ added in v3.8.0
MarkForDeletion marks stored tags that have been stranded without an associated kafka resource.
func (*Server) ReassigningTopics ¶ added in v3.4.0
ReassigningTopics returns a *pb.TopicResponse holding the names of all topics currently undergoing reassignment.
func (*Server) RunTagCleanup ¶ added in v3.8.0
runTagCleanup starts a background process deleting stale tags.
func (*Server) TagBroker ¶
func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
TagBroker sets custom tags for the specified broker. Any previously existing tags that were not specified in the request remain unmodified.
func (*Server) TagBrokers ¶ added in v3.18.1
func (s *Server) TagBrokers(ctx context.Context, req *pb.TagBrokersRequest) (*pb.TagBrokersResponse, error)
TagBrokers sets custom tags for the specified brokers. Any previously existing tags that were not specified in the request remain unmodified.
func (*Server) TagTopic ¶
func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
TagTopic sets custom tags for the specified topic. Any previously existing tags that were not specified in the request remain unmodified.
func (*Server) TopicMappings ¶
func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)
TopicMappings returns all broker IDs that hold at least one partition for the requested topic. The topic is specified in the TopicRequest.Name field.
func (*Server) TranslateOffsets ¶ added in v3.4.0
func (s *Server) TranslateOffsets(ctx context.Context, req *pb.TranslateOffsetRequest) (*pb.TranslateOffsetResponse, error)
TranslateOffsets translates the last committed remote consumer group's offset into the corresponding local offsets.
func (*Server) UnderReplicatedTopics ¶ added in v3.5.0
UnderReplicatedTopics returns a *pb.TopicResponse holding the names of all under replicated topics.
func (*Server) UnmappedBrokers ¶ added in v3.6.0
func (s *Server) UnmappedBrokers(ctx context.Context, req *pb.UnmappedBrokersRequest) (*pb.BrokerResponse, error)
UnmappedBrokers returns a list of broker IDs that hold no partitions. An optional list of topic names can be specified in the UnmappedBrokersRequest exclude field where partitions for those topics are not considered. For example, broker 1000 holds no partitions other than one belonging to the 'test0' topic. If UnmappedBrokers is called with 'test0' specified as an exclude name, broker 1000 will be returned in the BrokerResponse as an unmapped broker.
func (*Server) ValidateRequest ¶
func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, context.CancelFunc, error)
ValidateRequest takes an incoming request context, params, and request kind. The request is logged and checked against the appropriate request throttler. If the incoming context did not have a deadline set, the server a derived context is created with the server default timeout. The child context and error are returned.
type TagCleaner ¶ added in v3.8.0
type TagCleaner struct {
// contains filtered or unexported fields
}
func (*TagCleaner) RunTagCleanup ¶ added in v3.8.0
func (tc *TagCleaner) RunTagCleanup(s *Server, ctx context.Context, c Config)
RunTagCleanup is regularly checks for tags that are stale and need clean up.
type TagHandler ¶
type TagHandler struct {
Store TagStorage
}
TagHandler provides object filtering by tags along with tag storage and retrieval.
func NewTagHandler ¶
func NewTagHandler(c TagHandlerConfig) (*TagHandler, error)
NewTagHandler initializes a TagHandler.
func (*TagHandler) FilterBrokers ¶
func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error)
FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A filtered map is returned that includes brokers where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.
func (*TagHandler) FilterTopics ¶
func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error)
FilterTopics takes a map of topic names to *pb.Topic and tags KV list. A filtered map is returned that includes topics where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.
func (*TagHandler) TagSetFromObject ¶
func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error)
TagSetFromObject takes a protobuf type and returns the default TagSet along with any user-defined tags.
type TagHandlerConfig ¶
type TagHandlerConfig struct {
Prefix string
}
TagHandlerConfig holds TagHandler configuration.
type TagSet ¶
TagSet is a map of key:values.
func (TagSet) Equal ¶
Equal checks if the input TagSet has the same key:value pairs as the calling TagSet.
type TagStorage ¶
type TagStorage interface { LoadReservedFields(ReservedFields) error FieldReserved(KafkaObject, string) bool SetTags(KafkaObject, TagSet) error GetTags(KafkaObject) (TagSet, error) DeleteTags(KafkaObject, []string) error GetAllTags() (map[KafkaObject]TagSet, error) }
TagStorage handles tag persistence to stable storage.
type Tags ¶
type Tags []string
Tags is a []string of "key:value" pairs.
type TopicSet ¶
TopicSet is a mapping of topic name to *pb.Topic.
func TopicSetFromSlice ¶ added in v3.8.0
TopicSetFromSlice converts a slice into a TopicSet for convenience
type ZKTagStorage ¶
type ZKTagStorage struct { ReservedFields ReservedFields Prefix string ZK kafkazk.Handler }
ZKTagStorage implements tag persistence in ZooKeeper.
func NewZKTagStorage ¶
func NewZKTagStorage(c ZKTagStorageConfig) (*ZKTagStorage, error)
NewZKTagStorage initializes a ZKTagStorage.
func (*ZKTagStorage) DeleteTags ¶
func (t *ZKTagStorage) DeleteTags(o KafkaObject, keysToDelete []string) error
DeleteTags deletes all tags in the list of keys for the requested KafkaObject.
func (*ZKTagStorage) FieldReserved ¶
func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool
FieldReserved takes a KafkaObject and field name. A bool is returned that indicates whether the field is reserved for the respective KafkaObject type.
func (*ZKTagStorage) GetAllTags ¶ added in v3.8.0
func (t *ZKTagStorage) GetAllTags() (map[KafkaObject]TagSet, error)
GetAllTags returns all tags stored in the tagstore, keyed by the resource they correspond to.
func (*ZKTagStorage) GetAllTagsForType ¶ added in v3.8.0
func (t *ZKTagStorage) GetAllTagsForType(kafkaObjectType string) (map[KafkaObject]TagSet, error)
GetAllTagsForType gets all the tags for objects of the given type. A convenience method that makes getting every tag a little easier.
func (*ZKTagStorage) GetTags ¶
func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)
GetTags returns the TagSet for the requested KafkaObject.
func (*ZKTagStorage) Init ¶
func (t *ZKTagStorage) Init() error
Init ensures the ZooKeeper connection is ready and any required znodes are created.
func (*ZKTagStorage) LoadReservedFields ¶
func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error
LoadReservedFields takes a ReservedFields and stores it at ZKTagStorage.ReservedFields and returns an error.
func (*ZKTagStorage) SetTags ¶
func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error
SetTags takes a KafkaObject and TagSet and sets the tag key:values for the object.
type ZKTagStorageConfig ¶
ZKTagStorageConfig holds ZKTagStorage configs.