pulsarKit

package
v3.1.112 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 24, 2024 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UrlPrefix = "pulsar://"
)

Variables

View Source
var (
	NotSetupError = errorKit.Newf("haven’t been set up correctly")
)

Functions

func GetTag

func GetTag(msg pulsar.Message) string

GetTag

@param msg 不能为nil @return 可能为""

func MustSetUp

func MustSetUp(pulsarConfig *Config, verifyConfig *VerifyConfig)

MustSetUp

PS: Pulsar服务中途挂掉的话,恢复后,Consumer实例、Producer实例还能正常工作.

@param verifyConfig 可以为nil(将不验证,但不推荐这么干)

func NewClient

func NewClient(addresses []string, logPath string) (pulsar.Client, error)

NewClient

@param logPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func SetTag

func SetTag(pMsg *pulsar.ProducerMessage, tag string)

SetTag

@param pMsg 不能为nil @param tag 可以为""

func SetUp

func SetUp(pulsarConfig *Config, verifyConfig *VerifyConfig) (err error)

Types

type Config

type Config struct {
	// Addrs Broker地址(s)
	Addrs []string `json:"addrs" yaml:"addrs" validate:"required,gte=1,dive,hostname_port"`
}

type Consumer

type Consumer struct {
	pulsar.Client
	pulsar.Consumer
}

func NewConsumer

func NewConsumer(ctx context.Context, options pulsar.ConsumerOptions, clientLogPath string) (*Consumer, error)

NewConsumer

前提: 成功调用 SetUp() || MustSetUp().

@param options 必需属性: Topic、SubscriptionName、Type @param logPath 客户端的日志输出("": 输出到控制台)

func NewConsumerOriginally

func NewConsumerOriginally(ctx context.Context, addresses []string, options pulsar.ConsumerOptions, clientLogPath string) (rst *Consumer, err error)

NewConsumerOriginally

PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).

@param ctx 建议设置超时时间,否则可能卡死(addresses无效的情况下) @param options 必须的属性: Topic、SubscriptionName、Type @param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func (*Consumer) Close

func (c *Consumer) Close()

type Producer

type Producer struct {
	pulsar.Client
	pulsar.Producer
}

func NewProducer

func NewProducer(ctx context.Context, options pulsar.ProducerOptions, clientLogPath string) (*Producer, error)

NewProducer

前提: 成功调用 SetUp() || MustSetUp().

@param options 必需属性: Topic、SendTimeout @param logPath 客户端的日志输出("": 输出到控制台)

func NewProducerOriginally

func NewProducerOriginally(ctx context.Context, addresses []string, options pulsar.ProducerOptions, clientLogPath string) (rst *Producer, err error)

NewProducerOriginally

PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).

@param ctx 建议设置超时时间,否则可能卡死(addresses无效的情况下) @param options 必须的属性: Topic

建议的属性: SendTimeout

@param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) SendWithTimeout

func (p *Producer) SendWithTimeout(pMsg *pulsar.ProducerMessage, timeout time.Duration) (pulsar.MessageID, error)

type VerifyConfig

type VerifyConfig struct {
	// Topic 用于验证"pulsar服务是否正常启动"的topic
	Topic string `json:"topic" yaml:"topic"`
	// Print 是否输出 验证日志 到控制台?
	Print bool `json:"print" yaml:"print"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL