conf

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 14, 2023 License: AGPL-3.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const (
	MiB = 1024 * 1024

	SegmentFileExtension   = "dlog"
	IndexFileExtension     = "index"
	ProducerOffsetFileName = "producer.offset"
	TopologyFileName       = "topology.txt" // Used for non-k8s envs
)
View Source
const (
	DefaultClientDiscoveryPort = 9250
	DefaultProducerPort        = 9251
	DefaultConsumerPort        = 9252
	DefaultMetricsPort         = 9253
	DefaultProducerBinaryPort  = 9254
	DefaultGossipPort          = 9255
	DefaultGossipDataPort      = 9256
)

Port defaults

View Source
const (
	StatusUrl = "/status"

	// Url for producing messages
	TopicMessageUrl = "/v1/topic/:topic/messages"

	// Url for client discovery service
	ClientDiscoveryUrl = "/v1/brokers"

	// Url consuming messages
	ConsumerRegisterUrl     = "/v1/consumer/register"
	ConsumerPollUrl         = "/v1/consumer/poll"
	ConsumerManualCommitUrl = "/v1/consumer/commit"
	ConsumerGoodbye         = "/v1/consumer/goodbye"

	// Url for getting/setting the generation by token
	GossipGenerationUrl = "/v1/generation/%s"
	// Url for setting one generation as proposed/accepted or two generations as accepted.
	// Token is part of the route but ignored.
	GossipGenerationProposeUrl = "/v1/generation/%s/propose"
	// Url for setting the generation and transaction as committed for token
	GossipGenerationCommmitUrl = "/v1/generation/%s/commit"
	// Url for requesting the token range to be split as a consequence of scaling up
	GossipGenerationSplitUrl = "/v1/token/split"

	GossipTokenHasHistoryUrl    = "/v1/token/%s/has-history"
	GossipTokenGetHistoryUrl    = "/v1/token/%s/history"
	GossipTokenInRange          = "/v1/token/%s/in-range"
	GossipBrokerIdentifyUrl     = "/v1/broker/identify" // Send/receive my info to the peer
	GossipHostIsUpUrl           = "/v1/broker/%s/is-up"
	GossipConsumerGroupsInfoUrl = "/v1/consumer/groups-info"          // Send/receive consumer groups info
	GossipConsumerOffsetUrl     = "/v1/consumer/offsets"              // Send/receive consumer offsets
	GossipConsumerRegisterUrl   = "/v1/consumer/register"             // Send/receive consumer register from peer
	GossipConsumerCommitUrl     = "/v1/consumer/commit/%s"            // Send/receive consumer manual commit from peer
	GossipConsumerUnregisterUrl = "/v1/consumer/unregister/%s"        // Send/receive consumer unregister from peer
	GossipReadProducerOffsetUrl = "/v1/producer/offset/%s/%s/%s/%s"   // Reads the producer offset, with params: topic, token, range, version
	GossipReadFileStructureUrl  = "/v1/file-structure/%s/%s/%s/%s/%s" // Reads the file names of a given topic & offset (topic, token, range, version and offset)
	GossipGoodbyeUrl            = "/v1/goodbye"                       // Send/receive message that a broker is shutting down

	RoutingMessageUrl = "/v1/routing/topic/%s/messages"
)
View Source
const (
	EnvDebug = "POLAR_DEBUG"
)
View Source
const IndexFileWriteFlags = os.O_APPEND | os.O_CREATE | os.O_WRONLY

Use page cache for index file as it's not critical and it won't abuse the cache space

View Source
const MaxTopicLength = 255
View Source
const ProducerOffsetFileReadFlags = os.O_RDONLY
View Source
const ProducerOffsetFileWriteFlags = os.O_CREATE | os.O_WRONLY

Use page cache for max offset producer file as it won't abuse the cache space

View Source
const SegmentFileReadFlags = readFileDirectFlags
View Source
const SegmentFileWriteFlags = os.O_APPEND | os.O_CREATE | os.O_WRONLY | syscall.O_DIRECT | syscall.O_DSYNC

Variables

View Source
var Endianness = binary.BigEndian

Functions

func SegmentFileName

func SegmentFileName(segmentId int64) string

Gets the formatted file name based on the segment id

func SegmentFilePrefix

func SegmentFilePrefix(segmentId int64) string

Gets the formatted value of the id for the file name (%020d) w/o the extension, e.g. 123 -> "00000000000000000123"

func SegmentIdFromName

func SegmentIdFromName(fileName string) int64

func StartProfiling

func StartProfiling() bool

Enables profiling when the build tag is set

func StopProfiling

func StopProfiling()

Types

type BasicConfig

type BasicConfig interface {
	HomePath() string
	ListenOnAllAddresses() bool
	DevMode() bool       // Determines whether we are running a single instance in dev mode
	ConsumerRanges() int // The number of ranges to partition any token range.
	ShutdownDelay() time.Duration
	ProducerPort() int
	ProducerBinaryPort() int
	ConsumerPort() int
}

type Config

type Config interface {
	Initializer
	LocalDbConfig
	GossipConfig
	ProducerConfig
	ConsumerConfig
	DiscovererConfig
	MetricsPort() int
	CreateAllDirs() error
}

Config represents the application configuration

func NewConfig

func NewConfig(devMode bool) Config

type ConsumerConfig

type ConsumerConfig interface {
	BasicConfig
	DatalogConfig
	ConsumerAddDelay() time.Duration
	ConsumerReadTimeout() time.Duration // The interval to set the deadline in the consumer connection
	ConsumerReadThreshold() int         // The minimum amount of bytes once reached the consumer poll is fulfilled
}

type DatalogConfig

type DatalogConfig interface {
	DatalogPath(topicDataId *TopicDataId) string
	DatalogSegmentsPath() string
	MaxSegmentSize() int    // Maximum file size in bytes
	SegmentBufferSize() int // The amount of bytes that the segment buffer can hold
	MaxMessageSize() int
	MaxGroupSize() int  // MaxGroupSize is the maximum size of an uncompressed group of messages
	ReadAheadSize() int // The amount of bytes to read each time from a segment file
	AutoCommitInterval() time.Duration
	IndexFilePeriodBytes() int // How frequently write to the index file based on the segment size.
	SegmentFlushInterval() time.Duration
	LogRetentionDuration() *time.Duration // The amount of time to keep a log file before deleting it (default = 7d)
	StreamBufferSize() int                // Max size of the file stream buffers (2 of them atm)
}

type DiscovererConfig

type DiscovererConfig interface {
	BasicConfig
	Ordinal() int
	ClientDiscoveryPort() int // port number of the HTTP discovery service to expose to client libraries
	// BaseHostName is name prefix that should be concatenated with the ordinal to
	// return the host name of a replica
	BaseHostName() string
	ServiceName() string                       // Name of the K8S service for pod stable names
	PodName() string                           // Name of the pod the broker is running
	PodNamespace() string                      // Name of the namespace of the broker pod
	FixedTopologyFilePollDelay() time.Duration // The delay between attempts to read file for changes in topology
}

type GossipConfig

type GossipConfig interface {
	BasicConfig
	DatalogConfig
	GossipPort() int
	GossipDataPort() int
	ReplicationTimeout() time.Duration
	ReplicationWriteTimeout() time.Duration
	// MaxDataBodyLength is the maximum size of an interbroker data body
	MaxDataBodyLength() int
}

type LocalDbConfig

type LocalDbConfig interface {
	LocalDbPath() string
}

type ProducerConfig

type ProducerConfig interface {
	BasicConfig
	DatalogConfig
	ProducerBufferPoolSize() int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL