influxdb/task/backend/storetest/logstoretest.go

410 lines
11 KiB
Go

package storetest
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
platform "github.com/influxdata/influxdb"
pcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/task/backend"
platformtesting "github.com/influxdata/influxdb/testing"
)
type CreateRunStoreFunc func(*testing.T) (backend.LogWriter, backend.LogReader)
type DestroyRunStoreFunc func(*testing.T, backend.LogWriter, backend.LogReader)
func NewRunStoreTest(name string, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) func(*testing.T) {
return func(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()
t.Run("UpdateRunState", func(t *testing.T) {
t.Parallel()
updateRunState(t, crf, drf)
})
t.Run("RunLog", func(t *testing.T) {
t.Parallel()
runLogTest(t, crf, drf)
})
t.Run("ListRuns", func(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode.")
}
t.Parallel()
listRunsTest(t, crf, drf)
})
t.Run("FindRunByID", func(t *testing.T) {
t.Parallel()
findRunByIDTest(t, crf, drf)
})
t.Run("ListLogs", func(t *testing.T) {
t.Parallel()
listLogsTest(t, crf, drf)
})
})
}
}
func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
now := time.Now().UTC()
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
scheduledFor := now.Add(-3 * time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: scheduledFor.Unix(),
}
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
startAt := now.Add(-2 * time.Second)
if err := writer.UpdateRunState(ctx, rlb, startAt, backend.RunStarted); err != nil {
t.Fatal(err)
}
run.StartedAt = startAt.Format(time.RFC3339Nano)
run.Status = "started"
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
endAt := now.Add(-1 * time.Second)
if err := writer.UpdateRunState(ctx, rlb, endAt, backend.RunSuccess); err != nil {
t.Fatal(err)
}
run.FinishedAt = endAt.Format(time.RFC3339Nano)
run.Status = "success"
returnedRun, err = reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
}
func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
sf := time.Now().UTC()
sa := sf.Add(-10 * time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339Nano),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
}
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
if err := writer.UpdateRunState(ctx, rlb, sa, backend.RunStarted); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(time.Second), "first"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(2*time.Second), "second"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(ctx, rlb, sa.Add(3*time.Second), "third"); err != nil {
t.Fatal(err)
}
run.Log = platform.Log(fmt.Sprintf(
"%s: first\n%s: second\n%s: third",
sa.Add(time.Second).Format(time.RFC3339Nano),
sa.Add(2*time.Second).Format(time.RFC3339Nano),
sa.Add(3*time.Second).Format(time.RFC3339Nano),
))
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(run, *returnedRun); diff != "" {
t.Fatalf("unexpected run found: -want/+got: %s", diff)
}
}
func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
if _, err := reader.ListRuns(ctx, platform.RunFilter{Task: &task.ID}); err == nil {
t.Fatal("failed to error on bad id")
}
now := time.Now().UTC()
const nRuns = 150
runs := make([]platform.Run, nRuns)
for i := 0; i < len(runs); i++ {
// Scheduled for times ascending with IDs.
scheduledFor := now.Add(time.Duration(-2*(nRuns-i)) * time.Second)
id := platform.ID(i + 1)
runs[i] = platform.Run{
ID: id,
Status: "started",
ScheduledFor: scheduledFor.Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: scheduledFor.Unix(),
}
err := writer.UpdateRunState(ctx, rlb, scheduledFor.Add(time.Second), backend.RunStarted)
if err != nil {
t.Fatal(err)
}
}
if _, err := reader.ListRuns(ctx, platform.RunFilter{}); err == nil {
t.Fatal("failed to error without any filter")
}
listRuns, err := reader.ListRuns(ctx, platform.RunFilter{
Task: &task.ID,
Org: &task.Org,
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs))
}
const afterIDIdx = 20
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
Task: &task.ID,
Org: &task.Org,
After: &runs[afterIDIdx].ID,
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs)-(afterIDIdx+1) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterIDIdx+1))
}
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
Task: &task.ID,
Org: &task.Org,
Limit: 30,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != 30 {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), 30)
}
const afterTimeIdx = 34
scheduledFor, _ := time.Parse(time.RFC3339, runs[afterTimeIdx].ScheduledFor)
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
Task: &task.ID,
Org: &task.Org,
AfterTime: scheduledFor.Format(time.RFC3339),
Limit: 2 * nRuns,
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != len(runs)-(afterTimeIdx+1) {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterTimeIdx+1))
}
const beforeTimeIdx = 34
scheduledFor, _ = time.Parse(time.RFC3339, runs[beforeTimeIdx].ScheduledFor)
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
Task: &task.ID,
Org: &task.Org,
BeforeTime: scheduledFor.Add(time.Millisecond).Format(time.RFC3339),
})
if err != nil {
t.Fatal(err)
}
if len(listRuns) != beforeTimeIdx {
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), beforeTimeIdx)
}
}
func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
if _, err := reader.FindRunByID(context.Background(), platform.InvalidID(), platform.InvalidID()); err == nil {
t.Fatal("failed to error with bad id")
}
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
sf := time.Now().UTC().Add(-10 * time.Second)
sa := sf.Add(time.Second)
run := platform.Run{
ID: platformtesting.MustIDBase16("2c20766972747573"),
TaskID: task.ID,
Status: "started",
ScheduledFor: sf.Format(time.RFC3339),
StartedAt: sa.Format(time.RFC3339Nano),
}
rlb := backend.RunLogBase{
Task: task,
RunID: run.ID,
RunScheduledFor: sf.Unix(),
}
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
if err := writer.UpdateRunState(ctx, rlb, sa, backend.RunStarted); err != nil {
t.Fatal(err)
}
returnedRun, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(run, *returnedRun) {
t.Fatalf("expected:\n%#v, got: \n%#v", run, *returnedRun)
}
returnedRun.Log = "cows"
rr2, err := reader.FindRunByID(ctx, task.Org, run.ID)
if err != nil {
t.Fatal(err)
}
if reflect.DeepEqual(returnedRun, rr2) {
t.Fatalf("updateing returned run modified RunStore data")
}
}
func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
task := &backend.StoreTask{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
Org: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
}
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
if _, err := reader.ListLogs(ctx, platform.LogFilter{}); err == nil {
t.Fatal("failed to error with no filter")
}
if _, err := reader.ListLogs(ctx, platform.LogFilter{Run: &task.ID}); err == nil {
t.Fatal("failed to error with a non-run-ID")
}
now := time.Now().UTC()
const nRuns = 20
runs := make([]platform.Run, nRuns)
for i := 0; i < len(runs); i++ {
sf := now.Add(time.Duration(i-nRuns) * time.Second)
id := platform.ID(i + 1)
runs[i] = platform.Run{
ID: id,
Status: "started",
ScheduledFor: sf.UTC().Format(time.RFC3339),
}
rlb := backend.RunLogBase{
Task: task,
RunID: runs[i].ID,
RunScheduledFor: sf.Unix(),
}
err := writer.UpdateRunState(ctx, rlb, sf.Add(time.Millisecond), backend.RunStarted)
if err != nil {
t.Fatal(err)
}
writer.AddRunLog(ctx, rlb, sf.Add(2*time.Millisecond), fmt.Sprintf("log%d", i))
}
const targetRun = 4
logs, err := reader.ListLogs(ctx, platform.LogFilter{Run: &runs[targetRun].ID, Org: &task.Org})
if err != nil {
t.Fatal(err)
}
if len(logs) != 1 {
t.Fatalf("expected 1 log, got %d", len(logs))
}
fmtTimelog := now.Add(time.Duration(targetRun-nRuns)*time.Second + 2*time.Millisecond).Format(time.RFC3339Nano)
if fmtTimelog+": log4" != string(logs[0]) {
t.Fatalf("expected: %q, got: %q", fmtTimelog+": log4", string(logs[0]))
}
logs, err = reader.ListLogs(ctx, platform.LogFilter{Task: &task.ID, Org: &task.Org})
if err != nil {
t.Fatal(err)
}
if len(logs) != len(runs) {
t.Fatal("not all logs retrieved")
}
}
func makeNewAuthorization() *platform.Authorization {
return &platform.Authorization{
ID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
UserID: platformtesting.MustIDBase16("ab01ab01ab01ab01"),
OrgID: platformtesting.MustIDBase16("ab01ab01ab01ab05"),
Permissions: platform.OperPermissions(),
}
}