pulsarKit

package
v2.2.20 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 13, 2023 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UrlPrefix = "pulsar://"
)

Variables

View Source
var (
	NotSetupError = errorKit.New("hasn't been set up")
)

Functions

func GetTag

func GetTag(msg pulsar.Message) string

GetTag

@param msg 不能为nil @return 可能为""

func MustSetUp

func MustSetUp(config Config, tmpDirPath string)

MustSetUp

@param tmpDirPath 用于存放生成日志文件的临时目录(可以为空字符串,此时将采用默认值: 系统临时目录)

func NewClient

func NewClient(addresses []string, logPath string) (pulsar.Client, error)

NewClient

@param logPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func SetTag

func SetTag(pMsg *pulsar.ProducerMessage, tag string)

SetTag

@param pMsg 不能为nil @param tag 可以为""

func SetUp

func SetUp(pulsarConfig Config, tmpDirPath string) (err error)

Types

type Config

type Config struct {
	// Addresses Broker地址
	Addresses []string `json:"addresses" yaml:"addresses"`

	VerifyConfig VerifyConfig `json:"verify,optional" yaml:"verify"`
}

type Consumer

type Consumer struct {
	pulsar.Client
	pulsar.Consumer
}

func NewConsumer

func NewConsumer(ctx context.Context, options pulsar.ConsumerOptions, clientLogPath string) (*Consumer, error)

NewConsumer

前提: 成功调用 SetUp() || MustSetUp().

@param options 必需属性: Topic、SubscriptionName、Type @param logPath 客户端的日志输出("": 输出到控制台)

func NewConsumerOriginally

func NewConsumerOriginally(ctx context.Context, addresses []string, options pulsar.ConsumerOptions, clientLogPath string) (*Consumer, error)

NewConsumerOriginally

PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).

@param options 必须的属性: Topic、SubscriptionName、Type @param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func (*Consumer) Close

func (c *Consumer) Close()

type Producer

type Producer struct {
	pulsar.Client
	pulsar.Producer
}

func NewProducer

func NewProducer(ctx context.Context, options pulsar.ProducerOptions, clientLogPath string) (*Producer, error)

NewProducer

前提: 成功调用 SetUp() || MustSetUp().

@param options 必需属性: Topic、SendTimeout @param logPath 客户端的日志输出("": 输出到控制台)

func NewProducerOriginally

func NewProducerOriginally(ctx context.Context, addresses []string, options pulsar.ProducerOptions, clientLogPath string) (*Producer, error)

NewProducerOriginally

PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).

@param options 必须的属性: Topic

建议的属性: SendTimeout

@param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)

func (*Producer) Close

func (p *Producer) Close()

func (*Producer) SendWithTimeout

func (p *Producer) SendWithTimeout(pMsg *pulsar.ProducerMessage, timeout time.Duration) (pulsar.MessageID, error)

type VerifyConfig

type VerifyConfig struct {
	// Topic 用于验证"pulsar服务是否正常启动"的topic
	Topic string `json:"topic,optional" yaml:"topic"`
	// Print 是否输出 验证日志 到控制台?
	Print bool `json:"print,default=false" yaml:"print"`
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL