oplog

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2023 License: MPL-2.0 Imports: 32 Imported by: 0

README

oplog

oplog is a package for writing operation log (oplog) entries for the purpose of replication and verification of the data stored in the Boundary RDBMS.

Usage


// you must init the ticket in its own transaction.  You only need
// to init a ticket once in the database.  It doesn't need to happen for 
// every connection.  Once it's persistent in the database, you can simply Get it.
initTx := db.Begin()
ticketer := &GormTicketer{Tx: initTx}
err = ticketer.InitTicket("users")
// if there's no error, then commit the initialized ticket
initTx.Commit()

userCreate := oplog_test.TestUser{
  TestUser: oplog_test.TestUser{
    Name: loginName,
  },
}
tx := db.Begin()
// write the user to the database
tx.Create(&userCreate)

ticketer = &GormTicketer{Tx: db}

// get a ticket for writing users to the oplog
ticket, err := ticketer.GetTicket("users")

// create an entry for the oplog with the entry's metadata (likely the entry's Scope)
newLogEntry := NewEntry(
  "test-users",
  []Metadata{
    Metadata{
      Key:   "deployment",
      Value: "amex",
    },
    Metadata{
      Key:   "project",
      Value: "central-info-systems",
    },
  },
  cipherer, // wrapping.Wrapper
  ticketer,
)

// write an entry with N messages (variadic parameter) in the order they were sent to the database 
_, err = newLogEntry.WriteEntryWith(
    context.Background(), 
    &GormWriter{tx}, 
    ticket,
    &Message{Message: &userCreate, TypeName: "user", OpType: OpType_CREATE_OP},
)
// if there's an error writing the oplog then roll EVERYTHING back
if err != nil {
    tx.Rollback()
}
// no err means you can commit all the things.
tx.Commit()

TBD/TODO

We need to discuss and decide how Boundary is going to handle the following oplog things:

  • SQL migrations: you'll find the package's SQL migrations under: ./db/schema We need to decide how Boundary will manage migrations across the system and we will likely need to reference this package's migrations somehow.

oplog entry

  Example Oplog Entry for the Target Aggregate      
┌────────────────────────────────────────────────┐  
│                        FIFO with []byte buffer │  
│                                                │  
│┌─Msg 4────┐┌─Msg 3────┐┌─Msg 2────┐            │  
││          ││          ││          │            │  
││  Tags    ││  Host    ││ HostSet  │            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││        ││││        ││││        ││            │  
│││  Tag   ││││  Host  ││││HostSet ││            │  
│││protobuf││││protobuf││││protobuf││            │  
│││        ││││        ││││        ││            │  
│││        ││││        ││││        ││            │  
││└────────┘││└────────┘││└────────┘│            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││typeName││││typeName││││typeName││            │  
│││  Tag   ││││  Host  ││││HostSet ││            │  
││└────────┘││└────────┘││└────────┘│            │  
││┌────────┐││┌────────┐││┌────────┐│            │  
│││        ││││        ││││        ││            │  
│││ OpType ││││ OpType ││││ OpType ││            │  
│││ Create ││││ Create ││││ Create ││            │  
││└────────┘││└────────┘││└────────┘│            │  
│└──────────┘└──────────┘└──────────┘            │  
└────────────────────────────────────────────────┘  

oplog tables

                  oplog tables:                   
      as the diagram shows, we can split the      
      oplog_entries into multiple tables if       
             needed for performance.              
                                                  
┌────────────────┐                                
│┌───────────────┴┐                               
││┌───────────────┴┐            ┌────────────────┐
│││ oplog_entries  │            │  oplog_ticket  │
││├────────────────┤            ├────────────────┤
│││id              │╲           │id              │
│││aggregate_name  │──┼───────┼ │aggregate_name  │
└┤│data            │╱           │version         │
 └┤                │            │                │
  └────────────────┘            └────────────────┘
          ┼                                       
          │                                       
          │                                       
          ┼                                       
         ╱│╲                                      
 ┌────────────────┐                               
 │ oplog_metadata │                               
 ├────────────────┤                               
 │id              │                               
 │entry_id        │                               
 │key             │                               
 │value           │                               
 └────────────────┘                               

oplog optimistic locking using tickets

    Alice's                        Database                                               
  transaction                                                          Bob's transaction  
                                                                                          
     │                                 │                                      │           
     │─────────BEGIN──────────────────▶│                                      │           
     │                                 │◀───────────────BEGIN─────────────────┤           
     │                     ┌───────────┴───────────┐                          │           
     │                     │   ticket version:1    │                          │           
     │                     └───────────┬───────────┘                          │           
     │       Select oplog-ticket for   │                                      │           
     ├──────────────"Target"──────────▶│        Select oplog-ticket for       │           
     │                                 │◀──────────────"Target"───────────────┤           
     │                                 │                                      │           
     │      Write to Tables in         │                                      │           
     ├───────Target Aggregate─────────▶│                                      │           
     │                                 │         Write to Tables in           │           
     │                                 │◀─────────Target Aggregate────────────┤           
     │                                 │                                      │           
     │                                 │                                      │           
     │      Update Ticket              │                                      │           
     ├────Version = 2 where───────────▶│                                      │           
     │       Version = 1     ┌─────────┴─────────┐                            │           
     │                       │ ticket version: 2 │                            │           
     │                       └─────────┬─────────┘                            │           
     │                                 │              Update Ticket           │           
     │                                 │◀───────────Version = 2 where─────────┤           
     │                                 │               Version = 1       ┌────┴──────────┐
     │                                 │                                 │ update failed │
     ├────────────Commit──────────────▶│                                 └────┬──────────┘
     │                                 │                                      │           
     │                                 │                                      │           
     │                                 │                                      │           
     │                                 │◀────────────Rollback─────────────────┤           
     │                                 │                                      │           
     │                                 │                                      │           
     ```

Documentation

Overview

Package oplog is a package for writing operational log (oplog) entries for the purpose of replication and verification of the data stored in the Boundary RDBMS. please see README.md in this directory for more information.

Index

Constants

View Source
const DefaultAggregateName = "global"
View Source
const Version = "v2"

Version of oplog entries (among other things, it's used to manage upgrade compatibility when replicating)

v1: initial version
v2: adds the new Message.Opts

Variables

View Source
var (
	OpType_name = map[int32]string{
		0: "OP_TYPE_UNSPECIFIED",
		1: "OP_TYPE_CREATE",
		2: "OP_TYPE_UPDATE",
		3: "OP_TYPE_DELETE",
		4: "OP_TYPE_CREATE_ITEMS",
		5: "OP_TYPE_DELETE_ITEMS",
	}
	OpType_value = map[string]int32{
		"OP_TYPE_UNSPECIFIED":  0,
		"OP_TYPE_CREATE":       1,
		"OP_TYPE_UPDATE":       2,
		"OP_TYPE_DELETE":       3,
		"OP_TYPE_CREATE_ITEMS": 4,
		"OP_TYPE_DELETE_ITEMS": 5,
	}
)

Enum value maps for OpType.

View Source
var File_controller_storage_oplog_v1_any_operation_proto protoreflect.FileDescriptor

Functions

func TestOplogDeleteAllEntries added in v0.11.1

func TestOplogDeleteAllEntries(t testing.TB, conn *dbw.DB)

TestOplogDeleteAllEntries allows you to delete all the entries for testing.

Types

type AnyOperation

type AnyOperation struct {

	// type_name defines type of operation.
	TypeName string `protobuf:"bytes,1,opt,name=type_name,json=typeName,proto3" json:"type_name,omitempty"`
	// value are the bytes of a marshaled proto buff.
	Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
	// operation_type defines the type of database operation.
	OperationType OpType `` /* 141-byte string literal not displayed */
	// field_mask is the mask of fields to update.
	FieldMask *fieldmaskpb.FieldMask `protobuf:"bytes,4,opt,name=field_mask,json=fieldMask,proto3" json:"field_mask,omitempty"`
	// null_mask is the mask of fields to set to null.
	NullMask *fieldmaskpb.FieldMask `protobuf:"bytes,5,opt,name=null_mask,json=nullMask,proto3" json:"null_mask,omitempty"`
	// Options for the operations (see dbw package for definition/documentation of
	// options)
	Options *OperationOptions `protobuf:"bytes,6,opt,name=options,proto3" json:"options,omitempty"`
	// contains filtered or unexported fields
}

AnyOperation provides a message for anything and the type of operation it represents.

func (*AnyOperation) Descriptor deprecated

func (*AnyOperation) Descriptor() ([]byte, []int)

Deprecated: Use AnyOperation.ProtoReflect.Descriptor instead.

func (*AnyOperation) GetFieldMask

func (x *AnyOperation) GetFieldMask() *fieldmaskpb.FieldMask

func (*AnyOperation) GetNullMask

func (x *AnyOperation) GetNullMask() *fieldmaskpb.FieldMask

func (*AnyOperation) GetOperationType

func (x *AnyOperation) GetOperationType() OpType

func (*AnyOperation) GetOptions added in v0.7.4

func (x *AnyOperation) GetOptions() *OperationOptions

func (*AnyOperation) GetTypeName

func (x *AnyOperation) GetTypeName() string

func (*AnyOperation) GetValue

func (x *AnyOperation) GetValue() []byte

func (*AnyOperation) ProtoMessage

func (*AnyOperation) ProtoMessage()

func (*AnyOperation) ProtoReflect

func (x *AnyOperation) ProtoReflect() protoreflect.Message

func (*AnyOperation) Reset

func (x *AnyOperation) Reset()

func (*AnyOperation) String

func (x *AnyOperation) String() string

type Column added in v0.7.4

type Column struct {

	// name of the column
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	// table name of the column
	Table string `protobuf:"bytes,2,opt,name=table,proto3" json:"table,omitempty"`
	// contains filtered or unexported fields
}

Column represents a table Column

func (*Column) Descriptor deprecated added in v0.7.4

func (*Column) Descriptor() ([]byte, []int)

Deprecated: Use Column.ProtoReflect.Descriptor instead.

func (*Column) GetName added in v0.7.4

func (x *Column) GetName() string

func (*Column) GetTable added in v0.7.4

func (x *Column) GetTable() string

func (*Column) ProtoMessage added in v0.7.4

func (*Column) ProtoMessage()

func (*Column) ProtoReflect added in v0.7.4

func (x *Column) ProtoReflect() protoreflect.Message

func (*Column) Reset added in v0.7.4

func (x *Column) Reset()

func (*Column) String added in v0.7.4

func (x *Column) String() string

type ColumnValue added in v0.7.4

type ColumnValue struct {

	// name of the column
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	// value of the column
	//
	// Types that are assignable to Value:
	//
	//	*ColumnValue_Raw
	//	*ColumnValue_ExprValue
	//	*ColumnValue_Column
	Value isColumnValue_Value `protobuf_oneof:"value"`
	// contains filtered or unexported fields
}

ColumnValue defines a column value

func (*ColumnValue) Descriptor deprecated added in v0.7.4

func (*ColumnValue) Descriptor() ([]byte, []int)

Deprecated: Use ColumnValue.ProtoReflect.Descriptor instead.

func (*ColumnValue) GetColumn added in v0.7.4

func (x *ColumnValue) GetColumn() *Column

func (*ColumnValue) GetExprValue added in v0.7.4

func (x *ColumnValue) GetExprValue() *ExprValue

func (*ColumnValue) GetName added in v0.7.4

func (x *ColumnValue) GetName() string

func (*ColumnValue) GetRaw added in v0.7.4

func (x *ColumnValue) GetRaw() *structpb.Value

func (*ColumnValue) GetValue added in v0.7.4

func (m *ColumnValue) GetValue() isColumnValue_Value

func (*ColumnValue) ProtoMessage added in v0.7.4

func (*ColumnValue) ProtoMessage()

func (*ColumnValue) ProtoReflect added in v0.7.4

func (x *ColumnValue) ProtoReflect() protoreflect.Message

func (*ColumnValue) Reset added in v0.7.4

func (x *ColumnValue) Reset()

func (*ColumnValue) String added in v0.7.4

func (x *ColumnValue) String() string

type ColumnValue_Column added in v0.7.4

type ColumnValue_Column struct {
	Column *Column `protobuf:"bytes,4,opt,name=column,proto3,oneof"`
}

type ColumnValue_ExprValue added in v0.7.4

type ColumnValue_ExprValue struct {
	ExprValue *ExprValue `protobuf:"bytes,3,opt,name=expr_value,json=exprValue,proto3,oneof"`
}

type ColumnValue_Raw added in v0.7.4

type ColumnValue_Raw struct {
	Raw *structpb.Value `protobuf:"bytes,2,opt,name=raw,proto3,oneof"`
}

type ColumnValues added in v0.7.4

type ColumnValues struct {

	// values are the values of the columns
	Values []*ColumnValue `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"`
	// contains filtered or unexported fields
}

ColumnValues defines a set of column value properies

func (*ColumnValues) Descriptor deprecated added in v0.7.4

func (*ColumnValues) Descriptor() ([]byte, []int)

Deprecated: Use ColumnValues.ProtoReflect.Descriptor instead.

func (*ColumnValues) GetValues added in v0.7.4

func (x *ColumnValues) GetValues() []*ColumnValue

func (*ColumnValues) ProtoMessage added in v0.7.4

func (*ColumnValues) ProtoMessage()

func (*ColumnValues) ProtoReflect added in v0.7.4

func (x *ColumnValues) ProtoReflect() protoreflect.Message

func (*ColumnValues) Reset added in v0.7.4

func (x *ColumnValues) Reset()

func (*ColumnValues) String added in v0.7.4

func (x *ColumnValues) String() string

type Columns added in v0.7.4

type Columns struct {

	// name of the columns
	Names []string `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"`
	// contains filtered or unexported fields
}

Columns defines a set of column properties

func (*Columns) Descriptor deprecated added in v0.7.4

func (*Columns) Descriptor() ([]byte, []int)

Deprecated: Use Columns.ProtoReflect.Descriptor instead.

func (*Columns) GetNames added in v0.7.4

func (x *Columns) GetNames() []string

func (*Columns) ProtoMessage added in v0.7.4

func (*Columns) ProtoMessage()

func (*Columns) ProtoReflect added in v0.7.4

func (x *Columns) ProtoReflect() protoreflect.Message

func (*Columns) Reset added in v0.7.4

func (x *Columns) Reset()

func (*Columns) String added in v0.7.4

func (x *Columns) String() string

type DbwTicketer added in v0.7.4

type DbwTicketer struct {
	// contains filtered or unexported fields
}

DbwTicketer defines a ticketer that uses the dbw pkg for database operations.

func NewTicketer added in v0.7.4

func NewTicketer(ctx context.Context, tx *dbw.DB, opt ...Option) (*DbwTicketer, error)

NewTicketer creates a new ticketer that uses dbw for storage

func (*DbwTicketer) GetTicket added in v0.7.4

func (ticketer *DbwTicketer) GetTicket(ctx context.Context, aggregateName string) (*store.Ticket, error)

GetTicket returns a ticket for the specified name. You MUST GetTicket in the same transaction that you're using to write to the database tables. Names allow us to shard tickets around domain root names

func (*DbwTicketer) Redeem added in v0.7.4

func (ticketer *DbwTicketer) Redeem(ctx context.Context, t *store.Ticket) error

Redeem will attempt to redeem the ticket. If the ticket version has already been used, then an error is returned

type Entry

type Entry struct {
	*store.Entry
	Wrapper  wrapping.Wrapper `gorm:"-"`
	Ticketer Ticketer         `gorm:"-"`
}

Entry represents an oplog entry

func NewEntry

func NewEntry(ctx context.Context, aggregateName string, metadata Metadata, wrapper wrapping.Wrapper, ticketer Ticketer) (*Entry, error)

NewEntry creates a new Entry

func (*Entry) DecryptData

func (e *Entry) DecryptData(ctx context.Context) error

DecryptData will decrypt the entry's data using its Wrapper (wrapping.Wrapper)

func (*Entry) Replay

func (e *Entry) Replay(ctx context.Context, tx *Writer, types *TypeCatalog, tableSuffix string) error

Replay provides the ability to replay an entry. you must initialize any new tables ending with the tableSuffix before calling Replay, otherwise you'll get "a table doesn't exist" error.

func (*Entry) UnmarshalData

func (e *Entry) UnmarshalData(ctx context.Context, types *TypeCatalog) ([]Message, error)

UnmarshalData the data attribute from []byte (treated as a FIFO QueueBuffer) to a []proto.Message

func (*Entry) Write

func (e *Entry) Write(ctx context.Context, tx *Writer, ticket *store.Ticket) error

Write the entry as is with whatever it has for e.Data marshaled into a FIFO QueueBuffer If Wrapper != nil then the data is authentication encrypted

func (*Entry) WriteEntryWith

func (e *Entry) WriteEntryWith(ctx context.Context, tx *Writer, ticket *store.Ticket, msgs ...*Message) error

WriteEntryWith the []proto.Message marshaled into the entry data as a FIFO QueueBuffer if Wrapper != nil then the data is authentication encrypted

type ExprValue added in v0.7.4

type ExprValue struct {

	// sql is the sql clause of the expr
	Sql string `protobuf:"bytes,1,opt,name=sql,proto3" json:"sql,omitempty"`
	// args are the sql args of the expr
	Args []*structpb.Value `protobuf:"bytes,2,rep,name=args,proto3" json:"args,omitempty"`
	// contains filtered or unexported fields
}

ExprValue defines an expr value that can be used as a column value

func (*ExprValue) Descriptor deprecated added in v0.7.4

func (*ExprValue) Descriptor() ([]byte, []int)

Deprecated: Use ExprValue.ProtoReflect.Descriptor instead.

func (*ExprValue) GetArgs added in v0.7.4

func (x *ExprValue) GetArgs() []*structpb.Value

func (*ExprValue) GetSql added in v0.7.4

func (x *ExprValue) GetSql() string

func (*ExprValue) ProtoMessage added in v0.7.4

func (*ExprValue) ProtoMessage()

func (*ExprValue) ProtoReflect added in v0.7.4

func (x *ExprValue) ProtoReflect() protoreflect.Message

func (*ExprValue) Reset added in v0.7.4

func (x *ExprValue) Reset()

func (*ExprValue) String added in v0.7.4

func (x *ExprValue) String() string

type Message

type Message struct {
	proto.Message
	TypeName       string
	OpType         OpType
	FieldMaskPaths []string
	SetToNullPaths []string
	Opts           []dbw.Option
}

Message wraps a proto.Message with some other bits like operation type, paths and options.

type Metadata

type Metadata map[string][]string

Metadata provides meta information about the Entry

type OpType

type OpType int32

OpType provides the type of database operation the Any message represents (create, update, delete)

const (
	// OP_TYPE_UNSPECIFIED defines an unspecified operation.
	OpType_OP_TYPE_UNSPECIFIED OpType = 0
	// OP_TYPE_CREATE defines a create operation.
	OpType_OP_TYPE_CREATE OpType = 1
	// OP_TYPE_UPDATE defines an update operation.
	OpType_OP_TYPE_UPDATE OpType = 2
	// OP_TYPE_DELETE defines a delete operation.
	OpType_OP_TYPE_DELETE OpType = 3
	// OP_TYPE_CREATE_ITEMS defines a create operation for multiple items.
	OpType_OP_TYPE_CREATE_ITEMS OpType = 4
	// OP_TYPE_DELETE_ITEMS defines a delete operation for multiple items.
	OpType_OP_TYPE_DELETE_ITEMS OpType = 5
)

func (OpType) Descriptor

func (OpType) Descriptor() protoreflect.EnumDescriptor

func (OpType) Enum

func (x OpType) Enum() *OpType

func (OpType) EnumDescriptor deprecated

func (OpType) EnumDescriptor() ([]byte, []int)

Deprecated: Use OpType.Descriptor instead.

func (OpType) Number

func (x OpType) Number() protoreflect.EnumNumber

func (OpType) String

func (x OpType) String() string

func (OpType) Type

func (OpType) Type() protoreflect.EnumType

type OperationOptions added in v0.7.4

type OperationOptions struct {

	// with_version (see dbw package for docs)
	WithVersion *wrapperspb.UInt32Value `protobuf:"bytes,1,opt,name=with_version,json=withVersion,proto3" json:"with_version,omitempty"`
	// with_skip_vet_for_write (see dbw package for docs)
	WithSkipVetForWrite bool `protobuf:"varint,2,opt,name=with_skip_vet_for_write,json=withSkipVetForWrite,proto3" json:"with_skip_vet_for_write,omitempty"`
	// with_where_clause (see dbw package for docs)
	WithWhereClause string `protobuf:"bytes,3,opt,name=with_where_clause,json=withWhereClause,proto3" json:"with_where_clause,omitempty"`
	// with_where_clause_args (see dbw package for docs)
	WithWhereClauseArgs []*structpb.Value `protobuf:"bytes,4,rep,name=with_where_clause_args,json=withWhereClauseArgs,proto3" json:"with_where_clause_args,omitempty"`
	// with_on_conflict (see dbw package for docs)
	WithOnConflict *WithOnConflict `protobuf:"bytes,5,opt,name=with_on_conflict,json=withOnConflict,proto3" json:"with_on_conflict,omitempty"`
	// contains filtered or unexported fields
}

OperationOptions represent operations options which can/will affect the oplog write operation. These options are a subset of the dbw.Options. We will not try to keep the docs in-sync from the dbw package, so if you need more information on what the option does please see the dbw package docs.

func (*OperationOptions) Descriptor deprecated added in v0.7.4

func (*OperationOptions) Descriptor() ([]byte, []int)

Deprecated: Use OperationOptions.ProtoReflect.Descriptor instead.

func (*OperationOptions) GetWithOnConflict added in v0.7.4

func (x *OperationOptions) GetWithOnConflict() *WithOnConflict

func (*OperationOptions) GetWithSkipVetForWrite added in v0.7.4

func (x *OperationOptions) GetWithSkipVetForWrite() bool

func (*OperationOptions) GetWithVersion added in v0.7.4

func (x *OperationOptions) GetWithVersion() *wrapperspb.UInt32Value

func (*OperationOptions) GetWithWhereClause added in v0.7.4

func (x *OperationOptions) GetWithWhereClause() string

func (*OperationOptions) GetWithWhereClauseArgs added in v0.7.4

func (x *OperationOptions) GetWithWhereClauseArgs() []*structpb.Value

func (*OperationOptions) ProtoMessage added in v0.7.4

func (*OperationOptions) ProtoMessage()

func (*OperationOptions) ProtoReflect added in v0.7.4

func (x *OperationOptions) ProtoReflect() protoreflect.Message

func (*OperationOptions) Reset added in v0.7.4

func (x *OperationOptions) Reset()

func (*OperationOptions) String added in v0.7.4

func (x *OperationOptions) String() string

type Option

type Option func(Options)

Option - how Options are passed as arguments

func WithAggregateNames

func WithAggregateNames(enabled bool) Option

WithAggregateNames enables/disables the use of multiple aggregate names for Ticketers

func WithFieldMaskPaths

func WithFieldMaskPaths(fieldMaskPaths []string) Option

WithFieldMaskPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be updated. (see google.golang.org/genproto/protobuf/field_mask)

func WithOperationOptions added in v0.7.4

func WithOperationOptions(opt ...dbw.Option) Option

WithOperationOptions represents an optional set dbw.Options. (see the dbw package for more info on the options)

func WithSetToNullPaths

func WithSetToNullPaths(setToNullPaths []string) Option

WithSetToNullPaths represents an optional set of symbolic field paths (for example: "f.a", "f.b.d") used to specify a subset of fields that should be set to null. (see google.golang.org/genproto/protobuf/field_mask)

type Options

type Options map[string]any

Options = how options are represented

func GetOpts

func GetOpts(opt ...Option) Options

GetOpts - iterate the inbound Options and return a struct

type Queue

type Queue struct {
	// Buffer for the queue
	bytes.Buffer

	// Catalog provides a TypeCatalog for the types added to the queue
	Catalog *TypeCatalog
	// contains filtered or unexported fields
}

Queue provides a FIFO queue

type ReplayableMessage

type ReplayableMessage interface {
	// TableName returns the table name of the resource
	TableName() string
	// SetTableName sets the table name of the resource
	SetTableName(name string)
}

ReplayableMessage defines an interface for messages that can be replayed from the oplog entries. we need to be able to replay into different table names.

type Ticketer

type Ticketer interface {
	// GetTicket returns a ticket for the specified name.  You MUST GetTicket in the same transaction
	// that you're using to write to the database tables. Names allow us to shard tickets around domain root names.
	// Before getting a ticket you must insert it with it's name into the oplog_ticket table.  This is done via a
	// db migrations script.  Requiring this insert as part of migrations ensures that the tickets are initialized in
	// a separate transaction from when a client calls GetTicket(aggregateName) which is critical for the optimized locking
	// pattern to work properly
	GetTicket(ctx context.Context, aggregateName string) (*store.Ticket, error)

	// Redeem ticket will attempt to redeem the ticket and ensure it's serialized with other tickets using the same
	// aggregate name
	Redeem(ctx context.Context, ticket *store.Ticket) error
}

Ticketer provides an interface to storage for Tickets, so you can easily substitute your own ticketer

type Type

type Type struct {
	// Interface of the type
	Interface any
	// Name for the interface
	Name string
}

Type provides the ability to associate an interface with a Type.Name which will decouple the interface from it's reflection type string, so you can refactor the type name without breaking the catalog

type TypeCatalog

type TypeCatalog map[string]reflect.Type

TypeCatalog is an abstraction for dealing with oplog data and their underlying types

func NewTypeCatalog

func NewTypeCatalog(ctx context.Context, withTypes ...Type) (*TypeCatalog, error)

NewTypeCatalog creates a catalog with the types you pass in

func (TypeCatalog) Get

func (t TypeCatalog) Get(ctx context.Context, typeName string) (any, error)

Get retrieves the interface via a name

func (*TypeCatalog) GetTypeName

func (t *TypeCatalog) GetTypeName(ctx context.Context, i any) (string, error)

GetTypeName returns the interface's name from the catalog

func (TypeCatalog) Set

func (t TypeCatalog) Set(ctx context.Context, i any, typeName string) error

Set creates an entry in the catalog for the interface

type WithOnConflict added in v0.7.4

type WithOnConflict struct {

	// target defines the on conflict target
	//
	// Types that are assignable to Target:
	//
	//	*WithOnConflict_Constraint
	//	*WithOnConflict_Columns
	Target isWithOnConflict_Target `protobuf_oneof:"target"`
	// action defines the on conflict action
	//
	// Types that are assignable to Action:
	//
	//	*WithOnConflict_DoNothing
	//	*WithOnConflict_UpdateAll
	//	*WithOnConflict_ColumnValues
	Action isWithOnConflict_Action `protobuf_oneof:"action"`
	// contains filtered or unexported fields
}

WithOnConflict defines the parameters needed for an sql "on conflict clause"

func (*WithOnConflict) Descriptor deprecated added in v0.7.4

func (*WithOnConflict) Descriptor() ([]byte, []int)

Deprecated: Use WithOnConflict.ProtoReflect.Descriptor instead.

func (*WithOnConflict) GetAction added in v0.7.4

func (m *WithOnConflict) GetAction() isWithOnConflict_Action

func (*WithOnConflict) GetColumnValues added in v0.7.4

func (x *WithOnConflict) GetColumnValues() *ColumnValues

func (*WithOnConflict) GetColumns added in v0.7.4

func (x *WithOnConflict) GetColumns() *Columns

func (*WithOnConflict) GetConstraint added in v0.7.4

func (x *WithOnConflict) GetConstraint() string

func (*WithOnConflict) GetDoNothing added in v0.7.4

func (x *WithOnConflict) GetDoNothing() bool

func (*WithOnConflict) GetTarget added in v0.7.4

func (m *WithOnConflict) GetTarget() isWithOnConflict_Target

func (*WithOnConflict) GetUpdateAll added in v0.7.4

func (x *WithOnConflict) GetUpdateAll() bool

func (*WithOnConflict) ProtoMessage added in v0.7.4

func (*WithOnConflict) ProtoMessage()

func (*WithOnConflict) ProtoReflect added in v0.7.4

func (x *WithOnConflict) ProtoReflect() protoreflect.Message

func (*WithOnConflict) Reset added in v0.7.4

func (x *WithOnConflict) Reset()

func (*WithOnConflict) String added in v0.7.4

func (x *WithOnConflict) String() string

type WithOnConflict_ColumnValues added in v0.7.4

type WithOnConflict_ColumnValues struct {
	// column_values defines on conflict action with the columns to update
	ColumnValues *ColumnValues `protobuf:"bytes,52,opt,name=column_values,json=columnValues,proto3,oneof"`
}

type WithOnConflict_Columns added in v0.7.4

type WithOnConflict_Columns struct {
	// columns are the on conflict columns
	Columns *Columns `protobuf:"bytes,11,opt,name=columns,proto3,oneof"`
}

type WithOnConflict_Constraint added in v0.7.4

type WithOnConflict_Constraint struct {
	// constraint is the on conflict constraint
	Constraint string `protobuf:"bytes,10,opt,name=constraint,proto3,oneof"`
}

type WithOnConflict_DoNothing added in v0.7.4

type WithOnConflict_DoNothing struct {
	// do_nothing defines an on conflict action of do nothing
	DoNothing bool `protobuf:"varint,50,opt,name=do_nothing,json=doNothing,proto3,oneof"`
}

type WithOnConflict_UpdateAll added in v0.7.4

type WithOnConflict_UpdateAll struct {
	// update_all defines an on conflict action of updating all the columns
	UpdateAll bool `protobuf:"varint,51,opt,name=update_all,json=updateAll,proto3,oneof"`
}

type Writer

type Writer struct {
	*dbw.DB
}

Writer provides a database writer for oplog operations

Directories

Path Synopsis
Package oplog_test provides some gorm helper funcs for testing oplog database integrations
Package oplog_test provides some gorm helper funcs for testing oplog database integrations
Package store provides storage types/behavior for the oplog
Package store provides storage types/behavior for the oplog

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL