Merge pull request #3697 from influxdb/chunking_10k

Merge same-series data if not chunking
pull/3723/head
Philip O'Toole 2015-08-18 13:23:10 -07:00
commit 28a6b1f3fd
4 changed files with 72 additions and 0 deletions

View File

@ -56,6 +56,7 @@ There are breaking changes in this release. Please see the *Features* section be
- [#3673](https://github.com/influxdb/influxdb/pull/3673): Improve query performance by removing unnecessary tagset sorting.
- [#3676](https://github.com/influxdb/influxdb/pull/3676): Improve query performance by memomizing mapper output keys.
- [#3687](https://github.com/influxdb/influxdb/issues/3687): Fix panic: runtime error: makeslice: len out of range in hinted handoff
- [#3697](https://github.com/influxdb/influxdb/issues/3697): Correctly merge non-chunked results for same series. Fix issue #3242.
## v0.9.2 [2015-07-24]

View File

@ -3025,6 +3025,57 @@ func TestServer_Query_Fill(t *testing.T) {
}
}
func TestServer_Query_Chunk(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()
if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
writes := make([]string, 10001) // 10,000 is the default chunking size, even when no chunking requested.
expectedValues := make([]string, len(writes))
for i := 0; i < len(writes); i++ {
writes[i] = fmt.Sprintf(`cpu value=%d %d`, i, time.Unix(0, int64(i)).UnixNano())
expectedValues[i] = fmt.Sprintf(`["%s",%d]`, time.Unix(0, int64(i)).UTC().Format(time.RFC3339Nano), i)
}
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[%s]}]}]}`, strings.Join(expectedValues, ","))
test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")
test.addQueries([]*Query{
&Query{
name: "SELECT all values, no chunking",
command: `SELECT value FROM cpu`,
exp: expected,
params: url.Values{"db": []string{"db0"}},
},
}...)
for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
func TestServer_Query_DropAndRecreateMeasurement(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")

View File

@ -31,6 +31,11 @@ type Row struct {
Err error `json:"err,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := fnv.New64a()

View File

@ -286,6 +286,21 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
resp.Results = append(resp.Results, r)
} else if resp.Results[l-1].StatementID == r.StatementID {
cr := resp.Results[l-1]
lastSeries := cr.Series[len(cr.Series)-1]
rowsMerged := 0
for _, row := range r.Series {
if !lastSeries.SameSeries(row) {
// Next row is for a different series than last.
break
}
// Values are for the same series, so append them.
lastSeries.Values = append(lastSeries.Values, row.Values...)
rowsMerged++
}
// Append remaining rows as new rows.
r.Series = r.Series[rowsMerged:]
cr.Series = append(cr.Series, r.Series...)
} else {
resp.Results = append(resp.Results, r)