persistence

package
v3.2.1+incompatible Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2020 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DatabaseProviders = DatabaseProviderRegistry{}

Functions

func Register

func Register(name string, db AgbotDatabase)

func ValidateStateTransition

func ValidateStateTransition(mod *Agreement, update *Agreement)

This code is running in a database transaction. Within the tx, the current record is read and then updated according to the updates within the input update record. It is critical to check for correct data transitions within the tx .

func ValidateWUStateTransition

func ValidateWUStateTransition(mod *WorkloadUsage, update *WorkloadUsage)

This code is running in a database transaction. Within the tx, the current record is read and then updated according to the updates within the input update record. It is critical to check for correct data transitions within the tx .

Types

type AFilter

type AFilter func(Agreement) bool

Filters used by the caller to control what comes back from the database.

func ArchivedAFilter

func ArchivedAFilter() AFilter

func DevPolAFilter

func DevPolAFilter(deviceId string, policyName string) AFilter

func IdAFilter

func IdAFilter(id string) AFilter

func UnarchivedAFilter

func UnarchivedAFilter() AFilter

type AgbotDatabase

type AgbotDatabase interface {

	// Database related functions
	Initialize(cfg *config.HorizonConfig) error
	Close()

	// Database partition related functions.
	FindPartitions() ([]string, error)
	ClaimPartition(timeout uint64) (string, error)
	HeartbeatPartition() error
	QuiescePartition() error
	GetPartitionOwner(id string) (string, error)
	MovePartition(timeout uint64) error

	// Persistent agreement related functions
	FindAgreements(filters []AFilter, protocol string) ([]Agreement, error)
	FindSingleAgreementByAgreementId(agreementid string, protocol string, filters []AFilter) (*Agreement, error)
	FindSingleAgreementByAgreementIdAllProtocols(agreementid string, protocols []string, filters []AFilter) (*Agreement, error)

	GetAgreementCount(partition string) (int64, int64, error)

	SingleAgreementUpdate(agreementid string, protocol string, fn func(Agreement) *Agreement) (*Agreement, error)

	AgreementAttempt(agreementid string, org string, deviceid string, policyName string, bcType string, bcName string, bcOrg string, agreementProto string, pattern string, serviceId []string, nhPolicy policy.NodeHealth) error
	AgreementFinalized(agreementid string, protocol string) (*Agreement, error)
	AgreementUpdate(agreementid string, proposal string, policy string, dvPolicy policy.DataVerification, defaultCheckRate uint64, hash string, sig string, protocol string, agreementProtoVersion int) (*Agreement, error)
	AgreementMade(agreementId string, counterParty string, signature string, protocol string, hapartners []string, bcType string, bcName string, bcOrg string) (*Agreement, error)
	AgreementBlockchainUpdate(agreementId string, consumerSig string, hash string, counterParty string, signature string, protocol string) (*Agreement, error)
	AgreementBlockchainUpdateAck(agreementId string, protocol string) (*Agreement, error)
	AgreementTimedout(agreementid string, protocol string) (*Agreement, error)

	DataNotification(agreementid string, protocol string) (*Agreement, error)
	DataVerified(agreementid string, protocol string) (*Agreement, error)
	DataNotVerified(agreementid string, protocol string) (*Agreement, error)
	MeteringNotification(agreementid string, protocol string, mn string) (*Agreement, error)

	DeleteAgreement(pk string, protocol string) error
	ArchiveAgreement(agreementid string, protocol string, reason uint, desc string) (*Agreement, error)

	// Workoad usage related functions
	NewWorkloadUsage(deviceId string, hapartners []string, policy string, policyName string, priority int, retryDurationS int, verifiedDurationS int, reqsNotMet bool, agid string) error
	FindSingleWorkloadUsageByDeviceAndPolicyName(deviceid string, policyName string) (*WorkloadUsage, error)
	FindWorkloadUsages(filters []WUFilter) ([]WorkloadUsage, error)

	GetWorkloadUsagesCount(partition string) (int64, error)

	SingleWorkloadUsageUpdate(deviceid string, policyName string, fn func(WorkloadUsage) *WorkloadUsage) (*WorkloadUsage, error)

	UpdatePendingUpgrade(deviceid string, policyName string) (*WorkloadUsage, error)
	UpdatePriority(deviceid string, policyName string, priority int, retryDurationS int, verifiedDurationS int, agid string) (*WorkloadUsage, error)
	UpdateRetryCount(deviceid string, policyName string, retryCount int, agid string) (*WorkloadUsage, error)
	UpdatePolicy(deviceid string, policyName string, pol string) (*WorkloadUsage, error)
	UpdateWUAgreementId(deviceid string, policyName string, agid string, protocol string) (*WorkloadUsage, error)
	DisableRollbackChecking(deviceid string, policyName string) (*WorkloadUsage, error)

	DeleteWorkloadUsage(deviceid string, policyName string) error
}

func InitDatabase

func InitDatabase(cfg *config.HorizonConfig) (AgbotDatabase, error)

Initialize the underlying Agbot database depending on what is configured. If the bolt DB is configured, it is used. Next, the postgresql config is checked and used if configured. If nothing is configured, an error is returned.

type Agreement

type Agreement struct {
	CurrentAgreementId             string   `json:"current_agreement_id"`              // unique
	Org                            string   `json:"org"`                               // the org in which the policy exists that was used to make this agreement
	DeviceId                       string   `json:"device_id"`                         // the device id we are working with, immutable after construction
	HAPartners                     []string `json:"ha_partners"`                       // list of HA partner device IDs
	AgreementProtocol              string   `json:"agreement_protocol"`                // immutable after construction - name of protocol in use
	AgreementProtocolVersion       int      `json:"agreement_protocol_version"`        // version of protocol in use - New in V2 protocol
	AgreementInceptionTime         uint64   `json:"agreement_inception_time"`          // immutable after construction
	AgreementCreationTime          uint64   `json:"agreement_creation_time"`           // device responds affirmatively to proposal
	AgreementFinalizedTime         uint64   `json:"agreement_finalized_time"`          // agreement is seen in the blockchain
	AgreementTimedout              uint64   `json:"agreement_timeout"`                 // agreement was not finalized before it timed out
	ProposalSig                    string   `json:"proposal_signature"`                // The signature used to create the agreement - from the producer
	Proposal                       string   `json:"proposal"`                          // JSON serialization of the proposal
	ProposalHash                   string   `json:"proposal_hash"`                     // Hash of the proposal
	ConsumerProposalSig            string   `json:"consumer_proposal_sig"`             // Consumer's signature of the proposal
	Policy                         string   `json:"policy"`                            // JSON serialization of the policy used to make the proposal
	PolicyName                     string   `json:"policy_name"`                       // The name of the policy for this agreement, policy names are unique
	CounterPartyAddress            string   `json:"counter_party_address"`             // The blockchain address of the counterparty in the agreement
	DataVerificationURL            string   `json:"data_verification_URL"`             // The URL to use to ensure that this agreement is sending data.
	DataVerificationUser           string   `json:"data_verification_user"`            // The user to use with the DataVerificationURL
	DataVerificationPW             string   `json:"data_verification_pw"`              // The pw of the data verification user
	DataVerificationCheckRate      int      `json:"data_verification_check_rate"`      // How often to check for data
	DataVerificationMissedCount    uint64   `json:"data_verification_missed_count"`    // Number of data verification misses
	DataVerificationNoDataInterval int      `json:"data_verification_nodata_interval"` // How long to wait before deciding there is no data
	DisableDataVerificationChecks  bool     `json:"disable_data_verification_checks"`  // disable data verification checks, assume data is being sent.
	DataVerifiedTime               uint64   `json:"data_verification_time"`            // The last time that data verification was successful
	DataNotificationSent           uint64   `json:"data_notification_sent"`            // The timestamp for when data notification was sent to the device
	MeteringTokens                 uint64   `json:"metering_tokens"`                   // Number of metering tokens from proposal
	MeteringPerTimeUnit            string   `json:"metering_per_time_unit"`            // The time units of tokens per, from the proposal
	MeteringNotificationInterval   int      `json:"metering_notify_interval"`          // The interval of time between metering notifications (seconds)
	MeteringNotificationSent       uint64   `json:"metering_notification_sent"`        // The last time a metering notification was sent
	MeteringNotificationMsgs       []string `json:"metering_notification_msgs"`        // The last metering messages that were sent, oldest at the end
	Archived                       bool     `json:"archived"`                          // The record is archived
	TerminatedReason               uint     `json:"terminated_reason"`                 // The reason the agreement was terminated
	TerminatedDescription          string   `json:"terminated_description"`            // The description of why the agreement was terminated
	BlockchainType                 string   `json:"blockchain_type"`                   // The name of the blockchain type that is being used (new V2 protocol)
	BlockchainName                 string   `json:"blockchain_name"`                   // The name of the blockchain being used (new V2 protocol)
	BlockchainOrg                  string   `json:"blockchain_org"`                    // The name of the blockchain org being used (new V2 protocol)
	BCUpdateAckTime                uint64   `json:"blockchain_update_ack_time"`        // The time when the producer ACked our update ot him (new V2 protocol)
	NHMissingHBInterval            int      `json:"missing_heartbeat_interval"`        // How long a heartbeat can be missing until it is considered missing (in seconds)
	NHCheckAgreementStatus         int      `json:"check_agreement_status"`            // How often to check that the node agreement entry still exists in the exchange (in seconds)
	Pattern                        string   `json:"pattern"`                           // The pattern used to make the agreement, used for pattern case only
	ServiceId                      []string `json:"service_id"`                        // All the service ids whose policy is used to make the agreement, used for policy case only
}

func AgreementBlockchainUpdate

func AgreementBlockchainUpdate(db AgbotDatabase, agreementId string, consumerSig string, hash string, counterParty string, signature string, protocol string) (*Agreement, error)

func AgreementBlockchainUpdateAck

func AgreementBlockchainUpdateAck(db AgbotDatabase, agreementId string, protocol string) (*Agreement, error)

func AgreementFinalized

func AgreementFinalized(db AgbotDatabase, agreementid string, protocol string) (*Agreement, error)

func AgreementMade

func AgreementMade(db AgbotDatabase, agreementId string, counterParty string, signature string, protocol string, hapartners []string, bcType string, bcName string, bcOrg string) (*Agreement, error)

func AgreementTimedout

func AgreementTimedout(db AgbotDatabase, agreementid string, protocol string) (*Agreement, error)

func AgreementUpdate

func AgreementUpdate(db AgbotDatabase, agreementid string, proposal string, policy string, dvPolicy policy.DataVerification, defaultCheckRate uint64, hash string, sig string, protocol string, agreementProtoVersion int) (*Agreement, error)

func ArchiveAgreement

func ArchiveAgreement(db AgbotDatabase, agreementid string, protocol string, reason uint, desc string) (*Agreement, error)

func DataNotVerified

func DataNotVerified(db AgbotDatabase, agreementid string, protocol string) (*Agreement, error)

func DataNotification

func DataNotification(db AgbotDatabase, agreementid string, protocol string) (*Agreement, error)

func DataVerified

func DataVerified(db AgbotDatabase, agreementid string, protocol string) (*Agreement, error)

func MeteringNotification

func MeteringNotification(db AgbotDatabase, agreementid string, protocol string, mn string) (*Agreement, error)

func NewAgreement

func NewAgreement(agreementid string, org string, deviceid string, policyName string, bcType string, bcName string, bcOrg string, agreementProto string, pattern string, serviceId []string, nhPolicy policy.NodeHealth) (*Agreement, error)

Factory method for agreement w/out persistence safety.

func RunFilters

func RunFilters(ag *Agreement, filters []AFilter) *Agreement

func (*Agreement) NodeHealthInUse

func (a *Agreement) NodeHealthInUse() bool

func (Agreement) String

func (a Agreement) String() string

type DatabaseProviderRegistry

type DatabaseProviderRegistry map[string]AgbotDatabase

The registry is a mechanism that enables optional persistence implementations to be plugged into the runtime. The implementation registers itself with this registry when the implementation's package init() method is driven. This mechanism prevents the need for the persistence package to import each of the optional DB specific packages. The only tricky part of this is that name of each DB implementation is hard coded here and in the implementation's call to the Register() method. Sharing constants would re-introduce the package dependency that we want to avoid.

type WUFilter

type WUFilter func(WorkloadUsage) bool

func DWUFilter

func DWUFilter(deviceid string) WUFilter

func DaPWUFilter

func DaPWUFilter(deviceid string, policyName string) WUFilter

Filters

func PWUFilter

func PWUFilter(policyName string) WUFilter

type WorkloadUsage

type WorkloadUsage struct {
	Id                 uint64   `json:"record_id"`            // unique primary key for records
	DeviceId           string   `json:"device_id"`            // the device id we are working with, immutable after construction
	HAPartners         []string `json:"ha_partners"`          // list of device id(s) which are partners to this device
	PendingUpgradeTime uint64   `json:"pending_upgrade_time"` // time when this usage was marked for pending upgrade
	Policy             string   `json:"policy"`               // the policy containing the workloads we're managing
	PolicyName         string   `json:"policy_name"`          // the name of the policy containing the workloads we're managing
	Priority           int      `json:"priority"`             // the workload priority that we're working with
	RetryCount         int      `json:"retry_count"`          // The number of retries attempted so far
	RetryDurationS     int      `json:"retry_durations"`      // The number of seconds in which the specified number of retries must occur in order for the next priority workload to be attempted.
	CurrentAgreementId string   `json:"current_agreement_id"` // the agreement id currently in use
	FirstTryTime       uint64   `json:"first_try_time"`       // time when first agrement attempt was made, used to count retries per time
	LatestRetryTime    uint64   `json:"latest_retry_time"`    // time when the newest retry has occurred
	DisableRetry       bool     `json:"disable_retry"`        // when true, retry and retry durations are disbled which effectively disables workload rollback
	VerifiedDurationS  int      `json:"verified_durations"`   // the number of seconds for successful data verification before disabling workload rollback retries
	ReqsNotMet         bool     `json:"requirements_not_met"` // this workload usage record is not at the highest priority because the device did not meet the API spec requirements at one of the higher priorities
}

func DisableRollbackChecking

func DisableRollbackChecking(db AgbotDatabase, deviceid string, policyName string) (*WorkloadUsage, error)

func NewWorkloadUsage

func NewWorkloadUsage(deviceId string, hapartners []string, policy string, policyName string, priority int, retryDurationS int, verifiedDurationS int, reqsNotMet bool, agid string) (*WorkloadUsage, error)

private factory method for workloadusage w/out persistence safety:

func UpdatePendingUpgrade

func UpdatePendingUpgrade(db AgbotDatabase, deviceid string, policyName string) (*WorkloadUsage, error)

func UpdatePolicy

func UpdatePolicy(db AgbotDatabase, deviceid string, policyName string, pol string) (*WorkloadUsage, error)

func UpdatePriority

func UpdatePriority(db AgbotDatabase, deviceid string, policyName string, priority int, retryDurationS int, verifiedDurationS int, agid string) (*WorkloadUsage, error)

func UpdateRetryCount

func UpdateRetryCount(db AgbotDatabase, deviceid string, policyName string, retryCount int, agid string) (*WorkloadUsage, error)

func UpdateWUAgreementId

func UpdateWUAgreementId(db AgbotDatabase, deviceid string, policyName string, agid string) (*WorkloadUsage, error)

func (WorkloadUsage) ShortString

func (w WorkloadUsage) ShortString() string

func (WorkloadUsage) String

func (w WorkloadUsage) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL