Documentation ¶
Index ¶
- Variables
- func PartitionMapToReplicaAssignment(pm *kafkazk.PartitionMap) admin.ReplicaAssignment
- type BrokerSet
- type Config
- type ErrReservedTag
- type KafkaObject
- type RequestThrottle
- type RequestThrottleConfig
- type ReservedFields
- type Server
- func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)
- func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (*pb.Empty, error)
- func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
- func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
- func (s *Server) DialZK(ctx context.Context, wg *sync.WaitGroup, c *kafkazk.Config) error
- func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
- func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
- func (s *Server) InitKafkaAdmin(ctx context.Context, wg *sync.WaitGroup, cfg admin.Config) error
- func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
- func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
- func (s *Server) LogRequest(ctx context.Context, params string, reqID uint64)
- func (s *Server) RunHTTP(ctx context.Context, wg *sync.WaitGroup) error
- func (s *Server) RunRPC(ctx context.Context, wg *sync.WaitGroup) error
- func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
- func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
- func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)
- func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, error)
- type TagHandler
- type TagHandlerConfig
- type TagSet
- type TagStorage
- type Tags
- type TopicSet
- type ZKTagStorage
- func (t *ZKTagStorage) DeleteTags(o KafkaObject, ts Tags) error
- func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool
- func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)
- func (t *ZKTagStorage) Init() error
- func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error
- func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error
- type ZKTagStorageConfig
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFetchingBrokers error. ErrFetchingBrokers = errors.New("error fetching brokers") // ErrBrokerNotExist error. ErrBrokerNotExist = errors.New("broker does not exist") // ErrBrokerIDEmpty error. ErrBrokerIDEmpty = errors.New("broker Id field must be specified") )
var ( // ErrFetchingTopics error. ErrFetchingTopics = errors.New("error fetching topics") // ErrTopicNotExist error. ErrTopicNotExist = errors.New("topic does not exist") // ErrTopicNameEmpty error. ErrTopicNameEmpty = errors.New("topic Name field must be specified") // ErrTopicFieldMissing error. ErrTopicFieldMissing = errors.New("topic field missing in request body") // ErrTopicAlreadyExists error. ErrTopicAlreadyExists = errors.New("topic already exists") // ErrInsufficientBrokers error. ErrInsufficientBrokers = errors.New("insufficient number of brokers") )
var ( // ErrInvalidKafkaObjectType error. ErrInvalidKafkaObjectType = errors.New("invalid Kafka object type") // ErrKafkaObjectDoesNotExist error. ErrKafkaObjectDoesNotExist = errors.New("requested Kafka object does not exist") // ErrNilTagSet error. ErrNilTagSet = errors.New("must provide a non-nil or non-empty TagSet") // ErrNilTags error. ErrNilTags = errors.New("must provide a non-nil or non-empty tags") )
var ( // ErrRequestThrottleTimeout error. ErrRequestThrottleTimeout = errors.New("wait time exceeded") )
Functions ¶
func PartitionMapToReplicaAssignment ¶
func PartitionMapToReplicaAssignment(pm *kafkazk.PartitionMap) admin.ReplicaAssignment
PartitionMapToReplicaAssignment takes a *kafkazk.PartitionMap and transforms it into an admin.ReplicaAssignment.
Types ¶
type Config ¶
type Config struct { HTTPListen string GRPCListen string ReadReqRate int WriteReqRate int ZKTagsPrefix string // contains filtered or unexported fields }
Config holds Server configurations.
type ErrReservedTag ¶
type ErrReservedTag struct {
// contains filtered or unexported fields
}
ErrReservedTag error.
func (ErrReservedTag) Error ¶
func (e ErrReservedTag) Error() string
type KafkaObject ¶
KafkaObject holds an object type (broker, topic) and object identifier (ID, name).
func (KafkaObject) Complete ¶
func (o KafkaObject) Complete() bool
Complete checks if a KafkaObject is valid and has a non-empty ID field value.
func (KafkaObject) Valid ¶
func (o KafkaObject) Valid() bool
Valid checks if a KafkaObject has a valid Type field value.
type RequestThrottle ¶
RequestThrottle controls request rates with a configurable burst capacity and per-second rate backed with a token bucket.
func NewRequestThrottle ¶
func NewRequestThrottle(cfg RequestThrottleConfig) (RequestThrottle, error)
NewRequestThrottle initializes a RequestThrottle.
type RequestThrottleConfig ¶
type RequestThrottleConfig struct { // Burst capacity. Capacity int // Request rate (reqs/s). Rate int }
RequestThrottleConfig specifies the RequestThrottle burst capacity and per-second rate limit.
type ReservedFields ¶
ReservedFields is a mapping of object types (topic, broker) to a set of fields reserved for internal use; these are default fields that become searchable through the tags interface.
func GetReservedFields ¶
func GetReservedFields() ReservedFields
GetReservedFields returns a map proto message types to field names considered reserved for internal use. All fields specified in the Registry proto messages are discovered here and reserved by default.
type Server ¶
type Server struct { HTTPListen string GRPCListen string ZK kafkazk.Handler Tags *TagHandler // contains filtered or unexported fields }
Server implements the registry APIs.
func (*Server) BrokerMappings ¶
func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)
BrokerMappings returns all topic names that have at least one partition held by the requested broker. The broker is specified in the BrokerRequest.ID field.
func (*Server) CreateTopic ¶
CreateTopic creates a topic if it doesn't exist. Topic tags can optionally be set at topic creation time. Additionally, topics can be created on a target set of brokers by specifying the broker tag(s) in the request.
func (*Server) DeleteBrokerTags ¶
func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
DeleteBrokerTags deletes custom tags for the specified broker.
func (*Server) DeleteTopicTags ¶
func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
DeleteTopicTag deletes custom tags for the specified topic.
func (*Server) DialZK ¶
DialZK takes a Context, WaitGroup and *kafkazk.Config and initializes a kafkazk.Handler. A background shutdown procedure is called when the context is cancelled.
func (*Server) GetBrokers ¶
func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
GetBrokers gets brokers. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.
func (*Server) GetTopics ¶
func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
GetTopics gets topics. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.
func (*Server) InitKafkaAdmin ¶
InitKafkaAdmin takes a Context, WaitGroup and an admin.Config and initializes an admin.Client. A background shutdown procedure is called when the context is cancelled.
func (*Server) ListBrokers ¶
func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)
ListBrokers gets broker IDs. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.
func (*Server) ListTopics ¶
func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)
ListTopics gets topic names. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.
func (*Server) LogRequest ¶
LogRequest takes a request context and input parameters as a string and logs the request data.
func (*Server) TagBroker ¶
func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)
TagBroker sets custom tags for the specified broker. Any previously existing tags that were not specified in the request remain unmodified.
func (*Server) TagTopic ¶
func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)
TagTopic sets custom tags for the specified topic. Any previously existing tags that were not specified in the request remain unmodified.
func (*Server) TopicMappings ¶
func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)
TopicMappings returns all broker IDs that hold at least one partition for the requested topic. The topic is specified in the TopicRequest.Name field.
func (*Server) ValidateRequest ¶
func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, error)
ValidateRequest takes an incoming request context, params, and request kind. The request is logged and checked against the appropriate request throttler. If the incoming context did not have a deadline set, the server a derived context is created with the server default timeout. The child context and error are returned.
type TagHandler ¶
type TagHandler struct {
Store TagStorage
}
TagHandler provides object filtering by tags along with tag storage and retrieval.
func NewTagHandler ¶
func NewTagHandler(c TagHandlerConfig) (*TagHandler, error)
NewTagHandler initializes a TagHandler.
func (*TagHandler) FilterBrokers ¶
func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error)
FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A filtered map is returned that includes brokers where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.
func (*TagHandler) FilterTopics ¶
func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error)
FilterTopics takes a map of topic names to *pb.Topic and tags KV list. A filtered map is returned that includes topics where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.
func (*TagHandler) TagSetFromObject ¶
func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error)
TagSetFromObject takes a protobuf type and returns the default TagSet along with any user-defined tags.
type TagHandlerConfig ¶
type TagHandlerConfig struct {
Prefix string
}
TagHandlerConfig holds TagHandler configuration.
type TagSet ¶
TagSet is a map of key:values.
type TagStorage ¶
type TagStorage interface { LoadReservedFields(ReservedFields) error FieldReserved(KafkaObject, string) bool SetTags(KafkaObject, TagSet) error GetTags(KafkaObject) (TagSet, error) DeleteTags(KafkaObject, Tags) error }
TagStorage handles tag persistence to stable storage.
type ZKTagStorage ¶
type ZKTagStorage struct { ReservedFields ReservedFields Prefix string ZK kafkazk.Handler }
ZKTagStorage implements tag persistence in ZooKeeper.
func NewZKTagStorage ¶
func NewZKTagStorage(c ZKTagStorageConfig) (*ZKTagStorage, error)
NewZKTagStorage initializes a ZKTagStorage.
func (*ZKTagStorage) DeleteTags ¶
func (t *ZKTagStorage) DeleteTags(o KafkaObject, ts Tags) error
DeleteTags deletes all tags in the Tags for the requested KafkaObject.
func (*ZKTagStorage) FieldReserved ¶
func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool
FieldReserved takes a KafkaObject and field name. A bool is returned that indicates whether the field is reserved for the respective KafkaObject type.
func (*ZKTagStorage) GetTags ¶
func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)
GetTags returns the TagSet for the requested KafkaObject.
func (*ZKTagStorage) Init ¶
func (t *ZKTagStorage) Init() error
Init ensures the ZooKeeper connection is ready and any required znodes are created.
func (*ZKTagStorage) LoadReservedFields ¶
func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error
LoadReservedFields takes a ReservedFields and stores it at ZKTagStorage.ReservedFields and returns an error.
func (*ZKTagStorage) SetTags ¶
func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error
SetTags takes a KafkaObject and TagSet and sets the tag key:values for the object.
type ZKTagStorageConfig ¶
ZKTagStorageConfig holds ZKTagStorage configs.