2019-10-17 09:37:03 +00:00
|
|
|
package backend
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/influxdata/influxdb"
|
|
|
|
"github.com/influxdata/influxdb/models"
|
|
|
|
"github.com/influxdata/influxdb/storage"
|
|
|
|
"github.com/influxdata/influxdb/tsdb"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
// StoragePointsWriterRecorder is an implementation of RunRecorder which
|
|
|
|
// writes runs via an implementation of storage PointsWriter
|
|
|
|
type StoragePointsWriterRecorder struct {
|
|
|
|
pw storage.PointsWriter
|
|
|
|
|
|
|
|
logger *zap.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewStoragePointsWriterRecorder configures and returns a new *StoragePointsWriterRecorder
|
|
|
|
func NewStoragePointsWriterRecorder(pw storage.PointsWriter, logger *zap.Logger) *StoragePointsWriterRecorder {
|
|
|
|
return &StoragePointsWriterRecorder{pw, logger}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Record formats the provided run as a models.Point and writes the resulting
|
|
|
|
// point to an underlying storage.PointsWriter
|
|
|
|
func (s *StoragePointsWriterRecorder) Record(ctx context.Context, orgID influxdb.ID, org string, bucketID influxdb.ID, bucket string, run *influxdb.Run) error {
|
|
|
|
tags := models.NewTags(map[string]string{
|
|
|
|
statusTag: run.Status,
|
|
|
|
taskIDTag: run.TaskID.String(),
|
|
|
|
})
|
|
|
|
|
|
|
|
// log an error if we have incomplete data on finish
|
|
|
|
if !run.ID.Valid() ||
|
2019-10-18 00:23:45 +00:00
|
|
|
run.ScheduledFor.IsZero() ||
|
|
|
|
run.StartedAt.IsZero() ||
|
|
|
|
run.FinishedAt.IsZero() ||
|
2019-10-17 09:37:03 +00:00
|
|
|
run.Status == "" {
|
|
|
|
s.logger.Error("Run missing critical fields", zap.String("run", fmt.Sprintf("%+v", run)), zap.String("runID", run.ID.String()))
|
|
|
|
}
|
|
|
|
|
|
|
|
fields := map[string]interface{}{}
|
|
|
|
fields[runIDField] = run.ID.String()
|
2019-10-18 00:23:45 +00:00
|
|
|
fields[startedAtField] = run.StartedAt.Format(time.RFC3339Nano)
|
|
|
|
fields[finishedAtField] = run.FinishedAt.Format(time.RFC3339Nano)
|
|
|
|
fields[scheduledForField] = run.ScheduledFor.Format(time.RFC3339)
|
|
|
|
fields[requestedAtField] = run.RequestedAt.Format(time.RFC3339)
|
2019-10-17 09:37:03 +00:00
|
|
|
|
2019-10-18 00:23:45 +00:00
|
|
|
startedAt := run.StartedAt
|
|
|
|
if startedAt.IsZero() {
|
|
|
|
startedAt = time.Now().UTC()
|
2019-10-17 09:37:03 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
logBytes, err := json.Marshal(run.Log)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
fields[logField] = string(logBytes)
|
|
|
|
|
|
|
|
point, err := models.NewPoint("runs", tags, fields, startedAt)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// use the tsdb explode points to convert to the new style.
|
|
|
|
// We could split this on our own but its quite possible this could change.
|
|
|
|
points, err := tsdb.ExplodePoints(orgID, bucketID, models.Points{point})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return s.pw.WritePoints(ctx, points)
|
|
|
|
}
|