influxdb/gather/storage.go

37 lines
790 B
Go
Raw Normal View History

2018-08-21 20:16:15 +00:00
package gather
import (
"encoding/json"
"fmt"
"github.com/influxdata/platform/nats"
2018-09-07 15:45:28 +00:00
"go.uber.org/zap"
2018-08-21 20:16:15 +00:00
)
2018-09-07 15:45:28 +00:00
// Storage stores the metrics of a time based.
2018-08-21 20:16:15 +00:00
type Storage interface {
//Subscriber nats.Subscriber
Record([]Metrics) error
}
2018-09-07 15:45:28 +00:00
// StorageHandler implements nats.Handler interface.
type StorageHandler struct {
Storage Storage
Logger *zap.Logger
2018-08-21 20:16:15 +00:00
}
2018-09-07 15:45:28 +00:00
// Process consumes job queue, and use storage to record.
func (h *StorageHandler) Process(s nats.Subscription, m nats.Message) {
2018-08-21 20:16:15 +00:00
defer m.Ack()
ms := make([]Metrics, 0)
err := json.Unmarshal(m.Data(), &ms)
if err != nil {
2018-09-07 15:45:28 +00:00
h.Logger.Error(fmt.Sprintf("storage handler process err: %v", err))
return
2018-08-21 20:16:15 +00:00
}
2018-09-07 15:45:28 +00:00
err = h.Storage.Record(ms)
2018-08-21 20:16:15 +00:00
if err != nil {
2018-09-07 15:45:28 +00:00
h.Logger.Error(fmt.Sprintf("storage handler store err: %v", err))
2018-08-21 20:16:15 +00:00
}
}