work_schedule

package
v0.0.0-...-8122643 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 5, 2024 License: Apache-2.0 Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T Work] struct {
	WorkBuilder WorkBuilder[T]
	WorkRunner  WorkRunner[T]
	WorkInvoker WorkInvoker[T]
}

type PoolWorkNotificationHandler

type PoolWorkNotificationHandler[T Work] struct {
	pubsub_subscriber.SubscriberClientBase
	// contains filtered or unexported fields
}

func (*PoolWorkNotificationHandler[T]) Handle

func (p *PoolWorkNotificationHandler[T]) Handle(ctx op_context.Context, msg *PubsubWork[T]) error

type PoolWorkPublisher

type PoolWorkPublisher[T Work] struct {
	// contains filtered or unexported fields
}

func NewPoolWorkPublisher

func NewPoolWorkPublisher[T Work](pubsub pool_pubsub.PoolPubsub, topicName string) *PoolWorkPublisher[T]

func (*PoolWorkPublisher[T]) InvokeWork

func (p *PoolWorkPublisher[T]) InvokeWork(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error

type PoolWorkSubscriber

type PoolWorkSubscriber[T Work] struct {
	// contains filtered or unexported fields
}

func NewPoolSubscriber

func NewPoolSubscriber[T Work](tenancies multitenancy.Multitenancy, controller *WorkSchedule[T], name string) *PoolWorkSubscriber[T]

func (*PoolWorkSubscriber[T]) Init

func (p *PoolWorkSubscriber[T]) Init(ctx op_context.Context, pubsub pool_pubsub.PoolPubsub, topicName string) error

type PostMode

type PostMode int
const (
	SCHEDULE PostMode = 0
	DIRECT   PostMode = 1
	QUEUED   PostMode = 2
)

func Mode

func Mode(m string) PostMode

type PubsubTopic

type PubsubTopic[T Work] struct {
	*pubsub_subscriber.TopicBase[*PubsubWork[T]]
}

type PubsubWork

type PubsubWork[T Work] struct {
	Work    T        `json:"work"`
	Mode    PostMode `json:"mode"`
	Tenancy string   `json:"tenancy"`
}

func MakePubsubWork

func MakePubsubWork[T Work]() *PubsubWork[T]

func NewPubsubWork

func NewPubsubWork[T Work](work T, postMode PostMode, tenancy ...multitenancy.Tenancy) *PubsubWork[T]

type Work

type Work interface {
	common.Object

	GetReferenceType() string
	SetReferenceType(string)

	GetReferenceId() string
	SetReferenceId(string)

	GetNextTime() time.Time
	SetNextTime(time.Time)
	ResetNextTime()

	GetDelay() int
	SetDelay(int)

	SetLock(cache.Lock)
	GetLock() cache.Lock

	SetNoDb(enable bool)
	IsNoDb() bool
}

type WorkBase

type WorkBase struct {
	common.ObjectBase
	ReferenceId   string    `json:"reference_id" gorm:"index;index:,unique,composite:ref"`
	ReferenceType string    `json:"reference_type" gorm:"index;index:,unique,composite:ref"`
	NextTime      time.Time `json:"next_time" gorm:"index"`
	NextTimeSet   bool      `json:"next_time_set" gorm:"index;default:false"`
	// contains filtered or unexported fields
}

func (*WorkBase) GetDelay

func (w *WorkBase) GetDelay() int

func (*WorkBase) GetLock

func (w *WorkBase) GetLock() cache.Lock

func (*WorkBase) GetNextTime

func (w *WorkBase) GetNextTime() time.Time

func (*WorkBase) GetReferenceId

func (w *WorkBase) GetReferenceId() string

func (*WorkBase) GetReferenceType

func (w *WorkBase) GetReferenceType() string

func (*WorkBase) IsNoDb

func (w *WorkBase) IsNoDb() bool

func (*WorkBase) ResetNextTime

func (w *WorkBase) ResetNextTime()

func (*WorkBase) SetDelay

func (w *WorkBase) SetDelay(delay int)

func (*WorkBase) SetLock

func (w *WorkBase) SetLock(lock cache.Lock)

func (*WorkBase) SetNextTime

func (w *WorkBase) SetNextTime(nextTime time.Time)

func (*WorkBase) SetNoDb

func (w *WorkBase) SetNoDb(enable bool)

func (*WorkBase) SetReferenceId

func (w *WorkBase) SetReferenceId(referenceId string)

func (*WorkBase) SetReferenceType

func (w *WorkBase) SetReferenceType(referenceType string)

type WorkBuilder

type WorkBuilder[T Work] func() T

type WorkInTenancyBase

type WorkInTenancyBase struct {
	WorkBase
	TenancyId string `json:"tenancy_id" gorm:"index"`
}

func (*WorkInTenancyBase) GetTenancyId

func (w *WorkInTenancyBase) GetTenancyId() string

func (*WorkInTenancyBase) SetTenancy

func (w *WorkInTenancyBase) SetTenancy(tenancy multitenancy.Tenancy)

type WorkInvoker

type WorkInvoker[T Work] func(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error

type WorkRunner

type WorkRunner[T Work] interface {
	Run(ctx op_context.Context, work T) (bool, error)
}

type WorkSchedule

type WorkSchedule[T Work] struct {
	WorkScheduleConfig
	app_context.WithAppBase
	crud.WithCRUDBase
	background_worker.JobRunnerBase

	WorkSchedulerBase[T]
	// contains filtered or unexported fields
}

func NewWorkSchedule

func NewWorkSchedule[T Work](name string, config Config[T], cruds ...crud.CRUD) *WorkSchedule[T]

func (*WorkSchedule[T]) AcquireWork

func (s *WorkSchedule[T]) AcquireWork(ctx op_context.Context, work T) error

func (*WorkSchedule[T]) Config

func (s *WorkSchedule[T]) Config() interface{}

func (*WorkSchedule[T]) DoWork

func (s *WorkSchedule[T]) DoWork(ctx op_context.Context, work T) error

func (*WorkSchedule[T]) Init

func (s *WorkSchedule[T]) Init(app app_context.Context, configPath ...string) error

func (*WorkSchedule[T]) InvokeWork

func (s *WorkSchedule[T]) InvokeWork(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error

func (*WorkSchedule[T]) PostWork

func (s *WorkSchedule[T]) PostWork(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error

func (*WorkSchedule[T]) ProcessWorks

func (s *WorkSchedule[T]) ProcessWorks()

func (*WorkSchedule[T]) ReleaseWork

func (s *WorkSchedule[T]) ReleaseWork(ctx op_context.Context, work T) error

func (*WorkSchedule[T]) RemoveWork

func (s *WorkSchedule[T]) RemoveWork(ctx op_context.Context, referenceId string, refernecType string) error

func (*WorkSchedule[T]) RunJob

func (s *WorkSchedule[T]) RunJob()

func (*WorkSchedule[T]) SetNextWorkTime

func (s *WorkSchedule[T]) SetNextWorkTime(work T, reset ...bool)

func (*WorkSchedule[T]) SetOverrideDb

func (s *WorkSchedule[T]) SetOverrideDb(db db.DB)

func (*WorkSchedule[T]) SetRunner

func (s *WorkSchedule[T]) SetRunner(runner WorkRunner[T])

func (*WorkSchedule[T]) StopJob

func (s *WorkSchedule[T]) StopJob()

func (*WorkSchedule[T]) UpdateWorkNextTime

func (s *WorkSchedule[T]) UpdateWorkNextTime(ctx op_context.Context, work T, tenancy ...multitenancy.Tenancy) error

type WorkScheduleConfig

type WorkScheduleConfig struct {
	PARALLEL_JOBS               int `default:"8"`
	BUCKET_SIZE                 int `default:"32"`
	INVOKATION_INTERVAL_SECONDS int `default:"300"`
	HOLD_WORK_SECONDS           int `default:"900"`
	LOCK_TTL_SECONDS            int `default:"300"`
	PERIOD                      int `default:"5"`
	LOG_EMPTY_WORKS             bool
}

type WorkScheduler

type WorkScheduler[T Work] interface {
	NewWork(referenceId string, referenceType string) T
	AcquireWork(ctx op_context.Context, work T) error
	ReleaseWork(ctx op_context.Context, work T) error
	PostWork(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error
	RemoveWork(ctx op_context.Context, referenceId string, referenceType string) error
}

type WorkSchedulerBase

type WorkSchedulerBase[T Work] struct {
	// contains filtered or unexported fields
}

func (*WorkSchedulerBase[T]) AcquireWork

func (s *WorkSchedulerBase[T]) AcquireWork(ctx op_context.Context, work T) error

func (*WorkSchedulerBase[T]) Construct

func (s *WorkSchedulerBase[T]) Construct(workBuilder WorkBuilder[T])

func (*WorkSchedulerBase[T]) NewWork

func (s *WorkSchedulerBase[T]) NewWork(referenceId string, referenceType string) T

func (*WorkSchedulerBase[T]) PostWork

func (s *WorkSchedulerBase[T]) PostWork(ctx op_context.Context, work T, postMode PostMode, tenancy ...multitenancy.Tenancy) error

func (*WorkSchedulerBase[T]) ReleaseWork

func (s *WorkSchedulerBase[T]) ReleaseWork(ctx op_context.Context, work T) error

func (*WorkSchedulerBase[T]) RemoveWork

func (s *WorkSchedulerBase[T]) RemoveWork(ctx op_context.Context, referenceId string) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL