store

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 1, 2024 License: AGPL-3.0 Imports: 17 Imported by: 2

Documentation

Index

Constants

View Source
const (
	// ErrFieldRequired is an error message if a value is missing
	ErrFieldRequired = "this field is required"
)

Variables

View Source
var (
	// ErrNotInitialized will be returned if the
	// database pool is accessed before initializing using
	// Connect.
	ErrNotInitialized = errors.New("store not initialized, pool not ready")

	// ErrMaxConnsUnconfigured will be returned, if the
	// the maximum connections are zero.
	ErrMaxConnsUnconfigured = errors.New("MaxConns not configured")
)
View Source
var (
	ErrNoBackend        = errors.New("no backend associated with meeting")
	ErrDeadlineRequired = errors.New("the operation requires a dealine")
)

MeetingStateErrors

View Source
var (
	ErrFrontendRequired = errors.New("meeting requires a frontend state")
)

Errors

View Source
var (
	// ErrNoConnectionInConfig will occure when the connection
	// is not available in the context.
	ErrNoConnectionInConfig = errors.New("connection missing in context")
)

Errors

View Source
var (
	// ErrRecordingsStorageUnconfigured will be returned, when
	// the environment variables for the published and unpublished
	// recording path are missing.
	ErrRecordingsStorageUnconfigured = errors.New(
		"environment for " + config.EnvRecordingsPublishedPath + " or " +
			config.EnvRecordingsUnpublishedPath + " is not set")
)
View Source
var (
	// ReMatchParamSQLUnsafe will match anything
	// NOT a-Z, 0-9, '_' and '-'.
	ReMatchParamSQLUnsafe = regexp.MustCompile(`[^a-zA-Z0-9_-]`)
)

Functions

func Acquire

func Acquire(ctx context.Context) (*pgxpool.Conn, error)

Acquire tries to get a database connection from the pool

func AssertDatabaseVersion

func AssertDatabaseVersion(pool *pgxpool.Pool, version int) error

AssertDatabaseVersion tests if the current version of the database is equal to a required version

func Connect

func Connect(opts *ConnectOpts) error

Connect initializes the connection pool and checks the schema version of the database.

func ConnectTest

func ConnectTest(ctx context.Context) error

ConnectTest to pgx db pool. Use b3scale defaults if environment variable is not set.

func ConnectionFromContext

func ConnectionFromContext(ctx context.Context) *pgxpool.Conn

ConnectionFromContext will retrieve the connection.

func ContextWithConnection

func ContextWithConnection(
	ctx context.Context,
	conn *pgxpool.Conn,
) context.Context

ContextWithConnection will return a child context with a value for connection.

func CountCommandsError

func CountCommandsError(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsError returns the number of successfully processed commands in the queue.

func CountCommandsRequested

func CountCommandsRequested(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsRequested returns the number of unprocessed commands in the queue.

func CountCommandsSuccess

func CountCommandsSuccess(ctx context.Context, tx pgx.Tx) (int, error)

CountCommandsSuccess returns the number of successfully processed commands in the queue.

func CountCommandsWithState

func CountCommandsWithState(
	ctx context.Context,
	tx pgx.Tx,
	state string,
) (int, error)

CountCommandsWithState retrievs the number of commands in the queue with a given state. e.g. requested, error, etc.

func DeleteMeetingStateByID

func DeleteMeetingStateByID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

DeleteMeetingStateByID will remove a meeting state. It will succeed, even if no such meeting was present. TODO: merge with DeleteMeetingStateByInternalID

func DeleteMeetingStateByInternalID

func DeleteMeetingStateByInternalID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

DeleteMeetingStateByInternalID will remove a meeting state. It will succeed, even if no such meeting was present.

func DeleteOrphanMeetings

func DeleteOrphanMeetings(
	ctx context.Context,
	tx pgx.Tx,
	backendID string,
	backendMeetings []string,
) (int64, error)

DeleteOrphanMeetings will remove all meetings not in a list of (internal) meeting IDs, but associated with a backend

func DeleteRecordingByID

func DeleteRecordingByID(ctx context.Context, tx pgx.Tx, recordID string) error

DeleteRecordingByID will delete a recording identified by its id.

func GetRecordingTextTracks

func GetRecordingTextTracks(
	ctx context.Context,
	tx pgx.Tx,
	recordID string,
) ([]*bbb.TextTrack, error)

GetRecordingTextTracks retrieves the text tracks from a recording.

func LookupFrontendIDByMeetingID

func LookupFrontendIDByMeetingID(
	ctx context.Context,
	tx pgx.Tx,
	meetingID string,
) (string, bool, error)

LookupFrontendIDByMeetingID queries the frontend_meetings mapping and returns the frontendID for a given meetingID. The function name might be a hint.

func NewDelete

func NewDelete() sq.DeleteBuilder

NewDelete creates a new deletion query

func NewQuery

func NewQuery() sq.SelectBuilder

NewQuery creates a new query

func NextDeadline

func NextDeadline(dt time.Duration) time.Time

NextDeadline calculates the deadline for a newly requested command

func Q

func Q() sq.SelectBuilder

Q is an alias for NewQuery

func QueryRecordingsByFrontendKey

func QueryRecordingsByFrontendKey(frontendKey string) sq.SelectBuilder

QueryRecordingsByFrontendKey creates a query selecting recordings by frontend key. The query can be extended e.g. for filtering by recordingID.

func QueueCommand

func QueueCommand(ctx context.Context, tx pgx.Tx, cmd *Command) error

QueueCommand adds a new command to the queue

func RemoveStaleFrontendMeetings

func RemoveStaleFrontendMeetings(
	ctx context.Context,
	tx pgx.Tx,
	t time.Time,
) error

RemoveStaleFrontendMeetings removes all frontend meetings older than a threshold.

func SQLSafeParam

func SQLSafeParam(p string) string

SQLSafeParam will remove any unsafe chars from a param potentially used without parameter binding.

func SetRecordingTextTracks

func SetRecordingTextTracks(
	ctx context.Context,
	tx pgx.Tx,
	recordID string,
	tracks []*bbb.TextTrack,
) error

SetRecordingTextTracks updates the text tracks attribute of a recording.

Types

type AgentHeartbeat

type AgentHeartbeat struct {
	BackendID string    `json:"backend_id"`
	Heartbeat time.Time `json:"heartbeat"`
}

AgentHeartbeat is a short api response

type AttendeesLimitSettings added in v1.2.0

type AttendeesLimitSettings struct {
	Limit int `json:"limit" doc:"Limit of overall attendees for a frontend."`
}

AttendeesLimitSettings configure a overall limit of attendees per frontend

type BackendSettings

type BackendSettings struct {
	Tags Tags `` /* 140-byte string literal not displayed */
}

BackendSettings hold per backend runtime configuration.

type BackendState

type BackendState struct {
	ID string `json:"id"`

	NodeState  string `json:"node_state" doc:"The current state of the node." example:"ready" enum:"init,ready,error,stopped,decommissioned"`
	AdminState string `` /* 155-byte string literal not displayed */

	AgentHeartbeat time.Time `json:"agent_heartbeat" doc:"The last time we heared from the node agent."`
	AgentRef       *string   `` /* 131-byte string literal not displayed */

	LastError *string `json:"last_error" doc:"The last error that happend. For example destination host not reachable."`

	Latency        time.Duration `json:"latency" doc:"The amount of milliseconds when polling the current node state."`
	MeetingsCount  uint          `json:"meetings_count" doc:"Number of meetings on the backend."`
	AttendeesCount uint          `json:"attendees_count" doc:"Number of participants in meetings on the backend."`

	LoadFactor float64 `` /* 247-byte string literal not displayed */

	Backend *bbb.Backend `json:"bbb" api:"BackendConfig"`

	Settings BackendSettings `json:"settings"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
	SyncedAt  time.Time `json:"synced_at"`
}

The BackendState is shared across b3scale instances and encapsulates the list of meetings and recordings. The backend.ID should be used as identifier.

func GetBackendState

func GetBackendState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*BackendState, error)

GetBackendState tries to retriev a single backend state

func GetBackendStates

func GetBackendStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*BackendState, error)

GetBackendStates retrievs all backends

func InitBackendState

func InitBackendState(init *BackendState) *BackendState

InitBackendState initializes a new backend state with an initial state.

func (*BackendState) ClearMeetings

func (s *BackendState) ClearMeetings(
	ctx context.Context,
	tx pgx.Tx,
) error

ClearMeetings will remove all meetings in the current state

func (*BackendState) CreateMeetingState

func (s *BackendState) CreateMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	frontend *bbb.Frontend,
	meeting *bbb.Meeting,
) (*MeetingState, error)

CreateMeetingState will create a new state for the current backend state. A frontend is attached if present.

func (*BackendState) CreateOrUpdateMeetingState

func (s *BackendState) CreateOrUpdateMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	meeting *bbb.Meeting,
) error

CreateOrUpdateMeetingState will try to update the meeting or will create a new meeting state if the meeting does not exists. The new meeting will not be associated with a frontend state - however the meeting can later be claimed by a frontend.

func (*BackendState) Delete

func (s *BackendState) Delete(
	ctx context.Context,
	tx pgx.Tx,
) error

Delete will remove the backend from the store

func (*BackendState) IsAgentAlive

func (s *BackendState) IsAgentAlive() bool

IsAgentAlive checks if the heartbeat is older than the threshold

func (*BackendState) IsNodeReady

func (s *BackendState) IsNodeReady() bool

IsNodeReady checks if the agent is alive and the node state is ready

func (*BackendState) Refresh

func (s *BackendState) Refresh(
	ctx context.Context,
	tx pgx.Tx,
) error

Refresh the backend state from the database

func (*BackendState) Save

func (s *BackendState) Save(
	ctx context.Context,
	tx pgx.Tx,
) error

Save persists the backend state in the database store

func (*BackendState) UpdateAgentHeartbeat

func (s *BackendState) UpdateAgentHeartbeat(
	ctx context.Context,
	tx pgx.Tx,
) (*AgentHeartbeat, error)

UpdateAgentHeartbeat will set the attribute to the current timestamp

func (*BackendState) UpdateStatCounters

func (s *BackendState) UpdateStatCounters(
	ctx context.Context,
	tx pgx.Tx,
) error

UpdateStatCounters counts meetings and attendees and updates the properties

func (*BackendState) Validate

func (s *BackendState) Validate() ValidationError

Validate the backend state

type Command

type Command struct {
	ID  string `json:"id"`
	Seq int    `json:"seq"`

	State string `json:"state" doc:"The current state of the command." enum:"requested,success,error"`

	Action string      `json:"action" doc:"The operation to perform." enum:"end_all_meetings"`
	Params interface{} `json:"params" doc:"Key value options for the command. See example above."`
	Result interface{} `json:"result" doc:"The result of the command. as key value object."`

	Deadline  time.Time  `json:"deadline" doc:"The commands need to be processed before the deadline is reached. The deadline is optional."`
	StartedAt *time.Time `json:"started_at"`
	StoppedAt *time.Time `json:"stopped_at"`
	CreatedAt time.Time  `json:"created_at"`
	// contains filtered or unexported fields
}

A Command is a representation of an operation

func GetCommand

func GetCommand(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*Command, error)

GetCommand retrievs a command query

func GetCommands

func GetCommands(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*Command, error)

GetCommands retrieves the current command queue. This includes locked.

func (*Command) FetchParams

func (cmd *Command) FetchParams(
	ctx context.Context,
	req interface{},
) error

FetchParams loads the parameters and decodes them

type CommandHandler

type CommandHandler func(context.Context, *Command) (interface{}, error)

CommandHandler is a callback function for handling commands. The command was successful if no error was returned.

type CommandQueue

type CommandQueue struct {
}

The CommandQueue is connected to the database and provides methods for queuing and dequeuing commands.

func NewCommandQueue

func NewCommandQueue() *CommandQueue

NewCommandQueue initializes a new command queue

func (*CommandQueue) Receive

func (q *CommandQueue) Receive(handler CommandHandler) error

Receive will await a command and will block until a command can be processed. If the handler responds with an error, the error will be returned.

type ConnectOpts

type ConnectOpts struct {
	URL      string
	MaxConns int32
	MinConns int32
}

ConnectOpts database connection options

type DefaultPresentationSettings

type DefaultPresentationSettings struct {
	URL   string `` /* 134-byte string literal not displayed */
	Force bool   `json:"force" doc:"Override any default presentation provided by the frontend."`
}

DefaultPresentationSettings configure a per frontend default presentation.

type FrontendSettings

type FrontendSettings struct {
	RequiredTags        Tags                         `` /* 131-byte string literal not displayed */
	DefaultPresentation *DefaultPresentationSettings `json:"default_presentation"`
	AttendeesLimit      *AttendeesLimitSettings      `json:"attendees_limit"`

	CreateDefaultParams  bbb.Params `` /* 220-byte string literal not displayed */
	CreateOverrideParams bbb.Params `` /* 137-byte string literal not displayed */
}

FrontendSettings hold all well known settings for a frontend.

type FrontendState

type FrontendState struct {
	ID string `json:"id"`

	Active   bool          `json:"active" doc:"When false, the frontend can not longer use the API."`
	Frontend *bbb.Frontend `json:"bbb" api:"FrontendConfig"`

	Settings FrontendSettings `json:"settings"`

	AccountRef *string `` /* 182-byte string literal not displayed */

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
}

The FrontendState holds shared information about a frontend.

func GetFrontendState

func GetFrontendState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*FrontendState, error)

GetFrontendState gets a single row from the store. This may return nil without an error.

func GetFrontendStateByID added in v1.2.0

func GetFrontendStateByID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) (*FrontendState, error)

GetFrontendStateByID retrieves a single frontend state identified by the frontend ID.

func GetFrontendStateByKey added in v1.2.0

func GetFrontendStateByKey(
	ctx context.Context,
	tx pgx.Tx,
	key string,
) (*FrontendState, error)

GetFrontendStateByKey retrieves a single frontend state identified by the frontend key.

func GetFrontendStates

func GetFrontendStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*FrontendState, error)

GetFrontendStates retrieves all frontend states from the database.

func InitFrontendState

func InitFrontendState(init *FrontendState) *FrontendState

InitFrontendState initializes the state with a database pool and default values where required.

func (*FrontendState) Delete

func (s *FrontendState) Delete(ctx context.Context, tx pgx.Tx) error

Delete will remove a frontend state from the store

func (*FrontendState) Save

func (s *FrontendState) Save(
	ctx context.Context,
	tx pgx.Tx,
) error

Save will create or update a frontend state

func (*FrontendState) Validate

func (s *FrontendState) Validate() ValidationError

Validate checks for presence of required fields.

type MeetingState

type MeetingState struct {
	ID         string `json:"id"`
	InternalID string `json:"internal_id"`

	Meeting *bbb.Meeting `json:"meeting" api:"MeetingInfo"`

	FrontendID *string `json:"frontend_id"`

	BackendID *string `json:"backend_id"`

	CreatedAt time.Time `json:"created_at"`
	UpdatedAt time.Time `json:"updated_at"`
	SyncedAt  time.Time `json:"synced_at"`
	// contains filtered or unexported fields
}

The MeetingState holds a meeting and its relations to a backend and frontend.

func AwaitMeetingState

func AwaitMeetingState(
	ctx context.Context,
	conn *pgxpool.Conn,
	q sq.SelectBuilder,
) (*MeetingState, pgx.Tx, error)

AwaitMeetingState polls the database for a meeting state until the context expires.

func GetMeetingState

func GetMeetingState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*MeetingState, error)

GetMeetingState tries to retriev a single meeting state

func GetMeetingStateByID

func GetMeetingStateByID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) (*MeetingState, error)

GetMeetingStateByID is a convenience wrapper around GetMeetingState.

func GetMeetingStates

func GetMeetingStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*MeetingState, error)

GetMeetingStates retrieves all meeting states

func InitMeetingState

func InitMeetingState(
	init *MeetingState,
) *MeetingState

InitMeetingState initializes meeting state with defaults and a connection

func (*MeetingState) BindFrontendID

func (s *MeetingState) BindFrontendID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

BindFrontendID associates an unclaimed meeting with a frontend

func (*MeetingState) GetBackendState

func (s *MeetingState) GetBackendState(
	ctx context.Context,
	tx pgx.Tx,
) (*BackendState, error)

GetBackendState loads the backend state

func (*MeetingState) GetFrontendState

func (s *MeetingState) GetFrontendState(
	ctx context.Context,
	tx pgx.Tx,
) (*FrontendState, error)

GetFrontendState loads the frontend state for the meeting

func (*MeetingState) IsStale

func (s *MeetingState) IsStale(threshold time.Duration) bool

IsStale checks if the last sync is longer ago than a given threshold.

func (*MeetingState) MarkSynced

func (s *MeetingState) MarkSynced()

MarkSynced sets the synced at timestamp

func (*MeetingState) Refresh

func (s *MeetingState) Refresh(ctx context.Context, tx pgx.Tx) error

Refresh the backend state from the database

func (*MeetingState) Save

func (s *MeetingState) Save(ctx context.Context, tx pgx.Tx) error

Save updates or inserts a meeting state into our cluster state.

func (*MeetingState) SetBackendID

func (s *MeetingState) SetBackendID(
	ctx context.Context,
	tx pgx.Tx,
	id string,
) error

SetBackendID associates a meeting with a backend

func (*MeetingState) UpdateFrontendMeetingMapping added in v1.2.0

func (s *MeetingState) UpdateFrontendMeetingMapping(
	ctx context.Context,
	tx pgx.Tx,
) error

UpdateFrontendMeetingMapping updates the `frontend_meetings` mapping table. This table does not hold any state, but persists the association between frontend and meetingID for identifiying recordings.

func (*MeetingState) Upsert

func (s *MeetingState) Upsert(ctx context.Context, tx pgx.Tx) (string, error)

Upsert meeting state will create the meeting state or will fall back to a state update.

type RecordingState

type RecordingState struct {
	RecordID string

	Recording *bbb.Recording

	MeetingID         string
	InternalMeetingID string

	FrontendID string

	CreatedAt time.Time
	UpdatedAt time.Time
	SyncedAt  time.Time
}

RecordingState holds a recording and its relation to a meeting.

func GetRecordingState

func GetRecordingState(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) (*RecordingState, error)

GetRecordingState retrieves a single state from the database.

func GetRecordingStateByID

func GetRecordingStateByID(
	ctx context.Context,
	tx pgx.Tx,
	recordID string,
) (*RecordingState, error)

GetRecordingStateByID retrievs a single state identified by recordID.

func GetRecordingStates

func GetRecordingStates(
	ctx context.Context,
	tx pgx.Tx,
	q sq.SelectBuilder,
) ([]*RecordingState, error)

GetRecordingStates retrieves a list of recordings filtered by criteria in the select builder

func InitRecordingState

func InitRecordingState(init *RecordingState) *RecordingState

InitRecordingState initializes the state with default values where required

func NewStateFromRecording added in v1.1.0

func NewStateFromRecording(
	recording *bbb.Recording,
) *RecordingState

NewStateFromRecording will initialize a recording state with a recording.

func (*RecordingState) Delete

func (s *RecordingState) Delete(ctx context.Context, tx pgx.Tx) error

Delete will remove a recording from the database. This cascades to associated text tracks.

func (*RecordingState) DeleteFiles

func (s *RecordingState) DeleteFiles() error

DeleteFiles will remove the recording from the filesystem.

func (*RecordingState) Exists

func (s *RecordingState) Exists(ctx context.Context, tx pgx.Tx) (bool, error)

Exists checks if the recording is already present in in the store.

func (*RecordingState) Merge added in v1.1.0

func (s *RecordingState) Merge(other *RecordingState) error

Merge will combine the state and also fill in missing fields

func (*RecordingState) PublishFiles

func (s *RecordingState) PublishFiles() error

PublishFiles will move the recording from the unpublished directory to the published directory.

func (*RecordingState) Save

func (s *RecordingState) Save(
	ctx context.Context,
	tx pgx.Tx,
) error

Save the recording state

func (*RecordingState) SetFrontendID

func (s *RecordingState) SetFrontendID(
	ctx context.Context,
	tx pgx.Tx,
	frontendID string,
) error

SetFrontendID will set the frontend_id attribute of the recording state.

func (*RecordingState) SetTextTracks

func (s *RecordingState) SetTextTracks(
	ctx context.Context,
	tx pgx.Tx,
	tracks []*bbb.TextTrack,
) error

SetTextTracks will persist associated text tracks without touching the rest. The recording has to be present in the database.

func (*RecordingState) UnpublishFiles

func (s *RecordingState) UnpublishFiles() error

UnpublishFiles will move the recording from the published directory to the unpublished directory.

type RecordingsStorage

type RecordingsStorage struct {
	PublishedPath   string
	UnpublishedPath string
}

RecordingsStorage is handleing the filesystem access when dealing with recordings.

func NewRecordingsStorageFromEnv

func NewRecordingsStorageFromEnv() (*RecordingsStorage, error)

NewRecordingsStorageFromEnv creates a new recordings storage instance and configures it through well known environment variables.

func (*RecordingsStorage) Check

func (s *RecordingsStorage) Check() error

Check will test if we can access and manipulate the recordings storage.

func (*RecordingsStorage) ListThumbnailFiles

func (s *RecordingsStorage) ListThumbnailFiles(recordID string) []string

ListThumbnailFiles retrievs all thumbnail files from presentations relative to the published path.

func (*RecordingsStorage) MakeRecordingPreview

func (s *RecordingsStorage) MakeRecordingPreview(
	recordID string,
) *bbb.Preview

MakeRecordingPreview will use the thumbnails to create previews

func (*RecordingsStorage) PublishedRecordingPath

func (s *RecordingsStorage) PublishedRecordingPath(id string) string

PublishedRecordingPath returns the joined filepath for an "id" (this will be the internal meeting id).

func (*RecordingsStorage) UnpublishedRecordingPath

func (s *RecordingsStorage) UnpublishedRecordingPath(id string) string

UnpublishedRecordingPath returns the joined filepath for an "id" (this will be the internal meeting id).

type Tags

type Tags []string

Tags are a list of strings with labels to declare for example backend capabilities

type ValidationError

type ValidationError map[string][]string

A ValidationError is a mapping between the field name (same as json field name), and a list of error strings.

func (ValidationError) Add

func (e ValidationError) Add(field, err string)

Add a validation error to the collection

func (ValidationError) Error

func (e ValidationError) Error() string

Error implements the error interface

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL