Merge pull request #8893 from benbjohnson/handle-nil-measurement-iterator

Handle nil MeasurementIterator.
pull/9022/head
Ben Johnson 2017-10-26 11:26:06 -06:00 committed by GitHub
commit da46ac54e8
5 changed files with 75 additions and 41 deletions

View File

@ -43,6 +43,7 @@
- [#8995](https://github.com/influxdata/influxdb/pull/8995): Sort & validate TSI key value insertion.
- [#8968](https://github.com/influxdata/influxdb/issues/8968): Make client errors more helpful on downstream errs. Thanks @darkliquid!
- [#8984](https://github.com/influxdata/influxdb/pull/8984): EXACT and estimated CARDINALITY queries.
- [#8893](https://github.com/influxdata/influxdb/pull/8893): Handle nil MeasurementIterator.
### Bugfixes

View File

@ -229,19 +229,20 @@ func (cmd *Command) printMeasurements(fs *tsi1.FileSet) error {
fmt.Fprintln(tw, "Measurement\t")
// Iterate over each series.
itr := fs.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
if cmd.measurementFilter != nil && !cmd.measurementFilter.Match(e.Name()) {
continue
}
if itr := fs.MeasurementIterator(); itr != nil {
for e := itr.Next(); e != nil; e = itr.Next() {
if cmd.measurementFilter != nil && !cmd.measurementFilter.Match(e.Name()) {
continue
}
fmt.Fprintf(tw, "%s\t%v\n", e.Name(), deletedString(e.Deleted()))
if err := tw.Flush(); err != nil {
return err
}
fmt.Fprintf(tw, "%s\t%v\n", e.Name(), deletedString(e.Deleted()))
if err := tw.Flush(); err != nil {
return err
}
if err := cmd.printTagKeys(fs, e.Name()); err != nil {
return err
if err := cmd.printTagKeys(fs, e.Name()); err != nil {
return err
}
}
}
@ -364,21 +365,23 @@ func (cmd *Command) printIndexFileSummary(f *tsi1.IndexFile) error {
var measurementN, measurementSeriesN, measurementSeriesSize uint64
var keyN uint64
var valueN, valueSeriesN, valueSeriesSize uint64
mitr := f.MeasurementIterator()
for me, _ := mitr.Next().(*tsi1.MeasurementBlockElem); me != nil; me, _ = mitr.Next().(*tsi1.MeasurementBlockElem) {
kitr := f.TagKeyIterator(me.Name())
for ke, _ := kitr.Next().(*tsi1.TagBlockKeyElem); ke != nil; ke, _ = kitr.Next().(*tsi1.TagBlockKeyElem) {
vitr := f.TagValueIterator(me.Name(), ke.Key())
for ve, _ := vitr.Next().(*tsi1.TagBlockValueElem); ve != nil; ve, _ = vitr.Next().(*tsi1.TagBlockValueElem) {
valueN++
valueSeriesN += uint64(ve.SeriesN())
valueSeriesSize += uint64(len(ve.SeriesData()))
if mitr := f.MeasurementIterator(); mitr != nil {
for me, _ := mitr.Next().(*tsi1.MeasurementBlockElem); me != nil; me, _ = mitr.Next().(*tsi1.MeasurementBlockElem) {
kitr := f.TagKeyIterator(me.Name())
for ke, _ := kitr.Next().(*tsi1.TagBlockKeyElem); ke != nil; ke, _ = kitr.Next().(*tsi1.TagBlockKeyElem) {
vitr := f.TagValueIterator(me.Name(), ke.Key())
for ve, _ := vitr.Next().(*tsi1.TagBlockValueElem); ve != nil; ve, _ = vitr.Next().(*tsi1.TagBlockValueElem) {
valueN++
valueSeriesN += uint64(ve.SeriesN())
valueSeriesSize += uint64(len(ve.SeriesData()))
}
keyN++
}
keyN++
measurementN++
measurementSeriesN += uint64(me.SeriesN())
measurementSeriesSize += uint64(len(me.SeriesData()))
}
measurementN++
measurementSeriesN += uint64(me.SeriesN())
measurementSeriesSize += uint64(len(me.SeriesData()))
}
// Write stats.

View File

@ -528,9 +528,13 @@ func (fs *FileSet) MeasurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
return fs.measurementNamesByExpr(expr)
}
itr := fs.MeasurementIterator()
if itr == nil {
return nil, nil
}
// Iterate over all measurements if no condition exists.
var names [][]byte
itr := fs.MeasurementIterator()
for e := itr.Next(); e != nil; e = itr.Next() {
names = append(names, e.Name())
}
@ -605,8 +609,12 @@ func (fs *FileSet) measurementNamesByExpr(expr influxql.Expr) ([][]byte, error)
// measurementNamesByNameFilter returns matching measurement names in sorted order.
func (fs *FileSet) measurementNamesByNameFilter(op influxql.Token, val string, regex *regexp.Regexp) [][]byte {
var names [][]byte
itr := fs.MeasurementIterator()
if itr == nil {
return nil
}
var names [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
var matched bool
switch op {
@ -632,6 +640,10 @@ func (fs *FileSet) measurementNamesByTagFilter(op influxql.Token, key, val strin
var names [][]byte
mitr := fs.MeasurementIterator()
if mitr == nil {
return nil
}
for me := mitr.Next(); me != nil; me = mitr.Next() {
// If the operator is non-regex, only check the specified value.
var tagMatch bool

View File

@ -415,6 +415,10 @@ func (i *Index) MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error) {
defer fs.Release()
itr := fs.MeasurementIterator()
if itr == nil {
return nil, nil
}
var a [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
if re.Match(e.Name()) {
@ -1200,6 +1204,10 @@ func (itr *seriesPointIterator) Next() (*query.FloatPoint, error) {
// Create new series iterator, if necessary.
// Exit if there are no measurements remaining.
if itr.sitr == nil {
if itr.mitr == nil {
return nil, nil
}
m := itr.mitr.Next()
if m == nil {
return nil, nil

View File

@ -51,6 +51,10 @@ func (p IndexFiles) Files() []File {
// MeasurementNames returns a sorted list of all measurement names for all files.
func (p *IndexFiles) MeasurementNames() [][]byte {
itr := p.MeasurementIterator()
if itr == nil {
return nil
}
var names [][]byte
for e := itr.Next(); e != nil; e = itr.Next() {
names = append(names, bytesutil.Clone(e.Name()))
@ -219,6 +223,10 @@ func (p IndexFiles) writeSeriesBlockTo(w io.Writer, m, k uint64, info *indexComp
func (p IndexFiles) writeTagsetsTo(w io.Writer, info *indexCompactInfo, n *int64) error {
mitr := p.MeasurementIterator()
if mitr == nil {
return nil
}
for m := mitr.Next(); m != nil; m = mitr.Next() {
if err := p.writeTagsetTo(w, m.Name(), info, n); err != nil {
return err
@ -290,24 +298,26 @@ func (p IndexFiles) writeMeasurementBlockTo(w io.Writer, info *indexCompactInfo,
// Add measurement data & compute sketches.
mitr := p.MeasurementIterator()
for m := mitr.Next(); m != nil; m = mitr.Next() {
name := m.Name()
if mitr != nil {
for m := mitr.Next(); m != nil; m = mitr.Next() {
name := m.Name()
// Look-up series ids.
itr := p.MeasurementSeriesIterator(name)
var seriesIDs []uint32
for e := itr.Next(); e != nil; e = itr.Next() {
seriesID, _ := info.sblk.Offset(e.Name(), e.Tags(), seriesKey[:0])
if seriesID == 0 {
panic(fmt.Sprintf("expected series id: %s %s", e.Name(), e.Tags().String()))
// Look-up series ids.
itr := p.MeasurementSeriesIterator(name)
var seriesIDs []uint32
for e := itr.Next(); e != nil; e = itr.Next() {
seriesID, _ := info.sblk.Offset(e.Name(), e.Tags(), seriesKey[:0])
if seriesID == 0 {
panic(fmt.Sprintf("expected series id: %s %s", e.Name(), e.Tags().String()))
}
seriesIDs = append(seriesIDs, seriesID)
}
seriesIDs = append(seriesIDs, seriesID)
}
sort.Sort(uint32Slice(seriesIDs))
sort.Sort(uint32Slice(seriesIDs))
// Add measurement to writer.
pos := info.tagSets[string(name)]
mw.Add(name, m.Deleted(), pos.offset, pos.size, seriesIDs)
// Add measurement to writer.
pos := info.tagSets[string(name)]
mw.Add(name, m.Deleted(), pos.offset, pos.size, seriesIDs)
}
}
// Flush data to writer.