Documentation ¶
Index ¶
- Constants
- func ConvertMsgId(messageId pulsar.MessageID) int64
- type Broker
- func (b *Broker) AuthGroupTopic(topic, groupId string) bool
- func (b *Broker) Close()
- func (b *Broker) Disconnect(addr net.Addr)
- func (b *Broker) Fetch(addr net.Addr, req *codec.FetchReq) ([]*codec.FetchTopicResp, error)
- func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, ...) *codec.FetchPartitionResp
- func (b *Broker) GetOffsetManager() OffsetManager
- func (b *Broker) GroupJoin(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
- func (b *Broker) GroupLeave(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
- func (b *Broker) GroupSync(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
- func (b *Broker) HeartBeat(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp
- func (b *Broker) OffsetCommitPartition(addr net.Addr, kafkaTopic, clientID string, ...) (*codec.OffsetCommitPartitionResp, error)
- func (b *Broker) OffsetFetch(addr net.Addr, topic, clientID, groupID string, ...) (*codec.OffsetFetchPartitionResp, error)
- func (b *Broker) OffsetLeaderEpoch(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)
- func (b *Broker) OffsetListPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)
- func (b *Broker) PartitionNum(addr net.Addr, kafkaTopic string) (int, error)
- func (b *Broker) Produce(addr net.Addr, kafkaTopic string, partition int, ...) (*codec.ProducePartitionResp, error)
- func (b *Broker) Run() error
- func (b *Broker) SaslAuth(addr net.Addr, req codec.SaslAuthenticateReq) (bool, codec.ErrorCode)
- func (b *Broker) SaslAuthConsumerGroup(addr net.Addr, req codec.SaslAuthenticateReq, consumerGroup string) (bool, codec.ErrorCode)
- func (b *Broker) SaslAuthTopic(addr net.Addr, req codec.SaslAuthenticateReq, topic, permissionType string) (bool, codec.ErrorCode)
- func (b *Broker) TopicList(addr net.Addr) ([]string, error)
- type Config
- type ConsumerMetadata
- type Group
- type GroupCoordinator
- type GroupCoordinatorCluster
- func (gcc *GroupCoordinatorCluster) GetGroup(username, groupId string) (*Group, error)
- func (gcc *GroupCoordinatorCluster) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
- func (gcc *GroupCoordinatorCluster) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, ...) (*codec.JoinGroupResp, error)
- func (gcc *GroupCoordinatorCluster) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
- func (gcc *GroupCoordinatorCluster) HandleSyncGroup(username, groupId, memberId string, generation int, ...) (*codec.SyncGroupResp, error)
- type GroupCoordinatorStandalone
- func (g *GroupCoordinatorStandalone) GetGroup(username, groupId string) (*Group, error)
- func (g *GroupCoordinatorStandalone) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
- func (g *GroupCoordinatorStandalone) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, ...) (*codec.JoinGroupResp, error)
- func (g *GroupCoordinatorStandalone) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
- func (g *GroupCoordinatorStandalone) HandleSyncGroup(username, groupId, memberId string, generation int, ...) (*codec.SyncGroupResp, error)
- type GroupCoordinatorType
- type GroupStatus
- type KafsarConfig
- type LocalSpan
- type MemberInfo
- type MessageIdPair
- type NoErrorTracer
- type OffsetManager
- type OffsetManagerImpl
- func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)
- func (o *OffsetManagerImpl) Close()
- func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error
- func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string
- func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair
- func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool
- func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)
- func (o *OffsetManagerImpl) Start() chan bool
- type OtelTracerConfig
- func (ot *OtelTracerConfig) EndSpan(span LocalSpan, logs ...string)
- func (ot *OtelTracerConfig) IsDisabled() bool
- func (ot *OtelTracerConfig) NewProvider()
- func (ot *OtelTracerConfig) NewSpan(ctx context.Context, opName string, logs ...string) LocalSpan
- func (ot *OtelTracerConfig) NewSubSpan(span LocalSpan, opName string, logs ...string) LocalSpan
- func (ot *OtelTracerConfig) SetAttribute(span LocalSpan, k, v string)
- type PulsarConfig
- type Server
- type ServerControl
- type SkywalkingTracerConfig
- func (st *SkywalkingTracerConfig) EndSpan(span LocalSpan, logs ...string)
- func (st *SkywalkingTracerConfig) IsDisabled() bool
- func (st *SkywalkingTracerConfig) NewProvider()
- func (st *SkywalkingTracerConfig) NewSpan(ctx context.Context, operateName string, logs ...string) LocalSpan
- func (st *SkywalkingTracerConfig) NewSubSpan(span LocalSpan, operateName string, logs ...string) LocalSpan
- func (st *SkywalkingTracerConfig) SetAttribute(span LocalSpan, k, v string)
- type TraceConfig
- type TraceType
Constants ¶
View Source
const (
EmptyMemberId = ""
)
Variables ¶
This section is empty.
Functions ¶
func ConvertMsgId ¶
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
func SetupKafsar ¶
func (*Broker) AuthGroupTopic ¶
func (*Broker) Disconnect ¶
func (*Broker) FetchPartition ¶
func (b *Broker) FetchPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.FetchPartitionReq, maxBytes int, minBytes int, maxWaitMs int, span LocalSpan) *codec.FetchPartitionResp
FetchPartition visible for testing
func (*Broker) GetOffsetManager ¶
func (b *Broker) GetOffsetManager() OffsetManager
func (*Broker) GroupJoin ¶
func (b *Broker) GroupJoin(addr net.Addr, req *codec.JoinGroupReq) (*codec.JoinGroupResp, error)
func (*Broker) GroupLeave ¶
func (b *Broker) GroupLeave(addr net.Addr, req *codec.LeaveGroupReq) (*codec.LeaveGroupResp, error)
func (*Broker) GroupSync ¶
func (b *Broker) GroupSync(addr net.Addr, req *codec.SyncGroupReq) (*codec.SyncGroupResp, error)
func (*Broker) HeartBeat ¶
func (b *Broker) HeartBeat(addr net.Addr, req codec.HeartbeatReq) *codec.HeartbeatResp
func (*Broker) OffsetCommitPartition ¶
func (b *Broker) OffsetCommitPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.OffsetCommitPartitionReq) (*codec.OffsetCommitPartitionResp, error)
func (*Broker) OffsetFetch ¶
func (b *Broker) OffsetFetch(addr net.Addr, topic, clientID, groupID string, req *codec.OffsetFetchPartitionReq) (*codec.OffsetFetchPartitionResp, error)
func (*Broker) OffsetLeaderEpoch ¶
func (b *Broker) OffsetLeaderEpoch(addr net.Addr, topic string, req *codec.OffsetLeaderEpochPartitionReq) (*codec.OffsetForLeaderEpochPartitionResp, error)
func (*Broker) OffsetListPartition ¶
func (b *Broker) OffsetListPartition(addr net.Addr, kafkaTopic, clientID string, req *codec.ListOffsetsPartition) (*codec.ListOffsetsPartitionResp, error)
func (*Broker) PartitionNum ¶
func (*Broker) Produce ¶
func (b *Broker) Produce(addr net.Addr, kafkaTopic string, partition int, req *codec.ProducePartitionReq) (*codec.ProducePartitionResp, error)
func (*Broker) SaslAuthConsumerGroup ¶
func (*Broker) SaslAuthTopic ¶
type Config ¶
type Config struct { PulsarConfig PulsarConfig KafsarConfig KafsarConfig TraceConfig NoErrorTracer }
type ConsumerMetadata ¶
type ConsumerMetadata struct {
// contains filtered or unexported fields
}
type GroupCoordinator ¶
type GroupCoordinator interface { HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp GetGroup(username, groupId string) (*Group, error) }
type GroupCoordinatorCluster ¶
type GroupCoordinatorCluster struct { }
func NewGroupCoordinatorCluster ¶
func NewGroupCoordinatorCluster() *GroupCoordinatorCluster
func (*GroupCoordinatorCluster) GetGroup ¶
func (gcc *GroupCoordinatorCluster) GetGroup(username, groupId string) (*Group, error)
func (*GroupCoordinatorCluster) HandleHeartBeat ¶
func (gcc *GroupCoordinatorCluster) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
func (*GroupCoordinatorCluster) HandleJoinGroup ¶
func (gcc *GroupCoordinatorCluster) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)
func (*GroupCoordinatorCluster) HandleLeaveGroup ¶
func (gcc *GroupCoordinatorCluster) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
func (*GroupCoordinatorCluster) HandleSyncGroup ¶
func (gcc *GroupCoordinatorCluster) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)
type GroupCoordinatorStandalone ¶
type GroupCoordinatorStandalone struct {
// contains filtered or unexported fields
}
func NewGroupCoordinatorStandalone ¶
func NewGroupCoordinatorStandalone(pulsarConfig PulsarConfig, kafsarConfig KafsarConfig, pulsarClient pulsar.Client) *GroupCoordinatorStandalone
func (*GroupCoordinatorStandalone) GetGroup ¶
func (g *GroupCoordinatorStandalone) GetGroup(username, groupId string) (*Group, error)
func (*GroupCoordinatorStandalone) HandleHeartBeat ¶
func (g *GroupCoordinatorStandalone) HandleHeartBeat(username, groupId, memberId string) *codec.HeartbeatResp
func (*GroupCoordinatorStandalone) HandleJoinGroup ¶
func (g *GroupCoordinatorStandalone) HandleJoinGroup(username, groupId, memberId, clientId, protocolType string, sessionTimeoutMs int, protocols []*codec.GroupProtocol) (*codec.JoinGroupResp, error)
func (*GroupCoordinatorStandalone) HandleLeaveGroup ¶
func (g *GroupCoordinatorStandalone) HandleLeaveGroup(username, groupId string, members []*codec.LeaveGroupMember) (*codec.LeaveGroupResp, error)
func (*GroupCoordinatorStandalone) HandleSyncGroup ¶
func (g *GroupCoordinatorStandalone) HandleSyncGroup(username, groupId, memberId string, generation int, groupAssignments []*codec.GroupAssignment) (*codec.SyncGroupResp, error)
type GroupCoordinatorType ¶
type GroupCoordinatorType int
const ( Standalone GroupCoordinatorType = 0 + iota Cluster )
type GroupStatus ¶
type GroupStatus int
const ( PreparingRebalance GroupStatus = 1 + iota CompletingRebalance Stable Dead Empty )
type KafsarConfig ¶
type KafsarConfig struct { // network config GnetConfig kgnet.GnetServerConfig NeedSasl bool MaxConn int32 // Kafka protocol config ClusterId string AdvertiseHost string AdvertisePort int MaxProducerRecordSize int MaxBatchSize int MaxConsumersPerGroup int GroupMinSessionTimeoutMs int GroupMaxSessionTimeoutMs int ConsumerReceiveQueueSize int MaxFetchRecord int MinFetchWaitMs int MaxFetchWaitMs int ContinuousOffset bool // PulsarTenant use for kafsar internal PulsarTenant string // PulsarNamespace use for kafsar internal PulsarNamespace string // OffsetTopic use to store kafka offset OffsetTopic string // GroupCoordinatorType enum: Standalone, Cluster; default Standalone GroupCoordinatorType GroupCoordinatorType // InitialDelayedJoinMs InitialDelayedJoinMs int // RebalanceTickMs RebalanceTickMs int }
type MemberInfo ¶
type MemberInfo struct {
// contains filtered or unexported fields
}
type MessageIdPair ¶
type NoErrorTracer ¶
type NoErrorTracer interface { // IsDisabled check whether tracer disabled IsDisabled() bool // NewProvider create a trace driver NewProvider() // NewSpan create span NewSpan(ctx context.Context, operateName string, logs ...string) LocalSpan // SetAttribute set attribute from localSpan SetAttribute(span LocalSpan, k, v string) // NewSubSpan create child span from localSpan NewSubSpan(span LocalSpan, operateName string, logs ...string) LocalSpan // EndSpan close span EndSpan(span LocalSpan, logs ...string) }
type OffsetManager ¶
type OffsetManager interface { Start() chan bool CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool GenerateKey(username, kafkaTopic, groupId string, partition int) string RemoveOffsetWithKey(key string) GetOffsetMap() map[string]MessageIdPair Close() }
func NewOffsetManager ¶
func NewOffsetManager(client pulsar.Client, config KafsarConfig, pulsarHttpAddr string) (OffsetManager, error)
type OffsetManagerImpl ¶
type OffsetManagerImpl struct {
// contains filtered or unexported fields
}
func (*OffsetManagerImpl) AcquireOffset ¶
func (o *OffsetManagerImpl) AcquireOffset(username, kafkaTopic, groupId string, partition int) (MessageIdPair, bool)
func (*OffsetManagerImpl) Close ¶
func (o *OffsetManagerImpl) Close()
func (*OffsetManagerImpl) CommitOffset ¶
func (o *OffsetManagerImpl) CommitOffset(username, kafkaTopic, groupId string, partition int, pair MessageIdPair) error
func (*OffsetManagerImpl) GenerateKey ¶
func (o *OffsetManagerImpl) GenerateKey(username, kafkaTopic, groupId string, partition int) string
func (*OffsetManagerImpl) GetOffsetMap ¶
func (o *OffsetManagerImpl) GetOffsetMap() map[string]MessageIdPair
func (*OffsetManagerImpl) RemoveOffset ¶
func (o *OffsetManagerImpl) RemoveOffset(username, kafkaTopic, groupId string, partition int) bool
func (*OffsetManagerImpl) RemoveOffsetWithKey ¶
func (o *OffsetManagerImpl) RemoveOffsetWithKey(key string)
func (*OffsetManagerImpl) Start ¶
func (o *OffsetManagerImpl) Start() chan bool
type OtelTracerConfig ¶
type OtelTracerConfig TraceConfig
func (*OtelTracerConfig) EndSpan ¶
func (ot *OtelTracerConfig) EndSpan(span LocalSpan, logs ...string)
func (*OtelTracerConfig) IsDisabled ¶
func (ot *OtelTracerConfig) IsDisabled() bool
func (*OtelTracerConfig) NewProvider ¶
func (ot *OtelTracerConfig) NewProvider()
func (*OtelTracerConfig) NewSubSpan ¶
func (ot *OtelTracerConfig) NewSubSpan(span LocalSpan, opName string, logs ...string) LocalSpan
func (*OtelTracerConfig) SetAttribute ¶
func (ot *OtelTracerConfig) SetAttribute(span LocalSpan, k, v string)
type PulsarConfig ¶
type Server ¶
type Server interface { Auth(username string, password string, clientId string) (bool, error) AuthTopic(username string, password, clientId, topic, permissionType string) (bool, error) AuthTopicGroup(username string, password, clientId, consumerGroup string) (bool, error) AuthGroupTopic(topic, groupId string) bool SubscriptionName(groupId string) (string, error) // PulsarTopic the corresponding topic in pulsar PulsarTopic(username, topic string) (string, error) PartitionNum(username, topic string) (int, error) ListTopic(username string) ([]string, error) HasFlowQuota(username, topic string) bool }
type ServerControl ¶
type ServerControl struct {
// contains filtered or unexported fields
}
func (*ServerControl) DisConnect ¶
func (s *ServerControl) DisConnect(addr net.Addr) error
type SkywalkingTracerConfig ¶
type SkywalkingTracerConfig TraceConfig
func (*SkywalkingTracerConfig) EndSpan ¶
func (st *SkywalkingTracerConfig) EndSpan(span LocalSpan, logs ...string)
func (*SkywalkingTracerConfig) IsDisabled ¶
func (st *SkywalkingTracerConfig) IsDisabled() bool
func (*SkywalkingTracerConfig) NewProvider ¶
func (st *SkywalkingTracerConfig) NewProvider()
func (*SkywalkingTracerConfig) NewSubSpan ¶
func (st *SkywalkingTracerConfig) NewSubSpan(span LocalSpan, operateName string, logs ...string) LocalSpan
func (*SkywalkingTracerConfig) SetAttribute ¶
func (st *SkywalkingTracerConfig) SetAttribute(span LocalSpan, k, v string)
type TraceConfig ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.