eventstream

package module
v4.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2024 License: Apache-2.0 Imports: 26 Imported by: 1

README

Build Status

eventstream-go-sdk

Go SDK for integrating with AccelByte's event stream

Usage

Install

The eventstream-kafka-go is available in two major versions:

v4

The eventstream-go-sdk v4 changed its underlying Kafka library from segmentio/kafka-go to confluentinc/confluent-kafka-go which is a lightweight wrapper around librdkafka, a finely tuned C client.

Enabling CGO CGO_ENABLED=1 is a must since the Go client is based on the C library librdkafka.

go get -u github.com/AccelByte/eventstream-go-sdk/v4

When building your application in Alpine Linux (musl libc), you must pass -tags musl to go build.

v3
go get -u github.com/AccelByte/eventstream-go-sdk/v3
Importing
v4
eventstream "github.com/AccelByte/eventstream-go-sdk/v4"
v3
eventstream "github.com/AccelByte/eventstream-go-sdk/v3"

To create a new event stream client, use this function:

client, err := eventstream.NewClient(prefix, stream, brokers, config)

NewClient requires 4 parameters :

  • prefix : Topic prefix from client (string)
  • stream : Stream name. e.g. kafka, stdout, none (string)
  • brokers : List of kafka broker (array of string)
  • config : Custom broker configuration from client. This is optional and only uses the first arguments. (variadic *BrokerConfig)

Note for Maintainers

Please follow semantic versioning rule when releasing new version. New version releases must be tagged based on their own corresponding major version:

  • If you make changes only in v3, you need to tag the new version as 3.x.x
  • If you make changes only in v4, you need to tag the new version as 4.x.x
  • If you make changes on both, you need to tag the new version as 3.x.x and 4.x.x

Supported Stream

Currently event stream are supported by these stream:

Kafka Stream

Publish and subscribe an event to / from Kafka stream.

currently compatible with golang version from 1.12+ and Kafka versions from 0.10.1.0 to 2.1.0.

To create a kafka stream client, just pass the stream parameter with kafka.

Custom Configuration

SDK support with custom configuration for kafka stream, that is :

  • DialTimeout : Timeout duration during connecting to kafka broker. Default: 10 Seconds (time.Duration)
  • ReadTimeout : Timeout duration during consume topic from kafka broker. Default: 10 Seconds (time.Duration)
  • WriteTimeout : Timeout duration during publish event to kafka broker. Default: 10 Seconds (time.Duration)
  • LogMode : eventstream will print log based on following levels: info, warn, debug, error and off. Default: off (string)
  • StrictValidation : If it set true, eventstream will enable strict validation for event fields, Default: False (boolean)
    config := &eventstream.BrokerConfig{
		LogMode:          eventstream.InfoLevel,
		StrictValidation: true,
		DialTimeout:      1 * time.Second,
		ReadTimeout:      1 * time.Second,
		WriteTimeout:     1 * time.Second,
	}
Authentication

Supported authentication mode for kafka stream:

1. SASL SCRAM

Example configuration:

    config := &eventstream.BrokerConfig{
        ...
        SecurityConfig: &eventstream.SecurityConfig{
            AuthenticationType: "SASL-SCRAM",
            SASLUsername:       "your-username",
            SASLPassword:       "your-password",
        },
    }
Stdout Stream

This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this will print unnecessary log.

To create a stdout stream client, just pass the stream parameter with stdout.

Blackhole

This is used when client don't want the service to send event data to anywhere.

To create a blackhole client, just pass the stream parameter with none.

Publish Subscribe Event

Publish

Publish or sent an event into stream. Client able to publish an event into a single topic. Publish to kafka stream support with exponential backoff retry. (max 3x)

To publish an event, use this function:

err := client.Publish(
		NewPublish().
			Topic(TopicName).
			EventName(EventName).
			Namespace(Namespace).
			Key(Key).
			ClientID(ClientID).
			UserID(UserID).
			SessionID(SessionID).
			TraceID(TraceID).
			SpanContext(SpanContext).
			Context(Context).
			EventID(eventID int).
			EventType(eventType int).
			EventLevel(eventLevel int).
			ServiceName(serviceName string).
			ClientIDs(clientIDs []string).
			TargetUserIDs(targetUserIDs []string).
			TargetNamespace(targetNamespace string).
			Privacy(privacy bool).
			AdditionalFields(additionalFields map[string]interface{}).
			Version(Version).
			Payload(Payload).
			ErrorCallback(func(event *Event, err error) {}))
