Merge pull request #1503 from influxdb/top_level_timestamps

Fix batched writes
pull/1509/head
Philip O'Toole 2015-02-04 08:25:46 -08:00
commit b424c2fe0c
5 changed files with 169 additions and 126 deletions

View File

@ -241,7 +241,7 @@ func (p *Point) UnmarshalJSON(b []byte) error {
var epoch struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
Values map[string]interface{} `json:"values"`
}
@ -251,10 +251,14 @@ func (p *Point) UnmarshalJSON(b []byte) error {
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
ts, err := EpochToTime(epoch.Timestamp, epoch.Precision)
if err != nil {
return err
// Convert from epoch to time.Time, but only if Timestamp
// was actually set.
var ts time.Time
if epoch.Timestamp != nil {
ts, err = EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
p.Name = epoch.Name
p.Tags = epoch.Tags

View File

@ -16,7 +16,6 @@ import (
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/influxql"
)
@ -141,73 +140,9 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
httpResults(w, results, pretty)
}
// BatchWrite is used to send batch write data to the http /write endpoint
type BatchWrite struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the batchWrite struct
func (br *BatchWrite) UnmarshalJSON(b []byte) error {
var normal struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
br.Points = epoch.Points
br.Database = epoch.Database
br.RetentionPolicy = epoch.RetentionPolicy
br.Tags = epoch.Tags
br.Timestamp = ts
br.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision)
br.Points = normal.Points
br.Database = normal.Database
br.RetentionPolicy = normal.RetentionPolicy
br.Tags = normal.Tags
br.Timestamp = normal.Timestamp
br.Precision = normal.Precision
return nil
}
// serveWrite receives incoming series data and writes it to the database.
func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var br BatchWrite
var bp influxdb.BatchPoints
dec := json.NewDecoder(r.Body)
@ -218,58 +153,39 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
return
}
for {
if err := dec.Decode(&br); err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
if err := dec.Decode(&bp); err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
if br.Database == "" {
writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError)
return
}
if bp.Database == "" {
writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError)
return
}
if !h.server.DatabaseExists(br.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound)
return
}
if !h.server.DatabaseExists(bp.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", bp.Database)}, http.StatusNotFound)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, br.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, br.Database)}, http.StatusUnauthorized)
return
}
if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, bp.Database) {
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, bp.Database)}, http.StatusUnauthorized)
return
}
for _, p := range br.Points {
if p.Timestamp.Time().IsZero() {
p.Timestamp = client.Timestamp(br.Timestamp)
}
if p.Precision == "" && br.Precision != "" {
p.Precision = br.Precision
}
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
if len(br.Tags) > 0 {
for k := range br.Tags {
if p.Tags[k] == "" {
p.Tags[k] = br.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
iPoint := influxdb.Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.Time(),
Values: p.Values,
}
if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{iPoint}); err != nil {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
}
points, err := influxdb.NormalizeBatchPoints(bp)
if err != nil {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
if _, err := h.server.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
return
}
}

View File

@ -12,6 +12,7 @@ import (
"os"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -87,13 +88,13 @@ func TestBatchWrite_UnmarshalEpoch(t *testing.T) {
t.Logf("testing %q\n", test.name)
data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision))
t.Logf("json: %s", string(data))
var br httpd.BatchWrite
err := json.Unmarshal(data, &br)
var bp influxdb.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
t.Fatalf("unexpected error. expected: %v, actual: %v", nil, err)
}
if !br.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp)
if !bp.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Timestamp)
}
}
}
@ -125,13 +126,13 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) {
ts := test.now.Format(test.rfc)
data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts))
t.Logf("json: %s", string(data))
var br httpd.BatchWrite
err := json.Unmarshal(data, &br)
var bp influxdb.BatchPoints
err := json.Unmarshal(data, &bp)
if err != nil {
t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err)
}
if !br.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp)
if !bp.Timestamp.Equal(test.expected) {
t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Timestamp)
}
}
}
@ -1750,6 +1751,7 @@ func OpenUninitializedServer(client influxdb.MessagingClient) *Server {
type MessagingClient struct {
index uint64
c chan *messaging.Message
mu sync.Mutex // Ensure all publishing is serialized.
PublishFunc func(*messaging.Message) (uint64, error)
CreateReplicaFunc func(replicaID uint64) error
@ -1772,6 +1774,8 @@ func NewMessagingClient() *MessagingClient {
// Publish attaches an autoincrementing index to the message.
// This function also execute's the client's PublishFunc mock function.
func (c *MessagingClient) Publish(m *messaging.Message) (uint64, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.index++
m.Index = c.index
return c.PublishFunc(m)

View File

@ -5,6 +5,9 @@ import (
"errors"
"fmt"
"os"
"time"
"github.com/influxdb/influxdb/client"
)
var (
@ -88,9 +91,15 @@ var (
// ErrInvalidQuery is returned when executing an unknown query type.
ErrInvalidQuery = errors.New("invalid query")
// ErrMeasurementNameRequired is returned when a point does not contain a name.
ErrMeasurementNameRequired = errors.New("measurement name required")
// ErrMeasurementNotFound is returned when a measurement does not exist.
ErrMeasurementNotFound = errors.New("measurement not found")
// ErrValuesRequired is returned when a point does not any values
ErrValuesRequired = errors.New("values required")
// ErrFieldOverflow is returned when too many fields are created on a measurement.
ErrFieldOverflow = errors.New("field overflow")
@ -109,6 +118,108 @@ var (
ErrInvalidGrantRevoke = errors.New("invalid privilege requested")
)
// BatchPoints is used to send batched data in a single write.
type BatchPoints struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
// UnmarshalJSON decodes the data into the BatchPoints struct
func (bp *BatchPoints) UnmarshalJSON(b []byte) error {
var normal struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp time.Time `json:"timestamp"`
Precision string `json:"precision"`
}
var epoch struct {
Points []client.Point `json:"points"`
Database string `json:"database"`
RetentionPolicy string `json:"retentionPolicy"`
Tags map[string]string `json:"tags"`
Timestamp *int64 `json:"timestamp"`
Precision string `json:"precision"`
}
if err := func() error {
var err error
if err = json.Unmarshal(b, &epoch); err != nil {
return err
}
// Convert from epoch to time.Time
var ts time.Time
if epoch.Timestamp != nil {
ts, err = client.EpochToTime(*epoch.Timestamp, epoch.Precision)
if err != nil {
return err
}
}
bp.Points = epoch.Points
bp.Database = epoch.Database
bp.RetentionPolicy = epoch.RetentionPolicy
bp.Tags = epoch.Tags
bp.Timestamp = ts
bp.Precision = epoch.Precision
return nil
}(); err == nil {
return nil
}
if err := json.Unmarshal(b, &normal); err != nil {
return err
}
normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision)
bp.Points = normal.Points
bp.Database = normal.Database
bp.RetentionPolicy = normal.RetentionPolicy
bp.Tags = normal.Tags
bp.Timestamp = normal.Timestamp
bp.Precision = normal.Precision
return nil
}
// NormalizeBatchPoints returns a slice of Points, created by populating individual
// points within the batch, which do not have timestamps or tags, with the top-level
// values.
func NormalizeBatchPoints(bp BatchPoints) ([]Point, error) {
points := []Point{}
for _, p := range bp.Points {
if p.Timestamp.Time().IsZero() {
p.Timestamp = client.Timestamp(bp.Timestamp)
}
if p.Precision == "" && bp.Precision != "" {
p.Precision = bp.Precision
}
p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision))
if len(bp.Tags) > 0 {
if p.Tags == nil {
p.Tags = make(map[string]string)
}
for k := range bp.Tags {
if p.Tags[k] == "" {
p.Tags[k] = bp.Tags[k]
}
}
}
// Need to convert from a client.Point to a influxdb.Point
points = append(points, Point{
Name: p.Name,
Tags: p.Tags,
Timestamp: p.Timestamp.Time(),
Values: p.Values,
})
}
return points, nil
}
// ErrAuthorize represents an authorization error.
type ErrAuthorize struct {
text string

View File

@ -1413,6 +1413,14 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
func (s *Server) writePoint(database, retentionPolicy string, point *Point) (uint64, error) {
name, tags, timestamp, values := point.Name, point.Tags, point.Timestamp, point.Values
// Sanity-check the data point.
if name == "" {
return 0, ErrMeasurementNameRequired
}
if len(values) == 0 {
return 0, ErrValuesRequired
}
// Find the id for the series and tagset
seriesID, err := s.createSeriesIfNotExists(database, name, tags)
if err != nil {