Documentation ¶
Index ¶
- Constants
- Variables
- func GetTag(msg pulsar.Message) string
- func MustSetUp(pulsarConfig *Config, verifyConfig *VerifyConfig)
- func NewClient(addresses []string, logPath string) (pulsar.Client, error)
- func SetTag(pMsg *pulsar.ProducerMessage, tag string)
- func SetUp(pulsarConfig *Config, verifyConfig *VerifyConfig) (err error)
- type Config
- type Consumer
- type Producer
- type VerifyConfig
Constants ¶
const (
UrlPrefix = "pulsar://"
)
Variables ¶
var (
NotSetupError = errorKit.Newf("haven’t been set up correctly")
)
Functions ¶
func MustSetUp ¶
func MustSetUp(pulsarConfig *Config, verifyConfig *VerifyConfig)
MustSetUp
PS: Pulsar服务中途挂掉的话,恢复后,Consumer实例、Producer实例还能正常工作.
@param verifyConfig 可以为nil(将不验证,但不推荐这么干)
func SetTag ¶
func SetTag(pMsg *pulsar.ProducerMessage, tag string)
SetTag
@param pMsg 不能为nil @param tag 可以为""
func SetUp ¶
func SetUp(pulsarConfig *Config, verifyConfig *VerifyConfig) (err error)
Types ¶
type Config ¶
type Config struct { // Addrs Broker地址(s) Addrs []string `json:"addrs" yaml:"addrs" validate:"required,gte=1,dive,hostname_port"` }
type Consumer ¶
func NewConsumer ¶
func NewConsumer(ctx context.Context, options pulsar.ConsumerOptions, clientLogPath string) (*Consumer, error)
NewConsumer
前提: 成功调用 SetUp() || MustSetUp().
@param options 必需属性: Topic、SubscriptionName、Type @param logPath 客户端的日志输出("": 输出到控制台)
func NewConsumerOriginally ¶
func NewConsumerOriginally(ctx context.Context, addresses []string, options pulsar.ConsumerOptions, clientLogPath string) (rst *Consumer, err error)
NewConsumerOriginally
PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).
@param ctx 建议设置超时时间,否则可能卡死(addresses无效的情况下) @param options 必须的属性: Topic、SubscriptionName、Type @param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)
type Producer ¶
func NewProducer ¶
func NewProducer(ctx context.Context, options pulsar.ProducerOptions, clientLogPath string) (*Producer, error)
NewProducer
前提: 成功调用 SetUp() || MustSetUp().
@param options 必需属性: Topic、SendTimeout @param logPath 客户端的日志输出("": 输出到控制台)
func NewProducerOriginally ¶
func NewProducerOriginally(ctx context.Context, addresses []string, options pulsar.ProducerOptions, clientLogPath string) (rst *Producer, err error)
NewProducerOriginally
PS: 目标Pulsar服务未启动的情况下,如果ctx不加以限制,要过约 1min 才会返回error(期间客户端日志有connection refused输出).
@param ctx 建议设置超时时间,否则可能卡死(addresses无效的情况下) @param options 必须的属性: Topic
建议的属性: SendTimeout
@param clientLogPath 客户端的日志输出(为空则输出到控制台; 不会rotate)