Documentation
¶
Index ¶
- Constants
- func ConvertMsgId(messageId pulsar.MessageID) int64
- type Broker
- func (b *Broker) AcceptError(conn *knet.Conn, err error)
- func (b *Broker) ApiVersion(conn *knet.Conn, req *codec.ApiReq) (*codec.ApiResp, error)
- func (b *Broker) Authed(ctx *NetworkContext) bool
- func (b *Broker) Close()
- func (b *Broker) CloseServer()
- func (b *Broker) ConnectionClosed(conn *knet.Conn)
- func (b *Broker) ConnectionOpened(conn *knet.Conn)
- func (b *Broker) ConsumePulsarConsumerSize() int
- func (b *Broker) DisconnectAction(addr net.Addr)
- func (b *Broker) DisconnectAll()
- func (b *Broker) DisconnectAllLocalAddr(localAddr string)
- func (b *Broker) DisconnectConsumer(topic string)
- func (b *Broker) DisconnectRemoteAddr(addrList []string)
- func (b *Broker) Fetch(conn *knet.Conn, req *codec.FetchReq) (*codec.FetchResp, error)
- func (b *Broker) FetchAction(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error)
- func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, ...) (*codec.FetchPartitionResp, error)
- func (b *Broker) FindCoordinator(conn *knet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, error)
- func (b *Broker) GroupJoinAction(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
- func (b *Broker) GroupLeaveAction(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
- func (b *Broker) GroupSyncAction(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
- func (b *Broker) HeartBeatAction(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp
- func (b *Broker) Heartbeat(conn *knet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, error)
- func (b *Broker) JoinGroup(conn *knet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
- func (b *Broker) LeaveGroup(conn *knet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
- func (b *Broker) ListOffsets(conn *knet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)
- func (b *Broker) ListOffsetsVersion(ctx *NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)
- func (b *Broker) Metadata(conn *knet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, error)
- func (b *Broker) OffsetCommit(conn *knet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)
- func (b *Broker) OffsetCommitPartitionAction(addr net.Addr, topic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)
- func (b *Broker) OffsetCommitVersion(ctx *NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)
- func (b *Broker) OffsetFetch(conn *knet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)
- func (b *Broker) OffsetFetchAction(addr net.Addr, topic, clientID, groupID string, ...) (*codec.OffsetFetchPartitionResp, error)
- func (b *Broker) OffsetFetchVersion(ctx *NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)
- func (b *Broker) OffsetForLeaderEpoch(conn *knet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)
- func (b *Broker) OffsetForLeaderEpochVersion(ctx *NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)
- func (b *Broker) OffsetLeaderEpochAction(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)
- func (b *Broker) OffsetListPartitionAction(addr net.Addr, topic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)
- func (b *Broker) PartitionNumAction(addr net.Addr, topic string) (int, error)
- func (b *Broker) Produce(conn *knet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, error)
- func (b *Broker) ProduceAction(addr net.Addr, topic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)
- func (b *Broker) ProducePulsarProducerSize() int
- func (b *Broker) ReactApiVersion(apiRequest *codec.ApiReq) (*codec.ApiResp, error)
- func (b *Broker) ReactError(conn *knet.Conn, err error)
- func (b *Broker) ReactFetch(ctx *NetworkContext, req *codec.FetchReq) (*codec.FetchResp, error)
- func (b *Broker) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *Config) (*codec.FindCoordinatorResp, error)
- func (b *Broker) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *NetworkContext) (*codec.HeartbeatResp, error)
- func (b *Broker) ReactJoinGroup(ctx *NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
- func (b *Broker) ReactLeaveGroup(ctx *NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
- func (b *Broker) ReactMetadata(ctx *NetworkContext, req *codec.MetadataReq, config *Config) (*codec.MetadataResp, error)
- func (b *Broker) ReactProduce(ctx *NetworkContext, req *codec.ProduceReq, config *Config) (*codec.ProduceResp, error)
- func (b *Broker) ReactSasl(req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)
- func (b *Broker) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *NetworkContext) (*codec.SaslAuthenticateResp, error)
- func (b *Broker) ReactSyncGroup(ctx *NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
- func (b *Broker) ReadError(conn *knet.Conn, err error)
- func (b *Broker) Run() error
- func (b *Broker) SaslAuthAction(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode)
- func (b *Broker) SaslAuthConsumerGroupAction(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode)
- func (b *Broker) SaslAuthTopicAction(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode)
- func (b *Broker) SaslAuthenticate(conn *knet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, error)
- func (b *Broker) SaslHandshake(conn *knet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)
- func (b *Broker) SyncGroup(conn *knet.Conn, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
- func (b *Broker) TopicListAction(addr net.Addr) ([]string, error)
- func (b *Broker) UnSupportedApi(conn *knet.Conn, apiKey codec.ApiCode, apiVersion int16)
- func (b *Broker) WriteError(conn *knet.Conn, err error)
- type Config
- type Group
- type GroupCoordinator
- type GroupCoordinatorMemory
- func (g *GroupCoordinatorMemory) DelGroup(username, groupId string)
- func (g *GroupCoordinatorMemory) GetGroup(username, groupId string) (*Group, error)
- func (g *GroupCoordinatorMemory) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
- func (g *GroupCoordinatorMemory) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, ...) (*codec.JoinGroupResp, error)
- func (g *GroupCoordinatorMemory) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
- func (g *GroupCoordinatorMemory) HandleSyncGroup(username, groupId, memberId string, generation int, ...) (*codec.SyncGroupResp, error)
- type GroupCoordinatorRedis
- func (gcc *GroupCoordinatorRedis) DelGroup(username, groupId string)
- func (gcc *GroupCoordinatorRedis) GetGroup(username, groupId string) (*Group, error)
- func (gcc *GroupCoordinatorRedis) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
- func (gcc *GroupCoordinatorRedis) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, ...) (*codec.JoinGroupResp, error)
- func (gcc *GroupCoordinatorRedis) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
- func (gcc *GroupCoordinatorRedis) HandleSyncGroup(username, groupId, memberId string, generation int, ...) (*codec.SyncGroupResp, error)
- type GroupCoordinatorType
- type GroupForRedis
- type GroupStatus
- type MemberForRedis
- type MemberInfo
- type MessageIdPair
- type MockMessageID
- type MockProducer
- func (mp *MockProducer) Close()
- func (mp *MockProducer) Flush() error
- func (mp *MockProducer) LastSequenceID() int64
- func (mp *MockProducer) Name() string
- func (mp *MockProducer) Send(ctx context.Context, msg *pulsar.ProducerMessage) (pulsar.MessageID, error)
- func (mp *MockProducer) SendAsync(ctx context.Context, msg *pulsar.ProducerMessage, ...)
- func (mp *MockProducer) Topic() string
- type NetworkContext
- type OffsetManager
- type OffsetManagerImpl
- func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)
- func (o *OffsetManagerImpl) Close()
- func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error
- func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string
- func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair
- func (o *OffsetManagerImpl) GracefulSendOffsetMessage(key string, consumerHandle *PulsarConsumerHandle) error
- func (o *OffsetManagerImpl) GracefulSendOffsetMessages(consumerHandle map[string]*PulsarConsumerHandle) error
- func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool
- func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)
- func (o *OffsetManagerImpl) Start() chan bool
- type PulsarConfig
- type PulsarConsumerHandle
- type RedisConfig
- type RedisType
- type Server
Constants ¶
View Source
const (
EmptyMemberId = ""
)
Variables ¶
This section is empty.
Functions ¶
func ConvertMsgId ¶
Types ¶
type Broker ¶
type Broker struct { ConnMap syncx.Map[string, net.Conn] SaslMap syncx.Map[string, codec.SaslAuthenticateReq] // contains filtered or unexported fields }
func (*Broker) ApiVersion ¶
func (*Broker) Authed ¶
func (b *Broker) Authed(ctx *NetworkContext) bool
func (*Broker) CloseServer ¶
func (b *Broker) CloseServer()
func (*Broker) ConnectionClosed ¶
func (*Broker) ConnectionOpened ¶
func (*Broker) ConsumePulsarConsumerSize ¶
func (*Broker) DisconnectAction ¶
func (*Broker) DisconnectAll ¶
func (b *Broker) DisconnectAll()
func (*Broker) DisconnectAllLocalAddr ¶
func (*Broker) DisconnectConsumer ¶
func (*Broker) DisconnectRemoteAddr ¶
func (*Broker) FetchAction ¶
func (*Broker) FetchPartition ¶
func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, maxBytes int, minBytes int, maxWaitMs int) (*codec.FetchPartitionResp, error)
FetchPartition visible for testing
func (*Broker) FindCoordinator ¶
func (b *Broker) FindCoordinator(conn *knet.Conn, req *codec.FindCoordinatorReq) (*codec.FindCoordinatorResp, error)
func (*Broker) GroupJoinAction ¶
func (b *Broker) GroupJoinAction(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
func (*Broker) GroupLeaveAction ¶
func (b *Broker) GroupLeaveAction(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
func (*Broker) GroupSyncAction ¶
func (b *Broker) GroupSyncAction(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
func (*Broker) HeartBeatAction ¶
func (b *Broker) HeartBeatAction(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp
func (*Broker) Heartbeat ¶
func (b *Broker) Heartbeat(conn *knet.Conn, req *codec.HeartbeatReq) (*codec.HeartbeatResp, error)
func (*Broker) JoinGroup ¶
func (b *Broker) JoinGroup(conn *knet.Conn, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
func (*Broker) LeaveGroup ¶
func (b *Broker) LeaveGroup(conn *knet.Conn, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
func (*Broker) ListOffsets ¶
func (b *Broker) ListOffsets(conn *knet.Conn, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)
func (*Broker) ListOffsetsVersion ¶
func (b *Broker) ListOffsetsVersion(ctx *NetworkContext, req *codec.ListOffsetsReq) (*codec.ListOffsetsResp, error)
func (*Broker) Metadata ¶
func (b *Broker) Metadata(conn *knet.Conn, req *codec.MetadataReq) (*codec.MetadataResp, error)
func (*Broker) OffsetCommit ¶
func (b *Broker) OffsetCommit(conn *knet.Conn, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)
func (*Broker) OffsetCommitPartitionAction ¶
func (b *Broker) OffsetCommitPartitionAction(addr net.Addr, topic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)
func (*Broker) OffsetCommitVersion ¶
func (b *Broker) OffsetCommitVersion(ctx *NetworkContext, req *codec.OffsetCommitReq) (*codec.OffsetCommitResp, error)
func (*Broker) OffsetFetch ¶
func (b *Broker) OffsetFetch(conn *knet.Conn, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)
func (*Broker) OffsetFetchAction ¶
func (b *Broker) OffsetFetchAction(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error)
func (*Broker) OffsetFetchVersion ¶
func (b *Broker) OffsetFetchVersion(ctx *NetworkContext, req *codec.OffsetFetchReq) (*codec.OffsetFetchResp, error)
func (*Broker) OffsetForLeaderEpoch ¶
func (b *Broker) OffsetForLeaderEpoch(conn *knet.Conn, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)
func (*Broker) OffsetForLeaderEpochVersion ¶
func (b *Broker) OffsetForLeaderEpochVersion(ctx *NetworkContext, req *codec.OffsetForLeaderEpochReq) (*codec.OffsetForLeaderEpochResp, error)
func (*Broker) OffsetLeaderEpochAction ¶
func (b *Broker) OffsetLeaderEpochAction(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)
func (*Broker) OffsetListPartitionAction ¶
func (b *Broker) OffsetListPartitionAction(addr net.Addr, topic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)
func (*Broker) PartitionNumAction ¶
func (*Broker) Produce ¶
func (b *Broker) Produce(conn *knet.Conn, req *codec.ProduceReq) (*codec.ProduceResp, error)
func (*Broker) ProduceAction ¶
func (b *Broker) ProduceAction(addr net.Addr, topic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)
func (*Broker) ProducePulsarProducerSize ¶
func (*Broker) ReactApiVersion ¶
func (*Broker) ReactFetch ¶
func (*Broker) ReactFindCoordinator ¶
func (b *Broker) ReactFindCoordinator(req *codec.FindCoordinatorReq, config *Config) (*codec.FindCoordinatorResp, error)
func (*Broker) ReactHeartbeat ¶
func (b *Broker) ReactHeartbeat(heartbeatReqV4 *codec.HeartbeatReq, context *NetworkContext) (*codec.HeartbeatResp, error)
func (*Broker) ReactJoinGroup ¶
func (b *Broker) ReactJoinGroup(ctx *NetworkContext, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
func (*Broker) ReactLeaveGroup ¶
func (b *Broker) ReactLeaveGroup(ctx *NetworkContext, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
func (*Broker) ReactMetadata ¶
func (b *Broker) ReactMetadata(ctx *NetworkContext, req *codec.MetadataReq, config *Config) (*codec.MetadataResp, error)
func (*Broker) ReactProduce ¶
func (b *Broker) ReactProduce(ctx *NetworkContext, req *codec.ProduceReq, config *Config) (*codec.ProduceResp, error)
func (*Broker) ReactSasl ¶
func (b *Broker) ReactSasl(req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)
func (*Broker) ReactSaslHandshakeAuth ¶
func (b *Broker) ReactSaslHandshakeAuth(req *codec.SaslAuthenticateReq, context *NetworkContext) (*codec.SaslAuthenticateResp, error)
func (*Broker) ReactSyncGroup ¶
func (b *Broker) ReactSyncGroup(ctx *NetworkContext, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
func (*Broker) SaslAuthAction ¶
func (*Broker) SaslAuthConsumerGroupAction ¶
func (*Broker) SaslAuthTopicAction ¶
func (*Broker) SaslAuthenticate ¶
func (b *Broker) SaslAuthenticate(conn *knet.Conn, req *codec.SaslAuthenticateReq) (*codec.SaslAuthenticateResp, error)
func (*Broker) SaslHandshake ¶
func (b *Broker) SaslHandshake(conn *knet.Conn, req *codec.SaslHandshakeReq) (*codec.SaslHandshakeResp, error)
func (*Broker) SyncGroup ¶
func (b *Broker) SyncGroup(conn *knet.Conn, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
func (*Broker) UnSupportedApi ¶
type Config ¶
type Config struct { PulsarConfig PulsarConfig NetConfig knet.KafkaNetServerConfig RedisConfig RedisConfig ClusterId string NodeId int32 AdvertiseHost string AdvertisePort int NeedSasl bool ContinuousOffset bool RecordHeaderSupport bool MaxConn int32 MaxConsumersPerGroup int GroupMinSessionTimeoutMs int GroupMaxSessionTimeoutMs int ConsumerReceiveQueueSize int MaxFetchRecord int MinFetchWaitMs int MaxFetchWaitMs int MaxProducerRecordSize int MaxBatchSize int // PulsarTenant use for kop internal PulsarTenant string // PulsarNamespace use for kop internal PulsarNamespace string // OffsetTopic use to store kafka offset OffsetTopic string // OffsetPersistentFrequency specifies the frequency at which committed offsets should be persistent // per topic per consumer group per partition, in seconds. 0 means immediately OffsetPersistentFrequency int // AutoCreateOffsetTopic if true, create offset topic automatically AutoCreateOffsetTopic bool // GroupCoordinatorType enum: Standalone, Cluster; default Standalone GroupCoordinatorType GroupCoordinatorType // InitialDelayedJoinMs InitialDelayedJoinMs int // RebalanceTickMs RebalanceTickMs int // TopicLevelMetricsDisable if true, disable topic level metrics TopicLevelMetricsDisable bool // NetworkDebugEnable print network layer log NetworkDebugEnable bool DebugKafkaTopicSet set.Set[string] DebugPulsarTopicSet set.Set[string] LogFormatter logrus.Formatter }
type GroupCoordinator ¶
type GroupCoordinator interface { HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp GetGroup(username, groupId string) (*Group, error) DelGroup(username, groupId string) }
type GroupCoordinatorMemory ¶
type GroupCoordinatorMemory struct {
// contains filtered or unexported fields
}
func NewGroupCoordinatorMemory ¶
func NewGroupCoordinatorMemory(config *Config) *GroupCoordinatorMemory
func (*GroupCoordinatorMemory) DelGroup ¶
func (g *GroupCoordinatorMemory) DelGroup(username, groupId string)
func (*GroupCoordinatorMemory) GetGroup ¶
func (g *GroupCoordinatorMemory) GetGroup(username, groupId string) (*Group, error)
func (*GroupCoordinatorMemory) HandleHeartBeat ¶
func (g *GroupCoordinatorMemory) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
func (*GroupCoordinatorMemory) HandleJoinGroup ¶
func (g *GroupCoordinatorMemory) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)
func (*GroupCoordinatorMemory) HandleLeaveGroup ¶
func (g *GroupCoordinatorMemory) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
func (*GroupCoordinatorMemory) HandleSyncGroup ¶
func (g *GroupCoordinatorMemory) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)
type GroupCoordinatorRedis ¶
type GroupCoordinatorRedis struct {
// contains filtered or unexported fields
}
func NewGroupCoordinatorRedis ¶
func NewGroupCoordinatorRedis(redisConfig RedisConfig) (*GroupCoordinatorRedis, error)
func (*GroupCoordinatorRedis) DelGroup ¶
func (gcc *GroupCoordinatorRedis) DelGroup(username, groupId string)
func (*GroupCoordinatorRedis) GetGroup ¶
func (gcc *GroupCoordinatorRedis) GetGroup(username, groupId string) (*Group, error)
func (*GroupCoordinatorRedis) HandleHeartBeat ¶
func (gcc *GroupCoordinatorRedis) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
func (*GroupCoordinatorRedis) HandleJoinGroup ¶
func (gcc *GroupCoordinatorRedis) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)
func (*GroupCoordinatorRedis) HandleLeaveGroup ¶
func (gcc *GroupCoordinatorRedis) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
func (*GroupCoordinatorRedis) HandleSyncGroup ¶
func (gcc *GroupCoordinatorRedis) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)
type GroupCoordinatorType ¶
type GroupCoordinatorType string
const ( GroupCoordinatorTypeMemory GroupCoordinatorType = "memory" GroupCoordinatorTypeRedis GroupCoordinatorType = "redis" )
type GroupForRedis ¶
type GroupForRedis struct { GroupStatus GroupStatus `json:"GroupStatus,omitempty"` MemberIds map[string]*MemberForRedis `json:"MemberIds,omitempty"` CanRebalance bool `json:"CanRebalance,omitempty"` GenerationId int `json:"GenerationId,omitempty"` Leader string `json:"Leader,omitempty"` SupportedProtocol string `json:"SupportedProtocol,omitempty"` ProtocolType string `json:"ProtocolType,omitempty"` }
type GroupStatus ¶
type GroupStatus int
const ( PreparingRebalance GroupStatus = 1 + iota CompletingRebalance Stable Dead Empty )
type MemberForRedis ¶
type MemberForRedis struct { ClientId string `json:"ClientId,omitempty"` MemberId string `json:"MemberId,omitempty"` JoinGenerationId int `json:"JoinGenerationId,omitempty"` SyncGenerationId int `json:"SyncGenerationId,omitempty"` Metadata []byte `json:"Metadata,omitempty"` Assignment []byte `json:"Assignment,omitempty"` ProtocolType string `json:"ProtocolType,omitempty"` Protocols map[string][]byte `json:"Protocols,omitempty"` }
type MemberInfo ¶
type MemberInfo struct {
// contains filtered or unexported fields
}
type MessageIdPair ¶
type MockMessageID ¶
type MockMessageID struct{}
func (MockMessageID) BatchIdx ¶
func (m MockMessageID) BatchIdx() int32
func (MockMessageID) BatchSize ¶
func (m MockMessageID) BatchSize() int32
func (MockMessageID) EntryID ¶
func (m MockMessageID) EntryID() int64
func (MockMessageID) LedgerID ¶
func (m MockMessageID) LedgerID() int64
func (MockMessageID) PartitionIdx ¶
func (m MockMessageID) PartitionIdx() int32
func (MockMessageID) Serialize ¶
func (m MockMessageID) Serialize() []byte
func (MockMessageID) String ¶
func (m MockMessageID) String() string
type MockProducer ¶
func (*MockProducer) Close ¶
func (mp *MockProducer) Close()
func (*MockProducer) Flush ¶
func (mp *MockProducer) Flush() error
func (*MockProducer) LastSequenceID ¶
func (mp *MockProducer) LastSequenceID() int64
func (*MockProducer) Name ¶
func (mp *MockProducer) Name() string
func (*MockProducer) Send ¶
func (mp *MockProducer) Send(ctx context.Context, msg *pulsar.ProducerMessage) (pulsar.MessageID, error)
func (*MockProducer) SendAsync ¶
func (mp *MockProducer) SendAsync(ctx context.Context, msg *pulsar.ProducerMessage, callback func(pulsar.MessageID, *pulsar.ProducerMessage, error))
func (*MockProducer) Topic ¶
func (mp *MockProducer) Topic() string
type NetworkContext ¶
NetworkContext authed Record Kafka authentication status
func (*NetworkContext) Authed ¶
func (n *NetworkContext) Authed(authed bool)
func (*NetworkContext) IsAuthed ¶
func (n *NetworkContext) IsAuthed() bool
type OffsetManager ¶
type OffsetManager interface { Start() chan bool CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool GenerateKey(username, kafkaTopic, groupId string, partition int) string RemoveOffsetWithKey(key string) GracefulSendOffsetMessages(map[string]*PulsarConsumerHandle) error GracefulSendOffsetMessage(partitionTopic string, consumerHandle *PulsarConsumerHandle) error GetOffsetMap() map[string]MessageIdPair Close() }
func NewOffsetManager ¶
func NewOffsetManager(client pulsar.Client, config *Config, admin *padmin.PulsarAdmin) (OffsetManager, error)
type OffsetManagerImpl ¶
type OffsetManagerImpl struct {
// contains filtered or unexported fields
}
func (*OffsetManagerImpl) AcquireOffset ¶
func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)
func (*OffsetManagerImpl) Close ¶
func (o *OffsetManagerImpl) Close()
func (*OffsetManagerImpl) CommitOffset ¶
func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error
func (*OffsetManagerImpl) GenerateKey ¶
func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string
func (*OffsetManagerImpl) GetOffsetMap ¶
func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair
func (*OffsetManagerImpl) GracefulSendOffsetMessage ¶
func (o *OffsetManagerImpl) GracefulSendOffsetMessage(key string, consumerHandle *PulsarConsumerHandle) error
func (*OffsetManagerImpl) GracefulSendOffsetMessages ¶
func (o *OffsetManagerImpl) GracefulSendOffsetMessages(consumerHandle map[string]*PulsarConsumerHandle) error
func (*OffsetManagerImpl) RemoveOffset ¶
func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool
func (*OffsetManagerImpl) RemoveOffsetWithKey ¶
func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)
func (*OffsetManagerImpl) Start ¶
func (o *OffsetManagerImpl) Start() chan bool
type PulsarConfig ¶
type PulsarConsumerHandle ¶
type PulsarConsumerHandle struct {
// contains filtered or unexported fields
}
type RedisConfig ¶
type Server ¶
type Server interface { Auth(username string, password string, clientId string) (bool, error) AuthTopic(username string, password, clientId, topic, permissionType string) (bool, error) // AuthTopicGroup check if group is valid AuthTopicGroup(username string, password, clientId, consumerGroup string) (bool, error) // AuthGroupTopic check if group has permission on topic AuthGroupTopic(topic, groupId string) bool SubscriptionName(groupId string) (string, error) // PulsarTopic the corresponding topic in pulsar PulsarTopic(username, topic string) (string, error) PartitionNum(username, topic string) (int, error) ListTopic(username string) ([]string, error) HasFlowQuota(username, topic string) bool }
Source Files
¶
- group_coordinator.go
- group_coordinator_api.go
- group_coordinator_cluster.go
- group_coordinator_standalone.go
- kop.go
- kop_api_versions.go
- kop_fetch.go
- kop_find_coordinator.go
- kop_heartbeat.go
- kop_impl.go
- kop_join_group.go
- kop_leave_group.go
- kop_list_offsets.go
- kop_matedata.go
- kop_offset_commit.go
- kop_offset_fetch.go
- kop_offset_leader_epoch.go
- kop_produce.go
- kop_sasl_authenticate.go
- kop_sasl_handshake.go
- kop_sync_group.go
- kop_util.go
- member_info.go
- message_id_pair.go
- mock.go
- network_context.go
- offset_helper.go
- offset_manager.go
- offset_manager_impl.go
- pulsar_consumer_handle.go
- user_info.go
Click to show internal directories.
Click to hide internal directories.