Documentation ¶
Index ¶
- Variables
- func NewMessage(batchMsg *TransportMessage) streams.Message
- func NewMessages(batch *TransportMessageBatch) []streams.Message
- func SetTransactionContext[T any](ctx context.Context, tx TransactionContext[T]) context.Context
- type TransactionContext
- type TransactionContextKeyType
- type TransportMessage
- func (*TransportMessage) Descriptor() ([]byte, []int)deprecated
- func (x *TransportMessage) GetContentType() string
- func (x *TransportMessage) GetData() []byte
- func (x *TransportMessage) GetHeaders() map[string]string
- func (x *TransportMessage) GetMessageId() string
- func (x *TransportMessage) GetStreamKey() string
- func (x *TransportMessage) GetStreamName() string
- func (x *TransportMessage) GetTime() *timestamppb.Timestamp
- func (*TransportMessage) ProtoMessage()
- func (x *TransportMessage) ProtoReflect() protoreflect.Message
- func (x *TransportMessage) Reset()
- func (x *TransportMessage) String() string
- type TransportMessageBatch
- func (*TransportMessageBatch) Descriptor() ([]byte, []int)deprecated
- func (x *TransportMessageBatch) GetMessages() []*TransportMessage
- func (*TransportMessageBatch) ProtoMessage()
- func (x *TransportMessageBatch) ProtoReflect() protoreflect.Message
- func (x *TransportMessageBatch) Reset()
- func (x *TransportMessageBatch) String() string
Constants ¶
This section is empty.
Variables ¶
var (
ErrTransactionContextNotFound = errors.New("streams: transaction context not found")
)
var File_message_proto protoreflect.FileDescriptor
Functions ¶
func NewMessage ¶
func NewMessage(batchMsg *TransportMessage) streams.Message
NewMessage allocates a streams.Message from a TransportMessage.
func NewMessages ¶
func NewMessages(batch *TransportMessageBatch) []streams.Message
NewMessages allocates a streams.Message slice from TransportMessageBatch.Messages.
func SetTransactionContext ¶
SetTransactionContext allocates a transaction context using ctx as parent.
A transaction context is a mechanism used by multiple system layers to share one single transaction. Thus, each operation inside the same context will be part of the transaction itself.
Example:
txCtx := SetTransactionContext(context.TODO(), TransactionContext[*sql.Tx]{TransactionID: "123", Tx: mySqlTx})
repository.DoOp(txCtx) // this will be part of the transaction as well
in repository.DoOp(ctx context.Context)
tx, err := GetTransactionContext[*sql.Tx](ctx)
tx.Exec()...
Types ¶
type TransactionContext ¶
A TransactionContext is a mechanism used by multiple system layers to share one single transaction. Thus, each operation inside the same context will be part of the transaction itself.
func GetTransactionContext ¶
func GetTransactionContext[T any](ctx context.Context) (TransactionContext[T], error)
GetTransactionContext retrieves a transaction context from ctx.
A transaction context is a mechanism used by multiple system layers to share one single transaction. Thus, each operation inside the same context will be part of the transaction itself.
Example:
txCtx := SetTransactionContext(context.TODO(), TransactionContext[*sql.Tx]{TransactionID: "123", Tx: mySqlTx})
repository.DoOp(txCtx) // this will be part of the transaction as well
in repository.DoOp(ctx context.Context)
txCtx, err := GetTransactionContext[*sql.Tx](ctx)
txCtx.Tx.Exec()...
type TransactionContextKeyType ¶
type TransactionContextKeyType string
TransactionContextKeyType custom type used by transaction contexts.
const TransactionContextKey TransactionContextKeyType = "streams.tx_context"
TransactionContextKey context key used by a transaction context.
type TransportMessage ¶
type TransportMessage struct { MessageId string `protobuf:"bytes,1,opt,name=messageId,proto3" json:"messageId,omitempty"` StreamName string `protobuf:"bytes,2,opt,name=streamName,proto3" json:"streamName,omitempty"` StreamKey string `protobuf:"bytes,3,opt,name=streamKey,proto3" json:"streamKey,omitempty"` Headers map[string]string `` /* 155-byte string literal not displayed */ ContentType string `protobuf:"bytes,5,opt,name=contentType,proto3" json:"contentType,omitempty"` Data []byte `protobuf:"bytes,6,opt,name=data,proto3" json:"data,omitempty"` Time *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=time,proto3" json:"time,omitempty"` // contains filtered or unexported fields }
A TransportMessage is a unit of information used by `streams` package to pass information between internal systems such as proxy agents.
func NewTransportMessage ¶
func NewTransportMessage(msg streams.Message) *TransportMessage
NewTransportMessage allocates a TransportMessage from a streams.Message.
func (*TransportMessage) Descriptor
deprecated
func (*TransportMessage) Descriptor() ([]byte, []int)
Deprecated: Use TransportMessage.ProtoReflect.Descriptor instead.
func (*TransportMessage) GetContentType ¶
func (x *TransportMessage) GetContentType() string
func (*TransportMessage) GetData ¶
func (x *TransportMessage) GetData() []byte
func (*TransportMessage) GetHeaders ¶
func (x *TransportMessage) GetHeaders() map[string]string
func (*TransportMessage) GetMessageId ¶
func (x *TransportMessage) GetMessageId() string
func (*TransportMessage) GetStreamKey ¶
func (x *TransportMessage) GetStreamKey() string
func (*TransportMessage) GetStreamName ¶
func (x *TransportMessage) GetStreamName() string
func (*TransportMessage) GetTime ¶
func (x *TransportMessage) GetTime() *timestamppb.Timestamp
func (*TransportMessage) ProtoMessage ¶
func (*TransportMessage) ProtoMessage()
func (*TransportMessage) ProtoReflect ¶
func (x *TransportMessage) ProtoReflect() protoreflect.Message
func (*TransportMessage) Reset ¶
func (x *TransportMessage) Reset()
func (*TransportMessage) String ¶
func (x *TransportMessage) String() string
type TransportMessageBatch ¶
type TransportMessageBatch struct { Messages []*TransportMessage `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` // contains filtered or unexported fields }
A TransportMessageBatch is an aggregate of TransportMessage(s) used by `streams` package to pass information between internal systems such as proxy agents.
func NewTransportMessageBatch ¶
func NewTransportMessageBatch(msgs []streams.Message) *TransportMessageBatch
NewTransportMessageBatch allocates a TransportMessageBatch for each of streams.Message(s).
func (*TransportMessageBatch) Descriptor
deprecated
func (*TransportMessageBatch) Descriptor() ([]byte, []int)
Deprecated: Use TransportMessageBatch.ProtoReflect.Descriptor instead.
func (*TransportMessageBatch) GetMessages ¶
func (x *TransportMessageBatch) GetMessages() []*TransportMessage
func (*TransportMessageBatch) ProtoMessage ¶
func (*TransportMessageBatch) ProtoMessage()
func (*TransportMessageBatch) ProtoReflect ¶
func (x *TransportMessageBatch) ProtoReflect() protoreflect.Message
func (*TransportMessageBatch) Reset ¶
func (x *TransportMessageBatch) Reset()
func (*TransportMessageBatch) String ¶
func (x *TransportMessageBatch) String() string