Documentation ¶
Index ¶
- Constants
- Variables
- func CleanupFunc(ctx context.Context, action *Workflow) error
- func DeleteFunc(ctx context.Context, action *Workflow) error
- func ResetFunc(ctx context.Context, action *Workflow) error
- func SetupFunc(ctx context.Context, action *Workflow) error
- func WithTimeout(fn ActionFunc, timeout time.Duration) func(ctx context.Context, wf *Workflow) error
- type Action
- type ActionFunc
- type OnError
- type SchemaManager
- func (tm *SchemaManager) Client() api.Client
- func (tm *SchemaManager) CreateOrUpdateTable(ctx context.Context, schema *kschema.Schema) error
- func (tm *SchemaManager) DeleteTable(ctx context.Context, schema *kschema.Schema) error
- func (tm *SchemaManager) DeleteTopic(ctx context.Context, name string) error
- func (tm *SchemaManager) GetSchema(ctx context.Context, table kschema.TableTopic) (kschema.FieldSchema, error)
- func (tm *SchemaManager) PurgeTable(ctx context.Context, schema *kschema.Schema) error
- func (tm *SchemaManager) ResetTable(ctx context.Context, schema *kschema.Schema) error
- func (tm *SchemaManager) Setup(ctx context.Context) error
- func (tm *SchemaManager) Validate() error
- type Workflow
- func (a *Workflow) AddStep(cmd ...Action)
- func (a *Workflow) Client() api.Client
- func (a *Workflow) Close() error
- func (a *Workflow) DeferStep(cmd ...Action)
- func (a *Workflow) GetFunc(name string) (ActionFunc, error)
- func (a *Workflow) KeyFile() *config.KeyFile
- func (a *Workflow) Run(ctx context.Context, onError OnError) (result error)
- func (a *Workflow) SchemaManager() *SchemaManager
- func (a *Workflow) ServerAddress() string
- func (a *Workflow) SetFunc(name string, fn ActionFunc) error
- func (wf *Workflow) SetProgram(steps []string) error
Constants ¶
View Source
const TearDownDuration = time.Minute * 1
Variables ¶
View Source
var ( ErrorWriterNotDefined = errors.New("Writer not defined") ErrorEmptyTopic = errors.New("SchemaManager.Topic not set") )
View Source
var ( Setup = Action{Name: "setup", Func: SetupFunc, Help: "setup metadata topics"} Reset = Action{Name: "reset", Func: ResetFunc, Help: "set table schema(s) to the empty schema"} Delete = Action{Name: "delete", Func: DeleteFunc, Help: "delete table topic(s)"} Purge = Action{Name: "purge", Func: CleanupFunc, Help: "reset and delete"} Destroy = Action{ Name: "destroy", Func: CleanupFunc, Help: "run reset and delete for ALL tables and delete the metadata topics", } )
Functions ¶
func WithTimeout ¶
Types ¶
type ActionFunc ¶
ActionFunc defines the minimal interface to implement a custom Command.Func.
Example Usage:
cmd := kstoreContext.AddCommand(kstore.Command{ Name: "demo", Func: kstore.ActionFunc(examples.RunTopicManagement).WithTimeout(*timeout), }) action.AddCommand(ctx, cmd)
type SchemaManager ¶
type SchemaManager struct {
// contains filtered or unexported fields
}
func NewSchemaManager ¶
func NewSchemaManager(schemasTopic string, client api.Client) *SchemaManager
func (*SchemaManager) Client ¶
func (tm *SchemaManager) Client() api.Client
func (*SchemaManager) CreateOrUpdateTable ¶
CreateOrUpdateTable creates a table topic (if needed) and updates the table schema.
func (*SchemaManager) DeleteTable ¶
DeleteTable resets the table schema and deletes the table topic.
func (*SchemaManager) DeleteTopic ¶
func (tm *SchemaManager) DeleteTopic(ctx context.Context, name string) error
DeleteTopic deletes a topic.
func (*SchemaManager) GetSchema ¶
func (tm *SchemaManager) GetSchema(ctx context.Context, table kschema.TableTopic) (kschema.FieldSchema, error)
GetSchema returns the stored schema for the given TableTopic.
func (*SchemaManager) PurgeTable ¶
PurgeTable clears the Schema for the given table and deletes the table topic.
func (*SchemaManager) ResetTable ¶
ResetTable clears teh Schema for the given topic.
func (*SchemaManager) Setup ¶
func (tm *SchemaManager) Setup(ctx context.Context) error
Setup must be called to initialize the SchemaManager and setup the TablesInfo topic in Kafka.
func (*SchemaManager) Validate ¶
func (tm *SchemaManager) Validate() error
Validate checks the SchemaManager setup for correctness.
type Workflow ¶
type Workflow struct { DryRun bool // contains filtered or unexported fields }
func NewWorkflow ¶
func (*Workflow) SchemaManager ¶
func (a *Workflow) SchemaManager() *SchemaManager
func (*Workflow) ServerAddress ¶
func (*Workflow) SetProgram ¶
Click to show internal directories.
Click to hide internal directories.