fix(task): Remove the tag and field status duplication (#14643)
parent
25d6de0ba8
commit
e671c71e69
|
@ -24,10 +24,10 @@ const (
|
|||
startedAtField = "startedAt"
|
||||
finishedAtField = "finishedAt"
|
||||
requestedAtField = "requestedAt"
|
||||
statusField = "status"
|
||||
logField = "logs"
|
||||
|
||||
taskIDTag = "taskID"
|
||||
statusTag = "status"
|
||||
|
||||
// Fixed system bucket ID for task and run logs.
|
||||
taskSystemBucketID platform.ID = 10
|
||||
|
@ -63,7 +63,7 @@ func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID influx
|
|||
|
||||
tags := models.Tags{
|
||||
models.NewTag([]byte(taskIDTag), []byte(run.TaskID.String())),
|
||||
models.NewTag([]byte(statusField), []byte(run.Status)),
|
||||
models.NewTag([]byte(statusTag), []byte(run.Status)),
|
||||
}
|
||||
|
||||
// log an error if we have incomplete data on finish
|
||||
|
@ -76,7 +76,6 @@ func (as *AnalyticalStorage) FinishRun(ctx context.Context, taskID, runID influx
|
|||
}
|
||||
|
||||
fields := map[string]interface{}{}
|
||||
fields[statusField] = run.Status
|
||||
fields[runIDField] = run.ID.String()
|
||||
fields[startedAtField] = run.StartedAt
|
||||
fields[finishedAtField] = run.FinishedAt
|
||||
|
@ -178,7 +177,7 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
|
|||
|> range(start: -14d)
|
||||
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
|
||||
%s
|
||||
|> group(columns: ["taskID"])
|
||||
|> group(columns: ["taskID", "status"])
|
||||
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
|> limit(n:%d)
|
||||
|> sort(columns:["scheduledFor"], desc: true)
|
||||
|
@ -250,7 +249,7 @@ func (as *AnalyticalStorage) FindRunByID(ctx context.Context, taskID, runID infl
|
|||
findRunScript := fmt.Sprintf(`from(bucketID: "000000000000000a")
|
||||
|> range(start: -14d)
|
||||
|> filter(fn: (r) => r._measurement == "runs" and r.taskID == %q)
|
||||
|> group(columns: ["taskID"])
|
||||
|> group(columns: ["taskID", "status"])
|
||||
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|
||||
|> filter(fn: (r) => r.runID == %q)
|
||||
`, taskID.String(), runID.String())
|
||||
|
@ -347,7 +346,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error {
|
|||
var r influxdb.Run
|
||||
for j, col := range cr.Cols() {
|
||||
switch col.Label {
|
||||
case "runID":
|
||||
case runIDField:
|
||||
if cr.Strings(j).ValueString(i) != "" {
|
||||
id, err := influxdb.IDFromString(cr.Strings(j).ValueString(i))
|
||||
if err != nil {
|
||||
|
@ -356,7 +355,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error {
|
|||
}
|
||||
r.ID = *id
|
||||
}
|
||||
case "taskID":
|
||||
case taskIDTag:
|
||||
if cr.Strings(j).ValueString(i) != "" {
|
||||
id, err := influxdb.IDFromString(cr.Strings(j).ValueString(i))
|
||||
if err != nil {
|
||||
|
@ -371,7 +370,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error {
|
|||
r.RequestedAt = cr.Strings(j).ValueString(i)
|
||||
case scheduledForField:
|
||||
r.ScheduledFor = cr.Strings(j).ValueString(i)
|
||||
case statusField:
|
||||
case statusTag:
|
||||
r.Status = cr.Strings(j).ValueString(i)
|
||||
case finishedAtField:
|
||||
r.FinishedAt = cr.Strings(j).ValueString(i)
|
||||
|
|
Loading…
Reference in New Issue