Documentation ¶
Overview ¶
oplog is a package for writing operational log (oplog) entries for the purpose of replication and verification of the data stored in the Boundary RDBMS. please see README.md in this directory for more information.
Index ¶
- Constants
- Variables
- type AnyOperation
- func (*AnyOperation) Descriptor() ([]byte, []int)deprecated
- func (x *AnyOperation) GetFieldMask() *field_mask.FieldMask
- func (x *AnyOperation) GetNullMask() *field_mask.FieldMask
- func (x *AnyOperation) GetOperationType() OpType
- func (x *AnyOperation) GetTypeName() string
- func (x *AnyOperation) GetValue() []byte
- func (*AnyOperation) ProtoMessage()
- func (x *AnyOperation) ProtoReflect() protoreflect.Message
- func (x *AnyOperation) Reset()
- func (x *AnyOperation) String() string
- type Entry
- func (e *Entry) DecryptData(ctx context.Context) error
- func (e *Entry) EncryptData(ctx context.Context) error
- func (e *Entry) Replay(ctx context.Context, tx Writer, types *TypeCatalog, tableSuffix string) error
- func (e *Entry) UnmarshalData(types *TypeCatalog) ([]Message, error)
- func (e *Entry) Write(ctx context.Context, tx Writer, ticket *store.Ticket) error
- func (e *Entry) WriteEntryWith(ctx context.Context, tx Writer, ticket *store.Ticket, msgs ...*Message) error
- type GormTicketer
- type GormWriter
- type Message
- type Metadata
- type OpType
- type Option
- type Options
- type Queue
- type ReplayableMessage
- type Ticketer
- type Type
- type TypeCatalog
- type Writer
Constants ¶
const DefaultAggregateName = "global"
const Version = "v1"
Version of oplog entries (among other things, it's used to manage upgrade compatibility when replicating)
Variables ¶
var ( OpType_name = map[int32]string{ 0: "OP_TYPE_UNSPECIFIED", 1: "OP_TYPE_CREATE", 2: "OP_TYPE_UPDATE", 3: "OP_TYPE_DELETE", } OpType_value = map[string]int32{ "OP_TYPE_UNSPECIFIED": 0, "OP_TYPE_CREATE": 1, "OP_TYPE_UPDATE": 2, "OP_TYPE_DELETE": 3, } )
Enum value maps for OpType.
var ( ErrTicketNotFound = errors.New("ticket not found") ErrTicketAlreadyRedeemed = errors.New("ticket already redeemed") ErrTicketRedeeming = errors.New("error trying to redeem ticket") )
var File_controller_storage_oplog_v1_any_operation_proto protoreflect.FileDescriptor
Functions ¶
This section is empty.
Types ¶
type AnyOperation ¶
type AnyOperation struct { // type_name defines type of operation. TypeName string `protobuf:"bytes,1,opt,name=type_name,json=typeName,proto3" json:"type_name,omitempty"` // value are the bytes of a marshalled proto buff. Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` // operation_type defines the type of database operation. OperationType OpType `` /* 141-byte string literal not displayed */ // field_mask is the mask of fields to update. FieldMask *field_mask.FieldMask `protobuf:"bytes,4,opt,name=field_mask,json=fieldMask,proto3" json:"field_mask,omitempty"` // null_mask is the mask of fields to set to null. NullMask *field_mask.FieldMask `protobuf:"bytes,5,opt,name=null_mask,json=nullMask,proto3" json:"null_mask,omitempty"` // contains filtered or unexported fields }
AnyOperation provides a message for anything and the type of operation it represents.
func (*AnyOperation) Descriptor
deprecated
func (*AnyOperation) Descriptor() ([]byte, []int)
Deprecated: Use AnyOperation.ProtoReflect.Descriptor instead.
func (*AnyOperation) GetFieldMask ¶
func (x *AnyOperation) GetFieldMask() *field_mask.FieldMask
func (*AnyOperation) GetNullMask ¶
func (x *AnyOperation) GetNullMask() *field_mask.FieldMask
func (*AnyOperation) GetOperationType ¶
func (x *AnyOperation) GetOperationType() OpType
func (*AnyOperation) GetTypeName ¶
func (x *AnyOperation) GetTypeName() string
func (*AnyOperation) GetValue ¶
func (x *AnyOperation) GetValue() []byte
func (*AnyOperation) ProtoMessage ¶
func (*AnyOperation) ProtoMessage()
func (*AnyOperation) ProtoReflect ¶
func (x *AnyOperation) ProtoReflect() protoreflect.Message
func (*AnyOperation) Reset ¶
func (x *AnyOperation) Reset()
func (*AnyOperation) String ¶
func (x *AnyOperation) String() string
type Entry ¶
Entry represents an oplog entry
func NewEntry ¶
func NewEntry(aggregateName string, metadata Metadata, cipherer wrapping.Wrapper, ticketer Ticketer) (*Entry, error)
NewEntry creates a new Entry
func (*Entry) DecryptData ¶
DecryptData will decrypt the entry's data using its Cipherer (wrapping.Wrapper)
func (*Entry) EncryptData ¶
EncryptData the entry's data using its Cipherer (wrapping.Wrapper)
func (*Entry) Replay ¶
func (e *Entry) Replay(ctx context.Context, tx Writer, types *TypeCatalog, tableSuffix string) error
Replay provides the ability to replay an entry. you must initialize any new tables ending with the tableSuffix before calling Replay, otherwise you'll get "a table doesn't exist" error.
func (*Entry) UnmarshalData ¶
func (e *Entry) UnmarshalData(types *TypeCatalog) ([]Message, error)
UnmarshalData the data attribute from []byte (treated as a FIFO QueueBuffer) to a []proto.Message
type GormTicketer ¶
type GormTicketer struct {
// contains filtered or unexported fields
}
GormTicketer uses a gorm DB connection for ticket storage
func NewGormTicketer ¶
func NewGormTicketer(tx *gorm.DB, opt ...Option) (*GormTicketer, error)
NewGormTicketer creates a new ticketer that uses gorm for storage
func (*GormTicketer) GetTicket ¶
func (ticketer *GormTicketer) GetTicket(aggregateName string) (*store.Ticket, error)
GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction that you're using to write to the database tables. Names allow us to shard tickets around domain root names
type GormWriter ¶
GormWriter uses a gorm DB connection for writing
func (*GormWriter) Create ¶
func (w *GormWriter) Create(i interface{}) error
Create an object in storage
func (*GormWriter) Delete ¶
func (w *GormWriter) Delete(i interface{}) error
Deleting an object in storage
func (*GormWriter) Update ¶
func (w *GormWriter) Update(i interface{}, fieldMaskPaths, setToNullPaths []string) error
Update the entry using the fieldMaskPaths and setNullPaths, which are Paths from field_mask.proto. fieldMaskPaths and setNullPaths cannot intersect and both cannot be zero len.
type Message ¶
type Message struct { proto.Message TypeName string OpType OpType FieldMaskPaths []string SetToNullPaths []string }
Message wraps a proto.Message and adds a operation type (Create, Update, Delete)
type OpType ¶
type OpType int32
OpType provides the type of database operation the Any message represents (create, update, delete)
const ( // OP_TYPE_UNSPECIFIED defines an unspecified operation. OpType_OP_TYPE_UNSPECIFIED OpType = 0 // OP_TYPE_CREATE defines a create operation. OpType_OP_TYPE_CREATE OpType = 1 // OP_TYPE_UPDATE defines an update operation. OpType_OP_TYPE_UPDATE OpType = 2 // OP_TYPE_DELETE defines a delete operation. OpType_OP_TYPE_DELETE OpType = 3 )
func (OpType) Descriptor ¶
func (OpType) Descriptor() protoreflect.EnumDescriptor
func (OpType) EnumDescriptor
deprecated
func (OpType) Number ¶
func (x OpType) Number() protoreflect.EnumNumber
func (OpType) Type ¶
func (OpType) Type() protoreflect.EnumType
type Option ¶
type Option func(Options)
Option - how Options are passed as arguments
func WithAggregateNames ¶
WithAggregateNames enables/disables the use of multiple aggregate names for Ticketers
func WithFieldMaskPaths ¶
WithFieldMaskPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be updated. (see google.golang.org/genproto/protobuf/field_mask)
func WithSetToNullPaths ¶
WithSetToNullPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be set to null. (see google.golang.org/genproto/protobuf/field_mask)
type Queue ¶
type Queue struct { // Buffer for the queue bytes.Buffer // Catalog provides a TypeCatalog for the types added to the queue Catalog *TypeCatalog // contains filtered or unexported fields }
Queue provides a FIFO queue
func (*Queue) Add ¶
Add message to queue. typeName defines the type of message added to the queue and allows the msg to be removed using a TypeCatalog with a coresponding typeName entry. OpType defines the msg's operation (create, add, update, etc). If OpType == OpType_OP_TYPE_UPDATE, the WithFieldMaskPaths() and SetToNullPaths() options are supported.
type ReplayableMessage ¶
Replayable defines an interface for messages that can be replayed from the oplog entries. we need to be able to replay into different table names.
type Ticketer ¶
type Ticketer interface { // GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction // that you're using to write to the database tables. Names allow us to shard tickets around domain root names. // Before getting a ticket you must insert it with it's name into the oplog_ticket table. This is done via a // db migrations script. Requiring this insert as part of migrations ensures that the tickets are initialized in // a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking // pattern to work properly GetTicket(aggregateName string) (*store.Ticket, error) // Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same // aggregate name Redeem(ticket *store.Ticket) error }
Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer
type Type ¶
type Type struct { // Interface of the type Interface interface{} // Name for the interface Name string }
Type provides the ability to associate an interface with a Type.Name which will decouple the interface from it's reflection type string, so you can refactor the type name without breaking the catalog
type TypeCatalog ¶
TypeCatalog is an abstraction for dealing with oplog data and their underlying types
func NewTypeCatalog ¶
func NewTypeCatalog(withTypes ...Type) (*TypeCatalog, error)
NewTypeCatalog creates a catalog with the types you pass in
func (TypeCatalog) Get ¶
func (t TypeCatalog) Get(typeName string) (interface{}, error)
Get retrieves the interface via a name
func (*TypeCatalog) GetTypeName ¶
func (t *TypeCatalog) GetTypeName(i interface{}) (string, error)
GetTypeName returns the interface's name from the catalog
func (TypeCatalog) Set ¶
func (t TypeCatalog) Set(i interface{}, typeName string) error
Set creates an entry in the catalog for the interface
type Writer ¶
type Writer interface { // Create the entry Create(interface{}) error // Update the entry using the fieldMaskPaths and setNullPaths, which are // Paths from field_mask.proto. fieldMaskPaths is required. setToNullPaths // is optional. fieldMaskPaths and setNullPaths cannot instersect and both // cannot be zero len. Update(entry interface{}, fieldMaskPaths, setToNullPaths []string) error // Delete the entry Delete(interface{}) error // contains filtered or unexported methods }
Writer interface for Entries
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package oplog_test provides some gorm helper funcs for testing oplog database integrations
|
Package oplog_test provides some gorm helper funcs for testing oplog database integrations |
Package store provides storage types/behavior for the oplog
|
Package store provides storage types/behavior for the oplog |