kop

package
v0.0.0-...-dc14e26 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 6, 2023 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EmptyMemberId = ""
)

Variables

This section is empty.

Functions

func ConvertMsgId

func ConvertMsgId(messageId pulsar.MessageID) int64

Types

type Broker

type Broker struct {
	ConnMap syncx.Map[string, net.Conn]
	SaslMap syncx.Map[string, codec.SaslAuthenticateReq]
	// contains filtered or unexported fields
}

func NewKop

func NewKop(impl Server, config *Config) (*Broker, error)

func (*Broker) AcceptError

func (b *Broker) AcceptError(conn *knet.Conn, err error)

func (*Broker) ApiVersion

func (b *Broker) ApiVersion(conn *knet.Conn, req *codec.ApiReq) (*codec.ApiResp, error)

func (*Broker) Authed

func (b *Broker) Authed(ctx *NetworkContext) bool

func (*Broker) Close

func (b *Broker) Close()

func (*Broker) CloseServer

func (b *Broker) CloseServer()

func (*Broker) ConnectionClosed

func (b *Broker) ConnectionClosed(conn *knet.Conn)

func (*Broker) ConnectionOpened

func (b *Broker) ConnectionOpened(conn *knet.Conn)

func (*Broker) ConsumePulsarConsumerSize

func (b *Broker) ConsumePulsarConsumerSize() int

func (*Broker) DisconnectAction

func (b *Broker) DisconnectAction(addr net.Addr)

func (*Broker) DisconnectAll

func (b *Broker) DisconnectAll()

func (*Broker) DisconnectAllLocalAddr

func (b *Broker) DisconnectAllLocalAddr(localAddr string)

func (*Broker) DisconnectConsumer

func (b *Broker) DisconnectConsumer(topic string)

func (*Broker) DisconnectRemoteAddr

func (b *Broker) DisconnectRemoteAddr(addrList []string)

func (*Broker) Fetch

func (b *Broker) Fetch(conn *knet.Conn, req *codec.FetchReq) (*codec.FetchResp, error)

func (*Broker) FetchAction

func (b *Broker) FetchAction(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error)

func (*Broker) FetchPartition

func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, maxBytes int, minBytes int, maxWaitMs int) (*codec.FetchPartitionResp, error)

FetchPartition visible for testing

func (*Broker) FindCoordinator

func (b *Broker) FindCoordinator(conn *knet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, error)

func (*Broker) GroupJoinAction

func (b *Broker) GroupJoinAction(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)

func (*Broker) GroupLeaveAction

func (b *Broker) GroupLeaveAction(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)

func (*Broker) GroupSyncAction

func (b *Broker) GroupSyncAction(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)

func (*Broker) HeartBeatAction

func (b *Broker) HeartBeatAction(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp

func (*Broker) Heartbeat

func (b *Broker) Heartbeat(conn *knet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, error)

func (*Broker) JoinGroup

func (b *Broker) JoinGroup(conn *knet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)

func (*Broker) LeaveGroup

func (b *Broker) LeaveGroup(conn *knet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)

func (*Broker) ListOffsets

func (b *Broker) ListOffsets(conn *knet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)

func (*Broker) ListOffsetsVersion

func (b *Broker) ListOffsetsVersion(ctx *NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)

func (*Broker) Metadata

func (b *Broker) Metadata(conn *knet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, error)

func (*Broker) OffsetCommit

func (b *Broker) OffsetCommit(conn *knet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)

func (*Broker) OffsetCommitPartitionAction

func (b *Broker) OffsetCommitPartitionAction(addr net.Addr, topic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)

func (*Broker) OffsetCommitVersion

func (b *Broker) OffsetCommitVersion(ctx *NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)

func (*Broker) OffsetFetch

func (b *Broker) OffsetFetch(conn *knet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)

func (*Broker) OffsetFetchAction

func (b *Broker) OffsetFetchAction(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error)

func (*Broker) OffsetFetchVersion

func (b *Broker) OffsetFetchVersion(ctx *NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)

func (*Broker) OffsetForLeaderEpoch

func (b *Broker) OffsetForLeaderEpoch(conn *knet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)

func (*Broker) OffsetForLeaderEpochVersion

func (b *Broker) OffsetForLeaderEpochVersion(ctx *NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)

func (*Broker) OffsetLeaderEpochAction

func (b *Broker) OffsetLeaderEpochAction(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)

func (*Broker) OffsetListPartitionAction

func (b *Broker) OffsetListPartitionAction(addr net.Addr, topic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)

func (*Broker) PartitionNumAction

func (b *Broker) PartitionNumAction(addr net.Addr, topic string) (int, error)

func (*Broker) Produce

func (b *Broker) Produce(conn *knet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, error)

func (*Broker) ProduceAction

func (b *Broker) ProduceAction(addr net.Addr, topic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)

func (*Broker) ProducePulsarProducerSize

func (b *Broker) ProducePulsarProducerSize() int

func (*Broker) ReactApiVersion

func (b *Broker) ReactApiVersion(apiRequest *codec.ApiReq) (*codec.ApiResp, error)

func (*Broker) ReactError

func (b *Broker) ReactError(conn *knet.Conn, err error)

func (*Broker) ReactFetch

func (b *Broker) ReactFetch(ctx *NetworkContext, req *codec.FetchReq) (*codec.FetchResp, error)

func (*Broker) ReactFindCoordinator

func (b *Broker) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *Config) (*codec.FindCoordinatorResp, error)

func (*Broker) ReactHeartbeat

func (b *Broker) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *NetworkContext) (*codec.HeartbeatResp, error)

func (*Broker) ReactJoinGroup

func (b *Broker) ReactJoinGroup(ctx *NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)

func (*Broker) ReactLeaveGroup

func (b *Broker) ReactLeaveGroup(ctx *NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)

func (*Broker) ReactMetadata

func (b *Broker) ReactMetadata(ctx *NetworkContext, req *codec.MetadataReq, config *Config) (*codec.MetadataResp, error)

func (*Broker) ReactProduce

func (b *Broker) ReactProduce(ctx *NetworkContext, req *codec.ProduceReq, config *Config) (*codec.ProduceResp, error)

func (*Broker) ReactSasl

func (b *Broker) ReactSasl(req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)

func (*Broker) ReactSaslHandshakeAuth

func (b *Broker) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *NetworkContext) (*codec.SaslAuthenticateResp, error)

func (*Broker) ReactSyncGroup

func (b *Broker) ReactSyncGroup(ctx *NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)

func (*Broker) ReadError

func (b *Broker) ReadError(conn *knet.Conn, err error)

func (*Broker) Run

func (b *Broker) Run() error

func (*Broker) SaslAuthAction

func (b *Broker) SaslAuthAction(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode)

func (*Broker) SaslAuthConsumerGroupAction

func (b *Broker) SaslAuthConsumerGroupAction(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode)

func (*Broker) SaslAuthTopicAction

func (b *Broker) SaslAuthTopicAction(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode)

func (*Broker) SaslAuthenticate

func (b *Broker) SaslAuthenticate(conn *knet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, error)

func (*Broker) SaslHandshake

func (b *Broker) SaslHandshake(conn *knet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)

func (*Broker) SyncGroup

func (b *Broker) SyncGroup(conn *knet.Conn, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)

func (*Broker) TopicListAction

func (b *Broker) TopicListAction(addr net.Addr) ([]string, error)

func (*Broker) UnSupportedApi

func (b *Broker) UnSupportedApi(conn *knet.Conn, apiKey codec.ApiCode, apiVersion int16)

func (*Broker) WriteError

func (b *Broker) WriteError(conn *knet.Conn, err error)

type Config

type Config struct {
	PulsarConfig PulsarConfig
	NetConfig    knet.KafkaNetServerConfig
	RedisConfig  RedisConfig

	ClusterId     string
	NodeId        int32
	AdvertiseHost string
	AdvertisePort int

	NeedSasl            bool
	ContinuousOffset    bool
	RecordHeaderSupport bool

	MaxConn                  int32
	MaxConsumersPerGroup     int
	GroupMinSessionTimeoutMs int
	GroupMaxSessionTimeoutMs int
	ConsumerReceiveQueueSize int
	MaxFetchRecord           int
	MinFetchWaitMs           int
	MaxFetchWaitMs           int
	MaxProducerRecordSize    int
	MaxBatchSize             int
	// PulsarTenant use for kop internal
	PulsarTenant string
	// PulsarNamespace use for kop internal
	PulsarNamespace string
	// OffsetTopic use to store kafka offset
	OffsetTopic string
	// OffsetPersistentFrequency specifies the frequency at which committed offsets should be persistent
	// per topic per consumer group per partition, in seconds. 0 means immediately
	OffsetPersistentFrequency int
	// AutoCreateOffsetTopic if true, create offset topic automatically
	AutoCreateOffsetTopic bool
	// GroupCoordinatorType enum: Standalone, Cluster; default Standalone
	GroupCoordinatorType GroupCoordinatorType
	// InitialDelayedJoinMs
	InitialDelayedJoinMs int
	// RebalanceTickMs
	RebalanceTickMs int
	// TopicLevelMetricsDisable if true, disable topic level metrics
	TopicLevelMetricsDisable bool
	// NetworkDebugEnable print network layer log
	NetworkDebugEnable bool

	DebugKafkaTopicSet  set.Set[string]
	DebugPulsarTopicSet set.Set[string]

	LogFormatter logrus.Formatter
}

type Group

type Group struct {
	// contains filtered or unexported fields
}

type GroupCoordinator

type GroupCoordinator interface {
	HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
		protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

	HandleSyncGroup(username, groupId, memberId string, generation int,
		groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

	HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

	HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

	GetGroup(username, groupId string) (*Group, error)

	DelGroup(username, groupId string)
}

type GroupCoordinatorMemory

type GroupCoordinatorMemory struct {
	// contains filtered or unexported fields
}

func NewGroupCoordinatorMemory

func NewGroupCoordinatorMemory(config *Config) *GroupCoordinatorMemory

func (*GroupCoordinatorMemory) DelGroup

func (g *GroupCoordinatorMemory) DelGroup(username, groupId string)

func (*GroupCoordinatorMemory) GetGroup

func (g *GroupCoordinatorMemory) GetGroup(username, groupId string) (*Group, error)

func (*GroupCoordinatorMemory) HandleHeartBeat

func (g *GroupCoordinatorMemory) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

func (*GroupCoordinatorMemory) HandleJoinGroup

func (g *GroupCoordinatorMemory) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
	protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

func (*GroupCoordinatorMemory) HandleLeaveGroup

func (g *GroupCoordinatorMemory) HandleLeaveGroup(username, groupId string,
	members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

func (*GroupCoordinatorMemory) HandleSyncGroup

func (g *GroupCoordinatorMemory) HandleSyncGroup(username, groupId, memberId string, generation int,
	groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

type GroupCoordinatorRedis

type GroupCoordinatorRedis struct {
	// contains filtered or unexported fields
}

func NewGroupCoordinatorRedis

func NewGroupCoordinatorRedis(redisConfig RedisConfig) (*GroupCoordinatorRedis, error)

func (*GroupCoordinatorRedis) DelGroup

func (gcc *GroupCoordinatorRedis) DelGroup(username, groupId string)

func (*GroupCoordinatorRedis) GetGroup

func (gcc *GroupCoordinatorRedis) GetGroup(username, groupId string) (*Group, error)

func (*GroupCoordinatorRedis) HandleHeartBeat

func (gcc *GroupCoordinatorRedis) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp

func (*GroupCoordinatorRedis) HandleJoinGroup

func (gcc *GroupCoordinatorRedis) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int,
	protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)

func (*GroupCoordinatorRedis) HandleLeaveGroup

func (gcc *GroupCoordinatorRedis) HandleLeaveGroup(username, groupId string,
	members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)

func (*GroupCoordinatorRedis) HandleSyncGroup

func (gcc *GroupCoordinatorRedis) HandleSyncGroup(username, groupId, memberId string, generation int,
	groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)

type GroupCoordinatorType

type GroupCoordinatorType string
const (
	GroupCoordinatorTypeMemory GroupCoordinatorType = "memory"
	GroupCoordinatorTypeRedis  GroupCoordinatorType = "redis"
)

type GroupForRedis

type GroupForRedis struct {
	GroupStatus       GroupStatus                `json:"GroupStatus,omitempty"`
	MemberIds         map[string]*MemberForRedis `json:"MemberIds,omitempty"`
	CanRebalance      bool                       `json:"CanRebalance,omitempty"`
	GenerationId      int                        `json:"GenerationId,omitempty"`
	Leader            string                     `json:"Leader,omitempty"`
	SupportedProtocol string                     `json:"SupportedProtocol,omitempty"`
	ProtocolType      string                     `json:"ProtocolType,omitempty"`
}

type GroupStatus

type GroupStatus int
const (
	PreparingRebalance GroupStatus = 1 + iota
	CompletingRebalance
	Stable
	Dead
	Empty
)

type MemberForRedis

type MemberForRedis struct {
	ClientId         string            `json:"ClientId,omitempty"`
	MemberId         string            `json:"MemberId,omitempty"`
	JoinGenerationId int               `json:"JoinGenerationId,omitempty"`
	SyncGenerationId int               `json:"SyncGenerationId,omitempty"`
	Metadata         []byte            `json:"Metadata,omitempty"`
	Assignment       []byte            `json:"Assignment,omitempty"`
	ProtocolType     string            `json:"ProtocolType,omitempty"`
	Protocols        map[string][]byte `json:"Protocols,omitempty"`
}

type MemberInfo

type MemberInfo struct {
	// contains filtered or unexported fields
}

type MessageIdPair

type MessageIdPair struct {
	MessageId pulsar.MessageID
	Offset    int64
}

type MockMessageID

type MockMessageID struct{}

func (MockMessageID) BatchIdx

func (m MockMessageID) BatchIdx() int32

func (MockMessageID) BatchSize

func (m MockMessageID) BatchSize() int32

func (MockMessageID) EntryID

func (m MockMessageID) EntryID() int64

func (MockMessageID) LedgerID

func (m MockMessageID) LedgerID() int64

func (MockMessageID) PartitionIdx

func (m MockMessageID) PartitionIdx() int32

func (MockMessageID) Serialize

func (m MockMessageID) Serialize() []byte

func (MockMessageID) String

func (m MockMessageID) String() string

type MockProducer

type MockProducer struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*MockProducer) Close

func (mp *MockProducer) Close()

func (*MockProducer) Flush

func (mp *MockProducer) Flush() error

func (*MockProducer) LastSequenceID

func (mp *MockProducer) LastSequenceID() int64

func (*MockProducer) Name

func (mp *MockProducer) Name() string

func (*MockProducer) Send

func (*MockProducer) SendAsync

func (mp *MockProducer) SendAsync(ctx context.Context, msg *pulsar.ProducerMessage, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error))

