db_common

package
v0.22.0-dev.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2024 License: AGPL-3.0 Imports: 27 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrServiceInRecoveryMode = errors.New("service is in recovery mode")
View Source
var Functions = []SQLFunction{
	{
		Name:     "glob",
		Params:   map[string]string{"input_glob": "text"},
		Returns:  "text",
		Language: "plpgsql",
		Body: `
declare
	output_pattern text;
begin
	output_pattern = replace(input_glob, '*', '%');
	output_pattern = replace(output_pattern, '?', '_');
	return output_pattern;
end;
`,
	},
	{
		Name:     constants.FunctionCacheSet,
		Params:   map[string]string{"command": "text"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
	IF command = 'on' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache','true');
	ELSIF command = 'off' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache','false');
	ELSIF command = 'clear' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache_clear_time','');
	ELSE
		RAISE EXCEPTION 'Unknown value % for set_cache - valid values are on, off and clear.', $1;
	END IF;
end;
`,
	},
	{
		Name:     constants.FunctionConnectionCacheClear,
		Params:   map[string]string{"connection": "text"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('connection_cache_clear',connection);
end;
`,
	},
	{
		Name:     constants.FunctionCacheSetTtl,
		Params:   map[string]string{"duration": "int"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
	INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache_ttl',duration);
end;
`,
	},
}

Functions is a list of SQLFunction objects that are installed in the db 'steampipe_internal' schema startup

Functions

func AddRootCertToConfig added in v0.17.0

func AddRootCertToConfig(config *pgconn.Config, certLocation string) error

func AddSearchPathPrefix added in v0.20.0

func AddSearchPathPrefix(searchPathPrefix []string, searchPath []string) []string

func BuildSearchPathResult added in v0.20.0

func BuildSearchPathResult(searchPathString string) ([]string, error)

func CacheClear added in v0.20.0

func CacheClear(ctx context.Context, connection *pgx.Conn) error

CacheClear resets the max time on the cache anything below this is not accepted

func CanSetCacheTtl added in v0.21.0

func CanSetCacheTtl(ss *ServerSettings, newTtl int) (bool, string)

func EnsureInternalSchemaSuffix added in v0.20.0

func EnsureInternalSchemaSuffix(searchPath []string) []string

func ExecuteQuery

func ExecuteQuery(ctx context.Context, client Client, queryString string, args ...any) (*queryresult.ResultStreamer, error)

ExecuteQuery executes a single query. If shutdownAfterCompletion is true, shutdown the client after completion

func ExecuteSystemClientCall added in v0.21.0

func ExecuteSystemClientCall(ctx context.Context, conn *pgx.Conn, executor SystemClientExecutor) error

ExecuteSystemClientCall creates a transaction and sets the application_name to the one used by the system client, executes the callback and sets the application name back to the client app name

func GetCommentsQueryForPlugin added in v0.20.0

func GetCommentsQueryForPlugin(connectionName string, p map[string]*proto.TableSchema) string

func GetDeleteConnectionQuery added in v0.20.0

func GetDeleteConnectionQuery(name string) string

func GetMissingSchemaFromIsRelationNotFoundError added in v0.21.0

func GetMissingSchemaFromIsRelationNotFoundError(err error) (string, string, bool)

func GetUpdateConnectionQuery added in v0.20.0

func GetUpdateConnectionQuery(connectionName, pluginSchemaName string) string

func GetUserSearchPath added in v0.20.0

func GetUserSearchPath(ctx context.Context, conn *pgx.Conn) ([]string, error)

func IsClientAppName added in v0.21.0

func IsClientAppName(appName string) bool

func IsClientSystemAppName added in v0.21.0

func IsClientSystemAppName(appName string) bool

func IsRelationNotFoundError added in v0.21.0

func IsRelationNotFoundError(err error) bool

func IsSchemaNameValid added in v0.20.0

func IsSchemaNameValid(name string) (bool, string)

IsSchemaNameValid verifies that the given string is a valid pgsql schema name

func IsServiceAppName added in v0.21.0

func IsServiceAppName(appName string) bool

func LoadForeignSchemaNames added in v0.20.0

func LoadForeignSchemaNames(ctx context.Context, conn *pgx.Conn) ([]string, error)

func MaxDbConnections added in v0.20.0

func MaxDbConnections() int

func PgEscapeName

func PgEscapeName(name string) string

PgEscapeName escapes strings which will be usaed for Podsdtgres object identifiers (table names, column names, schema names)

func PgEscapeSearchPath

func PgEscapeSearchPath(searchPath []string) []string

PgEscapeSearchPath applies postgres escaping to search path and remove whitespace

func PgEscapeString

func PgEscapeString(str string) string

PgEscapeString escapes strings which are to be inserted use a custom escape tag to avoid chance of clash with the escaped text https://medium.com/@lnishada/postgres-dollar-quoting-6d23e4f186ec

func SetCacheEnabled added in v0.20.0

func SetCacheEnabled(ctx context.Context, enabled bool, connection *pgx.Conn) error

SetCacheEnabled enables/disables the cache

func SetCacheTtl added in v0.20.0

func SetCacheTtl(ctx context.Context, duration time.Duration, connection *pgx.Conn) error

SetCacheTtl set the cache ttl on the client

func ValidateClientCacheEnabled added in v0.21.0

func ValidateClientCacheEnabled(c Client) *error_helpers.ErrorAndWarnings

func ValidateClientCacheSettings added in v0.21.0

func ValidateClientCacheSettings(c Client) *error_helpers.ErrorAndWarnings

func ValidateClientCacheTtl added in v0.21.0

func ValidateClientCacheTtl(c Client) *error_helpers.ErrorAndWarnings

func WaitForConnection

func WaitForConnection(ctx context.Context, connStr string, options ...WaitOption) (conn *pgx.Conn, err error)

func WaitForConnectionPing added in v0.19.0

func WaitForConnectionPing(ctx context.Context, connection *pgx.Conn, waitOptions ...WaitOption) (err error)

WaitForConnectionPing PINGs the DB - retrying after a backoff of constants.ServicePingInterval - but only for constants.DBConnectionTimeout returns the error from the database if the dbClient does not respond successfully after a timeout

func WaitForPool added in v0.17.0

func WaitForPool(ctx context.Context, db *pgxpool.Pool, waitOptions ...WaitOption) (err error)

WaitForPool waits for the db to start accepting connections and returns true returns false if the dbClient does not start within a stipulated time,

func WaitForRecovery added in v0.19.0

func WaitForRecovery(ctx context.Context, connection *pgx.Conn, waitOptions ...WaitOption) (err error)

WaitForRecovery returns an error (ErrRecoveryMode) if the service stays in recovery mode for more than constants.DBRecoveryWaitTimeout

Types

type AcquireSessionResult

type AcquireSessionResult struct {
	Session *DatabaseSession
	error_helpers.ErrorAndWarnings
}

type Client

type Client interface {
	Close(context.Context) error
	LoadUserSearchPath(context.Context) error

	SetRequiredSessionSearchPath(context.Context) error
	GetRequiredSessionSearchPath() []string
	GetCustomSearchPath() []string

	// acquire a management database connection - must be closed
	AcquireManagementConnection(context.Context) (*pgxpool.Conn, error)
	// acquire a query execution session (which search pathand cache options  set) - must be closed
	AcquireSession(context.Context) *AcquireSessionResult

	ExecuteSync(context.Context, string, ...any) (*queryresult.SyncQueryResult, error)
	Execute(context.Context, string, ...any) (*queryresult.Result, error)

	ExecuteSyncInSession(context.Context, *DatabaseSession, string, ...any) (*queryresult.SyncQueryResult, error)
	ExecuteInSession(context.Context, *DatabaseSession, func(), string, ...any) (*queryresult.Result, error)

	ResetPools(context.Context)
	GetSchemaFromDB(context.Context) (*SchemaMetadata, error)

	ServerSettings() *ServerSettings
	RegisterNotificationListener(f func(notification *pgconn.Notification))
}

type ColumnSchema added in v0.20.0

type ColumnSchema struct {
	ID          string
	Name        string
	NotNull     bool
	Type        string
	Default     string
	Description string
}

ColumnSchema contains the details of a single column in a table

type DatabaseSession

type DatabaseSession struct {
	BackendPid uint32   `json:"backend_pid"`
	SearchPath []string `json:"-"`

	// this gets rewritten, since the database/sql gives back a new instance everytime
	Connection *pgxpool.Conn `json:"-"`

	// the id of the last scan metadata retrieved
	ScanMetadataMaxId int64 `json:"-"`
}

DatabaseSession wraps over the raw database connection the purpose is to be able

  • to store the current search path of the connection without having to make a database round-trip
  • To store the last scan_metadata id used on this connection

func NewDBSession

func NewDBSession(backendPid uint32) *DatabaseSession

func (*DatabaseSession) Close

func (s *DatabaseSession) Close(waitForCleanup bool)

type InitResult

type InitResult struct {
	Error    error
	Warnings []string
	Messages []string

	// allow overriding of the display functions
	DisplayMessage func(ctx context.Context, m string)
	DisplayWarning func(ctx context.Context, w string)
}

func (*InitResult) AddMessage

func (r *InitResult) AddMessage(message string)

func (*InitResult) AddWarnings

func (r *InitResult) AddWarnings(warnings ...string)

func (*InitResult) DisplayMessages

func (r *InitResult) DisplayMessages()

func (*InitResult) HasMessages

func (r *InitResult) HasMessages() bool

type NotificationListener added in v0.21.0

type NotificationListener struct {
	// contains filtered or unexported fields
}

func NewNotificationListener added in v0.21.0

func NewNotificationListener(ctx context.Context, conn *pgx.Conn) (*NotificationListener, error)

func (*NotificationListener) RegisterListener added in v0.21.0

func (c *NotificationListener) RegisterListener(onNotification func(*pgconn.Notification))

func (*NotificationListener) Stop added in v0.21.0

func (c *NotificationListener) Stop(ctx context.Context)

type QueryWithArgs added in v0.20.0

type QueryWithArgs struct {
	Query string
	Args  []any
}

type SQLFunction added in v0.20.0

type SQLFunction struct {
	Name     string
	Params   map[string]string
	Returns  string
	Body     string
	Language string
}

SQLFunction is a struct for an sqlFunc

type SchemaMetadata added in v0.20.0

type SchemaMetadata struct {
	// map {schemaname, {map {tablename -> tableschema}}
	Schemas map[string]map[string]TableSchema
	// the name of the temporary schema
	TemporarySchemaName string
}

SchemaMetadata is a struct to represent the schema of the database

func LoadSchemaMetadata added in v0.21.0

func LoadSchemaMetadata(ctx context.Context, conn *pgx.Conn, query string) (*SchemaMetadata, error)

func NewSchemaMetadata added in v0.20.0

func NewSchemaMetadata() *SchemaMetadata

func (*SchemaMetadata) GetSchemas added in v0.20.0

func (m *SchemaMetadata) GetSchemas() []string

GetSchemas returns all foreign schema names

func (*SchemaMetadata) GetTablesInSchema added in v0.20.0

func (m *SchemaMetadata) GetTablesInSchema(schemaName string) map[string]struct{}

GetTablesInSchema returns a lookup of all foreign tables in a given foreign schema

type ServerSettings added in v0.21.0

type ServerSettings struct {
	StartTime        time.Time `db:"start_time"`
	SteampipeVersion string    `db:"steampipe_version"`
	FdwVersion       string    `db:"fdw_version"`
	CacheMaxTtl      int       `db:"cache_max_ttl"`
	CacheMaxSizeMb   int       `db:"cache_max_size_mb"`
	CacheEnabled     bool      `db:"cache_enabled"`
}

type SystemClientExecutor added in v0.21.0

type SystemClientExecutor func(context.Context, pgx.Tx) error

SystemClientExecutor is the executor function that is called within a transaction make sure that by the time the executor finishes execution, the connection is freed otherwise we will get a `conn is busy` error

type TableSchema added in v0.20.0

type TableSchema struct {
	// map columnName -> columnSchema
	Columns     map[string]ColumnSchema
	Name        string
	FullName    string
	Schema      string
	Description string
}

TableSchema contains the details of a single table in the schema

type WaitOption added in v0.19.0

type WaitOption func(w *waitConfig)

func WithRetryInterval added in v0.19.0

func WithRetryInterval(d time.Duration) WaitOption

func WithTimeout added in v0.19.0

func WithTimeout(d time.Duration) WaitOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL