Documentation ¶
Index ¶
- Constants
- func ES_Version(client *elastic.Client, conf *config.JSON) string
- func ES_close()
- func ES_init(conf *config.JSON) *elastic.Client
- func ExecuteWithRetry(operation func(*elastic.BulkService, context.Context) (bool, error), ...) (success bool, err error)
- func GenBody(settings, mappings string, dynamic bool) string
- func GenMappings(dstDynamic, typeName string, isGreaterOrEqualThan7 bool, conf *config.JSON) string
- func GetAlias(conf *config.JSON) string
- func GetBatchSize(conf *config.JSON) int64
- func GetDeleteBy(conf *config.JSON) string
- func GetDiscoveryFilter(conf *config.JSON) string
- func GetDstDynamic(conf *config.JSON) string
- func GetDynamic(conf *config.JSON) bool
- func GetESVersion(conf *config.JSON) int64
- func GetEndpoint(conf *config.JSON) string
- func GetFieldDelimiter(conf *config.JSON) string
- func GetIncludeSettings(conf *config.JSON) []string
- func GetIndexName(conf *config.JSON) string
- func GetMasterTimeout(conf *config.JSON) string
- func GetPassword(conf *config.JSON) string
- func GetRetryTimes(conf *config.JSON) int64
- func GetSettings(conf *config.JSON) map[string]*encoding.JSON
- func GetSleepTimeInMilliSecond(conf *config.JSON) int64
- func GetSplitter(conf *config.JSON) string
- func GetTimeout(conf *config.JSON) int64
- func GetTryInterval(conf *config.JSON) int64
- func GetTrySize(conf *config.JSON) int64
- func GetTypeName(conf *config.JSON) string
- func GetUnifiedVersion(conf *config.JSON) int64
- func GetUrlParams(conf *config.JSON) map[string]interface{}
- func GetUsername(conf *config.JSON) string
- func GetVersioning(conf *config.JSON) bool
- func HasID(conf *config.JSON) bool
- func HasPrimaryKeyInfo(conf *config.JSON) bool
- func IsCompression(conf *config.JSON) bool
- func IsDiscovery(conf *config.JSON) bool
- func IsEnableNullUpdate(conf *config.JSON) bool
- func IsGreaterOrEqualThan7(conf *config.JSON, client *elastic.Client) bool
- func IsHasId(conf *config.JSON) bool
- func IsHighSpeedMode(conf *config.JSON) bool
- func IsIgnoreParseError(conf *config.JSON) bool
- func IsIgnoreWriteError(conf *config.JSON) bool
- func IsMultiThread(conf *config.JSON) bool
- func IsNeedCleanAlias(conf *config.JSON) bool
- func IsTruncate(conf *config.JSON) bool
- func NewWriterFromString(plugin string) (wr writer.Writer, err error)
- func ParseDeleteCondition(conf *config.JSON) []map[string]interface{}
- func RegistPlugin()
- type ActionType
- type Body
- type ElasticSearchFieldType
- type EsColumn
- type Job
- func (j *Job) Destroy(ctx context.Context) (err error)
- func (j *Job) Init(ctx context.Context) (err error)
- func (j *Job) Post(ctx context.Context) (err error)
- func (j *Job) Prepare(ctx context.Context) (err error)
- func (j *Job) Split(ctx context.Context, number int) (confs []*config.JSON, err error)
- type Maker
- type PartitionColumn
- type PrimaryKeyInfo
- type Task
- func (w *Task) Destroy(ctx context.Context) error
- func (t *Task) DoBatchInsert(writerBuffer []element.Record) error
- func (t *Task) Init(ctx context.Context) (err error)
- func (t *Task) IsDeleteRecord(record element.Record) bool
- func (t *Task) StartWrite(ctx context.Context, receiver plugin.RecordReceiver) (err error)
- type Writer
Constants ¶
View Source
const (
WRITE_COLUMNS = "write_columns"
)
Variables ¶
This section is empty.
Functions ¶
func ES_Version ¶
func ExecuteWithRetry ¶
func ExecuteWithRetry(operation func(*elastic.BulkService, context.Context) (bool, error), bulkRequest *elastic.BulkService, ctx context.Context, maxRetries int, retryInterval time.Duration) (success bool, err error)
重试函数,接受一个操作函数、最大重试次数以及重试间隔
func GenMappings ¶
func GetBatchSize ¶
func GetDeleteBy ¶
func GetDiscoveryFilter ¶
func GetDstDynamic ¶
func GetDynamic ¶
func GetESVersion ¶
func GetEndpoint ¶
func GetFieldDelimiter ¶
func GetIncludeSettings ¶
func GetIndexName ¶
func GetMasterTimeout ¶
func GetPassword ¶
func GetRetryTimes ¶
func GetSplitter ¶
func GetTimeout ¶
func GetTryInterval ¶
func GetTrySize ¶
func GetTypeName ¶
func GetUnifiedVersion ¶
func GetUrlParams ¶
func GetUsername ¶
func GetVersioning ¶
func HasPrimaryKeyInfo ¶
func IsCompression ¶
func IsDiscovery ¶
func IsEnableNullUpdate ¶
func IsGreaterOrEqualThan7 ¶
func IsHighSpeedMode ¶
func IsIgnoreParseError ¶
func IsIgnoreWriteError ¶
func IsMultiThread ¶
func IsNeedCleanAlias ¶
func IsTruncate ¶
func NewWriterFromString ¶
NewWriterFromString create writer
func ParseDeleteCondition ¶
func RegistPlugin ¶
func RegistPlugin()
Types ¶
type ActionType ¶
type ActionType int
定义ActionType类型
const ( UNKNOW ActionType = iota INDEX CREATE DELETE UPDATE )
定义ActionType常量
func GetActionType ¶
func GetActionType(conf *config.JSON) ActionType
type Body ¶
type Body struct { Settings interface{} `json:"settings"` Mappings interface{} `json:"mappings"` }
type ElasticSearchFieldType ¶
type ElasticSearchFieldType int
ElasticSearchFieldType 枚举类型
const ( ID ElasticSearchFieldType = iota PARENT ROUTING VERSION STRING TEXT KEYWORD LONG INTEGER SHORT BYTE DOUBLE FLOAT DATE BOOLEAN BINARY INTEGER_RANGE FLOAT_RANGE LONG_RANGE DOUBLE_RANGE DATE_RANGE GEO_POINT GEO_SHAPE IP IP_RANGE COMPLETION TOKEN_COUNT OBJECT NESTED )
func GetESFieldType ¶
func GetESFieldType(typeStr string) ElasticSearchFieldType
getESFieldType 根据传入的字符串类型返回对应的ElasticSearchFieldType
func (ElasticSearchFieldType) String ¶
func (f ElasticSearchFieldType) String() string
String 方法用于返回ElasticSearchFieldType的字符串表示
type EsColumn ¶
type EsColumn struct { Name string Type string Timezone string Format string DstFormat string Array bool DstArray bool JsonArray bool Origin bool CombineFields []string CombineFieldsValueSeparator string }
func DefaultColumn ¶
func DefaultColumn() *EsColumn
func GetColumnList ¶
func GetWriteColumns ¶
func GetcombinedIdColumn ¶
type Job ¶
type Job struct { *plugin.BaseJob RetryTimes int64 SleepTimeInMilliSecond int64 // contains filtered or unexported fields }
Job
type PartitionColumn ¶
func GetEsPartitionColumn ¶
func GetEsPartitionColumn(conf *config.JSON) []PartitionColumn
type PrimaryKeyInfo ¶
func GetPrimaryKeyInfo ¶
func GetPrimaryKeyInfo(conf *config.JSON) *PrimaryKeyInfo
type Task ¶
type Task struct { *writer.BaseTask IndexName string TypeName string BatchSize int64 EnableRedundantColumn bool ColumnList []EsColumn CombinedIdColumn *EsColumn Splitter string TypeList []string PrimaryKeyInfo *PrimaryKeyInfo ColNameToIndexMap map[string]int EsPartitionColumn []PartitionColumn DeleteByConditions []map[string]interface{} Client *elastic.Client ActionType string EnableWriteNull bool IsGreaterOrEqualThan7 bool RetryTimes int64 SleepTimeInMilliSecond int64 UrlParams map[string]interface{} FieldDelimiter string // contains filtered or unexported fields }
Task
func (*Task) StartWrite ¶
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer Writer
func (*Writer) ResourcesConfig ¶
ResourcesConfig Plugin Resource Configuration
Click to show internal directories.
Click to hide internal directories.