kafka

package
v1.1.62 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 8, 2024 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func RandStringBytes

func RandStringBytes(n int) string

Types

type Client

type Client struct {
	Backoff backoff.BackOff
	// contains filtered or unexported fields
}

func (*Client) Close

func (k *Client) Close() error

func (*Client) CreateTopic

func (k *Client) CreateTopic(topic string, numPart int) error

func (*Client) IsReaderConnected

func (k *Client) IsReaderConnected() bool

func (*Client) IsWriters

func (k *Client) IsWriters() bool

func (*Client) ListTopics

func (k *Client) ListTopics() []kafka.Topic

func (*Client) Listen

func (k *Client) Listen(f HandlerFunc) error

Listen manual listen need call msg.Commit() when process done recommend for this process

func (*Client) ListenTopic added in v1.1.47

func (k *Client) ListenTopic(topic string, f HandlerFunc) error

func (*Client) NewConsumer

func (k *Client) NewConsumer()

func (*Client) NewPublisher

func (k *Client) NewPublisher() error

func (*Client) Publish

func (k *Client) Publish(ctx context.Context, topic string, event Event) error

func (*Client) PublishWithTracer added in v1.1.16

func (k *Client) PublishWithTracer(ctx context.Context, topic string, event Event) error

type Event added in v1.1.35

type Event struct {
	EventID   string
	EventType EventType
	Version   uint64
	Data      []byte
	Metadata  []byte
	Timestamp time.Time
}

Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.

func NewEvent added in v1.1.35

func NewEvent(eventType EventType, data []byte, metadata ...[]byte) *Event

NewEvent creates a new event, with the given aggregateID, eventType and data. The eventID is generated automatically, and the version is set to 0.

func (*Event) GetData added in v1.1.35

func (e *Event) GetData() []byte

func (*Event) GetEventID added in v1.1.35

func (e *Event) GetEventID() string

func (*Event) GetEventType added in v1.1.35

func (e *Event) GetEventType() EventType

func (*Event) GetJsonData added in v1.1.35

func (e *Event) GetJsonData(data any) error

GetJsonData json unmarshal data attached to the Event.

func (*Event) GetJsonMetadata added in v1.1.35

func (e *Event) GetJsonMetadata(metaData any) error

GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.

func (*Event) GetMetadata added in v1.1.35

func (e *Event) GetMetadata() []byte

func (*Event) GetTimestamp added in v1.1.35

func (e *Event) GetTimestamp() time.Time

func (*Event) GetVersion added in v1.1.35

func (e *Event) GetVersion() uint64

func (*Event) SetMetadata added in v1.1.35

func (e *Event) SetMetadata(metaData any) error

func (*Event) String added in v1.1.35

func (e *Event) String() string

type EventType added in v1.1.35

type EventType string

EventType is the type of any event, used as its unique identifier.

type HandlerFunc added in v1.1.30

type HandlerFunc func(context.Context, *Message) error

type IClient

type IClient interface {
	Listen(f HandlerFunc) error
	ListenTopic(topic string, f HandlerFunc) error
	NewConsumer()
	IsWriters() bool
	Close() error

	NewPublisher() error
	Publish(ctx context.Context, topic string, msg Event) error
	PublishWithTracer(ctx context.Context, topic string, msg Event) error

	IsReaderConnected() bool

	CreateTopic(topic string, numPart int) error
	// contains filtered or unexported methods
}

func NewKafkaClient

func NewKafkaClient(cfg *common_utils.BaseConfig) IClient

type Message

type Message struct {
	Offset        int64  `json:"offset,omitempty"`
	Partition     int    `json:"partition,omitempty"`
	Topic         string `json:"topic,omitempty"`
	Key           string `json:"key,omitempty"`
	Body          []byte `json:"body,omitempty"`
	Timestamp     int64  `json:"timestamp,omitempty"`
	ConsumerGroup string `json:"consumer_group,omitempty"`
	Retry         int    `json:"retry,omitempty"`
	Commit        func() error
	MoveToDLQ     func() error
	Headers       map[string]string
}

Message define message encode/decode sarama message

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL