cluster

package
v0.0.0-...-7662171 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2021 License: MIT Imports: 13 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RegularlySaveHandlerType = "regularly_save"
	CopyUniqueHandlerType    = "copy_unique"
	BlockDeleteHandlerType   = "block_delete"
	BlockUnloadHandlerType   = "block_unload"
	BlockMarkHandlerType     = "block_mark"
)
View Source
const (
	ClusterFileName = "cluster.json"
)
View Source
const (
	CtxStopTimeName = "ctx_stop_time"
)
View Source
const HtmlCodeInternalError = 500
View Source
const HtmlCodeOk = 200
View Source
const SimpleClusterObjectCreateDuration = time.Second * 10

Variables

View Source
var ClusterNameSplitter = "/"
View Source
var DefaultEcLoadGenerator map[string]EcLoadGenerator
View Source
var DefaultEcNewGenerator map[string]EcNewGenerator
View Source
var Errors map[int]string = map[int]string{}/* 159 elements not displayed */

Errors codes and description

View Source
var SimpleQueueType = "simple_queue"

SimpleQueueType - simple queue type

Functions

func CompressErrorJson

func CompressErrorJson(serviceResponceBase ServiceResponce, err *mft.Error) (body []byte)

func GenerateError

func GenerateError(key int, a ...interface{}) *mft.Error

GenerateError -

func GenerateErrorE

func GenerateErrorE(key int, err error, a ...interface{}) *mft.Error

GenerateErrorE -

func GenerateErrorForClusterUser

func GenerateErrorForClusterUser(user cn.CapUser, key int, a ...interface{}) *mft.Error

GenerateError -

func GenerateErrorForClusterUserE

func GenerateErrorForClusterUserE(user cn.CapUser, key int, err error, a ...interface{}) *mft.Error

GenerateError -

func GetUserName

func GetUserName(user cn.CapUser) string

func LoadClusterData

func LoadClusterData(timeout time.Duration,
	storageGenerator *storage.Generator,
	mountName string, relativePath string) (data []byte, err *mft.Error)

func OnChangeFuncGenerate

func OnChangeFuncGenerate(timeout time.Duration,
	storageGenerator *storage.Generator,
	mountName string, relativePath string) (f func(sc *SimpleCluster) (err *mft.Error), err *mft.Error)

func SimppleQueueLoadGenerator

func SimppleQueueLoadGenerator(ctx context.Context, storageGenerator *storage.Generator,
	queueDescription *QueueLoadDescription, idGenerator *mft.G) (queue.Queue, *mft.Error)

func UnmarshalInnerObjectAndFindHandler

func UnmarshalInnerObjectAndFindHandler(ctx context.Context,
	cluster Cluster, request *RequestBody, v interface{}) (handler Handler, responce *ResponceBody, ok bool)

Types

type AdditionalCallFuncInClusterFunc

type AdditionalCallFuncInClusterFunc func(ctx context.Context,
	cluster Cluster, request *RequestBody) (responce *ResponceBody, ok bool)

type BlockDeleteHandler

type BlockDeleteHandler struct {
	Cluster      Cluster
	QueueName    string
	Interval     time.Duration
	WaitMark     time.Duration
	WaitDelete   time.Duration
	UserName     string
	HDescription *HandlerLoadDescription
	StorageTime  time.Duration
	LimitDelete  int
	// contains filtered or unexported fields
}

func (*BlockDeleteHandler) GetName

func (rsh *BlockDeleteHandler) GetName() string

func (*BlockDeleteHandler) IsStarted

func (rsh *BlockDeleteHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*BlockDeleteHandler) LastComplete

func (rsh *BlockDeleteHandler) LastComplete(ctx context.Context) (time.Time, *mft.Error)

func (*BlockDeleteHandler) LastError

func (rsh *BlockDeleteHandler) LastError(ctx context.Context) (err *mft.Error)

func (*BlockDeleteHandler) Start

func (rsh *BlockDeleteHandler) Start(ctx context.Context) (err *mft.Error)

func (*BlockDeleteHandler) Stop

func (rsh *BlockDeleteHandler) Stop(ctx context.Context) (err *mft.Error)

type BlockDeleteHandlerParams

type BlockDeleteHandlerParams struct {
	// Interval - interval between call
	Interval time.Duration `json:"interval"`
	// WaitMark - wait to mark as deleted block timeout
	WaitMark time.Duration `json:"wait_mark"`
	// WaitDelete - wait to delete block timeout
	WaitDelete time.Duration `json:"wait_delete"`
	// StorageTime - time to storage data in queue. For example 3*24*time.Hour
	StorageTime time.Duration `json:"storage_time"`
	// Limit to delete block for one iteration
	LimitDelete int `json:"limit_delete"`
}

func (BlockDeleteHandlerParams) ToJson

type BlockMarkHandler

type BlockMarkHandler struct {
	Cluster           Cluster
	QueueName         string
	Interval          time.Duration
	WaitMark          time.Duration
	WaitUpdate        time.Duration
	UserName          string
	HDescription      *HandlerLoadDescription
	Conditions        []MarkCondition
	LimitUpdateBlocks int
	// contains filtered or unexported fields
}

func (*BlockMarkHandler) GetName

func (rsh *BlockMarkHandler) GetName() string

func (*BlockMarkHandler) IsStarted

func (rsh *BlockMarkHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*BlockMarkHandler) LastComplete

func (rsh *BlockMarkHandler) LastComplete(ctx context.Context) (time.Time, *mft.Error)

func (*BlockMarkHandler) LastError

func (rsh *BlockMarkHandler) LastError(ctx context.Context) (err *mft.Error)

func (*BlockMarkHandler) Start

func (rsh *BlockMarkHandler) Start(ctx context.Context) (err *mft.Error)

func (*BlockMarkHandler) Stop

func (rsh *BlockMarkHandler) Stop(ctx context.Context) (err *mft.Error)

type BlockMarkHandlerParams

type BlockMarkHandlerParams struct {
	// Interval - interval between call
	Interval time.Duration `json:"interval"`
	// WaitMark - wait set new mark for block timeout
	WaitMark time.Duration `json:"wait_mark"`
	// WaitUpdate - wait update mark (move block betweeen marks) timeout
	WaitUpdate time.Duration `json:"wait_update_mark"`
	// Conditions - list of condition to set mark
	Conditions []MarkCondition `json:"conditions"`
	// LimitUpdateBlocks - limit of updates block
	LimitUpdateBlocks int `json:"limit_update_block"`
}

func (BlockMarkHandlerParams) ToJson

type BlockUnloadHandler

type BlockUnloadHandler struct {
	Cluster             Cluster
	QueueName           string
	Interval            time.Duration
	Wait                time.Duration
	UserName            string
	HDescription        *HandlerLoadDescription
	StorageMemoryTime   time.Duration
	StorageLastLoadTime time.Duration
	// contains filtered or unexported fields
}

func (*BlockUnloadHandler) GetName

func (rsh *BlockUnloadHandler) GetName() string

func (*BlockUnloadHandler) IsStarted

func (rsh *BlockUnloadHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*BlockUnloadHandler) LastComplete

func (rsh *BlockUnloadHandler) LastComplete(ctx context.Context) (time.Time, *mft.Error)

func (*BlockUnloadHandler) LastError

func (rsh *BlockUnloadHandler) LastError(ctx context.Context) (err *mft.Error)

func (*BlockUnloadHandler) Start

func (rsh *BlockUnloadHandler) Start(ctx context.Context) (err *mft.Error)

func (*BlockUnloadHandler) Stop

func (rsh *BlockUnloadHandler) Stop(ctx context.Context) (err *mft.Error)

type BlockUnloadHandlerParams

type BlockUnloadHandlerParams struct {
	// Interval - interval between call
	Interval time.Duration `json:"interval"`
	// Wait - wait save timeout
	Wait time.Duration `json:"wait"`
	// StorageMemoryTime - wait timeout to remove block from memory from create
	StorageMemoryTime time.Duration `json:"storage_memory_time"`
	// StorageLastLoadTime - wait tiomeou to remove block from last use
	StorageLastLoadTime time.Duration `json:"storage_last_load_time"`
}

func (BlockUnloadHandlerParams) ToJson

type CheckAuthFunc

type CheckAuthFunc func(ctx context.Context, serviceRequest *ServiceRequest) (ok bool, failResponce ResponceBody)

type CheckPermissionRequest

type CheckPermissionRequest struct {
	ObjectType string `json:"object_type"`
	Action     string `json:"action"`
	ObjectName string `json:"object_name"`
}

type Cluster

type Cluster interface {
	GetName(ctx context.Context, user cn.CapUser) (name string, err *mft.Error)
	SetName(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

	ThrowError(err *mft.Error) bool

	AddQueue(ctx context.Context, user cn.CapUser, queueDescription QueueDescription) (err *mft.Error)
	DropQueue(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)
	GetQueueDescription(ctx context.Context, user cn.CapUser, name string) (queueDescription QueueDescription, err *mft.Error)
	GetQueuesList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

	GetQueue(ctx context.Context, user cn.CapUser, name string) (queue queue.Queue, exists bool, err *mft.Error)

	AddExternalCluster(ctx context.Context, user cn.CapUser, clusterParams ExternalClusterDescription) (err *mft.Error)
	DropExternalCluster(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)
	GetExternalClusterDescription(ctx context.Context, user cn.CapUser, name string) (clusterParams ExternalClusterDescription, err *mft.Error)
	GetExternalClustersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

	// GetExternalCluster - gets cluster. Use '/' for separate names.
	GetExternalCluster(ctx context.Context, user cn.CapUser, name string) (cluster Cluster, exists bool, err *mft.Error)

	AddHandler(ctx context.Context, user cn.CapUser, handlerParams HandlerDescription) (err *mft.Error)
	DropHandler(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)
	GetHandlerDescription(ctx context.Context, user cn.CapUser, name string) (handlerParams HandlerDescription, err *mft.Error)
	GetHandlersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)
	GetHandler(ctx context.Context, user cn.CapUser, name string) (handler Handler, exists bool, err *mft.Error)

	CheckPermission(ctx context.Context, user cn.CapUser, objectType string, action string, objectName string) (allowed bool, err *mft.Error)

	GetFullStruct(ctx context.Context, user cn.CapUser) (data json.RawMessage, err *mft.Error)
	LoadFullStruct(ctx context.Context, user cn.CapUser, data json.RawMessage) (err *mft.Error)

	SetValueInternal(string, string) (err *mft.Error)
	GetValueInternal(string) (string, bool)

	OnChange() (err *mft.Error)

	Ping(ctx context.Context, user cn.CapUser) (err *mft.Error)
	GetNextId(ctx context.Context, user cn.CapUser) (id int64, err *mft.Error)
	GetNextIds(ctx context.Context, user cn.CapUser, cnt int) (ids []int64, err *mft.Error)
}

Cluster - cluser for queue

type ClusterService

type ClusterService struct {
	Unmarshal UnmarshalFunc
	Marshal   MarshalFunc

	CheckAuth CheckAuthFunc
	Cluster   Cluster

	// ResponceDuration - Actual duration = ServiceRequest.WaitDuration - ResponceDuration
	ResponceDuration time.Duration
}

func ClusterServiceJsonCreate

func ClusterServiceJsonCreate(checkAuth CheckAuthFunc, cluster Cluster, compressor *compress.Generator) (sc *ClusterService)

func (*ClusterService) Call

func (sc *ClusterService) Call(prepareCtx context.Context,
	contentType string, bodyIn []byte, addFunc []AdditionalCallFuncInClusterFunc,
) (bodyOut []byte, outContentType string, htmlCode int)

type CopyUniqueHandler

type CopyUniqueHandler struct {
	Cluster        Cluster
	SrcQueueName   string
	DstQueueName   string
	Interval       time.Duration
	ActiveInterval time.Duration
	Wait           time.Duration
	UserName       string
	HDescription   *HandlerLoadDescription

	SaveModeSrc    cn.SaveMode
	SaveModeDst    cn.SaveMode
	SubscriberName string
	CntLimit       int
	DoSaveDst      bool
	Segments       *segment.Segments
	// contains filtered or unexported fields
}

func (*CopyUniqueHandler) GetName

func (rsh *CopyUniqueHandler) GetName() string

func (*CopyUniqueHandler) IsStarted

func (rsh *CopyUniqueHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*CopyUniqueHandler) LastComplete

func (rsh *CopyUniqueHandler) LastComplete(ctx context.Context) (time.Time, *mft.Error)

func (*CopyUniqueHandler) LastError

func (rsh *CopyUniqueHandler) LastError(ctx context.Context) (err *mft.Error)

func (*CopyUniqueHandler) Start

func (rsh *CopyUniqueHandler) Start(ctx context.Context) (err *mft.Error)

func (*CopyUniqueHandler) Stop

func (rsh *CopyUniqueHandler) Stop(ctx context.Context) (err *mft.Error)

type CopyUniqueHandlerParams

type CopyUniqueHandlerParams struct {
	// Interval - interval between call
	Interval time.Duration `json:"interval"`
	// Wait - wait save timeout
	Wait time.Duration `json:"wait"`

	SaveModeSrc    cn.SaveMode       `json:"src_save_mode"`
	SaveModeDst    cn.SaveMode       `json:"dst_save_mode"`
	SubscriberName string            `json:"subscribe_name"`
	CntLimit       int               `json:"cnt_limit"`
	DoSaveDst      bool              `json:"do_save_dst"`
	Segments       *segment.Segments `json:"segments"`
}

func (CopyUniqueHandlerParams) ToJson

type EcLoadGenerator

type EcLoadGenerator func(
	ctx context.Context,
	compressor *compress.Generator,
	ecDescription *ExternalClusterLoadDescription,
	idGenerator *mft.G,
	encryptData *EncryptData,
) (Cluster, *mft.Error)

type EcNewGenerator

type EcNewGenerator func(
	ctx context.Context,
	compressor *compress.Generator,
	ecDescription ExternalClusterDescription,
	idGenerator *mft.G,
	encryptData *EncryptData,
) (*ExternalClusterLoadDescription, *mft.Error)

type EncryptData

type EncryptData struct {
	EncryptAlg string `json:"enc_alg"`
	EncryptKey []byte `json:"enc_key"`
	DecryptAlg string `json:"dec_alg"`
	DecryptKey []byte `json:"dec_key"`
}

EncryptData - info for encrypt and decrypt data

type ExternalAbstractCluster

type ExternalAbstractCluster struct {
	CallTimeout time.Duration
	CallFunc    func(ctx context.Context, request *RequestBody) (responce *ResponceBody)
	// case nil then panic
	ThrowErrorFunc func(err *mft.Error) bool
}

func (*ExternalAbstractCluster) AddExternalCluster

func (eac *ExternalAbstractCluster) AddExternalCluster(ctx context.Context, user cn.CapUser, clusterParams ExternalClusterDescription) (err *mft.Error)

func (*ExternalAbstractCluster) AddHandler

func (eac *ExternalAbstractCluster) AddHandler(ctx context.Context, user cn.CapUser, handlerParams HandlerDescription) (err *mft.Error)

func (*ExternalAbstractCluster) AddQueue

func (eac *ExternalAbstractCluster) AddQueue(ctx context.Context, user cn.CapUser, queueDescription QueueDescription) (err *mft.Error)

func (*ExternalAbstractCluster) Call

func (eac *ExternalAbstractCluster) Call(request *RequestBody) (responce *ResponceBody)

func (*ExternalAbstractCluster) CheckPermission

func (eac *ExternalAbstractCluster) CheckPermission(ctx context.Context, user cn.CapUser, objectType string, action string, objectName string) (allowed bool, err *mft.Error)

func (*ExternalAbstractCluster) DropExternalCluster

func (eac *ExternalAbstractCluster) DropExternalCluster(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

func (*ExternalAbstractCluster) DropHandler

func (eac *ExternalAbstractCluster) DropHandler(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

func (*ExternalAbstractCluster) DropQueue

func (eac *ExternalAbstractCluster) DropQueue(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

func (*ExternalAbstractCluster) GetExternalCluster

func (eac *ExternalAbstractCluster) GetExternalCluster(ctx context.Context, user cn.CapUser, name string) (cluster Cluster, exists bool, err *mft.Error)

func (*ExternalAbstractCluster) GetExternalClusterDescription

func (eac *ExternalAbstractCluster) GetExternalClusterDescription(ctx context.Context, user cn.CapUser, name string) (clusterParams ExternalClusterDescription, err *mft.Error)

func (*ExternalAbstractCluster) GetExternalClustersList

func (eac *ExternalAbstractCluster) GetExternalClustersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*ExternalAbstractCluster) GetFullStruct

func (eac *ExternalAbstractCluster) GetFullStruct(ctx context.Context, user cn.CapUser) (data json.RawMessage, err *mft.Error)

func (*ExternalAbstractCluster) GetHandler

func (eac *ExternalAbstractCluster) GetHandler(ctx context.Context, user cn.CapUser, name string) (handler Handler, exists bool, err *mft.Error)

func (*ExternalAbstractCluster) GetHandlerDescription

func (eac *ExternalAbstractCluster) GetHandlerDescription(ctx context.Context, user cn.CapUser, name string) (handlerParams HandlerDescription, err *mft.Error)

func (*ExternalAbstractCluster) GetHandlersList

func (eac *ExternalAbstractCluster) GetHandlersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*ExternalAbstractCluster) GetName

func (eac *ExternalAbstractCluster) GetName(ctx context.Context, user cn.CapUser) (name string, err *mft.Error)

func (*ExternalAbstractCluster) GetNextId

func (eac *ExternalAbstractCluster) GetNextId(ctx context.Context, user cn.CapUser) (id int64, err *mft.Error)

func (*ExternalAbstractCluster) GetNextIds

func (eac *ExternalAbstractCluster) GetNextIds(ctx context.Context, user cn.CapUser, cnt int) (ids []int64, err *mft.Error)

func (*ExternalAbstractCluster) GetQueue

func (eac *ExternalAbstractCluster) GetQueue(ctx context.Context, user cn.CapUser, name string) (queue queue.Queue, exists bool, err *mft.Error)

func (*ExternalAbstractCluster) GetQueueDescription

func (eac *ExternalAbstractCluster) GetQueueDescription(ctx context.Context, user cn.CapUser, name string) (queueDescription QueueDescription, err *mft.Error)

func (*ExternalAbstractCluster) GetQueuesList

func (eac *ExternalAbstractCluster) GetQueuesList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*ExternalAbstractCluster) GetValueInternal

func (eac *ExternalAbstractCluster) GetValueInternal(string) (string, bool)

func (*ExternalAbstractCluster) LoadFullStruct

func (eac *ExternalAbstractCluster) LoadFullStruct(ctx context.Context, user cn.CapUser, data json.RawMessage) (err *mft.Error)

func (*ExternalAbstractCluster) OnChange

func (eac *ExternalAbstractCluster) OnChange() (err *mft.Error)

func (*ExternalAbstractCluster) Ping

func (eac *ExternalAbstractCluster) Ping(ctx context.Context, user cn.CapUser) (err *mft.Error)

func (*ExternalAbstractCluster) SetName

func (eac *ExternalAbstractCluster) SetName(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

func (*ExternalAbstractCluster) SetValueInternal

func (eac *ExternalAbstractCluster) SetValueInternal(string, string) (err *mft.Error)

func (*ExternalAbstractCluster) ThrowError

func (eac *ExternalAbstractCluster) ThrowError(err *mft.Error) bool

type ExternalAbstractHandler

type ExternalAbstractHandler struct {
	HandlerName string
	User        cn.CapUser
	CallFunc    func(ctx context.Context, request *RequestBody) (responce *ResponceBody)
}

func (*ExternalAbstractHandler) IsStarted

func (eah *ExternalAbstractHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*ExternalAbstractHandler) LastComplete

func (eah *ExternalAbstractHandler) LastComplete(ctx context.Context) (lastComplete time.Time, err *mft.Error)

func (*ExternalAbstractHandler) LastError

func (eah *ExternalAbstractHandler) LastError(ctx context.Context) (err *mft.Error)

func (*ExternalAbstractHandler) MarshalRequestMust

func (eah *ExternalAbstractHandler) MarshalRequestMust(action string, v interface{}) *RequestBody

func (*ExternalAbstractHandler) Start

func (eah *ExternalAbstractHandler) Start(ctx context.Context) (err *mft.Error)

func (*ExternalAbstractHandler) Stop

func (eah *ExternalAbstractHandler) Stop(ctx context.Context) (err *mft.Error)

type ExternalAbstractQueue

type ExternalAbstractQueue struct {
	QueueName string
	User      cn.CapUser
	CallFunc  func(ctx context.Context, request *RequestBody) (responce *ResponceBody)
}

func (*ExternalAbstractQueue) Add

func (eac *ExternalAbstractQueue) Add(ctx context.Context, user cn.CapUser, message []byte,
	externalID int64, externalDt int64, source string, segment int64,
	saveMode cn.SaveMode) (id int64, err *mft.Error)

func (*ExternalAbstractQueue) AddList

func (eac *ExternalAbstractQueue) AddList(ctx context.Context, user cn.CapUser, messages []queue.Message,
	saveMode cn.SaveMode) (ids []int64, err *mft.Error)

func (*ExternalAbstractQueue) AddUnique

func (eac *ExternalAbstractQueue) AddUnique(ctx context.Context, user cn.CapUser, message []byte,
	externalID int64, externalDt int64, source string, segment int64,
	saveMode cn.SaveMode) (id int64, err *mft.Error)

func (*ExternalAbstractQueue) AddUniqueList

func (eac *ExternalAbstractQueue) AddUniqueList(ctx context.Context, user cn.CapUser, messages []queue.Message,
	saveMode cn.SaveMode) (ids []int64, err *mft.Error)

func (*ExternalAbstractQueue) Get

func (eac *ExternalAbstractQueue) Get(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int) (messages []*queue.MessageWithMeta, err *mft.Error)

func (*ExternalAbstractQueue) GetSegment

func (eac *ExternalAbstractQueue) GetSegment(ctx context.Context, user cn.CapUser, idStart int64, cntLimit int,
	segments *segment.Segments,
) (messages []*queue.MessageWithMeta, lastId int64, err *mft.Error)

func (*ExternalAbstractQueue) MarshalRequestMust

func (eac *ExternalAbstractQueue) MarshalRequestMust(user cn.CapUser, action string, v interface{}) *RequestBody

func (*ExternalAbstractQueue) SaveAll

func (eac *ExternalAbstractQueue) SaveAll(ctx context.Context, user cn.CapUser) (err *mft.Error)

func (*ExternalAbstractQueue) SubscriberAddReplicaMember

func (eac *ExternalAbstractQueue) SubscriberAddReplicaMember(ctx context.Context, user cn.CapUser,
	subscriber string) (err *mft.Error)

func (*ExternalAbstractQueue) SubscriberGetLastRead

func (eac *ExternalAbstractQueue) SubscriberGetLastRead(ctx context.Context, user cn.CapUser,
	subscriber string) (id int64, err *mft.Error)

func (*ExternalAbstractQueue) SubscriberGetReplicaCount

func (eac *ExternalAbstractQueue) SubscriberGetReplicaCount(ctx context.Context, user cn.CapUser,
	id int64) (cnt int, err *mft.Error)

func (*ExternalAbstractQueue) SubscriberRemoveReplicaMember

func (eac *ExternalAbstractQueue) SubscriberRemoveReplicaMember(ctx context.Context, user cn.CapUser,
	subscriber string) (err *mft.Error)

func (*ExternalAbstractQueue) SubscriberSetLastRead

func (eac *ExternalAbstractQueue) SubscriberSetLastRead(ctx context.Context, user cn.CapUser,
	subscriber string, id int64, saveMode cn.SaveMode) (err *mft.Error)

type ExternalClusterDescription

type ExternalClusterDescription struct {
	Name   string          `json:"name"`
	Type   string          `json:"type"`
	Params json.RawMessage `json:"params"`
}

ExternalClusterDescription description of external cluster

type ExternalClusterGenerator

type ExternalClusterGenerator struct {
	// contains filtered or unexported fields
}

func ExternalClusterGeneratorCreate

func ExternalClusterGeneratorCreate() *ExternalClusterGenerator

func (*ExternalClusterGenerator) AddGenerator

func (ecg *ExternalClusterGenerator) AddGenerator(
	name string,
	ecNewGenerator EcNewGenerator,
	ecLoadGenerator EcLoadGenerator,
)

func (*ExternalClusterGenerator) GetGenerator

func (ecg *ExternalClusterGenerator) GetGenerator(
	name string) (
	ecNewGenerator EcNewGenerator,
	ecLoadGenerator EcLoadGenerator,
	ok bool,
)

type ExternalClusterLoadDescription

type ExternalClusterLoadDescription struct {
	Name    string          `json:"name"`
	Type    string          `json:"type"`
	Params  json.RawMessage `json:"params"`
	Cluster Cluster         `json:"-"`
}

ExternalClusterLoadDescription description of external cluster for load

func (*ExternalClusterLoadDescription) ExternalClusterDescription

func (qld *ExternalClusterLoadDescription) ExternalClusterDescription() ExternalClusterDescription

type HLoadGenerator

type HLoadGenerator func(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

type HNewGenerator

type HNewGenerator func(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

type Handler

type Handler interface {
	Start(ctx context.Context) (err *mft.Error)
	Stop(ctx context.Context) (err *mft.Error)
	LastComplete(ctx context.Context) (lastComplete time.Time, err *mft.Error)
	LastError(ctx context.Context) (err *mft.Error)
	IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)
}

Handler - handler

func BlockDeleteLoadGenerator

func BlockDeleteLoadGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

func BlockMarkLoadGenerator

func BlockMarkLoadGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

func BlockUnloadLoadGenerator

func BlockUnloadLoadGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

func CopyUniqueLoadGenerator

func CopyUniqueLoadGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

func RegularlySaveLoadGenerator

func RegularlySaveLoadGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription *HandlerLoadDescription,
	idGenerator *mft.G,
) (Handler, *mft.Error)

type HandlerDescription

type HandlerDescription struct {
	Name       string          `json:"name"`
	UserName   string          `json:"user_name"`
	Type       string          `json:"type"`
	QueueNames []string        `json:"queue_names"`
	Params     json.RawMessage `json:"params"`
}

HandlerDescription description of handler

type HandlerGenerator

type HandlerGenerator struct {
	// contains filtered or unexported fields
}

func HandlerGeneratorCreate

func HandlerGeneratorCreate() *HandlerGenerator

func (*HandlerGenerator) AddGenerator

func (ecg *HandlerGenerator) AddGenerator(
	name string,
	hNewGenerator HNewGenerator,
	hLoadGenerator HLoadGenerator,
)

func (*HandlerGenerator) GetGenerator

func (ecg *HandlerGenerator) GetGenerator(
	name string) (
	hNewGenerator HNewGenerator,
	hLoadGenerator HLoadGenerator,
	ok bool,
)

type HandlerLoadDescription

type HandlerLoadDescription struct {
	Name       string          `json:"name"`
	UserName   string          `json:"user_name"`
	Type       string          `json:"type"`
	Params     json.RawMessage `json:"params"`
	QueueNames []string        `json:"queue_names"`
	Start      bool            `json:"start"`
	Handler    Handler         `json:"-"`
}

HandlerLoadDescription description of handler for load

func BlockDeleteNewGenerator

func BlockDeleteNewGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

func BlockMarkNewGenerator

func BlockMarkNewGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

func BlockUnloadNewGenerator

func BlockUnloadNewGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

func CopyUniqueNewGenerator

func CopyUniqueNewGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

func RegularlySaveNewGenerator

func RegularlySaveNewGenerator(
	ctx context.Context,
	cluster Cluster,
	hDescription HandlerDescription,
	idGenerator *mft.G,
) (*HandlerLoadDescription, *mft.Error)

func (*HandlerLoadDescription) HandlerDescription

func (qld *HandlerLoadDescription) HandlerDescription() HandlerDescription

type MarkCondition

type MarkCondition struct {
	// Mark - mark of block
	Mark string `json:"mark"`
	// FromTime - from time age like 10 min (FromTime < ToTime)
	FromTime time.Duration `json:"from_time"`
	// FromTime - from time age like 20 min (ToTime > FromTime)
	ToTime time.Duration `json:"to_time"`
}

type MarshalFunc

type MarshalFunc func(ctx context.Context, contentType string, serviceResponce ServiceResponce) (body []byte, outContentType string, htmlCode int)

type QueueAddListRequest

type QueueAddListRequest struct {
	Messages []queue.Message `json:"msgs"`
	SaveMode cn.SaveMode     `json:"sm"`
}

type QueueAddRequest

type QueueAddRequest struct {
	Message  queue.Message `json:"msg"`
	SaveMode cn.SaveMode   `json:"sm"`
}

type QueueDescription

type QueueDescription struct {
	Name         string          `json:"name"`
	Type         string          `json:"type"`
	CreateOnLoad bool            `json:"create_on_load"`
	Owner        string          `json:"owner"`
	Params       json.RawMessage `json:"params"`
}

QueueDescription description of queue

func (QueueDescription) GetName

func (qd QueueDescription) GetName() string

type QueueGenerator

type QueueGenerator struct {
	// contains filtered or unexported fields
}

QueueGenerator - queue generator

func QueueGeneratorCreate

func QueueGeneratorCreate() *QueueGenerator

func (*QueueGenerator) AddGenerator

func (qg *QueueGenerator) AddGenerator(
	name string,
	qNewGenerator func(ctx context.Context, storageGenerator *storage.Generator,
		queueDescription QueueDescription, idGenerator *mft.G) (*QueueLoadDescription, *mft.Error),
	qLoadGenerator func(ctx context.Context, storageGenerator *storage.Generator,
		queueDescription *QueueLoadDescription, idGenerator *mft.G) (queue.Queue, *mft.Error),
)

func (*QueueGenerator) GetGenerator

func (qg *QueueGenerator) GetGenerator(
	name string) (
	qNewGenerator func(ctx context.Context, storageGenerator *storage.Generator,
		queueDescription QueueDescription, idGenerator *mft.G) (*QueueLoadDescription, *mft.Error),
	qLoadGenerator func(ctx context.Context, storageGenerator *storage.Generator,
		queueDescription *QueueLoadDescription, idGenerator *mft.G) (queue.Queue, *mft.Error),
	ok bool,
)

type QueueGetRequest

type QueueGetRequest struct {
	IdStart  int64 `json:"id_start"`
	CntLimit int   `json:"cnt_limit"`
}

type QueueGetSegmentRequest

type QueueGetSegmentRequest struct {
	IdStart  int64             `json:"id_start"`
	CntLimit int               `json:"cnt_limit"`
	Segments *segment.Segments `json:"segments"`
}

type QueueGetSegmentResponce

type QueueGetSegmentResponce struct {
	Messages []*queue.MessageWithMeta `json:"msgs"`
	LastId   int64                    `json:"last_id"`
}

type QueueLoadDescription

type QueueLoadDescription struct {
	Name         string          `json:"name"`
	Type         string          `json:"type"`
	CreateOnLoad bool            `json:"create_on_load"`
	RelativePath string          `json:"relative_path"`
	Params       json.RawMessage `json:"params"`
	Queue        queue.Queue     `json:"-"`
	Owner        string          `json:"owner"`
}

QueueLoadDescription description of queue for load

func SimppleQueueNewGenerator

func SimppleQueueNewGenerator(ctx context.Context, storageGenerator *storage.Generator,
	queueDescription QueueDescription, idGenerator *mft.G) (qd *QueueLoadDescription, err *mft.Error)

func (*QueueLoadDescription) GetName

func (qld *QueueLoadDescription) GetName() string

func (*QueueLoadDescription) QueueDescription

func (qld *QueueLoadDescription) QueueDescription() QueueDescription

type QueueSubscriberSetLastReadRequest

type QueueSubscriberSetLastReadRequest struct {
	Subscriber string      `json:"sbscr"`
	Id         int64       `json:"id"`
	SaveMode   cn.SaveMode `json:"sm"`
}

type RegularlySaveHandler

type RegularlySaveHandler struct {
	Cluster      Cluster
	QueueName    string
	Interval     time.Duration
	Wait         time.Duration
	UserName     string
	HDescription *HandlerLoadDescription
	// contains filtered or unexported fields
}

func (*RegularlySaveHandler) GetName

func (rsh *RegularlySaveHandler) GetName() string

func (*RegularlySaveHandler) IsStarted

func (rsh *RegularlySaveHandler) IsStarted(ctx context.Context) (isStarted bool, err *mft.Error)

func (*RegularlySaveHandler) LastComplete

func (rsh *RegularlySaveHandler) LastComplete(ctx context.Context) (time.Time, *mft.Error)

func (*RegularlySaveHandler) LastError

func (rsh *RegularlySaveHandler) LastError(ctx context.Context) (err *mft.Error)

func (*RegularlySaveHandler) Start

func (rsh *RegularlySaveHandler) Start(ctx context.Context) (err *mft.Error)

func (*RegularlySaveHandler) Stop

func (rsh *RegularlySaveHandler) Stop(ctx context.Context) (err *mft.Error)

type RegularlySaveHandlerParams

type RegularlySaveHandlerParams struct {
	// Interval - interval between call
	Interval time.Duration `json:"interval"`
	// Wait - wait save timeout
	Wait time.Duration `json:"wait"`
}

func (RegularlySaveHandlerParams) ToJson

type RequestBody

type RequestBody struct {
	Name       string          `json:"name"`
	User       string          `json:"user"`
	Action     string          `json:"action"`
	Body       json.RawMessage `json:"body,omitempty"`
	ObjectName string          `json:"object_name,omitempty"`
}

RequestBody request

func MarshalRequestMust

func MarshalRequestMust(user cn.CapUser, action string, v interface{}) *RequestBody

func (*RequestBody) GetName

func (rb *RequestBody) GetName() string

func (*RequestBody) UnmarshalInnerObject

func (request *RequestBody) UnmarshalInnerObject(v interface{}) (err *mft.Error)

type ResponceBody

type ResponceBody struct {
	Body json.RawMessage `json:"body"`
	Err  *mft.Error      `json:"error"`
}

ResponceBody responce

func CallFuncInCluster

func CallFuncInCluster(ctx context.Context, cluster Cluster, request *RequestBody,
	addFunc []AdditionalCallFuncInClusterFunc) (responce *ResponceBody)

func CheckAuthFuncEmpty

func CheckAuthFuncEmpty(ctx context.Context, serviceRequest *ServiceRequest) (ok bool, failResponce ResponceBody)

func MarshalResponceMust

func MarshalResponceMust(v interface{}, err *mft.Error) *ResponceBody

func UnmarshalInnerObjectAndFindQueue

func UnmarshalInnerObjectAndFindQueue(ctx context.Context,
	cluster Cluster, request *RequestBody, v interface{}) (queue queue.Queue, responce *ResponceBody, ok bool)

func (*ResponceBody) UnmarshalInnerObject

func (responce *ResponceBody) UnmarshalInnerObject(v interface{}) (err *mft.Error)

type ServiceRequest

type ServiceRequest struct {
	AuthentificationType string `json:"auth_type"`
	UserName             string `json:"user_name"`
	AuthentificationInfo []byte `json:"auth_info"`

	WaitDuration time.Duration `json:"wait"`
	CurrentTime  int64         `json:"current_time"`

	PreferContentType string `json:"prefer_content_type"`

	ReplaceNameForce bool `json:"replace_name_force"`

	Request *RequestBody `json:"request"`
}

func (*ServiceRequest) GetName

func (sr *ServiceRequest) GetName() string

type ServiceResponce

type ServiceResponce struct {
	TimeStart  int64 `json:"start"`
	TimeFinish int64 `json:"finish"`

	Responce ResponceBody `json:"responce"`
}

type SimpleCluster

type SimpleCluster struct {
	Name string `json:"name"`

	ObjectCreateDuration time.Duration `json:"object_create_duration"`

	RvGeneratorPart int64  `json:"rv_generator_part"`
	IDGenerator     *mft.G `json:"-"`

	Queues           map[string]*QueueLoadDescription           `json:"queues"`
	ExternalClusters map[string]*ExternalClusterLoadDescription `json:"ext_clusters"`
	Handlers         map[string]*HandlerLoadDescription         `json:"handlers"`

	InternalValues map[string]string `json:"internal_values"`

	QueueGenerator           *QueueGenerator           `json:"-"`
	StorageGenerator         *storage.Generator        `json:"-"`
	ExternalClusterGenerator *ExternalClusterGenerator `json:"-"`
	HandlerGenerator         *HandlerGenerator         `json:"-"`
	Compressor               *compress.Generator       `json:"-"`

	EncryptData *EncryptData `json:"-"`

	// case nil then ignore
	CheckPermissionFunc func(ctx context.Context, user cn.CapUser, objectType string, action string, objectName string) (allowed bool, err *mft.Error) `json:"-"`
	// case nil then ignore
	ThrowErrorFunc func(err *mft.Error) bool `json:"-"`

	// OnChange event func (send self)
	OnChangeFunc func(sc *SimpleCluster) (err *mft.Error) `json:"-"`
	// contains filtered or unexported fields
}

SimpleCluster - simple cluster

func SimpleClusterCreate

func SimpleClusterCreate(storageGenerator *storage.Generator,
	throwErrorFunc func(err *mft.Error) bool,
	onChangeFunc func(sc *SimpleCluster) (err *mft.Error),
	checkPermissionFunc func(ctx context.Context, user cn.CapUser, objectType string, action string, objectName string) (allowed bool, err *mft.Error),
	queueGenerator *QueueGenerator,
	externalClusterGenerator *ExternalClusterGenerator,
	handlerGenerator *HandlerGenerator,
	compressor *compress.Generator,
	encryptData EncryptData,
) *SimpleCluster

func (*SimpleCluster) AddExternalCluster

func (sc *SimpleCluster) AddExternalCluster(ctx context.Context, user cn.CapUser,
	clusterParams ExternalClusterDescription) (err *mft.Error)

func (*SimpleCluster) AddHandler

func (sc *SimpleCluster) AddHandler(ctx context.Context, user cn.CapUser,
	handlerParams HandlerDescription) (err *mft.Error)

func (*SimpleCluster) AddQueue

func (sc *SimpleCluster) AddQueue(ctx context.Context, user cn.CapUser, queueDescription QueueDescription) (err *mft.Error)

AddQueue add queue to cluster

func (*SimpleCluster) CheckPermission

func (sc *SimpleCluster) CheckPermission(ctx context.Context, user cn.CapUser, objectType string, action string, objectName string) (allowed bool, err *mft.Error)

func (*SimpleCluster) DropExternalCluster

func (sc *SimpleCluster) DropExternalCluster(ctx context.Context, user cn.CapUser,
	name string) (err *mft.Error)

func (*SimpleCluster) DropHandler

func (sc *SimpleCluster) DropHandler(ctx context.Context, user cn.CapUser,
	name string) (err *mft.Error)

func (*SimpleCluster) DropQueue

func (sc *SimpleCluster) DropQueue(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

func (*SimpleCluster) GetExternalCluster

func (sc *SimpleCluster) GetExternalCluster(ctx context.Context, user cn.CapUser,
	name string) (cluster Cluster, exists bool, err *mft.Error)

func (*SimpleCluster) GetExternalClusterDescription

func (sc *SimpleCluster) GetExternalClusterDescription(ctx context.Context, user cn.CapUser,
	name string) (clusterParams ExternalClusterDescription, err *mft.Error)

func (*SimpleCluster) GetExternalClustersList

func (sc *SimpleCluster) GetExternalClustersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*SimpleCluster) GetFullStruct

func (sc *SimpleCluster) GetFullStruct(ctx context.Context, user cn.CapUser) (data json.RawMessage, err *mft.Error)

func (*SimpleCluster) GetFullStructRaw

func (sc *SimpleCluster) GetFullStructRaw() (data json.RawMessage, err *mft.Error)

func (*SimpleCluster) GetHandler

func (sc *SimpleCluster) GetHandler(ctx context.Context, user cn.CapUser, name string) (handler Handler, exists bool, err *mft.Error)

func (*SimpleCluster) GetHandlerDescription

func (sc *SimpleCluster) GetHandlerDescription(ctx context.Context, user cn.CapUser,
	name string) (handlerParams HandlerDescription, err *mft.Error)

func (*SimpleCluster) GetHandlersList

func (sc *SimpleCluster) GetHandlersList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*SimpleCluster) GetName

func (sc *SimpleCluster) GetName(ctx context.Context, user cn.CapUser) (name string, err *mft.Error)

GetName gets cluster name

func (*SimpleCluster) GetNextId

func (sc *SimpleCluster) GetNextId(ctx context.Context, user cn.CapUser) (id int64, err *mft.Error)

func (*SimpleCluster) GetNextIds

func (sc *SimpleCluster) GetNextIds(ctx context.Context, user cn.CapUser, cnt int) (ids []int64, err *mft.Error)

func (*SimpleCluster) GetQueue

func (sc *SimpleCluster) GetQueue(ctx context.Context, user cn.CapUser, name string) (queue queue.Queue, exists bool, err *mft.Error)

func (*SimpleCluster) GetQueueDescription

func (sc *SimpleCluster) GetQueueDescription(ctx context.Context, user cn.CapUser, name string) (queueDescription QueueDescription, err *mft.Error)

func (*SimpleCluster) GetQueuesList

func (sc *SimpleCluster) GetQueuesList(ctx context.Context, user cn.CapUser) (names []string, err *mft.Error)

func (*SimpleCluster) GetValueInternal

func (sc *SimpleCluster) GetValueInternal(name string) (value string, ok bool)

func (*SimpleCluster) LoadFullStruct

func (sc *SimpleCluster) LoadFullStruct(ctx context.Context, user cn.CapUser, data json.RawMessage) (err *mft.Error)

func (*SimpleCluster) LoadFullStructRaw

func (sc *SimpleCluster) LoadFullStructRaw(data json.RawMessage) (err *mft.Error)

func (*SimpleCluster) OnChange

func (sc *SimpleCluster) OnChange() (err *mft.Error)

func (*SimpleCluster) Ping

func (sc *SimpleCluster) Ping(ctx context.Context, user cn.CapUser) (err *mft.Error)

func (*SimpleCluster) SetName

func (sc *SimpleCluster) SetName(ctx context.Context, user cn.CapUser, name string) (err *mft.Error)

SetName sets cluster name

func (*SimpleCluster) SetValueInternal

func (sc *SimpleCluster) SetValueInternal(name string, value string) (err *mft.Error)

func (*SimpleCluster) ThrowError

func (sc *SimpleCluster) ThrowError(err *mft.Error) bool

type SimpleQueueParams

type SimpleQueueParams struct {
	CntLimit                        int               `json:"cnt_limit"`
	TimeLimit                       time.Duration     `json:"time_limit"`
	LenLimit                        int               `json:"len_limit"`
	MetaStorageMountName            string            `json:"meta_mount_name"`
	SubscriberStorageMountName      string            `json:"subscriber_mount_name"`
	MarkerBlockDataStorageMountName map[string]string `json:"marker_block_mount_name"`
	Segments                        *segment.Segments `json:"segments"`
	DefaultSaveMode                 cn.SaveMode       `json:"default_save_mod"`
	UseDefaultSaveModeForce         bool              `json:"use_default_save_mod_force"`
}

func (SimpleQueueParams) ToJson

func (sqp SimpleQueueParams) ToJson() json.RawMessage

type UnmarshalFunc

type UnmarshalFunc func(ctx context.Context, contentType string, body []byte) (serviceRequest ServiceRequest, ok bool, failResponce ResponceBody)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL