Documentation
¶
Index ¶
- Constants
- Variables
- type Dummy
- func (d *Dummy) AddEvent(namespace, id, status string, entity *Event) error
- func (d *Dummy) AppendTaskLog(taskID string, now time.Time, system, message, level string) error
- func (d *Dummy) Close() error
- func (d *Dummy) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
- func (d *Dummy) DeleteSignature(sourceID, collection string) error
- func (d *Dummy) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
- func (d *Dummy) GetAllTasks(sourceID, collection string, from, to time.Time, limit int) ([]Task, error)
- func (d *Dummy) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
- func (d *Dummy) GetAllTasksHeartBeat() (map[string]string, error)
- func (d *Dummy) GetEvents(namespace, id, status string, limit int) ([]Event, error)
- func (d *Dummy) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, ...) ([]EventsPerTime, error)
- func (d *Dummy) GetLastTask(sourceID, collection string, offset int) (*Task, error)
- func (d *Dummy) GetOrCreateClusterID() string
- func (d *Dummy) GetProjectDestinationIDs(projectID string) ([]string, error)
- func (d *Dummy) GetProjectPushSourceIDs(projectID string) ([]string, error)
- func (d *Dummy) GetProjectSourceIDs(projectID string) ([]string, error)
- func (d *Dummy) GetSignature(sourceID, collection, interval string) (string, error)
- func (d *Dummy) GetTask(taskID string) (*Task, error)
- func (d *Dummy) GetTaskLogs(taskID string, from, to time.Time) ([]TaskLogRecord, error)
- func (d *Dummy) GetTotalEvents(namespace, id, status string) (int, error)
- func (d *Dummy) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
- func (d *Dummy) PollTask() (*Task, error)
- func (d *Dummy) PushTask(task *Task) error
- func (d *Dummy) RemoveTaskFromHeartBeat(taskID string) error
- func (d *Dummy) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
- func (d *Dummy) SaveSignature(sourceID, collection, interval, signature string) error
- func (d *Dummy) TaskHeartBeat(taskID string) error
- func (d *Dummy) TrimEvents(namespace, id, status string, capacity int) error
- func (d *Dummy) Type() string
- func (d *Dummy) UpdateFinishedTask(taskID, status string) error
- func (d *Dummy) UpdateStartedTask(taskID, status string) error
- type ErrorMetrics
- type Event
- type EventsPerTime
- type Granularity
- type Options
- type Redis
- func (r *Redis) AddEvent(namespace, id, status string, entity *Event) error
- func (r *Redis) AppendTaskLog(taskID string, now time.Time, system, message, level string) error
- func (r *Redis) Close() error
- func (r *Redis) CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error
- func (r *Redis) DeleteSignature(sourceID, collection string) error
- func (r *Redis) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error)
- func (r *Redis) GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
- func (r *Redis) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
- func (r *Redis) GetAllTasksHeartBeat() (map[string]string, error)
- func (r *Redis) GetEvents(namespace, id, status string, limit int) ([]Event, error)
- func (r *Redis) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, ...) ([]EventsPerTime, error)
- func (r *Redis) GetLastTask(sourceID, collection string, offset int) (*Task, error)
- func (r *Redis) GetOrCreateClusterID() string
- func (r *Redis) GetProjectDestinationIDs(projectID string) ([]string, error)
- func (r *Redis) GetProjectPushSourceIDs(projectID string) ([]string, error)
- func (r *Redis) GetProjectSourceIDs(projectID string) ([]string, error)
- func (r *Redis) GetSignature(sourceID, collection, interval string) (string, error)
- func (r *Redis) GetTask(taskID string) (*Task, error)
- func (r *Redis) GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error)
- func (r *Redis) GetTotalEvents(namespace, id, status string) (int, error)
- func (r *Redis) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
- func (r *Redis) PollTask() (*Task, error)
- func (r *Redis) PushTask(task *Task) error
- func (r *Redis) RemoveTaskFromHeartBeat(taskID string) error
- func (r *Redis) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error)
- func (r *Redis) SaveSignature(sourceID, collection, interval, signature string) error
- func (r *Redis) TaskHeartBeat(taskID string) error
- func (r *Redis) TrimEvents(namespace, id, status string, capacity int) error
- func (r *Redis) Type() string
- func (r *Redis) UpdateFinishedTask(taskID, status string) error
- func (r *Redis) UpdateStartedTask(taskID, status string) error
- type RedisPool
- type RedisPoolFactory
- type Storage
- type Task
- type TaskLogRecord
Constants ¶
const ( DestinationNamespace = "destination" SourceNamespace = "source" //536-issue DEPRECATED //instead of this name - all sources will be in SourceNamespace and for push/pull events different keys will be selected PushSourceNamespace = "push_source" PushEventType = "push" PullEventType = "pull" SuccessStatus = "success" ErrorStatus = "errors" SkipStatus = "skip" ConfigPrefix = "config#" SystemKey = "system" EventsTokenNamespace = "token" EventsDestinationNamespace = "destination" EventsErrorStatus = "error" EventsPureStatus = "" )
const ( DummyType = "Dummy" RedisType = "Redis" )
Variables ¶
var DefaultOptions = Options{ DefaultDialConnectTimeout: 10 * time.Second, DefaultDialReadTimeout: 10 * time.Second, DefaultDialWriteTimeout: 10 * time.Second, MaxIdle: 10, MaxActive: 600, IdleTimeout: 240 * time.Second, PingTimeout: 30 * time.Second, }
DefaultOptions for Redis Pool
var (
ErrTaskNotFound = errors.New("Sync task wasn't found")
)
Functions ¶
This section is empty.
Types ¶
type Dummy ¶
type Dummy struct { }
func (*Dummy) AppendTaskLog ¶
func (*Dummy) CreateTask ¶
func (*Dummy) DeleteSignature ¶
func (*Dummy) GetAllTaskIDs ¶
func (*Dummy) GetAllTasks ¶
func (*Dummy) GetAllTasksForInitialHeartbeat ¶
func (*Dummy) GetAllTasksHeartBeat ¶
func (*Dummy) GetEventsWithGranularity ¶
func (d *Dummy) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)
func (*Dummy) GetLastTask ¶
func (*Dummy) GetOrCreateClusterID ¶
func (*Dummy) GetProjectDestinationIDs ¶
func (*Dummy) GetProjectPushSourceIDs ¶
func (*Dummy) GetProjectSourceIDs ¶
func (*Dummy) GetSignature ¶
func (*Dummy) GetTaskLogs ¶
func (*Dummy) GetTotalEvents ¶
func (*Dummy) IncrementEventsCount ¶
func (*Dummy) RemoveTaskFromHeartBeat ¶
func (*Dummy) RemoveTasks ¶
func (*Dummy) SaveSignature ¶
func (*Dummy) TaskHeartBeat ¶
func (*Dummy) TrimEvents ¶
func (*Dummy) UpdateFinishedTask ¶
func (*Dummy) UpdateStartedTask ¶
type ErrorMetrics ¶
type ErrorMetrics struct {
// contains filtered or unexported fields
}
func NewErrorMetrics ¶
func NewErrorMetrics(metricFunc func(string)) *ErrorMetrics
func (*ErrorMetrics) NoticeError ¶
func (em *ErrorMetrics) NoticeError(err error)
type Event ¶
type Event struct { Malformed string `json:"malformed,omitempty" redis:"malformed"` Original string `json:"original,omitempty" redis:"original"` Success string `json:"success,omitempty" redis:"success"` Error string `json:"error,omitempty" redis:"error"` Skip string `json:"skip,omitempty" redis:"skip"` Timestamp string `json:"timestamp,omitempty" redis:"timestamp"` UID string `json:"uid,omitempty" redis:"uid"` DestinationID string `json:"destination_id,omitempty" redis:"destination_id"` TokenID string `json:"token_id,omitempty" redis:"token_id"` }
type EventsPerTime ¶
type Granularity ¶
type Granularity string
Granularity is used for gathering statistics
const ( UNKNOWN Granularity = "" DAY Granularity = "day" HOUR Granularity = "hour" )
func GranularityFromString ¶
func GranularityFromString(value string) (Granularity, error)
func (Granularity) String ¶
func (g Granularity) String() string
type Options ¶
type Options struct { DefaultDialConnectTimeout time.Duration DefaultDialReadTimeout time.Duration DefaultDialWriteTimeout time.Duration MaxIdle int MaxActive int IdleTimeout time.Duration PingTimeout time.Duration }
Options for Redis Pool
type Redis ¶
type Redis struct {
// contains filtered or unexported fields
}
func (*Redis) AppendTaskLog ¶
AppendTaskLog appends log record into task logs sorted set
func (*Redis) CreateTask ¶
CreateTask saves task into Redis and add Task ID in index
func (*Redis) DeleteSignature ¶
DeleteSignature deletes source collection signature from Redis
func (*Redis) GetAllTaskIDs ¶
GetAllTaskIDs returns all source's tasks ids by collection
func (*Redis) GetAllTasks ¶
func (r *Redis) GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error)
GetAllTasks returns all source's tasks by collection and time criteria
func (*Redis) GetAllTasksForInitialHeartbeat ¶
func (r *Redis) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error)
GetAllTasksForInitialHeartbeat returns all task IDs where: 1. task is RUNNING and last log time more than last activity threshold 2. task is SCHEDULED and task creation time more than last activity threshold
func (*Redis) GetAllTasksHeartBeat ¶
GetAllTasksHeartBeat returns map with taskID-last heartbeat timestamp pairs
func (*Redis) GetEventsWithGranularity ¶
func (r *Redis) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error)
GetEventsWithGranularity returns events amount with time criteria by granularity, status and sources/destination ids
func (*Redis) GetLastTask ¶
GetLastTask returns last sync task
func (*Redis) GetOrCreateClusterID ¶
GetOrCreateClusterID returns clusterID from Redis or save input one
func (*Redis) GetProjectDestinationIDs ¶
GetProjectDestinationIDs returns project's destination ids
func (*Redis) GetProjectPushSourceIDs ¶
GetProjectPushSourceIDs returns project's pushed sources ids (api keys)
func (*Redis) GetProjectSourceIDs ¶
GetProjectSourceIDs returns project's sources ids
func (*Redis) GetSignature ¶
GetSignature returns sync interval signature from Redis
func (*Redis) GetTaskLogs ¶
GetTaskLogs returns task logs with time criteria
func (*Redis) GetTotalEvents ¶
GetTotalEvents returns total of cached events
func (*Redis) IncrementEventsCount ¶
func (r *Redis) IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error
IncrementEventsCount increment events counter namespaces: [destination, source] eventType: [push, pull] status: [success, error, skip]
func (*Redis) RemoveTaskFromHeartBeat ¶
RemoveTaskFromHeartBeat removes taskID current timestamp from heartbeat key
func (*Redis) RemoveTasks ¶
RemoveTasks tasks with provided taskIds from specified source's collections. All task logs removed as well
func (*Redis) SaveSignature ¶
SaveSignature saves sync interval signature in Redis
func (*Redis) TaskHeartBeat ¶
TaskHeartBeat sets current timestamp into heartbeat key
func (*Redis) TrimEvents ¶
TrimEvents keeps only last capacity events in Redis list key with trim function
func (*Redis) UpdateFinishedTask ¶
UpdateFinishedTask updates only status and finished_at field in the task
func (*Redis) UpdateStartedTask ¶
UpdateStartedTask updates only status and started_at field in the task
type RedisPool ¶
type RedisPool struct {
// contains filtered or unexported fields
}
RedisPool is a wrapper for keeping redis.Pool and sentinel.Sentinel and close them
func (*RedisPool) GetContext ¶
type RedisPoolFactory ¶
type RedisPoolFactory struct {
// contains filtered or unexported fields
}
RedisPoolFactory is a factory for creating RedisPool supports creating RedisPool from URLs: redis://, rediss://, sentinel:// and from config parameters like host,port, etc
func NewRedisPoolFactory ¶
func NewRedisPoolFactory(host string, port int, password string, database int, tlsSkipVerify bool, sentinelMasterMame string) *RedisPoolFactory
NewRedisPoolFactory returns filled RedisPoolFactory and removes quotes in host
func (*RedisPoolFactory) CheckAndSetDefaultPort ¶
func (rpf *RedisPoolFactory) CheckAndSetDefaultPort() (int, bool)
CheckAndSetDefaultPort checks if port isn't set - put defaultRedisPort, if sentinel mode put defaultSentinelPort
func (*RedisPoolFactory) Create ¶
func (rpf *RedisPoolFactory) Create() (*RedisPool, error)
Create returns configured RedisPool or err if ping failed host might be URLS: 1. redis://:password@host:port 2. rediss://:password@host:port 3. sentinel://master_name:password@node1:port,node2:port 4. plain host
func (*RedisPoolFactory) Details ¶
func (rpf *RedisPoolFactory) Details() string
Details returns host:port or host if host is a URL with sentinel information
func (*RedisPoolFactory) GetOptions ¶
func (rpf *RedisPoolFactory) GetOptions() Options
func (*RedisPoolFactory) WithOptions ¶
func (rpf *RedisPoolFactory) WithOptions(options Options) *RedisPoolFactory
WithOptions overrides options
type Storage ¶
type Storage interface { io.Closer //** Sources ** //signatures GetSignature(sourceID, collection, interval string) (string, error) SaveSignature(sourceID, collection, interval, signature string) error DeleteSignature(sourceID, collection string) error //** Counters ** //events counters IncrementEventsCount(id, namespace, eventType, status string, now time.Time, value int64) error GetProjectSourceIDs(projectID string) ([]string, error) GetProjectDestinationIDs(projectID string) ([]string, error) //536-issue DEPRECATED instead of it all project sources will be selected GetProjectPushSourceIDs(projectID string) ([]string, error) GetEventsWithGranularity(namespace, status, eventType string, ids []string, start, end time.Time, granularity Granularity) ([]EventsPerTime, error) //** Events Cache ** //events caching AddEvent(namespace, id, status string, entity *Event) error TrimEvents(namespace, id, status string, capacity int) error GetEvents(namespace, id, status string, limit int) ([]Event, error) GetTotalEvents(namespace, id, status string) (int, error) // ** Sync Tasks ** CreateTask(sourceID, collection string, task *Task, createdAt time.Time) error GetAllTasks(sourceID, collection string, start, end time.Time, limit int) ([]Task, error) GetLastTask(sourceID, collection string, offset int) (*Task, error) GetTask(taskID string) (*Task, error) GetAllTaskIDs(sourceID, collection string, descendingOrder bool) ([]string, error) RemoveTasks(sourceID, collection string, taskIDs ...string) (int, error) UpdateStartedTask(taskID, status string) error UpdateFinishedTask(taskID, status string) error //heartbeat TaskHeartBeat(taskID string) error RemoveTaskFromHeartBeat(taskID string) error GetAllTasksHeartBeat() (map[string]string, error) GetAllTasksForInitialHeartbeat(runningStatus, scheduledStatus string, lastActivityThreshold time.Duration) ([]string, error) //task logs AppendTaskLog(taskID string, now time.Time, system, message, level string) error GetTaskLogs(taskID string, start, end time.Time) ([]TaskLogRecord, error) //task queue PushTask(task *Task) error PollTask() (*Task, error) //system GetOrCreateClusterID() string Type() string }
type Task ¶
type Task struct { ID string `json:"id,omitempty" redis:"id"` SourceType string `json:"source_type" redis:"source_type"` Source string `json:"source,omitempty" redis:"source"` Collection string `json:"collection,omitempty" redis:"collection"` Priority int64 `json:"priority,omitempty" redis:"priority"` CreatedAt string `json:"created_at,omitempty" redis:"created_at"` StartedAt string `json:"started_at,omitempty" redis:"started_at"` FinishedAt string `json:"finished_at,omitempty" redis:"finished_at"` Status string `json:"status,omitempty" redis:"status"` }
Task is a Redis entity some fields are updated using names in Storage (like status updating)
type TaskLogRecord ¶
type TaskLogRecord struct { Time string `json:"time,omitempty" redis:"time"` System string `json:"system,omitempty" redis:"system"` Message string `json:"message,omitempty" redis:"message"` Level string `json:"level,omitempty" redis:"level"` }
TaskLogRecord is a Redis entity
func (*TaskLogRecord) Marshal ¶
func (tlr *TaskLogRecord) Marshal() string
Marshal returns serialized JSON object string