Documentation ¶
Index ¶
- Constants
- Variables
- func Run(ctx context.Context, sjkr SQSJkr, level string) error
- func SpawnWorker(sjkr SQSJkr, wid int, js <-chan Job, s *Stats)
- type AccountSection
- type Config
- func (c *Config) SetAWSAccount(id, profile, region string)
- func (c *Config) SetConcurrentNum(num int)
- func (c *Config) SetKickerConfig(num int, trigger string)
- func (c *Config) SetSQSQueue(qname string)
- func (c *Config) SetStatsPort(port int) error
- func (c *Config) SetStatsSocket(sock string) error
- func (c *Config) SetTriggerCommand(trigger string)
- func (c *Config) Validate() error
- type DefaultJob
- type DefaultSQSJkr
- func (sjkr *DefaultSQSJkr) Config() *Config
- func (sjkr *DefaultSQSJkr) JobStream() chan Job
- func (sjkr *DefaultSQSJkr) Locker() lock.Locker
- func (sjkr *DefaultSQSJkr) Run(ctx context.Context) error
- func (sjkr *DefaultSQSJkr) SetLocker(l lock.Locker)
- func (sjkr *DefaultSQSJkr) SetThrottler(t throttle.Throttler)
- func (sjkr *DefaultSQSJkr) Throttler() throttle.Throttler
- type Duration
- type Job
- type KickerSection
- type LogLevel
- type Logger
- type MessageBody
- type SQSJkr
- type SQSSection
- type Stats
- type StatsItem
- type Worker
Constants ¶
const ( DefaultMaxCocurrentNum = 20 VisibilityTimeout = 30 WaitTimeSec = 10 MaxRetrieveMessageNum = 10 JobRetryInterval = time.Second * 5 ApplicationJSON = "application/json" DefaultStatsPort = 8061 )
Default Const
Variables ¶
var (
ErrOverLifeTime = errors.New("over life time")
)
var TrapSignals = []os.Signal{ syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, }
TrapSignals list
Functions ¶
Types ¶
type AccountSection ¶
type AccountSection struct { Profile string `toml:"profile"` ID string `toml:"id"` Region string `toml:"region"` }
AccountSection is aws account information
type Config ¶
type Config struct { Account AccountSection `toml:"account"` Kicker KickerSection `toml:"kicker"` SQS SQSSection `toml:"sqs"` }
Config is the sqsjkr config
func LoadConfig ¶
LoadConfig loads config file by config file path
func (*Config) SetAWSAccount ¶
SetAWSAccount set aws account
func (*Config) SetConcurrentNum ¶
SetConcurrentNum set number of concurrent to execute job
func (*Config) SetKickerConfig ¶
SetKickerConfig set kicker config
func (*Config) SetSQSQueue ¶
SetSQSQueue set sqs queue name
func (*Config) SetStatsPort ¶
SetStatsPort set stats api port number
func (*Config) SetStatsSocket ¶
SetStatsSocket set unix domain socket path
func (*Config) SetTriggerCommand ¶
SetTriggerCommand set trigger command
type DefaultJob ¶
type DefaultJob struct {
// contains filtered or unexported fields
}
DefaultJob is created by one SQS message's body
func (*DefaultJob) Execute ¶
func (j *DefaultJob) Execute(lkr lock.Locker) ([]byte, error)
Execute executes command
func (DefaultJob) JobID ¶
func (j DefaultJob) JobID() string
JobID return job's id which is unique (sqs message id).
func (*DefaultJob) String ¶ added in v0.5.1
func (j *DefaultJob) String() string
type DefaultSQSJkr ¶
type DefaultSQSJkr struct { SQS *sqs.SQS RetentionPeriod time.Duration // contains filtered or unexported fields }
DefaultSQSJkr default
func (*DefaultSQSJkr) Config ¶
func (sjkr *DefaultSQSJkr) Config() *Config
Config return SQSJkr config
func (*DefaultSQSJkr) JobStream ¶
func (sjkr *DefaultSQSJkr) JobStream() chan Job
JobStream return Job chan.
func (*DefaultSQSJkr) Locker ¶
func (sjkr *DefaultSQSJkr) Locker() lock.Locker
Locker return DefaultSQSJkr's Locker
func (*DefaultSQSJkr) Run ¶
func (sjkr *DefaultSQSJkr) Run(ctx context.Context) error
Run sqsjkr daemon
func (*DefaultSQSJkr) SetLocker ¶
func (sjkr *DefaultSQSJkr) SetLocker(l lock.Locker)
SetLocker set DefaultSQSJkr's Locker
func (*DefaultSQSJkr) SetThrottler ¶
func (sjkr *DefaultSQSJkr) SetThrottler(t throttle.Throttler)
SetThrottler set DefaultSQSJkr's Throttler
func (*DefaultSQSJkr) Throttler ¶
func (sjkr *DefaultSQSJkr) Throttler() throttle.Throttler
Throttler return DefaultSQSJkr's Throttler
type Duration ¶
Duration struct
func (*Duration) UnmarshalJSON ¶
UnmarshalJSON Duration field to decode json
type Job ¶
type Job interface { Execute(lock.Locker) ([]byte, error) JobID() string EventID() string Command() string String() string }
Job is sqsjkr job struct
type KickerSection ¶
type KickerSection struct { MaxConcurrentNum int `toml:"max_concurrent_num"` Trigger string `toml:"life_time_trigger"` StatsPort int `toml:"stats_port"` StatsSocket string `toml:"stats_socket"` }
KickerSection is the config of command kicker
type Logger ¶
Logger is sqsjkr logger struct
type MessageBody ¶
type MessageBody struct { Command string `json:"command"` Environments map[string]string `json:"envs"` EventID string `json:"event_id"` LifeTime Duration `json:"life_time"` LockID string `json:"lock_id"` AbortIfLocked bool `json:"abort_if_locked"` DisableLifeTimeTrigger bool `json:"disable_life_time_trigger"` }
MessageBody for decoding json
func (MessageBody) String ¶ added in v0.3.2
func (m MessageBody) String() string
type SQSJkr ¶
type SQSJkr interface { Run(context.Context) error Config() *Config JobStream() chan Job Locker() lock.Locker Throttler() throttle.Throttler SetLocker(locker lock.Locker) SetThrottler(throttle throttle.Throttler) }
SQSJkr interfaces
type SQSSection ¶
type SQSSection struct {
QueueName string `toml:"queue_name"`
}
SQSSection is the AWS SQS configure
type Stats ¶ added in v0.5.0
type Stats struct { Workers struct { Busy int64 `json:"busy"` Idle int64 `json:"idle"` } `json:"workers"` Invocations struct { Succeeded int64 `json:"succeeded"` Failed int64 `json:"failed"` Errored int64 `json:"errored"` } `json:"invocations"` // contains filtered or unexported fields }
Stats represents stats.
type StatsItem ¶
type StatsItem struct { IdleWorkerNum uint32 `json:"idle_worker"` BusyWorkerNum uint32 `json:"busy_worker"` }
StatsItem struct