diff --git a/cluster/points_writer.go b/cluster/points_writer.go index ca1f7624bb..f7616a1674 100644 --- a/cluster/points_writer.go +++ b/cluster/points_writer.go @@ -181,7 +181,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) // WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of // a cluster structure for information. This is to avoid a circular dependency func (w *PointsWriter) WritePointsInto(p *IntoWriteRequest) error { - return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelAny, p.Points) + return w.WritePoints(p.Database, p.RetentionPolicy, models.ConsistencyLevelOne, p.Points) } // WritePoints writes across multiple local and remote data nodes according the consistency level. diff --git a/cluster/points_writer_test.go b/cluster/points_writer_test.go index 7af8869f8b..5126118acf 100644 --- a/cluster/points_writer_test.go +++ b/cluster/points_writer_test.go @@ -234,7 +234,7 @@ func TestPointsWriter_WritePoints(t *testing.T) { c.Open() defer c.Close() - err := c.WritePoints(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelAny, pr.Points) + err := c.WritePoints(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) if err == nil && test.expErr != nil { t.Errorf("PointsWriter.WritePoints(): '%s' error: got %v, exp %v", test.name, err, test.expErr) } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 02d9c87de1..f47beba859 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -455,8 +455,20 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *meta. return } + // Determine required consistency level. + level := r.URL.Query().Get("consistency") + consistency := models.ConsistencyLevelOne + if level != "" { + var err error + consistency, err = models.ParseConsistencyLevel(level) + if err != nil { + resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) + return + } + } + // Write points. - if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), models.ConsistencyLevelAny, points); influxdb.IsClientError(err) { + if err := h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, points); influxdb.IsClientError(err) { h.statMap.Add(statPointsWrittenFail, int64(len(points))) resultError(w, influxql.Result{Err: err}, http.StatusBadRequest) return