Documentation ¶
Index ¶
Constants ¶
const ( IDHeaderKey = "_clinia_message_id" RetryCountHeaderKey = "_clinia_retry_count" )
const ( TopicSeparator = "." TopicRetrySuffix = TopicSeparator + "retry" )
Variables ¶
This section is empty.
Functions ¶
func WithID ¶
func WithID(id string) newMessageOption
WithID sets the ID of the message. A ksuid will be generated if no ID is provided.
func WithMetadata ¶
func WithMetadata(m MessageMetadata) newMessageOption
WithMetadata sets the metadata of the message.
Types ¶
type ConsumerGroup ¶
type ConsumerGroup string
func (ConsumerGroup) ConsumerGroup ¶
func (group ConsumerGroup) ConsumerGroup(scope string) string
ConsumerGroup returns the consumer group name with the given scope. If the scope is empty, it returns the consumer group name as is. This should be used when interacting with the concrete pubsubs (e.g. Kafka).
type Message ¶
type Message struct { ID string Metadata MessageMetadata Payload []byte }
Message intentionally has no json marshalling fields as we want to pass by our own kgox.DefaultMarshaler
func NewMessage ¶
NewMessage creates a new Message with the given payload and options.
Parameters:
- payload: The payload of the message as a byte slice.
- opts: Optional parameters to customize the creation of the message. By default, a new UUID is generated for the message.
Returns:
- *Message: A pointer to the created Message.
func (*Message) ExtractTraceContext ¶ added in v0.0.82
func (*Message) InjectTraceContext ¶ added in v0.0.82
type MessageMetadata ¶
type Topic ¶
type Topic string
func BaseTopicFromName ¶ added in v0.0.80
Expect topic to be format `{scope}.{topic}.{consumer-group}.retry` or `{scope}.{topic}` If the scope is missing, this function will return a wrong result
func TopicFromName ¶
func (Topic) GenerateRetryTopic ¶ added in v0.0.80
func (t Topic) GenerateRetryTopic(consumerGroup ConsumerGroup) Topic
type TraceContextPropagator ¶ added in v0.0.81
type TraceContextPropagator struct {
// contains filtered or unexported fields
}
TraceContextPropagator is responsible for injecting and extracting trace context into and from message metadata.
func NewTraceContextPropagator ¶ added in v0.0.81
func NewTraceContextPropagator() TraceContextPropagator
func (*TraceContextPropagator) Extract ¶ added in v0.0.81
func (t *TraceContextPropagator) Extract(ctx context.Context, metadata MessageMetadata) context.Context
Extract extracts the trace context from the message metadata and returns a new context with the extracted trace context.