Documentation ¶
Index ¶
- type MessageQueueBroker
- func (b *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string
- func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error)
- func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error)
- func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stopCh chan struct{}) error
- func (b *MessageQueueBroker) ClosePublishers(ctx context.Context, request *mq_pb.ClosePublishersRequest) (resp *mq_pb.ClosePublishersResponse, err error)
- func (b *MessageQueueBroker) CloseSubscribers(ctx context.Context, request *mq_pb.CloseSubscribersRequest) (resp *mq_pb.CloseSubscribersResponse, err error)
- func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error)
- func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error)
- func (b *MessageQueueBroker) GetDataCenter() string
- func (b *MessageQueueBroker) GetFiler() pb.ServerAddress
- func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error)
- func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh chan string)
- func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error)
- func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error)
- func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
- func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error)
- func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error
- func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error
- func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, ...) (err error)
- func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error
- func (b *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error
- type MessageQueueBrokerOption
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MessageQueueBroker ¶
type MessageQueueBroker struct { mq_pb.UnimplementedSeaweedMessagingServer MasterClient *wdclient.MasterClient Balancer *pub_balancer.Balancer Coordinator *sub_coordinator.Coordinator // contains filtered or unexported fields }
func NewMessageBroker ¶
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error)
func (*MessageQueueBroker) AdjustedUrl ¶
func (b *MessageQueueBroker) AdjustedUrl(location *filer_pb.Location) string
func (*MessageQueueBroker) AssignTopicPartitions ¶
func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *mq_pb.AssignTopicPartitionsRequest) (*mq_pb.AssignTopicPartitionsResponse, error)
AssignTopicPartitions Runs on the assigned broker, to execute the topic partition assignment
func (*MessageQueueBroker) BalanceTopics ¶
func (b *MessageQueueBroker) BalanceTopics(ctx context.Context, request *mq_pb.BalanceTopicsRequest) (resp *mq_pb.BalanceTopicsResponse, err error)
func (*MessageQueueBroker) BrokerConnectToBalancer ¶
func (b *MessageQueueBroker) BrokerConnectToBalancer(brokerBalancer string, stopCh chan struct{}) error
BrokerConnectToBalancer connects to the broker balancer and sends stats
func (*MessageQueueBroker) ClosePublishers ¶
func (b *MessageQueueBroker) ClosePublishers(ctx context.Context, request *mq_pb.ClosePublishersRequest) (resp *mq_pb.ClosePublishersResponse, err error)
func (*MessageQueueBroker) CloseSubscribers ¶
func (b *MessageQueueBroker) CloseSubscribers(ctx context.Context, request *mq_pb.CloseSubscribersRequest) (resp *mq_pb.CloseSubscribersResponse, err error)
func (*MessageQueueBroker) ConfigureTopic ¶
func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.ConfigureTopicRequest) (resp *mq_pb.ConfigureTopicResponse, err error)
ConfigureTopic Runs on any broker, but proxied to the balancer if not the balancer It generates an assignments based on existing allocations, and then assign the partitions to the brokers.
func (*MessageQueueBroker) FindBrokerLeader ¶
func (b *MessageQueueBroker) FindBrokerLeader(c context.Context, request *mq_pb.FindBrokerLeaderRequest) (*mq_pb.FindBrokerLeaderResponse, error)
func (*MessageQueueBroker) GetDataCenter ¶
func (b *MessageQueueBroker) GetDataCenter() string
func (*MessageQueueBroker) GetFiler ¶
func (b *MessageQueueBroker) GetFiler() pb.ServerAddress
func (*MessageQueueBroker) GetOrGenerateLocalPartition ¶
func (b *MessageQueueBroker) GetOrGenerateLocalPartition(t topic.Topic, partition topic.Partition) (localTopicPartition *topic.LocalPartition, getOrGenError error)
func (*MessageQueueBroker) KeepConnectedToBrokerBalancer ¶
func (b *MessageQueueBroker) KeepConnectedToBrokerBalancer(newBrokerBalancerCh chan string)
func (*MessageQueueBroker) ListTopics ¶
func (b *MessageQueueBroker) ListTopics(ctx context.Context, request *mq_pb.ListTopicsRequest) (resp *mq_pb.ListTopicsResponse, err error)
func (*MessageQueueBroker) LookupTopicBrokers ¶
func (b *MessageQueueBroker) LookupTopicBrokers(ctx context.Context, request *mq_pb.LookupTopicBrokersRequest) (resp *mq_pb.LookupTopicBrokersResponse, err error)
LookupTopicBrokers returns the brokers that are serving the topic
func (*MessageQueueBroker) OnBrokerUpdate ¶
func (b *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
func (*MessageQueueBroker) PublishFollowMe ¶
func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_PublishFollowMeServer) (err error)
func (*MessageQueueBroker) PublishMessage ¶
func (b *MessageQueueBroker) PublishMessage(stream mq_pb.SeaweedMessaging_PublishMessageServer) error
func (*MessageQueueBroker) PublisherToPubBalancer ¶
func (b *MessageQueueBroker) PublisherToPubBalancer(stream mq_pb.SeaweedMessaging_PublisherToPubBalancerServer) error
PublisherToPubBalancer receives connections from brokers and collects stats
func (*MessageQueueBroker) SubscribeMessage ¶
func (b *MessageQueueBroker) SubscribeMessage(req *mq_pb.SubscribeMessageRequest, stream mq_pb.SeaweedMessaging_SubscribeMessageServer) (err error)
func (*MessageQueueBroker) SubscriberToSubCoordinator ¶
func (b *MessageQueueBroker) SubscriberToSubCoordinator(stream mq_pb.SeaweedMessaging_SubscriberToSubCoordinatorServer) error
SubscriberToSubCoordinator coordinates the subscribers
func (*MessageQueueBroker) WithFilerClient ¶
func (b *MessageQueueBroker) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error
type MessageQueueBrokerOption ¶
type MessageQueueBrokerOption struct { Masters map[string]pb.ServerAddress FilerGroup string DataCenter string Rack string DefaultReplication string MaxMB int Ip string Port int Cipher bool VolumeServerAccess string // how to access volume servers }
func (*MessageQueueBrokerOption) BrokerAddress ¶
func (option *MessageQueueBrokerOption) BrokerAddress() pb.ServerAddress
Source Files ¶
- broker_connect.go
- broker_grpc_admin.go
- broker_grpc_assign.go
- broker_grpc_balance.go
- broker_grpc_configure.go
- broker_grpc_lookup.go
- broker_grpc_pub.go
- broker_grpc_pub_balancer.go
- broker_grpc_pub_follow.go
- broker_grpc_sub.go
- broker_grpc_sub_coordinator.go
- broker_grpc_topic_partition_control.go
- broker_server.go
- broker_topic_conf_read_write.go
- broker_topic_partition_read_write.go
- broker_write.go
Click to show internal directories.
Click to hide internal directories.