Documentation ¶
Index ¶
- Constants
- Variables
- func SegmentFileName(segmentId int64) string
- func SegmentFilePrefix(segmentId int64) string
- func SegmentIdFromName(fileName string) int64
- func StartProfiling() bool
- func StopProfiling()
- type BasicConfig
- type Config
- type ConsumerConfig
- type DatalogConfig
- type DiscovererConfig
- type GossipConfig
- type LocalDbConfig
- type ProducerConfig
Constants ¶
View Source
const ( MiB = 1024 * 1024 SegmentFileExtension = "dlog" IndexFileExtension = "index" ProducerOffsetFileName = "producer.offset" TopologyFileName = "topology.txt" // Used for non-k8s envs )
View Source
const ( DefaultClientDiscoveryPort = 9250 DefaultProducerPort = 9251 DefaultConsumerPort = 9252 DefaultMetricsPort = 9253 DefaultProducerBinaryPort = 9254 DefaultGossipPort = 9255 DefaultGossipDataPort = 9256 )
Port defaults
View Source
const ( StatusUrl = "/status" // Url for producing messages TopicMessageUrl = "/v1/topic/:topic/messages" // Url for client discovery service ClientDiscoveryUrl = "/v1/brokers" // Url consuming messages ConsumerRegisterUrl = "/v1/consumer/register" ConsumerPollUrl = "/v1/consumer/poll" ConsumerManualCommitUrl = "/v1/consumer/commit" ConsumerGoodbye = "/v1/consumer/goodbye" // Url for getting/setting the generation by token GossipGenerationUrl = "/v1/generation/%s" // Url for setting one generation as proposed/accepted or two generations as accepted. // Token is part of the route but ignored. GossipGenerationProposeUrl = "/v1/generation/%s/propose" // Url for setting the generation and transaction as committed for token GossipGenerationCommmitUrl = "/v1/generation/%s/commit" // Url for requesting the token range to be split as a consequence of scaling up GossipGenerationSplitUrl = "/v1/token/split" GossipTokenHasHistoryUrl = "/v1/token/%s/has-history" GossipTokenGetHistoryUrl = "/v1/token/%s/history" GossipTokenInRange = "/v1/token/%s/in-range" GossipBrokerIdentifyUrl = "/v1/broker/identify" // Send/receive my info to the peer GossipHostIsUpUrl = "/v1/broker/%s/is-up" GossipConsumerGroupsInfoUrl = "/v1/consumer/groups-info" // Send/receive consumer groups info GossipConsumerOffsetUrl = "/v1/consumer/offsets" // Send/receive consumer offsets GossipConsumerRegisterUrl = "/v1/consumer/register" // Send/receive consumer register from peer GossipConsumerCommitUrl = "/v1/consumer/commit/%s" // Send/receive consumer manual commit from peer GossipConsumerUnregisterUrl = "/v1/consumer/unregister/%s" // Send/receive consumer unregister from peer GossipReadProducerOffsetUrl = "/v1/producer/offset/%s/%s/%s/%s" // Reads the producer offset, with params: topic, token, range, version GossipReadFileStructureUrl = "/v1/file-structure/%s/%s/%s/%s/%s" // Reads the file names of a given topic & offset (topic, token, range, version and offset) GossipGoodbyeUrl = "/v1/goodbye" // Send/receive message that a broker is shutting down RoutingMessageUrl = "/v1/routing/topic/%s/messages" )
View Source
const (
EnvDebug = "POLAR_DEBUG"
)
Use page cache for index file as it's not critical and it won't abuse the cache space
View Source
const MaxTopicLength = 255
View Source
const ProducerOffsetFileReadFlags = os.O_RDONLY
View Source
const ProducerOffsetFileWriteFlags = os.O_CREATE | os.O_WRONLY
Use page cache for max offset producer file as it won't abuse the cache space
View Source
const SegmentFileReadFlags = readFileDirectFlags
Variables ¶
View Source
var Endianness = binary.BigEndian
Functions ¶
func SegmentFileName ¶
Gets the formatted file name based on the segment id
func SegmentFilePrefix ¶
Gets the formatted value of the id for the file name (%020d) w/o the extension, e.g. 123 -> "00000000000000000123"
func SegmentIdFromName ¶
func StopProfiling ¶
func StopProfiling()
Types ¶
type BasicConfig ¶
type BasicConfig interface { HomePath() string ListenOnAllAddresses() bool DevMode() bool // Determines whether we are running a single instance in dev mode ConsumerRanges() int // The number of ranges to partition any token range. ShutdownDelay() time.Duration ProducerPort() int ProducerBinaryPort() int ConsumerPort() int }
type Config ¶
type Config interface { Initializer LocalDbConfig GossipConfig ProducerConfig ConsumerConfig DiscovererConfig MetricsPort() int CreateAllDirs() error }
Config represents the application configuration
type ConsumerConfig ¶
type ConsumerConfig interface { BasicConfig DatalogConfig ConsumerAddDelay() time.Duration ConsumerReadTimeout() time.Duration // The interval to set the deadline in the consumer connection ConsumerReadThreshold() int // The minimum amount of bytes once reached the consumer poll is fulfilled }
type DatalogConfig ¶
type DatalogConfig interface { DatalogPath(topicDataId *TopicDataId) string DatalogSegmentsPath() string MaxSegmentSize() int // Maximum file size in bytes SegmentBufferSize() int // The amount of bytes that the segment buffer can hold MaxMessageSize() int MaxGroupSize() int // MaxGroupSize is the maximum size of an uncompressed group of messages ReadAheadSize() int // The amount of bytes to read each time from a segment file AutoCommitInterval() time.Duration IndexFilePeriodBytes() int // How frequently write to the index file based on the segment size. SegmentFlushInterval() time.Duration LogRetentionDuration() *time.Duration // The amount of time to keep a log file before deleting it (default = 7d) StreamBufferSize() int // Max size of the file stream buffers (2 of them atm) }
type DiscovererConfig ¶
type DiscovererConfig interface { BasicConfig Ordinal() int ClientDiscoveryPort() int // port number of the HTTP discovery service to expose to client libraries // BaseHostName is name prefix that should be concatenated with the ordinal to // return the host name of a replica BaseHostName() string ServiceName() string // Name of the K8S service for pod stable names PodName() string // Name of the pod the broker is running PodNamespace() string // Name of the namespace of the broker pod FixedTopologyFilePollDelay() time.Duration // The delay between attempts to read file for changes in topology }
type GossipConfig ¶
type GossipConfig interface { BasicConfig DatalogConfig GossipPort() int GossipDataPort() int ReplicationTimeout() time.Duration ReplicationWriteTimeout() time.Duration // MaxDataBodyLength is the maximum size of an interbroker data body MaxDataBodyLength() int }
type LocalDbConfig ¶
type LocalDbConfig interface {
LocalDbPath() string
}
type ProducerConfig ¶
type ProducerConfig interface { BasicConfig DatalogConfig ProducerBufferPoolSize() int }
Click to show internal directories.
Click to hide internal directories.