Documentation ¶
Index ¶
- func RandStringBytes(n int) string
- type Client
- func (k *Client) Close() error
- func (k *Client) CreateTopic(topic string, numPart int) error
- func (k *Client) IsReaderConnected() bool
- func (k *Client) IsWriters() bool
- func (k *Client) ListTopics() []kafka.Topic
- func (k *Client) Listen(f HandlerFunc) error
- func (k *Client) ListenTopic(topic string, f HandlerFunc) error
- func (k *Client) NewConsumer()
- func (k *Client) NewPublisher() error
- func (k *Client) Publish(ctx context.Context, topic string, event Event) error
- func (k *Client) PublishWithTracer(ctx context.Context, topic string, event Event) error
- type Event
- func (e *Event) GetData() []byte
- func (e *Event) GetEventID() string
- func (e *Event) GetEventType() EventType
- func (e *Event) GetJsonData(data any) error
- func (e *Event) GetJsonMetadata(metaData any) error
- func (e *Event) GetMetadata() []byte
- func (e *Event) GetTimestamp() time.Time
- func (e *Event) GetVersion() uint64
- func (e *Event) SetMetadata(metaData any) error
- func (e *Event) String() string
- type EventType
- type HandlerFunc
- type IClient
- type Message
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func RandStringBytes ¶
Types ¶
type Client ¶
type Client struct { Backoff backoff.BackOff // contains filtered or unexported fields }
func (*Client) IsReaderConnected ¶
func (*Client) ListTopics ¶
func (k *Client) ListTopics() []kafka.Topic
func (*Client) Listen ¶
func (k *Client) Listen(f HandlerFunc) error
Listen manual listen need call msg.Commit() when process done recommend for this process
func (*Client) ListenTopic ¶ added in v1.1.47
func (k *Client) ListenTopic(topic string, f HandlerFunc) error
func (*Client) NewConsumer ¶
func (k *Client) NewConsumer()
func (*Client) NewPublisher ¶
type Event ¶ added in v1.1.35
type Event struct { EventID string EventType EventType Version uint64 Data []byte Metadata []byte Timestamp time.Time }
Event is an internal representation of an event, returned when the Aggregate uses NewEvent to create a new event. The events loaded from the db is represented by each DBs internal event type, implementing Event.
func NewEvent ¶ added in v1.1.35
NewEvent creates a new event, with the given aggregateID, eventType and data. The eventID is generated automatically, and the version is set to 0.
func (*Event) GetEventID ¶ added in v1.1.35
func (*Event) GetEventType ¶ added in v1.1.35
func (*Event) GetJsonData ¶ added in v1.1.35
GetJsonData json unmarshal data attached to the Event.
func (*Event) GetJsonMetadata ¶ added in v1.1.35
GetJsonMetadata unmarshal app-specific metadata serialized as json for the Event.
func (*Event) GetMetadata ¶ added in v1.1.35
func (*Event) GetTimestamp ¶ added in v1.1.35
func (*Event) GetVersion ¶ added in v1.1.35
func (*Event) SetMetadata ¶ added in v1.1.35
type EventType ¶ added in v1.1.35
type EventType string
EventType is the type of any event, used as its unique identifier.
type IClient ¶
type IClient interface { Listen(f HandlerFunc) error ListenTopic(topic string, f HandlerFunc) error NewConsumer() IsWriters() bool Close() error NewPublisher() error Publish(ctx context.Context, topic string, msg Event) error PublishWithTracer(ctx context.Context, topic string, msg Event) error IsReaderConnected() bool CreateTopic(topic string, numPart int) error // contains filtered or unexported methods }
func NewKafkaClient ¶
func NewKafkaClient(cfg *common_utils.BaseConfig) IClient
type Message ¶
type Message struct { Offset int64 `json:"offset,omitempty"` Partition int `json:"partition,omitempty"` Topic string `json:"topic,omitempty"` Key string `json:"key,omitempty"` Body []byte `json:"body,omitempty"` Timestamp int64 `json:"timestamp,omitempty"` ConsumerGroup string `json:"consumer_group,omitempty"` Retry int `json:"retry,omitempty"` Commit func() error MoveToDLQ func() error Headers map[string]string }
Message define message encode/decode sarama message