Publish Parameters
  • Topic : Event topic. (string - alphaNumeric(256) - Required)
  • EventName : Event name. (string - alphaNumeric(256) - Required)
  • Namespace : Event namespace. (string - alphaNumeric(256) - Required)
  • ID : Kafka message ID. (string - default. random UUID v4 without Hyphens - optional)
  • key : Kafka message key. (string - default. random UUID v4 without Hyphens - optional)
  • ClientID : Publisher client ID. (string - UUID v4 without Hyphens)7
  • UserID : Publisher user ID. (string - UUID v4 without Hyphens)
  • SessionID : Publisher session ID. (string - UUID v4 without Hyphens)
  • TraceID : Trace ID. (string - UUID v4 without Hyphens)
  • SpanContext : Opentracing Jaeger Span Context(string - optional)
  • Context : Golang context. (context - default: context.background)
  • Version : Version of schema. (integer - default: 1)
  • EventID : Event ID. Backward compatibility. (integer)
  • EventType : Event Type. Backward compatibility. (integer)
  • EventLevel : Event Level. Backward compatibility. (integer)
  • ServiceName : Service Name. Backward compatibility. (string)
  • ClientIDs : List of client IDs. Backward compatibility. ([]string - UUID v4 without Hyphens)
  • TargetUserIDs : List of target client IDs. Backward compatibility. ([]string - UUID v4 without Hyphens)
  • TargetNamespace : Target Namespace. Backward compatibility. (string)
  • Privacy : Privacy. Backward compatibility. (bool)
  • AdditionalFields : Additional fields. Backward compatibility. (map[string]interface{})
  • Payload : Additional attribute. (map[string]interface{})
Subscribe

To subscribe an event from specific topic in stream, client should be register a callback function that executed once event received. A callback aimed towards specific topic and event name.

To subscribe an event, use this function:

err := client.Register(
		NewSubscribe().
			Topic(topicName).
			EventName(mockEvent.EventName).
			GroupID(groupID).
			Offset(offset).
			Context(ctx).
			Callback(func(ctx context.Context, event *Event, err error) error { return nil }))
Unsubscribe

To unsubscribe a topic from the stream, client should close passed context

To unsubscribe from the topic, use this function:

ctx, cancel := context.WithCancel(context.Background())

err := client.Register(
    NewSubscribe().
        Topic(topicName).
        EventName(mockEvent.EventName).
        GroupID(groupID).
        Context(ctx).
        Callback(func(ctx context.Context, event *Event, err error) error {
            if ctx.Error() != nil {
                // unsubscribed
                return nil
            }           

            return nil
        }))

cancel() // cancel context to unsubscribe
Subscribe Parameters
  • Topic : Subscribed topic. (string - alphaNumeric(256) - Required)
  • Offset : Offset(position) inside the topic from which processing begins(int64 - Optional - default: -1 (the tail))
  • GroupID : Message broker group ID. A random ID will be generated by the client if it's empty. (string - alphaNumeric(256) - default: generated ID)
  • GroupInstanceID : Message broker group instance ID for static group membership. Optional. (string - alphaNumeric(256) - default: empty)
  • EventName : Event name to subscribe. If it's empty, any event will trigger the callback. (string - alphaNumeric(256) - optional)
  • Callback : Callback function to be called when an event is received. (func(ctx context.Context,event *Event, err error) error {} - required)
  • CallbackRaw : Similar with Callback but a raw message will be passed instead. (func(ctx context.Context,msgValue []byte, err error) error {} - optional)
  • Context : Golang context. (context - default: context.background)
  • SendErrorDLQ : A flag to send error message of message processing to a DLQ topic which have following format: 'yourTopic-dlq' (string - alphaNumeric(256) - default: false)
  • AsyncCommitMessage : A flag to asynchronously commit message offset. This setting will be overridden by AutoCommitInterval on BrokerConfig.
  • Slug : Returns a string describing a unique subscriber (topic, eventName, groupID)

Callback function passing 3 parameters:

  • ctx context to check that consumer unsubscribed
  • event is object that store event message.
  • err is an error that happen when consume the message.

Callback function return 1 result(error):

  • return nil to commit the event(mark as processed)
  • return any error to retry processing(worker will be selected randomly)

Event Message

Event message is a set of event information that would be published or consumed by client.

Event message format :

  • id : Event ID (string - default. random UUID v4 without Hyphens)
  • name : Event name (string)
  • namespace : Event namespace (string)
  • traceId : Trace ID (string - UUID v4 without Hyphens)
  • spanContext : Opentracing Jaeger Span Context (string - optional)
  • clientId : Publisher client ID (string - UUID v4 without Hyphens)
  • userId : Publisher user ID (string - UUID v4 without Hyphens)
  • sessionId : Publisher session ID (string - UUID v4 without Hyphens)
  • timestamp : Event time (time.Time)
  • event_id : Event id. backward compatibility. (integer)
  • event_type : Event type. backward compatibility. (integer)
  • event_level : Event level. backward compatibility. (integer)
  • service : Service name. backward compatibility. (string)
  • client_ids : Client IDs. backward compatibility. ([]string - UUID v4 without Hyphens)
  • target_user_ids : Target user IDs. backward compatibility. ([]string - UUID v4 without Hyphens)
  • target_namespace : Target namespace. backward compatibility. (string)
  • privacy : Privacy. backward compatibility. (bool)
  • additional_fields : Set of data / object that given by producer. Each data have own key for specific purpose. Backward compatibility. (map[string]interface{})
  • version : Event schema version (integer)
  • payload : Set of data / object that given by producer. Each data have own key for specific purpose. (map[string]interface{})

Publish Audit Log

Publish or sent an audit log into stream. Client able to publish an audit log into single topic. Publish to kafka stream support with exponential backoff retry. (max 3x)

Environment Variables:

  • APP_EVENT_STREAM_AUDIT_LOG_TOPIC(optional, default: auditLog)
  • APP_EVENT_STREAM_AUDIT_LOG_ENABLED(optional, default true)

To publish an audit log, use this function:

err := client.PublishAuditLog(eventstream.
            NewAuditLogBuilder().
            Category(Category).
            ActionName(ActionName).
            IP(IP).
            Actor(Actor).
            IsActorTypeUser(true).
            ClientID(ClientID).
            ActorNamespace(ActorNamespace).
            ObjectID(ObjectID).
	        ObjectType(ObjectType)).
            ObjectNamespace(ObjectNamespace).
            TargetUserID(TargetUserID).
            DeviceID(DeviceID).
            Content(Content).
            Diff(Diff).
            ErrorCallback(func(message []byte, err error) {
            })
Parameter
  • Category : Category. (string - Format: xxx_xxx - Required)
  • ActionName : Log action name. (string - Format: xxx_xxx - Required)
  • IP: IP address. (string - Optional)
  • Actor: Publisher/studio user id or client id. (string - UUID v4 without Hyphens - Required)
  • IsActorTypeUser: Is actor a user or not. (bool - true or false - Required)
  • ClientID : OAuth client ID. (string - UUID v4 without Hyphens - Required)
  • ActorNamespace : Actor namespace, should be publisher or studio namespace. (string - alphaNumeric(256) - Required)
  • ObjectID : Target resource id. (string - Optional)
  • ObjectType: Type of Object. (string - Optional)
  • ObjectNamespace : Namespace of target resource. (string - alphaNumeric(256) - Required)
  • TargetUserID : User id related to the resource. (string - UUID v4 without Hyphens - Optional)
  • DeviceID : Device id. (string - Optional)
  • Content : Resource Content. (map - Required)
  • Diff : Changes of the resource. (map - Recommended format: {"before":{}, "after":{}} - Optional)
  • ErrorCallback : Callback function when event failed to publish. (func(message []byte, err error){} - Optional)

SpanContext usage

  • Create Jaeger Span Context from an event
	import "github.com/AccelByte/go-restful-plugins/v3/pkg/jaeger"

    spanContextString := event.SpanContext
	span, ctx := jaeger.ChildSpanFromRemoteSpan(rootCtx, "service-name.operation-name", spanContextString)
  • Put existing Jaeger Span Context into an event
	import "github.com/AccelByte/go-restful-plugins/v3/pkg/jaeger"

    spanContextString := jaeger.GetSpanContextString(span)
    
    err := client.Register(
        NewSubscribe().
            Topic(topicName).
            SpanContext(spanContextString).
            // ...

Tips

v3

Faster Write

Here are some of the things you can do if you want to have faster writes.

Set RequiredAcks

From the WriterConfig documentation:

Number of acknowledges from partition replicas required before receiving a response to a produce request. The default is -1, which means to wait for all replicas, and a value above 0 is required to indicate how many replicas should acknowledge a message to be considered successful.

Set RequiredAcks to 1 so that it does not need to wait for all replicas to acknowledge.

You can set it up by injecting the WriterConfig inside BrokerConfig when creating new KafkaClient.

Note that there's a gotcha in the library, you cannot set it to 0, since it will be converted to -1, which is RequireAll.

    if config.RequiredAcks == 0 {
		w.RequiredAcks = RequireAll
	}
Set Async

From the WriterConfig documentation:

Setting this flag to true causes the WriteMessages method to never block. It also means that errors are ignored since the caller will not receive the returned value. Use this only if you don't care about guarantees of whether the messages were written to kafka.

Set Async to true so that after the message is pushed into the writer's queue, the publishing function will return immediately.

You can set it up by injecting the WriterConfig inside BrokerConfig when creating new KafkaClient.

Use more partitions

Segmentio/kafka-go creates one partitionWriter per partition. So theoretically with more partitions, the more writer it has to parallelize the writing process.

User more broker

Kafka will divide partitions between brokers, so with more brokers, each will handle less partition.

Handling Partition Change

While by default segmentio/kafka-go can automatically rebalance Consumer Group assignments whenever there's a member change (e.g. new service pod is spawned), it has specific config to detect and handle partition change.

From the ReaderConfig documentation:

WatchForPartitionChanges is used to inform kafka-go that a consumer group should be polling the brokers and rebalancing if any partition changes happen to the topic.

Set WatchPartitionChanges in ReaderConfig to true to tell the library to handle partition changes automatically. It can be injected inside BrokerConfig when creating new KafkaClient.

License

Copyright © 2020, AccelByte Inc. Released under the Apache License, Version 2.0

Documentation

Index

Constants

View Source
const (
	OffLevel   = "off"
	InfoLevel  = "info"
	DebugLevel = "debug"
	WarnLevel  = "warn"
	ErrorLevel = "error"
)

log level

View Source
const (
	DefaultSSLCertPath = "/etc/ssl/certs/ca-certificates.crt" // Alpine certificate path

)
View Source
const TopicEventPattern = "^[a-zA-Z0-9]+((['_.-][a-zA-Z0-9])?[a-zA-Z0-9]*)*$"

Variables

View Source
var (
	NotificationEventNamePath       = "name"
	FreeformNotificationUserIDsPath = []string{"payload", "userIds"}
)
View Source
var (
	ErrMessageTooLarge = errors.New("message to large")
)

Functions

func GetTLSCertFromFile

func GetTLSCertFromFile(path string) (*tls.Certificate, error)

GetTLSCertFromFile reads file, divides into key and certificates

Types

type AuditLog

type AuditLog struct {
	ID              string          `json:"_id" valid:"required"`
	Category        string          `json:"category" valid:"required"`
	ActionName      string          `json:"actionName" valid:"required"`
	Timestamp       int64           `json:"timestamp" valid:"required"`
	IP              string          `json:"ip,omitempty" valid:"optional"`
	Actor           string          `json:"actor" valid:"uuid4WithoutHyphens,required"`
	ActorType       string          `json:"actorType" valid:"required~actorType values: USER CLIENT"`
	ClientID        string          `json:"clientId" valid:"uuid4WithoutHyphens,required"`
	ActorNamespace  string          `json:"actorNamespace" valid:"required"`
	ObjectID        string          `json:"objectId,omitempty" valid:"optional"`
	ObjectType      string          `json:"objectType,omitempty" valid:"optional"`
	ObjectNamespace string          `json:"objectNamespace" valid:"required~use publisher namespace if resource has no namespace"`
	TargetUserID    string          `json:"targetUserId,omitempty" valid:"uuid4WithoutHyphens,optional"`
	DeviceID        string          `json:"deviceId,omitempty" valid:"optional"`
	Payload         AuditLogPayload `json:"payload" valid:"required"`
}

type AuditLogBuilder

type AuditLogBuilder struct {
	// contains filtered or unexported fields
}

func NewAuditLogBuilder

func NewAuditLogBuilder() *AuditLogBuilder

NewAuditLogBuilder create new AuditLogBuilder instance

func (*AuditLogBuilder) ActionName

func (auditLogBuilder *AuditLogBuilder) ActionName(actionName string) *AuditLogBuilder

func (*AuditLogBuilder) Actor

func (auditLogBuilder *AuditLogBuilder) Actor(actor string) *AuditLogBuilder

func (*AuditLogBuilder) ActorNamespace

func (auditLogBuilder *AuditLogBuilder) ActorNamespace(actorNamespace string) *AuditLogBuilder

func (*AuditLogBuilder) Build

func (auditLogBuilder *AuditLogBuilder) Build() (*kafka.Message, error)

func (*AuditLogBuilder) Category

func (auditLogBuilder *AuditLogBuilder) Category(category string) *AuditLogBuilder

func (*AuditLogBuilder) ClientID

func (auditLogBuilder *AuditLogBuilder) ClientID(clientID string) *AuditLogBuilder

func (*AuditLogBuilder) Content

func (auditLogBuilder *AuditLogBuilder) Content(content map[string]interface{}) *AuditLogBuilder

func (*AuditLogBuilder) DeviceID

func (auditLogBuilder *AuditLogBuilder) DeviceID(deviceID string) *AuditLogBuilder

func (*AuditLogBuilder) Diff

func (auditLogBuilder *AuditLogBuilder) Diff(diff *AuditLogDiff) *AuditLogBuilder

Diff If diff is not nil, please make sure diff.Before and diff.Before are both not nil

func (*AuditLogBuilder) IP

func (auditLogBuilder *AuditLogBuilder) IP(ip string) *AuditLogBuilder

func (*AuditLogBuilder) IsActorTypeUser

func (auditLogBuilder *AuditLogBuilder) IsActorTypeUser(isActorTypeUser bool) *AuditLogBuilder

func (*AuditLogBuilder) Key

func (auditLogBuilder *AuditLogBuilder) Key(key string) *AuditLogBuilder

func (*AuditLogBuilder) ObjectID

func (auditLogBuilder *AuditLogBuilder) ObjectID(objectID string) *AuditLogBuilder

func (*AuditLogBuilder) ObjectNamespace

func (auditLogBuilder *AuditLogBuilder) ObjectNamespace(objectNamespace string) *AuditLogBuilder

func (*AuditLogBuilder) ObjectType

func (auditLogBuilder *AuditLogBuilder) ObjectType(objectType string) *AuditLogBuilder

func (*AuditLogBuilder) TargetUserID

func (auditLogBuilder *AuditLogBuilder) TargetUserID(targetUserID string) *AuditLogBuilder

type AuditLogDiff

type AuditLogDiff struct {
	Before map[string]interface{} `json:"before,omitempty"`
	After  map[string]interface{} `json:"after,omitempty"`
}

type AuditLogPayload

type AuditLogPayload struct {
	Content map[string]interface{} `json:"content"`
	Diff    AuditLogDiff           `json:"diff"`
}

type BlackholeClient

type BlackholeClient struct{}

BlackholeClient satisfies the publisher for mocking

func (*BlackholeClient) GetMetadata added in v4.1.0

func (client *BlackholeClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error)

func (*BlackholeClient) Publish

func (client *BlackholeClient) Publish(publishBuilder *PublishBuilder) error

func (*BlackholeClient) PublishAuditLog

func (client *BlackholeClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error

func (*BlackholeClient) PublishSync

func (client *BlackholeClient) PublishSync(publishBuidler *PublishBuilder) error

func (*BlackholeClient) Register

func (client *BlackholeClient) Register(subscribeBuilder *SubscribeBuilder) error

type BrokerConfig

type BrokerConfig struct {

	// Disable auto commit on every consumer polls when the AutoCommitInterval has stepped in.
	// It's recommended to enable auto commit as manual commit per message is much slower.
	// Default: false (auto commit is enabled)
	DisableAutoCommit bool

	// Interval between auto commits. This will only take effect when auto commit is enabled.
	// Assigning zero value will be overridden by the default value.
	// Default: 1 second
	AutoCommitInterval time.Duration

	// Enable committing the message offset right after consumer polls and before the message is processed.
	// Otherwise, the message offset will be committed after it is processed. When auto commit is enabled,
	// it will store the offset to be committed by auto-committer later.
	// Default: false
	CommitBeforeProcessing bool

	// The maximum time duration the client may use to deliver a message, including retries
	// Assigning zero value will be overridden by the default value.
	// Default: 60 seconds
	PublishTimeout time.Duration

	// BaseConfig is a map to store key-value configuration of a broker.
	// It will override other configs that have been set using other BrokerConfig options.
	// Only Kafka broker is supported.
	// 		List of supported Kafka configuration: https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
	BaseConfig map[string]interface{}

	StrictValidation bool
	CACertFile       string
	DialTimeout      time.Duration
	SecurityConfig   *SecurityConfig
	MetricsRegistry  prometheus.Registerer // optional registry to report metrics to prometheus (used for kafka stats)
}

BrokerConfig is custom configuration for message broker

type BrokerMetadata added in v4.1.0

type BrokerMetadata struct {
	ID   int32
	Host string
	Port int
}

type Client

type Client interface {
	Publish(publishBuilder *PublishBuilder) error
	PublishSync(publishBuilder *PublishBuilder) error
	Register(subscribeBuilder *SubscribeBuilder) error
	PublishAuditLog(auditLogBuilder *AuditLogBuilder) error
	GetMetadata(topic string, timeout time.Duration) (*Metadata, error)
}

Client is an interface for event stream functionality

func NewClient

func NewClient(prefix, stream string, brokers []string, config ...*BrokerConfig) (Client, error)

type Event

type Event struct {
	ID               string                 `json:"id,omitempty"`
	EventName        string                 `json:"name,omitempty"`
	Namespace        string                 `json:"namespace,omitempty"`
	ParentNamespace  string                 `json:"parentNamespace,omitempty"`
	UnionNamespace   string                 `json:"unionNamespace,omitempty"`
	ClientID         string                 `json:"clientId,omitempty"`
	TraceID          string                 `json:"traceId,omitempty"`
	SpanContext      string                 `json:"spanContext,omitempty"`
	UserID           string                 `json:"userId,omitempty"`
	SessionID        string                 `json:"sessionId,omitempty"`
	Timestamp        string                 `json:"timestamp,omitempty"`
	Version          int                    `json:"version,omitempty"`
	EventID          int                    `json:"event_id,omitempty"`
	EventType        int                    `json:"event_type,omitempty"`
	EventLevel       int                    `json:"event_level,omitempty"`
	ServiceName      string                 `json:"service,omitempty"`
	ClientIDs        []string               `json:"client_ids,omitempty"`
	TargetUserIDs    []string               `json:"target_user_ids,omitempty"`
	TargetNamespace  string                 `json:"target_namespace,omitempty"`
	Privacy          bool                   `json:"privacy,omitempty"`
	Topic            string                 `json:"topic,omitempty"`
	AdditionalFields map[string]interface{} `json:"additional_fields,omitempty"`
	Payload          map[string]interface{} `json:"payload,omitempty"`

	Partition int    `json:",omitempty"`
	Offset    int64  `json:",omitempty"`
	Key       string `json:",omitempty"`
}

Event defines the structure of event

func ConstructEvent

func ConstructEvent(publishBuilder *PublishBuilder) (*kafka.Message, *Event, error)

ConstructEvent construct event message

type KafkaClient

type KafkaClient struct {

	// mutex to avoid runtime races to access subscribers map
	ReadersLock sync.RWMutex
	// contains filtered or unexported fields
}

KafkaClient wraps client's functionality for Kafka

func (*KafkaClient) GetMetadata added in v4.1.0

func (client *KafkaClient) GetMetadata(topic string, timeout time.Duration) (*Metadata, error)

func (*KafkaClient) GetReaderStats added in v4.1.3

func (client *KafkaClient) GetReaderStats() statistics.Stats

GetReaderStats returns the latest internal statistics of brokers, topics, and partitions of consumers. The stats values are refreshed at a fixed interval which can be configured by setting the `statistics.interval.ms` config

func (*KafkaClient) GetWriterStats added in v4.1.3

func (client *KafkaClient) GetWriterStats() statistics.Stats

GetWriterStats returns the latest internal statistics of brokers, topics, and partitions of producers. The stats values are refreshed at a fixed interval which can be configured by setting the `statistics.interval.ms` config

func (*KafkaClient) Publish

func (client *KafkaClient) Publish(publishBuilder *PublishBuilder) error

Publish send event to single or multiple topic with exponential backoff retry

func (*KafkaClient) PublishAuditLog

func (client *KafkaClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error

PublishAuditLog send an audit log message

func (*KafkaClient) PublishSync

func (client *KafkaClient) PublishSync(publishBuilder *PublishBuilder) error

PublishSync send an event synchronously (blocking)

func (*KafkaClient) Register

func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error

Register registers callback function and then subscribe topic nolint: gocognit,funlen

type Metadata added in v4.1.0

type Metadata struct {
	Brokers []BrokerMetadata
}

type PublishBuilder

type PublishBuilder struct {
	// contains filtered or unexported fields
}

PublishBuilder defines the structure of message which is sent through message broker

func NewPublish

func NewPublish() *PublishBuilder

NewPublish create new PublishBuilder instance

func (*PublishBuilder) AdditionalFields

func (p *PublishBuilder) AdditionalFields(additionalFields map[string]interface{}) *PublishBuilder

AdditionalFields set AdditionalFields of publisher event

func (*PublishBuilder) ClientID

func (p *PublishBuilder) ClientID(clientID string) *PublishBuilder

ClientID set clientID of publisher event

func (*PublishBuilder) ClientIDs

func (p *PublishBuilder) ClientIDs(clientIDs []string) *PublishBuilder

ClientIDs set clientIDs of publisher event

func (*PublishBuilder) Context

func (p *PublishBuilder) Context(ctx context.Context) *PublishBuilder

Context define client context when publish event. default: context.Background()

func (*PublishBuilder) EventID

func (p *PublishBuilder) EventID(eventID int) *PublishBuilder

EventID set eventID of publisher event

func (*PublishBuilder) EventLevel

func (p *PublishBuilder) EventLevel(eventLevel int) *PublishBuilder

EventLevel set eventLevel of publisher event

func (*PublishBuilder) EventName

func (p *PublishBuilder) EventName(eventName string) *PublishBuilder

EventName set name of published event

func (*PublishBuilder) EventType

func (p *PublishBuilder) EventType(eventType int) *PublishBuilder

EventType set eventType of publisher event

func (*PublishBuilder) ID added in v4.2.0

ID set ID of publiser event

func (*PublishBuilder) Key

func (p *PublishBuilder) Key(key string) *PublishBuilder

Key is a message key that used to determine the partition of the topic if client require strong order for the events

func (*PublishBuilder) Namespace

func (p *PublishBuilder) Namespace(namespace string) *PublishBuilder

Namespace set namespace of published event

func (*PublishBuilder) ParentNamespace

func (p *PublishBuilder) ParentNamespace(parentNamespace string) *PublishBuilder

func (*PublishBuilder) Payload

func (p *PublishBuilder) Payload(payload map[string]interface{}) *PublishBuilder

Payload is a event payload that will be published

func (*PublishBuilder) Privacy

func (p *PublishBuilder) Privacy(privacy bool) *PublishBuilder

Privacy set privacy of publisher event

func (*PublishBuilder) ServiceName

func (p *PublishBuilder) ServiceName(serviceName string) *PublishBuilder

ServiceName set serviceName of publisher event

func (*PublishBuilder) SessionID

func (p *PublishBuilder) SessionID(sessionID string) *PublishBuilder

SessionID set sessionID of publisher event

func (*PublishBuilder) SpanContext

func (p *PublishBuilder) SpanContext(spanID string) *PublishBuilder

SpanContext set jaeger spanContext of publisher event

func (*PublishBuilder) TargetNamespace

func (p *PublishBuilder) TargetNamespace(targetNamespace string) *PublishBuilder

TargetNamespace set targetNamespace of publisher event

func (*PublishBuilder) TargetUserIDs

func (p *PublishBuilder) TargetUserIDs(targetUserIDs []string) *PublishBuilder

TargetUserIDs set targetUserIDs of publisher event

func (*PublishBuilder) Timeout deprecated

func (p *PublishBuilder) Timeout(timeout time.Duration) *PublishBuilder

Timeout is an upper bound on the time to report success or failure after a call to send() returns. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.

Deprecated: This config is deprecated. It will only take effect for the first publisher of the client. Configure PublishTimeout from the BrokerConfig instead.

Default value: 60000 ms

func (*PublishBuilder) Topic

func (p *PublishBuilder) Topic(topic string) *PublishBuilder

Topic set channel / topic name

func (*PublishBuilder) TraceID

func (p *PublishBuilder) TraceID(traceID string) *PublishBuilder

TraceID set traceID of publisher event

func (*PublishBuilder) UnionNamespace

func (p *PublishBuilder) UnionNamespace(unionNamespace string) *PublishBuilder

Parent namespace for AGS Starter, leave it empty for AGS Premium

func (*PublishBuilder) UserID

func (p *PublishBuilder) UserID(userID string) *PublishBuilder

UserID set userID of publisher event

func (*PublishBuilder) Version

func (p *PublishBuilder) Version(version int) *PublishBuilder

Version set event schema version

type PublishErrorCallbackFunc

type PublishErrorCallbackFunc func(message []byte, err error)

type SecurityConfig

type SecurityConfig struct {
	AuthenticationType string
	SASLUsername       string
	SASLPassword       string
}

SecurityConfig contains security configuration for message broker

type StdoutClient

type StdoutClient struct {
	// contains filtered or unexported fields
}

StdoutClient satisfies the publisher for mocking

func (*StdoutClient) GetMetadata added in v4.1.0

func (client *StdoutClient) GetMetadata(_ string, _ time.Duration) (*Metadata, error)

func (*StdoutClient) Publish

func (client *StdoutClient) Publish(publishBuilder *PublishBuilder) error

Publish print event to console

func (*StdoutClient) PublishAuditLog

func (client *StdoutClient) PublishAuditLog(auditLogBuilder *AuditLogBuilder) error

func (*StdoutClient) PublishSync

func (client *StdoutClient) PublishSync(publishBuilder *PublishBuilder) error

func (*StdoutClient) Register

func (client *StdoutClient) Register(subscribeBuilder *SubscribeBuilder) error

Register print event to console

type SubscribeBuilder

type SubscribeBuilder struct {
	// contains filtered or unexported fields
}

SubscribeBuilder defines the structure of message which is sent through message broker

func NewSubscribe

func NewSubscribe() *SubscribeBuilder

NewSubscribe create new SubscribeBuilder instance

func (*SubscribeBuilder) AsyncCommitMessage

func (s *SubscribeBuilder) AsyncCommitMessage(async bool) *SubscribeBuilder

AsyncCommitMessage to asynchronously commit message offset. This setting will be overridden by AutoCommitInterval on BrokerConfig

func (*SubscribeBuilder) Callback

func (s *SubscribeBuilder) Callback(
	callback func(ctx context.Context, event *Event, err error) error,
) *SubscribeBuilder

Callback to do when the event received

func (*SubscribeBuilder) CallbackRaw

func (s *SubscribeBuilder) CallbackRaw(
	f func(ctx context.Context, msgValue []byte, err error) error,
) *SubscribeBuilder

CallbackRaw callback that receives the undecoded payload

func (*SubscribeBuilder) Context

Context define client context when subscribe event. default: context.Background()

func (*SubscribeBuilder) EventName

func (s *SubscribeBuilder) EventName(eventName string) *SubscribeBuilder

EventName set event name that will be subscribed

func (*SubscribeBuilder) GroupID

func (s *SubscribeBuilder) GroupID(groupID string) *SubscribeBuilder

GroupID set subscriber groupID. A random groupID will be generated by default.

func (*SubscribeBuilder) GroupInstanceID

func (s *SubscribeBuilder) GroupInstanceID(groupInstanceID string) *SubscribeBuilder

GroupInstanceID set subscriber group instance ID

func (*SubscribeBuilder) Offset

func (s *SubscribeBuilder) Offset(offset int64) *SubscribeBuilder

Offset set Offset of the event to start

func (*SubscribeBuilder) SendErrorDLQ

func (s *SubscribeBuilder) SendErrorDLQ(dlq bool) *SubscribeBuilder

SendErrorDLQ to send error message to DLQ topic. DLQ topic: 'topic' + -dlq

func (*SubscribeBuilder) Slug

func (s *SubscribeBuilder) Slug() string

Slug is a string describing a unique subscriber (topic, eventName, groupID)

func (*SubscribeBuilder) Topic

func (s *SubscribeBuilder) Topic(topic string) *SubscribeBuilder

Topic set topic that will be subscribe

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL