cdc

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 10, 2022 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (

	// CDCLogTag is a tag to use for logging.
	CDCLogTag = "CDC"
)
View Source
const (
	// OutboxVersion specifies the schema version
	// used to store data in outbox table.
	OutboxVersion = 1
)

Variables

This section is empty.

Functions

This section is empty.

Types

type AsyncTxParser

type AsyncTxParser struct {
	VulnDBClient vulndb.Client
	JobsRunner   *api.JobsRunner
	// contains filtered or unexported fields
}

AsyncTxParser implements a CDC log parser to handle distributed transactions for VulnDB and other API asynchronous jobs.

func NewAsyncTxParser

func NewAsyncTxParser(vulnDBClient vulndb.Client, jobsRunner *api.JobsRunner, logger log.Logger) *AsyncTxParser

NewAsyncTxParser builds a new CDC log parser to handle distributed transactions for VulnDB and other API asynchronous jobs.

func (*AsyncTxParser) Parse

func (p *AsyncTxParser) Parse(log []Event) (nParsed uint)

Parse parses the log secuentially processing each event based on its action and returns the number of events that have been processed correctly. If an error happens during processing of one event, and it is not a permanent error, log processing is stopped. If a permanent error happens during processing of one event or event has reached max processing attempts, that event is discarded counting as if it was processed.

type BrokerProxy

type BrokerProxy struct {
	// contains filtered or unexported fields
}

BrokerProxy is a proxy applied to the storage component which acts as a broker following Change Data Capture pattern.

func NewBrokerProxy

func NewBrokerProxy(logger log.Logger, db DB, store api.VulcanitoStore,
	parser Parser) *BrokerProxy

NewBrokerProxy builds a new CDC broker proxy around VulcanitoStore.

func (*BrokerProxy) Close

func (b *BrokerProxy) Close() error

func (*BrokerProxy) CountAssetsInGroups

func (b *BrokerProxy) CountAssetsInGroups(teamID string, groupIDs []string) (int, error)

func (*BrokerProxy) CreateAsset

func (b *BrokerProxy) CreateAsset(asset api.Asset, groups []api.Group) (*api.Asset, error)

func (*BrokerProxy) CreateAssetAnnotations

func (b *BrokerProxy) CreateAssetAnnotations(teamID string, assetID string, annotations []*api.AssetAnnotation) ([]*api.AssetAnnotation, error)

func (*BrokerProxy) CreateAssets

func (b *BrokerProxy) CreateAssets(assets []api.Asset, groups []api.Group, annotations []*api.AssetAnnotation) ([]api.Asset, error)

func (*BrokerProxy) CreateChecktypeSetting

func (b *BrokerProxy) CreateChecktypeSetting(setting api.ChecktypeSetting) (*api.ChecktypeSetting, error)

func (*BrokerProxy) CreateFindingOverwrite

func (b *BrokerProxy) CreateFindingOverwrite(findingOverwrite api.FindingOverwrite) error

func (*BrokerProxy) CreateGroup

func (b *BrokerProxy) CreateGroup(group api.Group) (*api.Group, error)

func (*BrokerProxy) CreatePolicy

func (b *BrokerProxy) CreatePolicy(policy api.Policy) (*api.Policy, error)

func (*BrokerProxy) CreateProgram

func (b *BrokerProxy) CreateProgram(program api.Program, teamID string) (*api.Program, error)

func (*BrokerProxy) CreateTeam

func (b *BrokerProxy) CreateTeam(team api.Team, ownerEmail string) (*api.Team, error)

func (*BrokerProxy) CreateTeamMember

func (b *BrokerProxy) CreateTeamMember(teamMember api.UserTeam) (*api.UserTeam, error)

func (*BrokerProxy) CreateUser

func (b *BrokerProxy) CreateUser(user api.User) (*api.User, error)

func (*BrokerProxy) CreateUserIfNotExists

func (b *BrokerProxy) CreateUserIfNotExists(userData saml.UserData) error

func (*BrokerProxy) DeleteAllAssets

func (b *BrokerProxy) DeleteAllAssets(teamID string) error

func (*BrokerProxy) DeleteAsset

func (b *BrokerProxy) DeleteAsset(asset api.Asset) error

func (*BrokerProxy) DeleteAssetAnnotations

func (b *BrokerProxy) DeleteAssetAnnotations(teamID string, assetID string, annotations []*api.AssetAnnotation) error

func (*BrokerProxy) DeleteChecktypeSetting

func (b *BrokerProxy) DeleteChecktypeSetting(checktypeSettingID string) error

func (*BrokerProxy) DeleteGroup

func (b *BrokerProxy) DeleteGroup(group api.Group) error

func (*BrokerProxy) DeletePolicy

func (b *BrokerProxy) DeletePolicy(policy api.Policy) error

func (*BrokerProxy) DeleteProgram

func (b *BrokerProxy) DeleteProgram(program api.Program, teamID string) error

func (*BrokerProxy) DeleteProgramMetadata

func (b *BrokerProxy) DeleteProgramMetadata(program string) error

func (*BrokerProxy) DeleteTeam

func (b *BrokerProxy) DeleteTeam(teamID string) error

func (*BrokerProxy) DeleteTeamMember

func (b *BrokerProxy) DeleteTeamMember(teamID string, userID string) error

func (*BrokerProxy) DeleteUserByID

func (b *BrokerProxy) DeleteUserByID(userID string) error

func (*BrokerProxy) DisjoinAssetsInGroups

func (b *BrokerProxy) DisjoinAssetsInGroups(teamID, inGroupID string, notInGroupIDs []string) ([]*api.Asset, error)

func (*BrokerProxy) FindAsset

func (b *BrokerProxy) FindAsset(teamID, assetID string) (*api.Asset, error)

func (*BrokerProxy) FindChecktypeSetting

func (b *BrokerProxy) FindChecktypeSetting(checktypeSettingID string) (*api.ChecktypeSetting, error)

func (*BrokerProxy) FindGlobalProgramMetadata

func (b *BrokerProxy) FindGlobalProgramMetadata(programID string, teamID string) (*api.GlobalProgramsMetadata, error)

func (*BrokerProxy) FindGroup

func (b *BrokerProxy) FindGroup(group api.Group) (*api.Group, error)

func (*BrokerProxy) FindGroupInfo

func (b *BrokerProxy) FindGroupInfo(group api.Group) (*api.Group, error)

func (*BrokerProxy) FindJob

func (b *BrokerProxy) FindJob(jobID string) (*api.Job, error)

func (*BrokerProxy) FindPolicy

func (b *BrokerProxy) FindPolicy(policyID string) (*api.Policy, error)

func (*BrokerProxy) FindProgram

func (b *BrokerProxy) FindProgram(programID string, teamID string) (*api.Program, error)

func (*BrokerProxy) FindTeam

func (b *BrokerProxy) FindTeam(teamID string) (*api.Team, error)

func (*BrokerProxy) FindTeamByIDForUser

func (b *BrokerProxy) FindTeamByIDForUser(ID, userID string) (*api.UserTeam, error)

func (*BrokerProxy) FindTeamByName

func (b *BrokerProxy) FindTeamByName(name string) (*api.Team, error)

func (*BrokerProxy) FindTeamByProgram

func (b *BrokerProxy) FindTeamByProgram(programID string) (*api.Team, error)

func (*BrokerProxy) FindTeamByTag

func (b *BrokerProxy) FindTeamByTag(tag string) (*api.Team, error)

func (*BrokerProxy) FindTeamMember

func (b *BrokerProxy) FindTeamMember(teamID string, userID string) (*api.UserTeam, error)

func (*BrokerProxy) FindTeamsByUser

func (b *BrokerProxy) FindTeamsByUser(userID string) ([]*api.Team, error)

func (*BrokerProxy) FindUserByEmail

func (b *BrokerProxy) FindUserByEmail(email string) (*api.User, error)

func (*BrokerProxy) FindUserByID

func (b *BrokerProxy) FindUserByID(userID string) (*api.User, error)

func (*BrokerProxy) GetAssetType

func (b *BrokerProxy) GetAssetType(assetTypeName string) (*api.AssetType, error)

func (*BrokerProxy) GroupAsset

func (b *BrokerProxy) GroupAsset(assetsGroup api.AssetGroup, teamID string) (*api.AssetGroup, error)

func (*BrokerProxy) Healthcheck

func (b *BrokerProxy) Healthcheck() error

func (*BrokerProxy) ListAssetAnnotations

func (b *BrokerProxy) ListAssetAnnotations(teamID string, assetID string) ([]*api.AssetAnnotation, error)

Asset Annotations

func (*BrokerProxy) ListAssetGroup

func (b *BrokerProxy) ListAssetGroup(assetGroup api.AssetGroup, teamID string) ([]*api.AssetGroup, error)

func (*BrokerProxy) ListAssets

func (b *BrokerProxy) ListAssets(teamID string, asset api.Asset) ([]*api.Asset, error)

func (*BrokerProxy) ListChecktypeSetting

func (b *BrokerProxy) ListChecktypeSetting(policyID string) ([]*api.ChecktypeSetting, error)

func (*BrokerProxy) ListFindingOverwrites

func (b *BrokerProxy) ListFindingOverwrites(findingID string) ([]*api.FindingOverwrite, error)

func (*BrokerProxy) ListGroups

func (b *BrokerProxy) ListGroups(teamID, groupName string) ([]*api.Group, error)

func (*BrokerProxy) ListPolicies

func (b *BrokerProxy) ListPolicies(teamID string) ([]*api.Policy, error)

func (*BrokerProxy) ListPrograms

func (b *BrokerProxy) ListPrograms(teamID string) ([]*api.Program, error)

func (*BrokerProxy) ListRecipients

func (b *BrokerProxy) ListRecipients(teamID string) ([]*api.Recipient, error)

func (*BrokerProxy) ListTeams

func (b *BrokerProxy) ListTeams() ([]*api.Team, error)

func (*BrokerProxy) ListUsers

func (b *BrokerProxy) ListUsers() ([]*api.User, error)

func (*BrokerProxy) MergeAssets

func (b *BrokerProxy) MergeAssets(mergeOps api.AssetMergeOperations) error

func (*BrokerProxy) MergeAssetsAsync

func (b *BrokerProxy) MergeAssetsAsync(teamID string, assets []api.Asset, groupName string) (*api.Job, error)

func (*BrokerProxy) NotFoundError

func (b *BrokerProxy) NotFoundError(err error) bool

func (*BrokerProxy) PutAssetAnnotations

func (b *BrokerProxy) PutAssetAnnotations(teamID string, assetID string, annotations []*api.AssetAnnotation) ([]*api.AssetAnnotation, error)

func (*BrokerProxy) UngroupAssets

func (b *BrokerProxy) UngroupAssets(assetGroup api.AssetGroup, teamID string) error

func (*BrokerProxy) UpdateAsset

func (b *BrokerProxy) UpdateAsset(asset api.Asset) (*api.Asset, error)

func (*BrokerProxy) UpdateAssetAnnotations

func (b *BrokerProxy) UpdateAssetAnnotations(teamID string, assetID string, annotations []*api.AssetAnnotation) ([]*api.AssetAnnotation, error)

func (*BrokerProxy) UpdateChecktypeSetting

func (b *BrokerProxy) UpdateChecktypeSetting(checktypeSetting api.ChecktypeSetting) (*api.ChecktypeSetting, error)

func (*BrokerProxy) UpdateGroup

func (b *BrokerProxy) UpdateGroup(group api.Group) (*api.Group, error)

func (*BrokerProxy) UpdateJob

func (b *BrokerProxy) UpdateJob(job api.Job) (*api.Job, error)

func (*BrokerProxy) UpdatePolicy

func (b *BrokerProxy) UpdatePolicy(policy api.Policy) (*api.Policy, error)

func (*BrokerProxy) UpdateProgram

func (b *BrokerProxy) UpdateProgram(program api.Program, teamID string) (*api.Program, error)

func (*BrokerProxy) UpdateRecipients

func (b *BrokerProxy) UpdateRecipients(teamID string, emails []string) error

func (*BrokerProxy) UpdateTeam

func (b *BrokerProxy) UpdateTeam(team api.Team) (*api.Team, error)

func (*BrokerProxy) UpdateTeamMember

func (b *BrokerProxy) UpdateTeamMember(teamMember api.UserTeam) (*api.UserTeam, error)

func (*BrokerProxy) UpdateUser

func (b *BrokerProxy) UpdateUser(user api.User) (*api.User, error)

func (*BrokerProxy) UpsertGlobalProgramMetadata

func (b *BrokerProxy) UpsertGlobalProgramMetadata(teamID, program string, defaultAutosend bool, defaultDisabled bool, defaultCron string, autosend *bool, disabled *bool, cron *string) error

type DB

type DB interface {
	GetLog() ([]Event, error)
	FailedEvent(event Event) error
	CleanEvent(event Event) error
	CleanLog(nEntries uint) error
	TryGetLock(id uint32) (*Lock, error)
	ReleaseLock(l *Lock) error
}

DB represents a database handle to perform CDC related operations synchronized across different instances.

type Event

type Event interface {
	ID() string
	Action() string
	Version() int
	Data() []byte
	ReadCount() int
}

Event represents an event retrieved from CDC log.

  • ID returns the event identifier.
  • Action returns the action related with a CBC event.
  • Version returns the schema version for data.
  • Data returns the data associated with the event.
  • ReadCount returns the number of times event has been read.

type Lock

type Lock struct {
	Acquired bool
	Tx       *sql.Tx
}

Lock represents an advisory lock

type OpCreateAssetDTO

type OpCreateAssetDTO struct {
	Asset api.Asset `json:"asset"`
}

OpCreateAssetDTO represents the data to store as part of CDC log for a CreateAsset operation.

type OpDeleteAllAssetsDTO

type OpDeleteAllAssetsDTO struct {
	Team api.Team `json:"team"`
}

OpDeleteAllAssetsDTO represents the data to store as part of CDC log for a DeleteAllAssets operation.

type OpDeleteAssetDTO

type OpDeleteAssetDTO struct {
	Asset api.Asset `json:"asset"`
	// DupAssets is the number of assets
	// which have the same identifier in
	// the same team as Asset
	DupAssets int `json:"duplicates"`
}

OpDeleteAssetDTO represents the data to store as part of CDC log for a DeleteAsset operation.

type OpDeleteTeamDTO

type OpDeleteTeamDTO struct {
	Team api.Team `json:"team"`
}

OpDeleteTeamDTO represents the data to store as part of CDC log for a DeleteTeam operation.

type OpFindingOverwriteDTO

type OpFindingOverwriteDTO struct {
	FindingOverwrite api.FindingOverwrite `json:"finding_overwrite"`
}

OpFindingOverwriteDTO represents the data to store as part of CDC log for a FindingOverwrite operation.

type OpMergeDiscoveredAssetsDTO

type OpMergeDiscoveredAssetsDTO struct {
	TeamID    string      `json:"team_id"`
	Assets    []api.Asset `json:"assets"`
	GroupName string      `json:"group_name"`
	JobID     string      `json:"job_id"`
}

OpMergeDiscoveredAssetsDTO represents the data to store as part of CDC log for a MergeDiscoveredAsset operation.

type OpUpdateAssetDTO

type OpUpdateAssetDTO struct {
	OldAsset api.Asset `json:"old_asset"`
	NewAsset api.Asset `json:"new_asset"`
	// DupAssets is the number of assets
	// which have the same identifier as
	// OldAsset for the same team.
	DupAssets int `json:"duplicates"`
}

OpUpdateAssetDTO represents the data to store as part of CDC log for a UpdateAsset operation.

type Outbox

type Outbox struct {
	Identifier string `gorm:"column:id"`
	Operation  string
	SchemaVer  int    `gorm:"column:version"`
	DTO        []byte `gorm:"column:data"`
	Retries    int
	CreatedAt  time.Time
	UpdatedAt  time.Time
}

Outbox represents an entry in the outbox table.

func (Outbox) Action

func (o Outbox) Action() string

func (Outbox) Data

func (o Outbox) Data() []byte

func (Outbox) ID

func (o Outbox) ID() string

func (Outbox) ReadCount

func (o Outbox) ReadCount() int

func (Outbox) TableName

func (o Outbox) TableName() string

func (Outbox) Version

func (o Outbox) Version() int

type PQDB

type PQDB struct {
	// contains filtered or unexported fields
}

PQDB represents the PostgreSQL implementation of DB handle to retrieve data from an outbox table. Outbox pattern: https://microservices.io/patterns/data/transactional-outbox.html

func NewPQDB

func NewPQDB(conStr, dbTable string) (*PQDB, error)

NewPQDB creates a new PostgreSQL DB handle for CDC related operations.

func (*PQDB) CleanEvent

func (p *PQDB) CleanEvent(event Event) error

CleanEvent deletes the given event from outbox table.

func (*PQDB) CleanLog

func (p *PQDB) CleanLog(nEntries uint) error

CleanLog deletes the oldest nEntries from the outbox table.

func (*PQDB) FailedEvent

func (p *PQDB) FailedEvent(event Event) error

FailedEvent increments the given event retries in DB.

func (*PQDB) GetLog

func (p *PQDB) GetLog() ([]Event, error)

GetLog retrieves the log entries from the outbox table ordered by creation time.

func (*PQDB) ReleaseLock

func (p *PQDB) ReleaseLock(l *Lock) error

ReleaseLock releases the input lock.

func (*PQDB) TryGetLock

func (p *PQDB) TryGetLock(id uint32) (*Lock, error)

TryGetLock tries to acquire the CDC advisory lock from DB. If no error is returned, lock should be released by calling ReleaseLock method, even if it was not acquired.

type Parser

type Parser interface {
	// Parse should parse the log events secuentially from the beginning
	// of the slice and return the number of events that have been processed
	// correctly. So if one event processing is errored, parser should stop
	// processing and return current parsed events count.
	Parse(log []Event) (nParsed uint)
}

Parser defines a CDC log parser.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL