Introduce syntax for marking a partial response with chunking
The `partial` tag has been added to the JSON response of a series and the result so that a client knows when more of the series or result will be sent in a future JSON chunk. This helps interactive clients who don't want to wait for all of the data to know if it is done processing the current series or the current result. Previously, the client had to guess if the next chunk would refer to the same result or a new result and it had to match the name and tags of the two series to know if they were the same series. Now, the client just needs to check the `partial` field included with the response to know if it should expect more. Fixed `max-row-limit` so it counts rows instead of results and it truncates the response when the `max-row-limit` is reached.pull/7368/head
parent
b765a4b8c7
commit
b4db76cee2
|
@ -6,6 +6,7 @@
|
|||
|
||||
- [#7066](https://github.com/influxdata/influxdb/issues/7066): Add support for secure transmission via collectd.
|
||||
- [#7554](https://github.com/influxdata/influxdb/pull/7554): update latest dependencies with Godeps.
|
||||
- [#7368](https://github.com/influxdata/influxdb/pull/7368): Introduce syntax for marking a partial response with chunking.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
@ -13,6 +14,7 @@
|
|||
- [#7616](https://github.com/influxdata/influxdb/pull/7616): Fix chuid argument order in init script @ccasey
|
||||
|
||||
## v1.1.0 [2016-11-14]
|
||||
## v1.1.0 [unreleased]
|
||||
|
||||
### Release Notes
|
||||
|
||||
|
|
|
@ -19,13 +19,13 @@ func init() {
|
|||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should succeed",
|
||||
command: `CREATE DATABASE db0_r WITH DURATION 24h REPLICATION 2 NAME db0_r_policy`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
|
@ -51,22 +51,22 @@ func init() {
|
|||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should not error with existing database",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should create non-existing database",
|
||||
command: `CREATE DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with retention duration should error if retention policy is different",
|
||||
command: `CREATE DATABASE db1 WITH DURATION 24h`,
|
||||
exp: `{"results":[{"error":"retention policy conflicts with an existing policy"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"retention policy conflicts with an existing policy"}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database should error with bad retention duration",
|
||||
|
@ -76,50 +76,50 @@ func init() {
|
|||
&Query{
|
||||
name: "show database should succeed",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"],"values":[["db0"],["db0_r"],["db1"]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0 should succeed",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db0_r should succeed",
|
||||
command: `DROP DATABASE db0_r`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database db1 should succeed",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should not error if it does not exists",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "drop database should not error with non-existing database db1",
|
||||
command: `DROP DATABASE db1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show database should have no results",
|
||||
command: `SHOW DATABASES`,
|
||||
exp: `{"results":[{"series":[{"name":"databases","columns":["name"]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"databases","columns":["name"]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with shard group duration should succeed",
|
||||
command: `CREATE DATABASE db0 WITH SHARD DURATION 61m`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create database with shard group duration and duration should succeed",
|
||||
command: `CREATE DATABASE db1 WITH DURATION 60m SHARD DURATION 30m`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
@ -134,31 +134,31 @@ func init() {
|
|||
&Query{
|
||||
name: "Drop database after data write",
|
||||
command: `DROP DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate database",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Recreate retention policy",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 365d REPLICATION 1 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show measurements after recreate",
|
||||
command: `SHOW MEASUREMENTS`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data after recreate",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
|
@ -174,31 +174,31 @@ func init() {
|
|||
&Query{
|
||||
name: "Query data from 1st database",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop other database",
|
||||
command: `DROP DATABASE db1`,
|
||||
once: true,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Query data from 1st database and ensure it's still there with GROUP BY *",
|
||||
command: `SELECT * FROM cpu GROUP BY *`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"serverA","region":"uswest"},"columns":["time","val"],"values":[["2000-01-01T00:00:00Z",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
|
@ -217,32 +217,32 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Delete series",
|
||||
command: `DELETE FROM cpu WHERE time < '2000-01-03T00:00:00Z'`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series still exists",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure last point still exists",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-03T00:00:00Z","serverA","uswest",200]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure data wasn't deleted from other database.",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db1"}},
|
||||
},
|
||||
},
|
||||
|
@ -259,26 +259,26 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM cpu`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Make sure data wasn't deleted from other database.",
|
||||
command: `SELECT * FROM cpu`,
|
||||
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","host","region","val"],"values":[["2000-01-01T00:00:00Z","serverA","uswest",23.2]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db1"}},
|
||||
},
|
||||
},
|
||||
|
@ -293,7 +293,7 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present again after re-write",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["cpu,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
|
@ -314,51 +314,51 @@ func init() {
|
|||
&Query{
|
||||
name: "Show series is present",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["a,host=serverA,region=uswest"],["aa,host=serverA,region=uswest"],["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["a,host=serverA,region=uswest"],["aa,host=serverA,region=uswest"],["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series after data write",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Show series is gone",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series from regex that matches no measurements",
|
||||
command: `DROP SERIES FROM /a.*/`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES doesn't delete anything when regex doesn't match",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE field should error",
|
||||
command: `DROP SERIES FROM c WHERE val > 50.0`,
|
||||
exp: `{"results":[{"error":"fields not supported in WHERE clause during deletion"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"fields not supported in WHERE clause during deletion"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "make sure DROP SERIES with field in WHERE didn't delete data",
|
||||
command: `SHOW SERIES`,
|
||||
exp: `{"results":[{"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["key"],"values":[["b,host=serverA,region=uswest"],["c,host=serverA,region=uswest"]]}]}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
&Query{
|
||||
name: "Drop series with WHERE time should error",
|
||||
command: `DROP SERIES FROM c WHERE time > now() - 1d`,
|
||||
exp: `{"results":[{"error":"DROP SERIES doesn't support time in WHERE clause"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"DROP SERIES doesn't support time in WHERE clause"}]}`,
|
||||
params: url.Values{"db": []string{"db0"}},
|
||||
},
|
||||
},
|
||||
|
@ -370,90 +370,90 @@ func init() {
|
|||
&Query{
|
||||
name: "create retention policy should succeed",
|
||||
command: `CREATE RETENTION POLICY rp0 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should succeed",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "alter retention policy should succeed",
|
||||
command: `ALTER RETENTION POLICY rp0 ON db0 DURATION 2h REPLICATION 3 DEFAULT`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should have new altered information",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should still show policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "create a second non-default retention policy",
|
||||
command: `CREATE RETENTION POLICY rp2 ON db0 DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp2","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp2","1h0m0s","1h0m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default retention policy succeed",
|
||||
command: `DROP RETENTION POLICY rp2 ON db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create a third non-default retention policy",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create retention policy with default on",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m DEFAULT`,
|
||||
exp: `{"results":[{"error":"retention policy conflicts with an existing policy"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"retention policy conflicts with an existing policy"}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both with custom shard",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp3","1h0m0s","30m0s",1,false]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true],["rp3","1h0m0s","30m0s",1,false]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "dropping non-default custom shard retention policy succeed",
|
||||
command: `DROP RETENTION POLICY rp3 ON db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show just default",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["rp0","2h0m0s","1h0m0s",3,true]]}]}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Ensure retention policy with unacceptable retention cannot be created",
|
||||
command: `CREATE RETENTION POLICY rp4 ON db0 DURATION 1s REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"retention policy duration must be at least 1h0m0s"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"retention policy duration must be at least 1h0m0s"}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "Check error when deleting retention policy on non-existent database",
|
||||
command: `DROP RETENTION POLICY rp1 ON mydatabase`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
},
|
||||
&Query{
|
||||
name: "Ensure retention policy for non existing db is not created",
|
||||
command: `CREATE RETENTION POLICY rp0 ON nodb DURATION 1h REPLICATION 1`,
|
||||
exp: `{"results":[{"error":"database not found: nodb"}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"error":"database not found: nodb"}]}`,
|
||||
once: true,
|
||||
},
|
||||
},
|
||||
|
@ -464,13 +464,13 @@ func init() {
|
|||
&Query{
|
||||
name: "create database should succeed",
|
||||
command: `CREATE DATABASE db0`,
|
||||
exp: `{"results":[{}]}`,
|
||||
exp: `{"results":[{"statement_id":0}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policies should return auto-created policy",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
exp: `{"results":[{"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["autogen","0s","168h0m0s",1,true]]}]}]}`,
|
||||
exp: `{"results":[{"statement_id":0,"series":[{"columns":["name","duration","shardGroupDuration","replicaN","default"],"values":[["autogen","0s","168h0m0s",1,true]]}]}]}`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -412,7 +412,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
}
|
||||
|
||||
for {
|
||||
row, err := em.Emit()
|
||||
row, partial, err := em.Emit()
|
||||
if err != nil {
|
||||
return err
|
||||
} else if row == nil {
|
||||
|
@ -437,6 +437,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
|
|||
result := &influxql.Result{
|
||||
StatementID: ctx.StatementID,
|
||||
Series: []*models.Row{row},
|
||||
Partial: partial,
|
||||
}
|
||||
|
||||
// Send results or exit if closing.
|
||||
|
|
|
@ -41,10 +41,10 @@ func (e *Emitter) Close() error {
|
|||
}
|
||||
|
||||
// Emit returns the next row from the iterators.
|
||||
func (e *Emitter) Emit() (*models.Row, error) {
|
||||
func (e *Emitter) Emit() (*models.Row, bool, error) {
|
||||
// Immediately end emission if there are no iterators.
|
||||
if len(e.itrs) == 0 {
|
||||
return nil, nil
|
||||
return nil, false, nil
|
||||
}
|
||||
|
||||
// Continually read from iterators until they are exhausted.
|
||||
|
@ -52,11 +52,11 @@ func (e *Emitter) Emit() (*models.Row, error) {
|
|||
// Fill buffer. Return row if no more points remain.
|
||||
t, name, tags, err := e.loadBuf()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, false, err
|
||||
} else if t == ZeroTime {
|
||||
row := e.row
|
||||
e.row = nil
|
||||
return row, nil
|
||||
return row, false, nil
|
||||
}
|
||||
|
||||
// Read next set of values from all iterators at a given time/name/tags.
|
||||
|
@ -65,7 +65,7 @@ func (e *Emitter) Emit() (*models.Row, error) {
|
|||
if values == nil {
|
||||
row := e.row
|
||||
e.row = nil
|
||||
return row, nil
|
||||
return row, false, nil
|
||||
}
|
||||
|
||||
// If there's no row yet then create one.
|
||||
|
@ -74,12 +74,18 @@ func (e *Emitter) Emit() (*models.Row, error) {
|
|||
// Otherwise return existing row and add values to next emitted row.
|
||||
if e.row == nil {
|
||||
e.createRow(name, tags, values)
|
||||
} else if e.row.Name == name && e.tags.Equals(&tags) && (e.chunkSize <= 0 || len(e.row.Values) < e.chunkSize) {
|
||||
} else if e.row.Name == name && e.tags.Equals(&tags) {
|
||||
if e.chunkSize > 0 && len(e.row.Values) >= e.chunkSize {
|
||||
row := e.row
|
||||
row.Partial = true
|
||||
e.createRow(name, tags, values)
|
||||
return row, true, nil
|
||||
}
|
||||
e.row.Values = append(e.row.Values, values)
|
||||
} else {
|
||||
row := e.row
|
||||
e.createRow(name, tags, values)
|
||||
return row, nil
|
||||
return row, true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
e.Columns = []string{"col1", "col2"}
|
||||
|
||||
// Verify the cpu region=west is emitted first.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(0): %s", err)
|
||||
} else if !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
|
@ -42,7 +42,7 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify the cpu region=north is emitted next.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(1): %s", err)
|
||||
} else if !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
|
@ -56,7 +56,7 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify the mem series is emitted last.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(2): %s", err)
|
||||
} else if !deep.Equal(row, &models.Row{
|
||||
Name: "mem",
|
||||
|
@ -69,7 +69,7 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify EOF.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(eof): %s", err)
|
||||
} else if row != nil {
|
||||
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
|
||||
|
@ -88,7 +88,7 @@ func TestEmitter_ChunkSize(t *testing.T) {
|
|||
e.Columns = []string{"col1"}
|
||||
|
||||
// Verify the cpu region=west is emitted first.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(0): %s", err)
|
||||
} else if !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
|
@ -97,12 +97,13 @@ func TestEmitter_ChunkSize(t *testing.T) {
|
|||
Values: [][]interface{}{
|
||||
{time.Unix(0, 0).UTC(), float64(1)},
|
||||
},
|
||||
Partial: true,
|
||||
}) {
|
||||
t.Fatalf("unexpected row(0): %s", spew.Sdump(row))
|
||||
}
|
||||
|
||||
// Verify the cpu region=north is emitted next.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(1): %s", err)
|
||||
} else if !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
|
@ -116,7 +117,7 @@ func TestEmitter_ChunkSize(t *testing.T) {
|
|||
}
|
||||
|
||||
// Verify EOF.
|
||||
if row, err := e.Emit(); err != nil {
|
||||
if row, _, err := e.Emit(); err != nil {
|
||||
t.Fatalf("unexpected error(eof): %s", err)
|
||||
} else if row != nil {
|
||||
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
|
||||
|
|
|
@ -58,9 +58,10 @@ func ReadOnlyWarning(stmt string) *Message {
|
|||
type Result struct {
|
||||
// StatementID is just the statement's position in the query. It's used
|
||||
// to combine statement results if they're being buffered in memory.
|
||||
StatementID int `json:"-"`
|
||||
StatementID int
|
||||
Series models.Rows
|
||||
Messages []*Message
|
||||
Partial bool
|
||||
Err error
|
||||
}
|
||||
|
||||
|
@ -68,14 +69,18 @@ type Result struct {
|
|||
func (r *Result) MarshalJSON() ([]byte, error) {
|
||||
// Define a struct that outputs "error" as a string.
|
||||
var o struct {
|
||||
Series []*models.Row `json:"series,omitempty"`
|
||||
Messages []*Message `json:"messages,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
StatementID int `json:"statement_id"`
|
||||
Series []*models.Row `json:"series,omitempty"`
|
||||
Messages []*Message `json:"messages,omitempty"`
|
||||
Partial bool `json:"partial,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Copy fields to output struct.
|
||||
o.StatementID = r.StatementID
|
||||
o.Series = r.Series
|
||||
o.Messages = r.Messages
|
||||
o.Partial = r.Partial
|
||||
if r.Err != nil {
|
||||
o.Err = r.Err.Error()
|
||||
}
|
||||
|
@ -86,17 +91,21 @@ func (r *Result) MarshalJSON() ([]byte, error) {
|
|||
// UnmarshalJSON decodes the data into the Result struct
|
||||
func (r *Result) UnmarshalJSON(b []byte) error {
|
||||
var o struct {
|
||||
Series []*models.Row `json:"series,omitempty"`
|
||||
Messages []*Message `json:"messages,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
StatementID int `json:"statement_id"`
|
||||
Series []*models.Row `json:"series,omitempty"`
|
||||
Messages []*Message `json:"messages,omitempty"`
|
||||
Partial bool `json:"partial,omitempty"`
|
||||
Err string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
err := json.Unmarshal(b, &o)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.StatementID = o.StatementID
|
||||
r.Series = o.Series
|
||||
r.Messages = o.Messages
|
||||
r.Partial = o.Partial
|
||||
if o.Err != "" {
|
||||
r.Err = errors.New(o.Err)
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ type Row struct {
|
|||
Tags map[string]string `json:"tags,omitempty"`
|
||||
Columns []string `json:"columns,omitempty"`
|
||||
Values [][]interface{} `json:"values,omitempty"`
|
||||
Partial bool `json:"partial,omitempty"`
|
||||
}
|
||||
|
||||
// SameSeries returns true if r contains values for the same series as o.
|
||||
|
|
|
@ -457,13 +457,33 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
continue
|
||||
}
|
||||
|
||||
// Limit the number of rows that can be returned in a non-chunked response.
|
||||
// This is to prevent the server from going OOM when returning a large response.
|
||||
// If you want to return more than the default chunk size, then use chunking
|
||||
// to process multiple blobs.
|
||||
rows += len(r.Series)
|
||||
if h.Config.MaxRowLimit > 0 && rows > h.Config.MaxRowLimit {
|
||||
break
|
||||
// Limit the number of rows that can be returned in a non-chunked
|
||||
// response. This is to prevent the server from going OOM when
|
||||
// returning a large response. If you want to return more than the
|
||||
// default chunk size, then use chunking to process multiple blobs.
|
||||
// Iterate through the series in this result to count the rows and
|
||||
// truncate any rows we shouldn't return.
|
||||
if h.Config.MaxRowLimit > 0 {
|
||||
for i, series := range r.Series {
|
||||
n := h.Config.MaxRowLimit - rows
|
||||
if n < len(series.Values) {
|
||||
// We have reached the maximum number of values. Truncate
|
||||
// the values within this row.
|
||||
series.Values = series.Values[:n]
|
||||
// Since this was truncated, it will always be a partial return.
|
||||
// Add this so the client knows we truncated the response.
|
||||
series.Partial = true
|
||||
}
|
||||
rows += len(series.Values)
|
||||
|
||||
if rows >= h.Config.MaxRowLimit {
|
||||
// Drop any remaining series since we have already reached the row limit.
|
||||
if i < len(r.Series) {
|
||||
r.Series = r.Series[:i+1]
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's not chunked so buffer results in memory.
|
||||
|
@ -499,9 +519,24 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
r.Series = r.Series[rowsMerged:]
|
||||
cr.Series = append(cr.Series, r.Series...)
|
||||
cr.Messages = append(cr.Messages, r.Messages...)
|
||||
cr.Partial = r.Partial
|
||||
} else {
|
||||
resp.Results = append(resp.Results, r)
|
||||
}
|
||||
|
||||
// Drop out of this loop and do not process further results when we hit the row limit.
|
||||
if h.Config.MaxRowLimit > 0 && rows >= h.Config.MaxRowLimit {
|
||||
// If the result is marked as partial, remove that partial marking
|
||||
// here. While the series is partial and we would normally have
|
||||
// tried to return the rest in the next chunk, we are not using
|
||||
// chunking and are truncating the series so we don't want to
|
||||
// signal to the client that we plan on sending another JSON blob
|
||||
// with another result. The series, on the other hand, still
|
||||
// returns partial true if it was truncated or had more data to
|
||||
// send in a future chunk.
|
||||
r.Partial = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If it's not chunked we buffered everything in memory, so write it out
|
||||
|
|
|
@ -39,7 +39,7 @@ func TestHandler_Query(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series0"}]},{"series":[{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ func TestHandler_Query_File(t *testing.T) {
|
|||
h.ServeHTTP(w, r)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series0"}]},{"series":[{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ func TestHandler_Query_Auth(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?u=user1&p=abcd&db=foo&q=SELECT+*+FROM+bar", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series0"}]},{"series":[{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
|
||||
|
@ -155,7 +155,7 @@ func TestHandler_Query_Auth(t *testing.T) {
|
|||
h.ServeHTTP(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d: %s", w.Code, w.Body.String())
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series0"}]},{"series":[{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]},{"statement_id":2,"series":[{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
|
||||
|
@ -244,7 +244,7 @@ func TestHandler_Query_MergeResults(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series0"},{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series0"},{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"series":[{"name":"series1"}]}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
@ -283,8 +283,8 @@ func TestHandler_Query_Chunked(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if w.Body.String() != `{"results":[{"series":[{"name":"series0"}]}]}
|
||||
{"results":[{"series":[{"name":"series1"}]}]}
|
||||
} else if w.Body.String() != `{"results":[{"statement_id":1,"series":[{"name":"series0"}]}]}
|
||||
{"results":[{"statement_id":1,"series":[{"name":"series1"}]}]}
|
||||
` {
|
||||
t.Fatalf("unexpected body: %s", w.Body.String())
|
||||
}
|
||||
|
@ -435,7 +435,7 @@ func TestHandler_Query_ErrResult(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SHOW+SERIES+from+bin", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"error":"measurement not found"}]}` {
|
||||
} else if body := strings.TrimSpace(w.Body.String()); body != `{"results":[{"statement_id":0,"error":"measurement not found"}]}` {
|
||||
t.Fatalf("unexpected body: %s", body)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue