actions

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2023 License: Apache-2.0 Imports: 36 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LogOutputExtension = ".log"
	LogOutputLocation  = "_lakefs/actions/log"
)
View Source
const (
	HeadersPropertyKey = "headers"
)
View Source
const (
	PartitionKey = "actions"
)

Variables

View Source
var (
	ErrInvalidAction         = errors.New("invalid action")
	ErrInvalidEventParameter = errors.New("invalid event parameter")
)
View Source
var (
	ErrNotFound      = errors.New("not found")
	ErrNilValue      = errors.New("nil value")
	ErrIfExprNotBool = errors.New("hook 'if' expression should evaluate to a boolean")
)
View Source
var ErrParamConflict = errors.New("parameters conflict")
View Source
var ErrUnknownHookType = errors.New("unknown hook type")
View Source
var File_actions_proto protoreflect.FileDescriptor

Functions

func DescendArgs added in v0.87.0

func DescendArgs(args interface{}) (interface{}, error)

func FormatHookOutputPath

func FormatHookOutputPath(runID, hookRunID string) string

func FormatRunManifestOutputPath

func FormatRunManifestOutputPath(runID string) string

func LuaRun added in v0.87.0

func LuaRun(l *lua.State, code, name string) error

func NewHookRunID

func NewHookRunID(actionIdx, hookIdx int) string

func RunByBranchPath added in v0.67.0

func RunByBranchPath(repoID, branchID, runID string) []byte

func RunByCommitPath added in v0.67.0

func RunByCommitPath(repoID, commitID, runID string) []byte

func RunPath added in v0.67.0

func RunPath(repoID, runID string) []byte

func TasksPath added in v0.67.0

func TasksPath(repoID, runID string) string

Types

type Action

type Action struct {
	Name        string                           `yaml:"name"`
	Description string                           `yaml:"description"`
	On          map[graveler.EventType]*ActionOn `yaml:"on"`
	Hooks       []ActionHook                     `yaml:"hooks"`
}

func LoadActions

func LoadActions(ctx context.Context, source Source, record graveler.HookRecord) ([]*Action, error)

func MatchedActions

func MatchedActions(actions []*Action, spec MatchSpec) ([]*Action, error)

func ParseAction

func ParseAction(data []byte) (*Action, error)

ParseAction helper function to read, parse and validate Action from a reader

func (*Action) Match

func (a *Action) Match(spec MatchSpec) (bool, error)

func (*Action) Validate

func (a *Action) Validate() error

type ActionHook

type ActionHook struct {
	ID          string     `yaml:"id"`
	Type        HookType   `yaml:"type"`
	Description string     `yaml:"description"`
	If          string     `yaml:"if"`
	Properties  Properties `yaml:"properties"`
}

type ActionOn

type ActionOn struct {
	Branches []string `yaml:"branches"`
}

type Airflow added in v0.47.0

type Airflow struct {
	HookBase
	URL        string
	DagID      string
	Username   string
	Password   SecureString
	DAGConf    map[string]interface{}
	Timeout    time.Duration
	WaitForDAG bool
}

func (*Airflow) Run added in v0.47.0

func (a *Airflow) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error

type Config added in v0.99.0

type Config struct {
	Enabled bool
	Lua     struct {
		NetHTTPEnabled bool
	}
}

type DagRunReq added in v0.47.0

type DagRunReq struct {
	// DagRunID Run ID. This together with DAG_ID are a unique key.
	DagRunID string `json:"dag_run_id,omitempty"`
	// Conf JSON object describing additional configuration parameters.
	Conf map[string]interface{} `json:"conf,omitempty"`
}

type DecreasingIDGenerator added in v0.67.0

type DecreasingIDGenerator struct{}

DecreasingIDGenerator creates IDs that are decreasing with time

func (*DecreasingIDGenerator) NewRunID added in v0.67.0

func (gen *DecreasingIDGenerator) NewRunID() string

type EventInfo added in v0.47.0

type EventInfo struct {
	EventType      string            `json:"event_type"`
	EventTime      string            `json:"event_time"`
	ActionName     string            `json:"action_name"`
	HookID         string            `json:"hook_id"`
	RepositoryID   string            `json:"repository_id"`
	BranchID       string            `json:"branch_id,omitempty"`
	SourceRef      string            `json:"source_ref,omitempty"`
	TagID          string            `json:"tag_id,omitempty"`
	CommitID       string            `json:"commit_id,omitempty"`
	CommitMessage  string            `json:"commit_message,omitempty"`
	Committer      string            `json:"committer,omitempty"`
	CommitMetadata map[string]string `json:"commit_metadata,omitempty"`
}

type Hook

type Hook interface {
	Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error
}

Hook is the abstraction of the basic user-configured runnable building-stone

func NewAirflowHook added in v0.47.0

func NewAirflowHook(h ActionHook, action *Action, cfg Config, endpoint *http.Server) (Hook, error)

func NewHook

func NewHook(hook ActionHook, action *Action, cfg Config, server *http.Server) (Hook, error)

func NewLuaHook added in v0.87.0

func NewLuaHook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook, error)

func NewWebhook

func NewWebhook(h ActionHook, action *Action, cfg Config, e *http.Server) (Hook, error)

type HookBase added in v0.47.0

type HookBase struct {
	ID         string
	ActionName string
	Config     Config
	Endpoint   *http.Server
}

type HookOutputWriter

type HookOutputWriter struct {
	StorageNamespace string
	RunID            string
	HookRunID        string
	ActionName       string
	HookID           string
	Writer           OutputWriter
}

func (*HookOutputWriter) OutputWrite

func (h *HookOutputWriter) OutputWrite(ctx context.Context, reader io.Reader, size int64) error

type HookType

type HookType string
const (
	HookTypeWebhook HookType = "webhook"
	HookTypeAirflow HookType = "airflow"
	HookTypeLua     HookType = "lua"
)

type IDGenerator added in v0.67.0

type IDGenerator interface {
	// NewRunID creates IDs for Runs.
	NewRunID() string
}

type IncreasingIDGenerator added in v0.67.0

type IncreasingIDGenerator struct{}

IncreasingIDGenerator creates IDs that are increasing with time

func (*IncreasingIDGenerator) NewRunID added in v0.67.0

func (gen *IncreasingIDGenerator) NewRunID() string

type KVRunResultIterator added in v0.67.0

type KVRunResultIterator struct {
	// contains filtered or unexported fields
}

func NewKVRunResultIterator added in v0.67.0

func NewKVRunResultIterator(ctx context.Context, store kv.Store, repositoryID, branchID, commitID, after string) (*KVRunResultIterator, error)

NewKVRunResultIterator returns a new iterator over actions run results 'after' determines the runID which we should start the scan from, used for pagination

func (*KVRunResultIterator) Close added in v0.67.0

func (i *KVRunResultIterator) Close()

func (*KVRunResultIterator) Err added in v0.67.0

func (i *KVRunResultIterator) Err() error

func (*KVRunResultIterator) Next added in v0.67.0

func (i *KVRunResultIterator) Next() bool

func (*KVRunResultIterator) Value added in v0.67.0

func (i *KVRunResultIterator) Value() *RunResult

type KVTaskResultIterator added in v0.67.0

type KVTaskResultIterator struct {
	// contains filtered or unexported fields
}

func NewKVTaskResultIterator added in v0.67.0

func NewKVTaskResultIterator(ctx context.Context, store kv.Store, repositoryID, runID, after string) (*KVTaskResultIterator, error)

NewKVTaskResultIterator returns a new iterator over actions task results of a specific run 'after' determines the hook run ID which we should start the scan from, used for pagination

func (*KVTaskResultIterator) Close added in v0.67.0

func (i *KVTaskResultIterator) Close()

func (*KVTaskResultIterator) Err added in v0.67.0

func (i *KVTaskResultIterator) Err() error

func (*KVTaskResultIterator) Next added in v0.67.0

func (i *KVTaskResultIterator) Next() bool

func (*KVTaskResultIterator) Value added in v0.67.0

func (i *KVTaskResultIterator) Value() *TaskResult

type LuaHook added in v0.87.0

type LuaHook struct {
	HookBase
	Script     string
	ScriptPath string
	Args       map[string]interface{}
}

func (*LuaHook) Run added in v0.87.0

func (h *LuaHook) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) error

type MatchSpec

type MatchSpec struct {
	EventType graveler.EventType
	BranchID  graveler.BranchID
}

type NewHookFunc

type NewHookFunc func(ActionHook, *Action, Config, *http.Server) (Hook, error)

type OutputWriter

type OutputWriter interface {
	OutputWrite(ctx context.Context, storageNamespace, name string, reader io.Reader, size int64) error
}

type Properties added in v0.47.0

type Properties map[string]interface{}

type RunManifest

type RunManifest struct {
	Run      RunResult    `json:"run"`
	HooksRun []TaskResult `json:"hooks,omitempty"`
}

type RunResult

type RunResult struct {
	RunID     string    `db:"run_id" json:"run_id"`
	BranchID  string    `db:"branch_id" json:"branch_id"`
	SourceRef string    `db:"source_ref" json:"source_ref"`
	EventType string    `db:"event_type" json:"event_type"`
	CommitID  string    `db:"commit_id" json:"commit_id,omitempty"`
	StartTime time.Time `db:"start_time" json:"start_time"`
	EndTime   time.Time `db:"end_time" json:"end_time"`
	Passed    bool      `db:"passed" json:"passed"`
}

func RunResultFromProto added in v0.67.0

func RunResultFromProto(pb *RunResultData) *RunResult

type RunResultData added in v0.67.0

type RunResultData struct {
	RunId     string                 `protobuf:"bytes,1,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	BranchId  string                 `protobuf:"bytes,2,opt,name=branch_id,json=branchId,proto3" json:"branch_id,omitempty"`
	CommitId  string                 `protobuf:"bytes,3,opt,name=commit_id,json=commitId,proto3" json:"commit_id,omitempty"`
	SourceRef string                 `protobuf:"bytes,4,opt,name=source_ref,json=sourceRef,proto3" json:"source_ref,omitempty"`
	EventType string                 `protobuf:"bytes,5,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"`
	StartTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
	EndTime   *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"`
	Passed    bool                   `protobuf:"varint,8,opt,name=passed,proto3" json:"passed,omitempty"`
	// contains filtered or unexported fields
}

message data model for RunResult struct

func (*RunResultData) Descriptor deprecated added in v0.67.0

func (*RunResultData) Descriptor() ([]byte, []int)

Deprecated: Use RunResultData.ProtoReflect.Descriptor instead.

func (*RunResultData) GetBranchId added in v0.67.0

func (x *RunResultData) GetBranchId() string

func (*RunResultData) GetCommitId added in v0.67.0

func (x *RunResultData) GetCommitId() string

func (*RunResultData) GetEndTime added in v0.67.0

func (x *RunResultData) GetEndTime() *timestamppb.Timestamp

func (*RunResultData) GetEventType added in v0.67.0

func (x *RunResultData) GetEventType() string

func (*RunResultData) GetPassed added in v0.67.0

func (x *RunResultData) GetPassed() bool

func (*RunResultData) GetRunId added in v0.67.0

func (x *RunResultData) GetRunId() string

func (*RunResultData) GetSourceRef added in v0.67.0

func (x *RunResultData) GetSourceRef() string

func (*RunResultData) GetStartTime added in v0.67.0

func (x *RunResultData) GetStartTime() *timestamppb.Timestamp

func (*RunResultData) ProtoMessage added in v0.67.0

func (*RunResultData) ProtoMessage()

func (*RunResultData) ProtoReflect added in v0.67.0

func (x *RunResultData) ProtoReflect() protoreflect.Message

func (*RunResultData) Reset added in v0.67.0

func (x *RunResultData) Reset()

func (*RunResultData) String added in v0.67.0

func (x *RunResultData) String() string

type RunResultIterator

type RunResultIterator interface {
	Next() bool
	Value() *RunResult
	Err() error
	Close()
}

type SecureString added in v0.48.0

type SecureString struct {
	// contains filtered or unexported fields
}

SecureString is a string that may be populated from an environment variable. If constructed with a string of the form {{ ENV.EXAMPLE_VARIABLE }}, the value is populated from EXAMPLE_VARIABLE and is considered a secret. Otherwise the value is taken from the string as-is, and is not considered a secret.

func NewSecureString added in v0.48.0

func NewSecureString(s string) (SecureString, error)

NewSecureString creates a new SecureString, reading env var if needed.

func (SecureString) String added in v0.48.0

func (s SecureString) String() string

Returns the string for non-secrets, or asterisks otherwise.

type Service

type Service interface {
	Stop()
	Run(ctx context.Context, record graveler.HookRecord) error
	UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error
	GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)
	GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)
	ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)
	ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)
	graveler.HooksHandler
}

type Source

type Source interface {
	List(ctx context.Context, record graveler.HookRecord) ([]string, error)
	Load(ctx context.Context, record graveler.HookRecord, name string) ([]byte, error)
}

type Store added in v0.67.0

type Store interface {
	// UpdateCommitID will update an already stored run with the commit results
	UpdateCommitID(ctx context.Context, repositoryID string, runID string, commitID string) (*RunManifest, error)

	GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)
	GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)
	ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)
	ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)
	// contains filtered or unexported methods
}

Store is an abstraction over our datasource (key-value store) that provides actions operations

func NewActionsKVStore added in v0.67.0

func NewActionsKVStore(store kv.Store) Store

type StoreService added in v0.67.0

type StoreService struct {
	Store Store

	Source Source
	Writer OutputWriter
	// contains filtered or unexported fields
}

StoreService is an implementation of actions.Service that saves the run data to the blockstore and to the actions.Store (which is a fancy name for a DB - kv style or postgres directly)

func NewService

func NewService(ctx context.Context, store Store, source Source, writer OutputWriter, idGen IDGenerator, stats stats.Collector, cfg Config) *StoreService

func (*StoreService) GetRunResult added in v0.67.0

func (s *StoreService) GetRunResult(ctx context.Context, repositoryID string, runID string) (*RunResult, error)

func (*StoreService) GetTaskResult added in v0.67.0

func (s *StoreService) GetTaskResult(ctx context.Context, repositoryID string, runID string, hookRunID string) (*TaskResult, error)

func (*StoreService) ListRunResults added in v0.67.0

func (s *StoreService) ListRunResults(ctx context.Context, repositoryID string, branchID, commitID string, after string) (RunResultIterator, error)

func (*StoreService) ListRunTaskResults added in v0.67.0

func (s *StoreService) ListRunTaskResults(ctx context.Context, repositoryID string, runID string, after string) (TaskResultIterator, error)

func (*StoreService) NewRunID added in v0.67.0

func (s *StoreService) NewRunID() string

func (*StoreService) PostCommitHook added in v0.67.0

func (s *StoreService) PostCommitHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PostCreateBranchHook added in v0.67.0

func (s *StoreService) PostCreateBranchHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostCreateTagHook added in v0.67.0

func (s *StoreService) PostCreateTagHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostDeleteBranchHook added in v0.67.0

func (s *StoreService) PostDeleteBranchHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostDeleteTagHook added in v0.67.0

func (s *StoreService) PostDeleteTagHook(ctx context.Context, record graveler.HookRecord)

func (*StoreService) PostMergeHook added in v0.67.0

func (s *StoreService) PostMergeHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCommitHook added in v0.67.0

func (s *StoreService) PreCommitHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCreateBranchHook added in v0.67.0

func (s *StoreService) PreCreateBranchHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreCreateTagHook added in v0.67.0

func (s *StoreService) PreCreateTagHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreDeleteBranchHook added in v0.67.0

func (s *StoreService) PreDeleteBranchHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreDeleteTagHook added in v0.67.0

func (s *StoreService) PreDeleteTagHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) PreMergeHook added in v0.67.0

func (s *StoreService) PreMergeHook(ctx context.Context, record graveler.HookRecord) error

func (*StoreService) Run added in v0.67.0

func (s *StoreService) Run(ctx context.Context, record graveler.HookRecord) error

Run load and run actions based on the event information

func (*StoreService) SetEndpoint added in v0.87.0

func (s *StoreService) SetEndpoint(h *http.Server)

func (*StoreService) Stop added in v0.67.0

func (s *StoreService) Stop()

func (*StoreService) UpdateCommitID added in v0.67.0

func (s *StoreService) UpdateCommitID(ctx context.Context, repositoryID string, storageNamespace string, runID string, commitID string) error

UpdateCommitID assume record is a post event, we use the PreRunID to update the commit_id and save the run manifest again

type Task

type Task struct {
	RunID     string
	HookRunID string
	Action    *Action
	HookID    string
	Hook      Hook
	If        string
	Err       error
	StartTime time.Time
	EndTime   time.Time
}

type TaskResult

type TaskResult struct {
	RunID      string    `db:"run_id" json:"run_id"`
	HookRunID  string    `db:"hook_run_id" json:"hook_run_id"`
	HookID     string    `db:"hook_id" json:"hook_id"`
	ActionName string    `db:"action_name" json:"action_name"`
	StartTime  time.Time `db:"start_time" json:"start_time"`
	EndTime    time.Time `db:"end_time" json:"end_time"`
	Passed     bool      `db:"passed" json:"passed"`
}

func (*TaskResult) LogPath

func (r *TaskResult) LogPath() string

type TaskResultData added in v0.67.0

type TaskResultData struct {
	RunId      string                 `protobuf:"bytes,1,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"`
	HookRunId  string                 `protobuf:"bytes,2,opt,name=hook_run_id,json=hookRunId,proto3" json:"hook_run_id,omitempty"`
	HookId     string                 `protobuf:"bytes,3,opt,name=hook_id,json=hookId,proto3" json:"hook_id,omitempty"`
	ActionName string                 `protobuf:"bytes,4,opt,name=action_name,json=actionName,proto3" json:"action_name,omitempty"`
	StartTime  *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"`
	EndTime    *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=end_time,json=endTime,proto3" json:"end_time,omitempty"`
	Passed     bool                   `protobuf:"varint,9,opt,name=passed,proto3" json:"passed,omitempty"`
	// contains filtered or unexported fields
}

message data model for TaskResult struct

func (*TaskResultData) Descriptor deprecated added in v0.67.0

func (*TaskResultData) Descriptor() ([]byte, []int)

Deprecated: Use TaskResultData.ProtoReflect.Descriptor instead.

func (*TaskResultData) GetActionName added in v0.67.0

func (x *TaskResultData) GetActionName() string

func (*TaskResultData) GetEndTime added in v0.67.0

func (x *TaskResultData) GetEndTime() *timestamppb.Timestamp

func (*TaskResultData) GetHookId added in v0.67.0

func (x *TaskResultData) GetHookId() string

func (*TaskResultData) GetHookRunId added in v0.67.0

func (x *TaskResultData) GetHookRunId() string

func (*TaskResultData) GetPassed added in v0.67.0

func (x *TaskResultData) GetPassed() bool

func (*TaskResultData) GetRunId added in v0.67.0

func (x *TaskResultData) GetRunId() string

func (*TaskResultData) GetStartTime added in v0.67.0

func (x *TaskResultData) GetStartTime() *timestamppb.Timestamp

func (*TaskResultData) ProtoMessage added in v0.67.0

func (*TaskResultData) ProtoMessage()

func (*TaskResultData) ProtoReflect added in v0.67.0

func (x *TaskResultData) ProtoReflect() protoreflect.Message

func (*TaskResultData) Reset added in v0.67.0

func (x *TaskResultData) Reset()

func (*TaskResultData) String added in v0.67.0

func (x *TaskResultData) String() string

type TaskResultIterator

type TaskResultIterator interface {
	Next() bool
	Value() *TaskResult
	Err() error
	Close()
}

type Webhook

type Webhook struct {
	HookBase
	URL         string
	Timeout     time.Duration
	QueryParams map[string][]SecureString
	Headers     map[string]SecureString
}

func (*Webhook) Run

func (w *Webhook) Run(ctx context.Context, record graveler.HookRecord, buf *bytes.Buffer) (err error)

Directories

Path Synopsis
lua
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL