Documentation ¶
Index ¶
- Constants
- Variables
- func CreateKafkaTopic() string
- func DaemonPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, ...) (bool, error)
- func DeleteKafkaTopic(topic string) string
- func Exec(name string, args ...string) (string, error)
- func ExpectKafkaTopicCount(topic string, total int, timeout time.Duration)
- func GetKafkaCount(topic string, count int) int
- func GetMsgCountContains(pipelineName, sinkName, targetStr string) int
- func HTTPExpect(t require.TestingT, baseURL string) *httpexpect.Expect
- func InvokeE2EAPI(format string, args ...interface{}) string
- func InvokeE2EAPIPOST(format string, body string, args ...interface{}) string
- func PodPortForward(config *rest.Config, namespace, podName string, localPort, remotePort int, ...) error
- func PodsLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, regex string, ...) bool
- func PodsLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, regex string, ...) bool
- func PumpKafkaTopic(topic string, n int, opts ...interface{})
- func PumpNatsSubject(subject string, n int, opts ...interface{})
- func RedisContains(ctx context.Context, pipelineName, sinkName, targetStr string, ...) bool
- func RedisNotContains(ctx context.Context, pipelineName, sinkName, targetStr string, ...) bool
- func SendMessage(topic string, key string, message string)
- func SendMessageTo(podIp string, vertexName string, r HttpPostRequest)
- func ValidateMessage(topic string, key string, position int)
- func VertexPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, ...) (bool, error)
- func VertexPodLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, ...) (bool, error)
- func WaitForDaemonPodsRunning(kubeClient kubernetes.Interface, namespace, pipelineName string, ...) error
- func WaitForISBSvcReady(ctx context.Context, isbSvcClient flowpkg.InterStepBufferServiceInterface, ...) error
- func WaitForISBSvcStatefulSetReady(ctx context.Context, kubeClient kubernetes.Interface, ...) error
- func WaitForPipelineRunning(ctx context.Context, pipelineClient flowpkg.PipelineInterface, ...) error
- func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, ...) error
- func WaitForVertexPodScalingTo(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, ...) error
- type CheckFunc
- type E2ESuite
- type Expect
- func (t *Expect) DaemonPodLogContains(pipelineName, regex string, opts ...PodLogCheckOption) *Expect
- func (t *Expect) DaemonPodsRunning() *Expect
- func (t *Expect) ISBSvcDeleted(timeout time.Duration) *Expect
- func (t *Expect) SinkContains(sinkName string, targetStr string, opts ...SinkCheckOption) *Expect
- func (t *Expect) SinkNotContains(sinkName string, targetStr string) *Expect
- func (t *Expect) VertexPodLogContains(vertexName, regex string, opts ...PodLogCheckOption) *Expect
- func (t *Expect) VertexPodLogNotContains(vertexName, regex string, opts ...PodLogCheckOption) *Expect
- func (t *Expect) VertexPodsRunning() *Expect
- func (t *Expect) VertexSizeScaledTo(v string, size int) *Expect
- func (t *Expect) When() *When
- type Given
- type HttpPostRequest
- type PodLogCheckOption
- type SinkCheckOption
- type When
- func (w *When) And(block func()) *When
- func (w *When) CreateISBSvc() *When
- func (w *When) CreatePipelineAndWait() *When
- func (w *When) DaemonPodPortForward(pipelineName string, localPort, remotePort int) *When
- func (w *When) DeleteISBSvc() *When
- func (w *When) DeletePipelineAndWait() *When
- func (w *When) Exec(name string, args []string, block func(t *testing.T, output string, err error)) *When
- func (w *When) Expect() *Expect
- func (w *When) Given() *Given
- func (w *When) SendMessageTo(pipelineName string, vertexName string, req HttpPostRequest) *When
- func (w *When) TerminateAllPodPortForwards() *When
- func (w *When) VertexPodPortForward(vertexName string, localPort, remotePort int) *When
- func (w *When) Wait(timeout time.Duration) *When
- func (w *When) WaitForISBSvcReady() *When
Constants ¶
const ( Namespace = "numaflow-system" Label = "numaflow-e2e" LabelValue = "true" ISBSvcName = "numaflow-e2e" LogSourceVertexStarted = "Start processing source messages" SinkVertexStarted = "Start processing sink messages" LogUDFVertexStarted = "Start processing udf messages" LogReduceUDFVertexStarted = "Start processing reduce udf messages" LogDaemonStarted = "Daemon server started successfully" )
Variables ¶
var CheckPodKillSucceeded = func(t *testing.T, output string, err error) { assert.Contains(t, output, "deleted") assert.NoError(t, err) }
Functions ¶
func CreateKafkaTopic ¶
func CreateKafkaTopic() string
func DaemonPodLogContains ¶
func DaemonPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, regex string, opts ...PodLogCheckOption) (bool, error)
func DeleteKafkaTopic ¶
func ExpectKafkaTopicCount ¶
func GetKafkaCount ¶
func GetMsgCountContains ¶
GetMsgCountContains returns number of occurrences of the targetStr in redis that are written by pipelineName, sinkName.
func HTTPExpect ¶
func InvokeE2EAPI ¶
func InvokeE2EAPIPOST ¶
func PodPortForward ¶
func PodsLogContains ¶
func PodsLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, regex string, podList *corev1.PodList, opts ...PodLogCheckOption) bool
func PodsLogNotContains ¶
func PodsLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, regex string, podList *corev1.PodList, opts ...PodLogCheckOption) bool
func PumpKafkaTopic ¶
func PumpNatsSubject ¶
func RedisContains ¶
func RedisContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool
RedisContains verifies that there are targetStr in redis written by pipelineName, sinkName.
func RedisNotContains ¶
func RedisNotContains(ctx context.Context, pipelineName, sinkName, targetStr string, opts ...SinkCheckOption) bool
RedisNotContains verifies that there is no occurrence of targetStr in redis that is written by pipelineName, sinkName.
func SendMessage ¶
func SendMessageTo ¶
func SendMessageTo(podIp string, vertexName string, r HttpPostRequest)
SendMessageTo sends a http post request to a pod in http source vertex.
func ValidateMessage ¶
func VertexPodLogContains ¶
func VertexPodLogContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, vertexName, regex string, opts ...PodLogCheckOption) (bool, error)
func VertexPodLogNotContains ¶
func VertexPodLogNotContains(ctx context.Context, kubeClient kubernetes.Interface, namespace, pipelineName, vertexName, regex string, opts ...PodLogCheckOption) (bool, error)
func WaitForISBSvcReady ¶
func WaitForPipelineRunning ¶
func WaitForVertexPodRunning ¶
func WaitForVertexPodRunning(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, namespace, pipelineName, vertexName string, timeout time.Duration) error
func WaitForVertexPodScalingTo ¶
func WaitForVertexPodScalingTo(kubeClient kubernetes.Interface, vertexClient flowpkg.VertexInterface, namespace, pipelineName, vertexName string, timeout time.Duration, size int) error
Types ¶
type E2ESuite ¶
func (*E2ESuite) CheckError ¶
func (*E2ESuite) GetNumaflowServerPodName ¶
func (*E2ESuite) SetupSuite ¶
func (s *E2ESuite) SetupSuite()
func (*E2ESuite) StartPortForward ¶
func (*E2ESuite) TearDownSuite ¶
func (s *E2ESuite) TearDownSuite()
type Expect ¶
type Expect struct {
// contains filtered or unexported fields
}
func (*Expect) DaemonPodLogContains ¶
func (t *Expect) DaemonPodLogContains(pipelineName, regex string, opts ...PodLogCheckOption) *Expect
func (*Expect) DaemonPodsRunning ¶
func (*Expect) SinkContains ¶
func (t *Expect) SinkContains(sinkName string, targetStr string, opts ...SinkCheckOption) *Expect
func (*Expect) SinkNotContains ¶
func (*Expect) VertexPodLogContains ¶
func (t *Expect) VertexPodLogContains(vertexName, regex string, opts ...PodLogCheckOption) *Expect
func (*Expect) VertexPodLogNotContains ¶
func (t *Expect) VertexPodLogNotContains(vertexName, regex string, opts ...PodLogCheckOption) *Expect
func (*Expect) VertexPodsRunning ¶
type Given ¶
type Given struct {
// contains filtered or unexported fields
}
func (*Given) ISBSvc ¶
creates an ISBSvc based on the parameter, this may be:
1. A file name if it starts with "@" 2. Raw YAML.
type HttpPostRequest ¶
func NewHttpPostRequest ¶
func NewHttpPostRequest() HttpPostRequest
NewHttpPostRequest constructor for HttpPostRequest
func (HttpPostRequest) WithBody ¶
func (b HttpPostRequest) WithBody(body []byte) HttpPostRequest
func (HttpPostRequest) WithHeader ¶
func (b HttpPostRequest) WithHeader(k, v string) HttpPostRequest
type PodLogCheckOption ¶
type PodLogCheckOption func(*podLogCheckOptions)
func PodLogCheckOptionWithContainer ¶
func PodLogCheckOptionWithContainer(c string) PodLogCheckOption
func PodLogCheckOptionWithCount ¶
func PodLogCheckOptionWithCount(c int) PodLogCheckOption
func PodLogCheckOptionWithTimeout ¶
func PodLogCheckOptionWithTimeout(t time.Duration) PodLogCheckOption
type SinkCheckOption ¶
type SinkCheckOption func(*redisCheckOptions)
func WithContainCount ¶
func WithContainCount(c int) SinkCheckOption
WithContainCount updates the redisCheckOptions to specify count. The count is the expected number of matches for the check.
func WithTimeout ¶
func WithTimeout(t time.Duration) SinkCheckOption
WithTimeout updates the redisCheckOptions to specify timeout. The timeout specifies how long the redis check will wait for expected data to be ready in redis.
type When ¶
type When struct {
// contains filtered or unexported fields
}
func (*When) CreateISBSvc ¶
func (*When) CreatePipelineAndWait ¶
func (*When) DaemonPodPortForward ¶
func (*When) DeleteISBSvc ¶
func (*When) DeletePipelineAndWait ¶
func (*When) SendMessageTo ¶
func (w *When) SendMessageTo(pipelineName string, vertexName string, req HttpPostRequest) *When
SendMessageTo sends msg to one of the pods in http source vertex.