func (*MockProducer) Topic

func (mp *MockProducer) Topic() string

type NetworkContext

type NetworkContext struct {
	Addr net.Addr
	// contains filtered or unexported fields
}

NetworkContext authed Record Kafka authentication status

func (*NetworkContext) Authed

func (n *NetworkContext) Authed(authed bool)

func (*NetworkContext) IsAuthed

func (n *NetworkContext) IsAuthed() bool

type OffsetManager

type OffsetManager interface {
	Start() chan bool

	CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error

	AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)

	RemoveOffset(username, kafkaTopic, groupId string, partition int) bool

	GenerateKey(username, kafkaTopic, groupId string, partition int) string

	RemoveOffsetWithKey(key string)

	GracefulSendOffsetMessages(map[string]*PulsarConsumerHandle) error

	GracefulSendOffsetMessage(partitionTopic string, consumerHandle *PulsarConsumerHandle) error

	GetOffsetMap() map[string]MessageIdPair

	Close()
}

func NewOffsetManager

func NewOffsetManager(client pulsar.Client, config *Config, admin *padmin.PulsarAdmin) (OffsetManager, error)

type OffsetManagerImpl

type OffsetManagerImpl struct {
	// contains filtered or unexported fields
}

func (*OffsetManagerImpl) AcquireOffset

func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)

func (*OffsetManagerImpl) Close

func (o *OffsetManagerImpl) Close()

func (*OffsetManagerImpl) CommitOffset

func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error

func (*OffsetManagerImpl) GenerateKey

func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string

func (*OffsetManagerImpl) GetOffsetMap

func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair

func (*OffsetManagerImpl) GracefulSendOffsetMessage

func (o *OffsetManagerImpl) GracefulSendOffsetMessage(key string, consumerHandle *PulsarConsumerHandle) error

func (*OffsetManagerImpl) GracefulSendOffsetMessages

func (o *OffsetManagerImpl) GracefulSendOffsetMessages(consumerHandle map[string]*PulsarConsumerHandle) error

func (*OffsetManagerImpl) RemoveOffset

func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool

func (*OffsetManagerImpl) RemoveOffsetWithKey

func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)

func (*OffsetManagerImpl) Start

func (o *OffsetManagerImpl) Start() chan bool

type PulsarConfig

type PulsarConfig struct {
	Host     string
	HttpPort int
	TcpPort  int
}

type PulsarConsumerHandle

type PulsarConsumerHandle struct {
	// contains filtered or unexported fields
}

type RedisConfig

type RedisConfig struct {
	Addr      []string
	Password  string
	DB        int
	RedisType RedisType
}

type RedisType

type RedisType string
const (
	RedisStandalone RedisType = "standalone"
	RedisCluster    RedisType = "cluster"
)

type Server

type Server interface {
	Auth(username string, password string, clientId string) (bool, error)

	AuthTopic(username string, password, clientId, topic, permissionType string) (bool, error)

	// AuthTopicGroup check if group is valid
	AuthTopicGroup(username string, password, clientId, consumerGroup string) (bool, error)

	// AuthGroupTopic check if group has permission on topic
	AuthGroupTopic(topic, groupId string) bool

	SubscriptionName(groupId string) (string, error)

	// PulsarTopic the corresponding topic in pulsar
	PulsarTopic(username, topic string) (string, error)

	PartitionNum(username, topic string) (int, error)

	ListTopic(username string) ([]string, error)

	HasFlowQuota(username, topic string) bool
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL