server

package
v3.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2020 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrFetchingBrokers error.
	ErrFetchingBrokers = errors.New("error fetching brokers")
	// ErrBrokerNotExist error.
	ErrBrokerNotExist = errors.New("broker does not exist")
	// ErrBrokerIDEmpty error.
	ErrBrokerIDEmpty = errors.New("broker Id field must be specified")
)
View Source
var (
	// ErrFetchingTopics error.
	ErrFetchingTopics = errors.New("error fetching topics")
	// ErrTopicNotExist error.
	ErrTopicNotExist = errors.New("topic does not exist")
	// ErrTopicNameEmpty error.
	ErrTopicNameEmpty = errors.New("topic Name field must be specified")
	// ErrTopicFieldMissing error.
	ErrTopicFieldMissing = errors.New("topic field missing in request body")
	// ErrTopicAlreadyExists error.
	ErrTopicAlreadyExists = errors.New("topic already exists")
	// ErrInsufficientBrokers error.
	ErrInsufficientBrokers = errors.New("insufficient number of brokers")
)
View Source
var (
	// ErrInvalidKafkaObjectType error.
	ErrInvalidKafkaObjectType = errors.New("invalid Kafka object type")
	// ErrKafkaObjectDoesNotExist error.
	ErrKafkaObjectDoesNotExist = errors.New("requested Kafka object does not exist")
	// ErrNilTagSet error.
	ErrNilTagSet = errors.New("must provide a non-nil or non-empty TagSet")
	// ErrNilTags error.
	ErrNilTags = errors.New("must provide a non-nil or non-empty tags")
)
View Source
var (
	// ErrRequestThrottleTimeout error.
	ErrRequestThrottleTimeout = errors.New("wait time exceeded")
)

Functions

func PartitionMapToReplicaAssignment

func PartitionMapToReplicaAssignment(pm *kafkazk.PartitionMap) admin.ReplicaAssignment

PartitionMapToReplicaAssignment takes a *kafkazk.PartitionMap and transforms it into an admin.ReplicaAssignment.

Types

type BrokerSet

type BrokerSet map[uint32]*pb.Broker

BrokerSet is a mapping of broker IDs to *pb.Broker.

func (BrokerSet) IDs

func (b BrokerSet) IDs() []uint32

IDs returns a []uint32 of IDs from a BrokerSet.

type Config

type Config struct {
	HTTPListen   string
	GRPCListen   string
	ReadReqRate  int
	WriteReqRate int
	ZKTagsPrefix string
	// contains filtered or unexported fields
}

Config holds Server configurations.

type ErrReservedTag

type ErrReservedTag struct {
	// contains filtered or unexported fields
}

ErrReservedTag error.

func (ErrReservedTag) Error

func (e ErrReservedTag) Error() string

type KafkaObject

type KafkaObject struct {
	Type string
	ID   string
}

KafkaObject holds an object type (broker, topic) and object identifier (ID, name).

func (KafkaObject) Complete

func (o KafkaObject) Complete() bool

Complete checks if a KafkaObject is valid and has a non-empty ID field value.

func (KafkaObject) Valid

func (o KafkaObject) Valid() bool

Valid checks if a KafkaObject has a valid Type field value.

type RequestThrottle

type RequestThrottle interface {
	Request(context.Context) error
}

RequestThrottle controls request rates with a configurable burst capacity and per-second rate backed with a token bucket.

func NewRequestThrottle

func NewRequestThrottle(cfg RequestThrottleConfig) (RequestThrottle, error)

NewRequestThrottle initializes a RequestThrottle.

type RequestThrottleConfig

type RequestThrottleConfig struct {
	// Burst capacity.
	Capacity int
	// Request rate (reqs/s).
	Rate int
}

RequestThrottleConfig specifies the RequestThrottle burst capacity and per-second rate limit.

type ReservedFields

type ReservedFields map[string]map[string]struct{}

ReservedFields is a mapping of object types (topic, broker) to a set of fields reserved for internal use; these are default fields that become searchable through the tags interface.

func GetReservedFields

func GetReservedFields() ReservedFields

GetReservedFields returns a map proto message types to field names considered reserved for internal use. All fields specified in the Registry proto messages are discovered here and reserved by default.

type Server

type Server struct {
	HTTPListen string
	GRPCListen string
	ZK         kafkazk.Handler

	Tags *TagHandler
	// contains filtered or unexported fields
}

Server implements the registry APIs.

func NewServer

func NewServer(c Config) (*Server, error)

NewServer initializes a *Server.

func (*Server) BrokerMappings

func (s *Server) BrokerMappings(ctx context.Context, req *pb.BrokerRequest) (*pb.TopicResponse, error)

BrokerMappings returns all topic names that have at least one partition held by the requested broker. The broker is specified in the BrokerRequest.ID field.

func (*Server) CreateTopic

func (s *Server) CreateTopic(ctx context.Context, req *pb.CreateTopicRequest) (*pb.Empty, error)

CreateTopic creates a topic if it doesn't exist. Topic tags can optionally be set at topic creation time. Additionally, topics can be created on a target set of brokers by specifying the broker tag(s) in the request.

func (*Server) DeleteBrokerTags

func (s *Server) DeleteBrokerTags(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)

DeleteBrokerTags deletes custom tags for the specified broker.

func (*Server) DeleteTopicTags

func (s *Server) DeleteTopicTags(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)

DeleteTopicTag deletes custom tags for the specified topic.

func (*Server) DialZK

func (s *Server) DialZK(ctx context.Context, wg *sync.WaitGroup, c *kafkazk.Config) error

DialZK takes a Context, WaitGroup and *kafkazk.Config and initializes a kafkazk.Handler. A background shutdown procedure is called when the context is cancelled.

func (*Server) GetBrokers

func (s *Server) GetBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)

GetBrokers gets brokers. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.

func (*Server) GetTopics

func (s *Server) GetTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)

GetTopics gets topics. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.

func (*Server) InitKafkaAdmin

func (s *Server) InitKafkaAdmin(ctx context.Context, wg *sync.WaitGroup, cfg admin.Config) error

InitKafkaAdmin takes a Context, WaitGroup and an admin.Config and initializes an admin.Client. A background shutdown procedure is called when the context is cancelled.

func (*Server) ListBrokers

func (s *Server) ListBrokers(ctx context.Context, req *pb.BrokerRequest) (*pb.BrokerResponse, error)

ListBrokers gets broker IDs. If the input *pb.BrokerRequest Id field is non-zero, the specified broker is matched if it exists. Otherwise, all brokers found in ZooKeeper are matched. Matched brokers are then filtered by all tags specified, if specified, in the *pb.BrokerRequest tag field.

func (*Server) ListTopics

func (s *Server) ListTopics(ctx context.Context, req *pb.TopicRequest) (*pb.TopicResponse, error)

ListTopics gets topic names. If the input *pb.TopicRequest Name field is non-nil, the specified topic is matched if it exists. Otherwise, all topics found in ZooKeeper are matched. Matched topics are then filtered by all tags specified, if specified, in the *pb.TopicRequest tag field.

func (*Server) LogRequest

func (s *Server) LogRequest(ctx context.Context, params string, reqID uint64)

LogRequest takes a request context and input parameters as a string and logs the request data.

func (*Server) RunHTTP

func (s *Server) RunHTTP(ctx context.Context, wg *sync.WaitGroup) error

RunHTTP runs the HTTP endpoint.

func (*Server) RunRPC

func (s *Server) RunRPC(ctx context.Context, wg *sync.WaitGroup) error

RunRPC runs the gRPC endpoint.

func (*Server) TagBroker

func (s *Server) TagBroker(ctx context.Context, req *pb.BrokerRequest) (*pb.TagResponse, error)

TagBroker sets custom tags for the specified broker. Any previously existing tags that were not specified in the request remain unmodified.

func (*Server) TagTopic

func (s *Server) TagTopic(ctx context.Context, req *pb.TopicRequest) (*pb.TagResponse, error)

TagTopic sets custom tags for the specified topic. Any previously existing tags that were not specified in the request remain unmodified.

func (*Server) TopicMappings

func (s *Server) TopicMappings(ctx context.Context, req *pb.TopicRequest) (*pb.BrokerResponse, error)

TopicMappings returns all broker IDs that hold at least one partition for the requested topic. The topic is specified in the TopicRequest.Name field.

func (*Server) ValidateRequest

func (s *Server) ValidateRequest(ctx context.Context, req interface{}, kind int) (context.Context, error)

ValidateRequest takes an incoming request context, params, and request kind. The request is logged and checked against the appropriate request throttler. If the incoming context did not have a deadline set, the server a derived context is created with the server default timeout. The child context and error are returned.

type TagHandler

type TagHandler struct {
	Store TagStorage
}

TagHandler provides object filtering by tags along with tag storage and retrieval.

func NewTagHandler

func NewTagHandler(c TagHandlerConfig) (*TagHandler, error)

NewTagHandler initializes a TagHandler.

func (*TagHandler) FilterBrokers

func (t *TagHandler) FilterBrokers(in BrokerSet, tags Tags) (BrokerSet, error)

FilterBrokers takes a map of broker IDs to *pb.Broker and tags KV list. A filtered map is returned that includes brokers where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.

func (*TagHandler) FilterTopics

func (t *TagHandler) FilterTopics(in TopicSet, tags Tags) (TopicSet, error)

FilterTopics takes a map of topic names to *pb.Topic and tags KV list. A filtered map is returned that includes topics where all tags values match the provided input tag KVs. Additionally, any custom tags persisted in the TagStorage backend are populated into the Tags field for each matched object.

func (*TagHandler) TagSetFromObject

func (t *TagHandler) TagSetFromObject(o interface{}) (TagSet, error)

TagSetFromObject takes a protobuf type and returns the default TagSet along with any user-defined tags.

type TagHandlerConfig

type TagHandlerConfig struct {
	Prefix string
}

TagHandlerConfig holds TagHandler configuration.

type TagSet

type TagSet map[string]string

TagSet is a map of key:values.

func (TagSet) Equal

func (t1 TagSet) Equal(t2 TagSet) bool

Equal checks if the input TagSet has the same key:value pairs as the calling TagSet.

func (TagSet) Tags

func (t TagSet) Tags() Tags

Tags takes a TagSet and returns a Tags.

type TagStorage

type TagStorage interface {
	LoadReservedFields(ReservedFields) error
	FieldReserved(KafkaObject, string) bool
	SetTags(KafkaObject, TagSet) error
	GetTags(KafkaObject) (TagSet, error)
	DeleteTags(KafkaObject, Tags) error
}

TagStorage handles tag persistence to stable storage.

type Tags

type Tags []string

Tags is a []string of "key:value" pairs.

func (Tags) TagSet

func (t Tags) TagSet() (TagSet, error)

TagSet takes a Tags and returns a TagSet and error for any malformed tags. Tags are expected to be formatted as a comma delimited "key:value,key2:value2" string. TODO normalize all tag usage to lower case.

type TopicSet

type TopicSet map[string]*pb.Topic

TopicSet is a mapping of topic name to *pb.Topic.

func (TopicSet) Names

func (t TopicSet) Names() []string

Names returns a []string of topic names from a TopicSet.

type ZKTagStorage

type ZKTagStorage struct {
	ReservedFields ReservedFields
	Prefix         string
	ZK             kafkazk.Handler
}

ZKTagStorage implements tag persistence in ZooKeeper.

func NewZKTagStorage

func NewZKTagStorage(c ZKTagStorageConfig) (*ZKTagStorage, error)

NewZKTagStorage initializes a ZKTagStorage.

func (*ZKTagStorage) DeleteTags

func (t *ZKTagStorage) DeleteTags(o KafkaObject, ts Tags) error

DeleteTags deletes all tags in the Tags for the requested KafkaObject.

func (*ZKTagStorage) FieldReserved

func (t *ZKTagStorage) FieldReserved(o KafkaObject, f string) bool

FieldReserved takes a KafkaObject and field name. A bool is returned that indicates whether the field is reserved for the respective KafkaObject type.

func (*ZKTagStorage) GetTags

func (t *ZKTagStorage) GetTags(o KafkaObject) (TagSet, error)

GetTags returns the TagSet for the requested KafkaObject.

func (*ZKTagStorage) Init

func (t *ZKTagStorage) Init() error

Init ensures the ZooKeeper connection is ready and any required znodes are created.

func (*ZKTagStorage) LoadReservedFields

func (t *ZKTagStorage) LoadReservedFields(r ReservedFields) error

LoadReservedFields takes a ReservedFields and stores it at ZKTagStorage.ReservedFields and returns an error.

func (*ZKTagStorage) SetTags

func (t *ZKTagStorage) SetTags(o KafkaObject, ts TagSet) error

SetTags takes a KafkaObject and TagSet and sets the tag key:values for the object.

type ZKTagStorageConfig

type ZKTagStorageConfig struct {
	ZKAddr string
	Prefix string
}

ZKTagStorageConfig holds ZKTagStorage configs.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL