rollback bolt tx on mapper open error
This commit fixes `SelectMapper.Open()` so that it properly rolls back transactions. Previously, this caused transactions to stay open indefinitely which caused mmap resizes to hang indefinitely.pull/4034/head
parent
ca94ad8ef4
commit
fd9be63b4e
319
tsdb/mapper.go
319
tsdb/mapper.go
|
@ -105,8 +105,6 @@ func (lm *SelectMapper) Open() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
|
||||
// Get a read-only transaction.
|
||||
tx, err := lm.shard.engine.Begin(false)
|
||||
if err != nil {
|
||||
|
@ -114,185 +112,192 @@ func (lm *SelectMapper) Open() error {
|
|||
}
|
||||
lm.tx = tx
|
||||
|
||||
if s, ok := lm.stmt.(*influxql.SelectStatement); ok {
|
||||
stmt, err := lm.rewriteSelectStatement(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lm.selectStmt = stmt
|
||||
lm.rawMode = (s.IsRawQuery && !s.HasDistinct()) || s.IsSimpleDerivative()
|
||||
} else {
|
||||
return lm.openMeta()
|
||||
}
|
||||
|
||||
// Set all time-related parameters on the mapper.
|
||||
lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.selectStmt.Condition)
|
||||
|
||||
if !lm.rawMode {
|
||||
if err := lm.initializeMapFunctions(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For GROUP BY time queries, limit the number of data points returned by the limit and offset
|
||||
d, err := lm.selectStmt.GroupByInterval()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lm.intervalSize = d.Nanoseconds()
|
||||
if lm.queryTMin == 0 || lm.intervalSize == 0 {
|
||||
lm.numIntervals = 1
|
||||
lm.intervalSize = lm.queryTMax - lm.queryTMin
|
||||
if err := func() error {
|
||||
if s, ok := lm.stmt.(*influxql.SelectStatement); ok {
|
||||
stmt, err := lm.rewriteSelectStatement(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lm.selectStmt = stmt
|
||||
lm.rawMode = (s.IsRawQuery && !s.HasDistinct()) || s.IsSimpleDerivative()
|
||||
} else {
|
||||
intervalTop := lm.queryTMax/lm.intervalSize*lm.intervalSize + lm.intervalSize
|
||||
intervalBottom := lm.queryTMin / lm.intervalSize * lm.intervalSize
|
||||
lm.numIntervals = int((intervalTop - intervalBottom) / lm.intervalSize)
|
||||
return lm.openMeta()
|
||||
}
|
||||
|
||||
if lm.selectStmt.Limit > 0 || lm.selectStmt.Offset > 0 {
|
||||
// ensure that the offset isn't higher than the number of points we'd get
|
||||
if lm.selectStmt.Offset > lm.numIntervals {
|
||||
// Set all time-related parameters on the mapper.
|
||||
lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.selectStmt.Condition)
|
||||
|
||||
if !lm.rawMode {
|
||||
if err := lm.initializeMapFunctions(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For GROUP BY time queries, limit the number of data points returned by the limit and offset
|
||||
d, err := lm.selectStmt.GroupByInterval()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lm.intervalSize = d.Nanoseconds()
|
||||
if lm.queryTMin == 0 || lm.intervalSize == 0 {
|
||||
lm.numIntervals = 1
|
||||
lm.intervalSize = lm.queryTMax - lm.queryTMin
|
||||
} else {
|
||||
intervalTop := lm.queryTMax/lm.intervalSize*lm.intervalSize + lm.intervalSize
|
||||
intervalBottom := lm.queryTMin / lm.intervalSize * lm.intervalSize
|
||||
lm.numIntervals = int((intervalTop - intervalBottom) / lm.intervalSize)
|
||||
}
|
||||
|
||||
if lm.selectStmt.Limit > 0 || lm.selectStmt.Offset > 0 {
|
||||
// ensure that the offset isn't higher than the number of points we'd get
|
||||
if lm.selectStmt.Offset > lm.numIntervals {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Take the lesser of either the pre computed number of GROUP BY buckets that
|
||||
// will be in the result or the limit passed in by the user
|
||||
if lm.selectStmt.Limit < lm.numIntervals {
|
||||
lm.numIntervals = lm.selectStmt.Limit
|
||||
}
|
||||
}
|
||||
|
||||
// If we are exceeding our MaxGroupByPoints error out
|
||||
if lm.numIntervals > MaxGroupByPoints {
|
||||
return errors.New("too many points in the group by interval. maybe you forgot to specify a where time clause?")
|
||||
}
|
||||
|
||||
// Ensure that the start time for the results is on the start of the window.
|
||||
lm.queryTMinWindow = lm.queryTMin
|
||||
if lm.intervalSize > 0 && lm.numIntervals > 1 {
|
||||
lm.queryTMinWindow = lm.queryTMinWindow / lm.intervalSize * lm.intervalSize
|
||||
}
|
||||
}
|
||||
|
||||
selectFields := newStringSet()
|
||||
selectTags := newStringSet()
|
||||
whereFields := newStringSet()
|
||||
|
||||
// Create the TagSet cursors for the Mapper.
|
||||
for _, src := range lm.selectStmt.Sources {
|
||||
mm, ok := src.(*influxql.Measurement)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid source type: %#v", src)
|
||||
}
|
||||
|
||||
m := lm.shard.index.Measurement(mm.Name)
|
||||
if m == nil {
|
||||
// This shard have never received data for the measurement. No Mapper
|
||||
// required.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Take the lesser of either the pre computed number of GROUP BY buckets that
|
||||
// will be in the result or the limit passed in by the user
|
||||
if lm.selectStmt.Limit < lm.numIntervals {
|
||||
lm.numIntervals = lm.selectStmt.Limit
|
||||
// Validate that ANY GROUP BY is not a field for thie measurement.
|
||||
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If we are exceeding our MaxGroupByPoints error out
|
||||
if lm.numIntervals > MaxGroupByPoints {
|
||||
return errors.New("too many points in the group by interval. maybe you forgot to specify a where time clause?")
|
||||
}
|
||||
// Create tagset cursors and determine various field types within SELECT statement.
|
||||
tsf, err := createTagSetsAndFields(m, lm.selectStmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tagSets := tsf.tagSets
|
||||
selectFields.add(tsf.selectFields...)
|
||||
selectTags.add(tsf.selectTags...)
|
||||
whereFields.add(tsf.whereFields...)
|
||||
|
||||
// Ensure that the start time for the results is on the start of the window.
|
||||
lm.queryTMinWindow = lm.queryTMin
|
||||
if lm.intervalSize > 0 && lm.numIntervals > 1 {
|
||||
lm.queryTMinWindow = lm.queryTMinWindow / lm.intervalSize * lm.intervalSize
|
||||
}
|
||||
}
|
||||
// If we only have tags in our select clause we just return
|
||||
if len(selectFields) == 0 && len(selectTags) > 0 {
|
||||
return fmt.Errorf("statement must have at least one field in select clause")
|
||||
}
|
||||
|
||||
selectFields := newStringSet()
|
||||
selectTags := newStringSet()
|
||||
whereFields := newStringSet()
|
||||
// Validate that any GROUP BY is not on a field
|
||||
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create the TagSet cursors for the Mapper.
|
||||
for _, src := range lm.selectStmt.Sources {
|
||||
mm, ok := src.(*influxql.Measurement)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid source type: %#v", src)
|
||||
}
|
||||
// SLIMIT and SOFFSET the unique series
|
||||
if lm.selectStmt.SLimit > 0 || lm.selectStmt.SOffset > 0 {
|
||||
if lm.selectStmt.SOffset > len(tagSets) {
|
||||
tagSets = nil
|
||||
} else {
|
||||
if lm.selectStmt.SOffset+lm.selectStmt.SLimit > len(tagSets) {
|
||||
lm.selectStmt.SLimit = len(tagSets) - lm.selectStmt.SOffset
|
||||
}
|
||||
|
||||
m := lm.shard.index.Measurement(mm.Name)
|
||||
if m == nil {
|
||||
// This shard have never received data for the measurement. No Mapper
|
||||
// required.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate that ANY GROUP BY is not a field for thie measurement.
|
||||
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create tagset cursors and determine various field types within SELECT statement.
|
||||
tsf, err := createTagSetsAndFields(m, lm.selectStmt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tagSets := tsf.tagSets
|
||||
selectFields.add(tsf.selectFields...)
|
||||
selectTags.add(tsf.selectTags...)
|
||||
whereFields.add(tsf.whereFields...)
|
||||
|
||||
// If we only have tags in our select clause we just return
|
||||
if len(selectFields) == 0 && len(selectTags) > 0 {
|
||||
return fmt.Errorf("statement must have at least one field in select clause")
|
||||
}
|
||||
|
||||
// Validate that any GROUP BY is not on a field
|
||||
if err := m.ValidateGroupBy(lm.selectStmt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// SLIMIT and SOFFSET the unique series
|
||||
if lm.selectStmt.SLimit > 0 || lm.selectStmt.SOffset > 0 {
|
||||
if lm.selectStmt.SOffset > len(tagSets) {
|
||||
tagSets = nil
|
||||
} else {
|
||||
if lm.selectStmt.SOffset+lm.selectStmt.SLimit > len(tagSets) {
|
||||
lm.selectStmt.SLimit = len(tagSets) - lm.selectStmt.SOffset
|
||||
tagSets = tagSets[lm.selectStmt.SOffset : lm.selectStmt.SOffset+lm.selectStmt.SLimit]
|
||||
}
|
||||
|
||||
tagSets = tagSets[lm.selectStmt.SOffset : lm.selectStmt.SOffset+lm.selectStmt.SLimit]
|
||||
}
|
||||
}
|
||||
|
||||
// For aggregate functions, we iterate the cursors in forward order but return the
|
||||
// time bucket results in reverse order. This simplifies the aggregate code in that
|
||||
// they do not need to hand forward and revers semantics. For raw queries, we do need
|
||||
// iterate in reverse order if using order by time desc.
|
||||
direction := Forward
|
||||
if lm.rawMode {
|
||||
direction = lm.timeDirection()
|
||||
}
|
||||
// Create all cursors for reading the data from this shard.
|
||||
for _, t := range tagSets {
|
||||
cursors := []*seriesCursor{}
|
||||
|
||||
for i, key := range t.SeriesKeys {
|
||||
c := lm.tx.Cursor(key, direction)
|
||||
if c == nil {
|
||||
// No data exists for this key.
|
||||
continue
|
||||
}
|
||||
seriesTags := lm.shard.index.TagsForSeries(key)
|
||||
cm := newSeriesCursor(c, t.Filters[i], seriesTags)
|
||||
cursors = append(cursors, cm)
|
||||
}
|
||||
|
||||
tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name))
|
||||
// For aggregate functions, we iterate the cursors in forward order but return the
|
||||
// time bucket results in reverse order. This simplifies the aggregate code in that
|
||||
// they do not need to hand forward and revers semantics. For raw queries, we do need
|
||||
// iterate in reverse order if using order by time desc.
|
||||
direction := Forward
|
||||
if lm.rawMode {
|
||||
tsc.pointHeap = newPointHeap()
|
||||
//Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
var k int64
|
||||
var v []byte
|
||||
if direction.Forward() {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
} else {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMax)
|
||||
}
|
||||
direction = lm.timeDirection()
|
||||
}
|
||||
// Create all cursors for reading the data from this shard.
|
||||
for _, t := range tagSets {
|
||||
cursors := []*seriesCursor{}
|
||||
|
||||
if k == -1 {
|
||||
k, v = tsc.cursors[i].Next()
|
||||
}
|
||||
|
||||
if k == -1 {
|
||||
for i, key := range t.SeriesKeys {
|
||||
c := lm.tx.Cursor(key, direction)
|
||||
if c == nil {
|
||||
// No data exists for this key.
|
||||
continue
|
||||
}
|
||||
p := &pointHeapItem{
|
||||
timestamp: k,
|
||||
value: v,
|
||||
cursor: tsc.cursors[i],
|
||||
}
|
||||
heap.Push(tsc.pointHeap, p)
|
||||
seriesTags := lm.shard.index.TagsForSeries(key)
|
||||
cm := newSeriesCursor(c, t.Filters[i], seriesTags)
|
||||
cursors = append(cursors, cm)
|
||||
}
|
||||
|
||||
tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name))
|
||||
if lm.rawMode {
|
||||
tsc.pointHeap = newPointHeap()
|
||||
//Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
var k int64
|
||||
var v []byte
|
||||
if direction.Forward() {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
} else {
|
||||
k, v = tsc.cursors[i].SeekTo(lm.queryTMax)
|
||||
}
|
||||
|
||||
if k == -1 {
|
||||
k, v = tsc.cursors[i].Next()
|
||||
}
|
||||
|
||||
if k == -1 {
|
||||
continue
|
||||
}
|
||||
p := &pointHeapItem{
|
||||
timestamp: k,
|
||||
value: v,
|
||||
cursor: tsc.cursors[i],
|
||||
}
|
||||
heap.Push(tsc.pointHeap, p)
|
||||
}
|
||||
}
|
||||
lm.cursors = append(lm.cursors, tsc)
|
||||
}
|
||||
lm.cursors = append(lm.cursors, tsc)
|
||||
sort.Sort(tagSetCursors(lm.cursors))
|
||||
}
|
||||
sort.Sort(tagSetCursors(lm.cursors))
|
||||
}
|
||||
|
||||
lm.selectFields = selectFields.list()
|
||||
lm.selectTags = selectTags.list()
|
||||
lm.whereFields = whereFields.list()
|
||||
lm.selectFields = selectFields.list()
|
||||
lm.selectTags = selectTags.list()
|
||||
lm.whereFields = whereFields.list()
|
||||
|
||||
// If the query does not aggregate, then at least 1 SELECT field should be present.
|
||||
if lm.rawMode && len(lm.selectFields) == 0 {
|
||||
// None of the SELECT fields exist in this data. Wipe out all tagset cursors.
|
||||
lm.cursors = nil
|
||||
// If the query does not aggregate, then at least 1 SELECT field should be present.
|
||||
if lm.rawMode && len(lm.selectFields) == 0 {
|
||||
// None of the SELECT fields exist in this data. Wipe out all tagset cursors.
|
||||
lm.cursors = nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}(); err != nil {
|
||||
lm.tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue