Merge pull request #6126 from influxdata/js-6115-chunked-query-support-mid-series
Support chunking in the middle of a series in the emitterpull/6167/head
commit
eb467d8d7f
|
@ -19,6 +19,7 @@
|
|||
- [#6149](https://github.com/influxdata/influxdb/pull/6149): Kill running queries when server is shutdown.
|
||||
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
|
||||
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.
|
||||
- [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -481,7 +481,7 @@ func (e *QueryExecutor) executeSelectStatement(stmt *influxql.SelectStatement, c
|
|||
}
|
||||
|
||||
// Generate a row emitter from the iterator set.
|
||||
em := influxql.NewEmitter(itrs, stmt.TimeAscending())
|
||||
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), chunkSize)
|
||||
em.Columns = stmt.ColumnNames()
|
||||
em.OmitTime = stmt.OmitTime
|
||||
defer em.Close()
|
||||
|
|
|
@ -212,6 +212,7 @@ reporting-disabled = false
|
|||
pprof-enabled = false
|
||||
https-enabled = false
|
||||
https-certificate = "/etc/ssl/influxdb.pem"
|
||||
max-row-limit = 10000
|
||||
|
||||
###
|
||||
### [[graphite]]
|
||||
|
|
|
@ -12,6 +12,7 @@ type Emitter struct {
|
|||
buf []Point
|
||||
itrs []Iterator
|
||||
ascending bool
|
||||
chunkSize int
|
||||
|
||||
tags Tags
|
||||
row *models.Row
|
||||
|
@ -25,11 +26,12 @@ type Emitter struct {
|
|||
}
|
||||
|
||||
// NewEmitter returns a new instance of Emitter that pulls from itrs.
|
||||
func NewEmitter(itrs []Iterator, ascending bool) *Emitter {
|
||||
func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter {
|
||||
return &Emitter{
|
||||
buf: make([]Point, len(itrs)),
|
||||
itrs: itrs,
|
||||
ascending: ascending,
|
||||
chunkSize: chunkSize,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,11 +67,12 @@ func (e *Emitter) Emit() *models.Row {
|
|||
}
|
||||
|
||||
// If there's no row yet then create one.
|
||||
// If the name and tags match the existing row, append to that row.
|
||||
// If the name and tags match the existing row, append to that row if
|
||||
// the number of values doesn't exceed the chunk size.
|
||||
// Otherwise return existing row and add values to next emitted row.
|
||||
if e.row == nil {
|
||||
e.createRow(name, tags, values)
|
||||
} else if e.row.Name == name && e.tags.Equals(&tags) {
|
||||
} else if e.row.Name == name && e.tags.Equals(&tags) && (e.chunkSize <= 0 || len(e.row.Values) < e.chunkSize) {
|
||||
e.row.Values = append(e.row.Values, values)
|
||||
} else {
|
||||
row := e.row
|
||||
|
|
|
@ -23,7 +23,7 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
{Name: "cpu", Tags: ParseTags("region=north"), Time: 0, Value: 4},
|
||||
{Name: "mem", Time: 4, Value: 5},
|
||||
}},
|
||||
}, true)
|
||||
}, true, 0)
|
||||
e.Columns = []string{"col1", "col2"}
|
||||
|
||||
// Verify the cpu region=west is emitted first.
|
||||
|
@ -67,3 +67,44 @@ func TestEmitter_Emit(t *testing.T) {
|
|||
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the emitter will limit the chunked output from a series.
|
||||
func TestEmitter_ChunkSize(t *testing.T) {
|
||||
// Build an emitter that pulls from one iterator with multiple points in the same series.
|
||||
e := influxql.NewEmitter([]influxql.Iterator{
|
||||
&FloatIterator{Points: []influxql.FloatPoint{
|
||||
{Name: "cpu", Tags: ParseTags("region=west"), Time: 0, Value: 1},
|
||||
{Name: "cpu", Tags: ParseTags("region=west"), Time: 1, Value: 2},
|
||||
}},
|
||||
}, true, 1)
|
||||
e.Columns = []string{"col1"}
|
||||
|
||||
// Verify the cpu region=west is emitted first.
|
||||
if row := e.Emit(); !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
Tags: map[string]string{"region": "west"},
|
||||
Columns: []string{"col1"},
|
||||
Values: [][]interface{}{
|
||||
{time.Unix(0, 0).UTC(), float64(1)},
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("unexpected row(0): %s", spew.Sdump(row))
|
||||
}
|
||||
|
||||
// Verify the cpu region=north is emitted next.
|
||||
if row := e.Emit(); !deep.Equal(row, &models.Row{
|
||||
Name: "cpu",
|
||||
Tags: map[string]string{"region": "west"},
|
||||
Columns: []string{"col1"},
|
||||
Values: [][]interface{}{
|
||||
{time.Unix(0, 1).UTC(), float64(2)},
|
||||
},
|
||||
}) {
|
||||
t.Fatalf("unexpected row(1): %s", spew.Sdump(row))
|
||||
}
|
||||
|
||||
// Verify EOF.
|
||||
if row := e.Emit(); row != nil {
|
||||
t.Fatalf("unexpected eof: %s", spew.Sdump(row))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ type Config struct {
|
|||
PprofEnabled bool `toml:"pprof-enabled"`
|
||||
HTTPSEnabled bool `toml:"https-enabled"`
|
||||
HTTPSCertificate string `toml:"https-certificate"`
|
||||
MaxRowLimit int `toml:"max-row-limit"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with default settings.
|
||||
|
@ -23,5 +24,6 @@ func NewConfig() Config {
|
|||
LogEnabled: true,
|
||||
HTTPSEnabled: false,
|
||||
HTTPSCertificate: "/etc/ssl/influxdb.pem",
|
||||
MaxRowLimit: DefaultChunkSize,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,11 +26,10 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
// DefaultChunkSize specifies the amount of data mappers will read
|
||||
// up to, before sending results back to the engine. This is the
|
||||
// default size in the number of values returned in a raw query.
|
||||
// DefaultChunkSize specifies the maximum number of points that will
|
||||
// be read before sending results back to the engine.
|
||||
//
|
||||
// Could be many more bytes depending on fields returned.
|
||||
// This has no relation to the number of bytes that are returned.
|
||||
DefaultChunkSize = 10000
|
||||
)
|
||||
|
||||
|
@ -75,17 +74,19 @@ type Handler struct {
|
|||
Logger *log.Logger
|
||||
loggingEnabled bool // Log every HTTP access.
|
||||
WriteTrace bool // Detailed logging of write path
|
||||
rowLimit int
|
||||
statMap *expvar.Map
|
||||
}
|
||||
|
||||
// NewHandler returns a new instance of handler with routes.
|
||||
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, statMap *expvar.Map) *Handler {
|
||||
func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, rowLimit int, statMap *expvar.Map) *Handler {
|
||||
h := &Handler{
|
||||
mux: pat.New(),
|
||||
requireAuthentication: requireAuthentication,
|
||||
Logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
|
||||
loggingEnabled: loggingEnabled,
|
||||
WriteTrace: writeTrace,
|
||||
rowLimit: rowLimit,
|
||||
statMap: statMap,
|
||||
}
|
||||
|
||||
|
@ -284,7 +285,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
chunked := (q.Get("chunked") == "true")
|
||||
chunkSize := DefaultChunkSize
|
||||
if chunked {
|
||||
if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil {
|
||||
if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil && int(n) > 0 {
|
||||
chunkSize = int(n)
|
||||
}
|
||||
}
|
||||
|
@ -312,6 +313,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
// pull all results from the channel
|
||||
rows := 0
|
||||
for r := range results {
|
||||
// Ignore nil results.
|
||||
if r == nil {
|
||||
|
@ -328,11 +330,23 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.
|
|||
n, _ := w.Write(MarshalJSON(Response{
|
||||
Results: []*influxql.Result{r},
|
||||
}, pretty))
|
||||
if !pretty {
|
||||
w.Write([]byte("\n"))
|
||||
}
|
||||
h.statMap.Add(statQueryRequestBytesTransmitted, int64(n))
|
||||
w.(http.Flusher).Flush()
|
||||
continue
|
||||
}
|
||||
|
||||
// Limit the number of rows that can be returned in a non-chunked response.
|
||||
// This is to prevent the server from going OOM when returning a large response.
|
||||
// If you want to return more than the default chunk size, then use chunking
|
||||
// to process multiple blobs.
|
||||
rows += len(r.Series)
|
||||
if h.rowLimit > 0 && rows > h.rowLimit {
|
||||
break
|
||||
}
|
||||
|
||||
// It's not chunked so buffer results in memory.
|
||||
// Results for statements need to be combined together.
|
||||
// We need to check if this new result is for the same statement as
|
||||
|
|
|
@ -113,7 +113,9 @@ func TestHandler_Query_Chunked(t *testing.T) {
|
|||
h.ServeHTTP(w, MustNewJSONRequest("GET", "/query?db=foo&q=SELECT+*+FROM+bar&chunked=true&chunk_size=2", nil))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("unexpected status: %d", w.Code)
|
||||
} else if w.Body.String() != `{"results":[{"series":[{"name":"series0"}]}]}{"results":[{"series":[{"name":"series1"}]}]}` {
|
||||
} else if w.Body.String() != `{"results":[{"series":[{"name":"series0"}]}]}
|
||||
{"results":[{"series":[{"name":"series1"}]}]}
|
||||
` {
|
||||
t.Fatalf("unexpected body: %s", w.Body.String())
|
||||
}
|
||||
}
|
||||
|
@ -287,7 +289,7 @@ type Handler struct {
|
|||
func NewHandler(requireAuthentication bool) *Handler {
|
||||
statMap := influxdb.NewStatistics("httpd", "httpd", nil)
|
||||
h := &Handler{
|
||||
Handler: httpd.NewHandler(requireAuthentication, true, false, statMap),
|
||||
Handler: httpd.NewHandler(requireAuthentication, true, false, 0, statMap),
|
||||
}
|
||||
h.Handler.MetaClient = &h.MetaClient
|
||||
h.Handler.QueryExecutor = &h.QueryExecutor
|
||||
|
|
|
@ -64,6 +64,7 @@ func NewService(c Config) *Service {
|
|||
c.AuthEnabled,
|
||||
c.LogEnabled,
|
||||
c.WriteTracing,
|
||||
c.MaxRowLimit,
|
||||
statMap,
|
||||
),
|
||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||
|
|
Loading…
Reference in New Issue