diff --git a/tsdb/engine/tsm1/compact.gen.go b/tsdb/engine/tsm1/compact.gen.go index f1cf6ee6e0..c7890e7fcd 100644 --- a/tsdb/engine/tsm1/compact.gen.go +++ b/tsdb/engine/tsm1/compact.gen.go @@ -12,1026 +12,6 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" ) -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) mergeFloat() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedFloatValues) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.mergedFloatValues) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combineFloat(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combineFloat(dedup bool) blocks { - if dedup { - for len(k.mergedFloatValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = FloatValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = FloatValues(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = FloatValues(v).Exclude(ts.Min, ts.Max) - } - - k.mergedFloatValues = k.mergedFloatValues.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunkFloat(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedFloatValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeFloatBlock(k.blocks[i].b, &[]FloatValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = FloatValues(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedFloatValues = k.mergedFloatValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunkFloat(k.merged) - } -} - -func (k *tsmKeyIterator) chunkFloat(dst blocks) blocks { - if len(k.mergedFloatValues) > k.size { - values := k.mergedFloatValues[:k.size] - cb, err := FloatValues(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedFloatValues = k.mergedFloatValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedFloatValues) > 0 { - cb, err := FloatValues(k.mergedFloatValues).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedFloatValues[0].UnixNano(), - maxTime: k.mergedFloatValues[len(k.mergedFloatValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedFloatValues = k.mergedFloatValues[:0] - } - return dst -} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) mergeInteger() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedIntegerValues) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.mergedIntegerValues) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combineInteger(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combineInteger(dedup bool) blocks { - if dedup { - for len(k.mergedIntegerValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = IntegerValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = IntegerValues(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = IntegerValues(v).Exclude(ts.Min, ts.Max) - } - - k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunkInteger(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedIntegerValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeIntegerBlock(k.blocks[i].b, &[]IntegerValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = IntegerValues(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedIntegerValues = k.mergedIntegerValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunkInteger(k.merged) - } -} - -func (k *tsmKeyIterator) chunkInteger(dst blocks) blocks { - if len(k.mergedIntegerValues) > k.size { - values := k.mergedIntegerValues[:k.size] - cb, err := IntegerValues(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedIntegerValues = k.mergedIntegerValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedIntegerValues) > 0 { - cb, err := IntegerValues(k.mergedIntegerValues).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedIntegerValues[0].UnixNano(), - maxTime: k.mergedIntegerValues[len(k.mergedIntegerValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedIntegerValues = k.mergedIntegerValues[:0] - } - return dst -} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) mergeUnsigned() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedUnsignedValues) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.mergedUnsignedValues) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combineUnsigned(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combineUnsigned(dedup bool) blocks { - if dedup { - for len(k.mergedUnsignedValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeUnsignedBlock(k.blocks[i].b, &[]UnsignedValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = UnsignedValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = UnsignedValues(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = UnsignedValues(v).Exclude(ts.Min, ts.Max) - } - - k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunkUnsigned(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedUnsignedValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeUnsignedBlock(k.blocks[i].b, &[]UnsignedValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = UnsignedValues(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedUnsignedValues = k.mergedUnsignedValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunkUnsigned(k.merged) - } -} - -func (k *tsmKeyIterator) chunkUnsigned(dst blocks) blocks { - if len(k.mergedUnsignedValues) > k.size { - values := k.mergedUnsignedValues[:k.size] - cb, err := UnsignedValues(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedUnsignedValues = k.mergedUnsignedValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedUnsignedValues) > 0 { - cb, err := UnsignedValues(k.mergedUnsignedValues).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedUnsignedValues[0].UnixNano(), - maxTime: k.mergedUnsignedValues[len(k.mergedUnsignedValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedUnsignedValues = k.mergedUnsignedValues[:0] - } - return dst -} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) mergeString() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedStringValues) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.mergedStringValues) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combineString(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combineString(dedup bool) blocks { - if dedup { - for len(k.mergedStringValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = StringValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = StringValues(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = StringValues(v).Exclude(ts.Min, ts.Max) - } - - k.mergedStringValues = k.mergedStringValues.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunkString(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedStringValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeStringBlock(k.blocks[i].b, &[]StringValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = StringValues(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedStringValues = k.mergedStringValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunkString(k.merged) - } -} - -func (k *tsmKeyIterator) chunkString(dst blocks) blocks { - if len(k.mergedStringValues) > k.size { - values := k.mergedStringValues[:k.size] - cb, err := StringValues(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedStringValues = k.mergedStringValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedStringValues) > 0 { - cb, err := StringValues(k.mergedStringValues).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedStringValues[0].UnixNano(), - maxTime: k.mergedStringValues[len(k.mergedStringValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedStringValues = k.mergedStringValues[:0] - } - return dst -} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) mergeBoolean() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.mergedBooleanValues) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.mergedBooleanValues) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combineBoolean(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combineBoolean(dedup bool) blocks { - if dedup { - for len(k.mergedBooleanValues) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = BooleanValues(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = BooleanValues(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = BooleanValues(v).Exclude(ts.Min, ts.Max) - } - - k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunkBoolean(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.mergedBooleanValues) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := DecodeBooleanBlock(k.blocks[i].b, &[]BooleanValue{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = BooleanValues(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.mergedBooleanValues = k.mergedBooleanValues.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunkBoolean(k.merged) - } -} - -func (k *tsmKeyIterator) chunkBoolean(dst blocks) blocks { - if len(k.mergedBooleanValues) > k.size { - values := k.mergedBooleanValues[:k.size] - cb, err := BooleanValues(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedBooleanValues = k.mergedBooleanValues[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.mergedBooleanValues) > 0 { - cb, err := BooleanValues(k.mergedBooleanValues).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.mergedBooleanValues[0].UnixNano(), - maxTime: k.mergedBooleanValues[len(k.mergedBooleanValues)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.mergedBooleanValues = k.mergedBooleanValues[:0] - } - return dst -} - // merge combines the next set of blocks into merged blocks. func (k *tsmBatchKeyIterator) mergeFloat() { // No blocks left, or pending merged values, we're done diff --git a/tsdb/engine/tsm1/compact.gen.go.tmpl b/tsdb/engine/tsm1/compact.gen.go.tmpl index c9852474be..cde652c139 100644 --- a/tsdb/engine/tsm1/compact.gen.go.tmpl +++ b/tsdb/engine/tsm1/compact.gen.go.tmpl @@ -6,214 +6,6 @@ import ( "github.com/influxdata/influxdb/v2/tsdb" ) -{{range .}} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) merge{{.Name}}() { - // No blocks left, or pending merged values, we're done - if len(k.blocks) == 0 && len(k.merged) == 0 && len(k.merged{{.Name}}Values) == 0 { - return - } - - sort.Stable(k.blocks) - - dedup := len(k.merged{{.Name}}Values) != 0 - if len(k.blocks) > 0 && !dedup { - // If we have more than one block or any partially tombstoned blocks, we many need to dedup - dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() - - // Quickly scan each block to see if any overlap with the prior block, if they overlap then - // we need to dedup as there may be duplicate points now - for i := 1; !dedup && i < len(k.blocks); i++ { - dedup = k.blocks[i].partiallyRead() || - k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || - len(k.blocks[i].tombstones) > 0 - } - - } - - k.merged = k.combine{{.Name}}(dedup) -} - -// combine returns a new set of blocks using the current blocks in the buffers. If dedup -// is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, -// only blocks that are smaller than the chunk size will be decoded and combined. -func (k *tsmKeyIterator) combine{{.Name}}(dedup bool) blocks { - if dedup { - for len(k.merged{{.Name}}Values) < k.size && len(k.blocks) > 0 { - for len(k.blocks) > 0 && k.blocks[0].read() { - k.blocks = k.blocks[1:] - } - - if len(k.blocks) == 0 { - break - } - first := k.blocks[0] - minTime := first.minTime - maxTime := first.maxTime - - // Adjust the min time to the start of any overlapping blocks. - for i := 0; i < len(k.blocks); i++ { - if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { - if k.blocks[i].minTime < minTime { - minTime = k.blocks[i].minTime - } - if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { - maxTime = k.blocks[i].maxTime - } - } - } - - // We have some overlapping blocks so decode all, append in order and then dedup - for i := 0; i < len(k.blocks); i++ { - if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { - continue - } - - v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Remove values we already read - v = {{.Name}}Values(v).Exclude(k.blocks[i].readMin, k.blocks[i].readMax) - - // Filter out only the values for overlapping block - v = {{.Name}}Values(v).Include(minTime, maxTime) - if len(v) > 0 { - // Record that we read a subset of the block - k.blocks[i].markRead(v[0].UnixNano(), v[len(v)-1].UnixNano()) - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max) - } - - k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) - } - } - - // Since we combined multiple blocks, we could have more values than we should put into - // a single block. We need to chunk them up into groups and re-encode them. - return k.chunk{{.Name}}(nil) - } else { - var i int - - for ; i < len(k.blocks); i++ { - // skip this block if it's values were already read - if k.blocks[i].read() { - continue - } - - // If this block is already full, just add it as is - count, err := BlockCount(k.blocks[i].b) - if err != nil { - // accumulate all errors to tsmKeyIterator.err - k.AppendError(err) - continue - } - - if count < k.size { - break - } - - k.merged = append(k.merged, k.blocks[i]) - } - - if k.fast { - for i < len(k.blocks) { - // skip this block if it's values were already read - if k.blocks[i].read() { - i++ - continue - } - - k.merged = append(k.merged, k.blocks[i]) - i++ - } - } - - // If we only have 1 blocks left, just append it as is and avoid decoding/recoding - if i == len(k.blocks)-1 { - if !k.blocks[i].read() { - k.merged = append(k.merged, k.blocks[i]) - } - i++ - } - - // The remaining blocks can be combined and we know that they do not overlap and - // so we can just append each, sort and re-encode. - for i < len(k.blocks) && len(k.merged{{.Name}}Values) < k.size { - if k.blocks[i].read() { - i++ - continue - } - - v, err := Decode{{.Name}}Block(k.blocks[i].b, &[]{{.Name}}Value{}) - if err != nil { - k.AppendError(err) - return nil - } - - // Apply each tombstone to the block - for _, ts := range k.blocks[i].tombstones { - v = {{.Name}}Values(v).Exclude(ts.Min, ts.Max) - } - - k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) - - k.merged{{.Name}}Values = k.merged{{.Name}}Values.Merge(v) - i++ - } - - k.blocks = k.blocks[i:] - - return k.chunk{{.Name}}(k.merged) - } -} - -func (k *tsmKeyIterator) chunk{{.Name}}(dst blocks) blocks { - if len(k.merged{{.Name}}Values) > k.size { - values := k.merged{{.Name}}Values[:k.size] - cb, err := {{.Name}}Values(values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: values[0].UnixNano(), - maxTime: values[len(values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.merged{{.Name}}Values = k.merged{{.Name}}Values[k.size:] - return dst - } - - // Re-encode the remaining values into the last block - if len(k.merged{{.Name}}Values) > 0 { - cb, err := {{.Name}}Values(k.merged{{.Name}}Values).Encode(nil) - if err != nil { - k.AppendError(err) - return nil - } - - dst = append(dst, &block{ - minTime: k.merged{{.Name}}Values[0].UnixNano(), - maxTime: k.merged{{.Name}}Values[len(k.merged{{.Name}}Values)-1].UnixNano(), - key: k.key, - b: cb, - }) - k.merged{{.Name}}Values = k.merged{{.Name}}Values[:0] - } - return dst -} - -{{ end }} - {{range .}} // merge combines the next set of blocks into merged blocks. func (k *tsmBatchKeyIterator) merge{{.Name}}() { diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go index b441156bc1..19520f9fb1 100644 --- a/tsdb/engine/tsm1/compact.go +++ b/tsdb/engine/tsm1/compact.go @@ -1266,61 +1266,6 @@ func (t TSMErrors) Error() string { return strings.Join(e, ", ") } -// tsmKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces -// keys in sorted order and the values between the keys sorted and deduped. If any of -// the readers have associated tombstone entries, they are returned as part of iteration. -type tsmKeyIterator struct { - // readers is the set of readers it produce a sorted key run with - readers []*TSMReader - - // values is the temporary buffers for each key that is returned by a reader - values map[string][]Value - - // pos is the current key position within the corresponding readers slice. A value of - // pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index. - pos []int - - // TSMError wraps any error we received while iterating values. - errs TSMErrors - - // indicates whether the iterator should choose a faster merging strategy over a more - // optimally compressed one. If fast is true, multiple blocks will just be added as is - // and not combined. In some cases, a slower path will need to be utilized even when - // fast is true to prevent overlapping blocks of time for the same key. - // If false, the blocks will be decoded and duplicated (if needed) and - // then chunked into the maximally sized blocks. - fast bool - - // size is the maximum number of values to encode in a single block - size int - - // key is the current key lowest key across all readers that has not be fully exhausted - // of values. - key []byte - typ byte - - iterators []*BlockIterator - blocks blocks - - buf []blocks - - // mergeValues are decoded blocks that have been combined - mergedFloatValues FloatValues - mergedIntegerValues IntegerValues - mergedUnsignedValues UnsignedValues - mergedBooleanValues BooleanValues - mergedStringValues StringValues - - // merged are encoded blocks that have been combined or used as is - // without decode - merged blocks - interrupt chan struct{} -} - -func (t *tsmKeyIterator) AppendError(err error) { - t.errs = append(t.errs, err) -} - type block struct { key []byte minTime, maxTime int64 @@ -1373,242 +1318,6 @@ func (a blocks) Less(i, j int) bool { func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -// NewTSMKeyIterator returns a new TSM key iterator from readers. -// size indicates the maximum number of values to encode in a single block. -func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) { - var iter []*BlockIterator - for _, r := range readers { - iter = append(iter, r.BlockIterator()) - } - - return &tsmKeyIterator{ - readers: readers, - values: map[string][]Value{}, - pos: make([]int, len(readers)), - size: size, - iterators: iter, - fast: fast, - buf: make([]blocks, len(iter)), - interrupt: interrupt, - }, nil -} - -func (k *tsmKeyIterator) hasMergedValues() bool { - return len(k.mergedFloatValues) > 0 || - len(k.mergedIntegerValues) > 0 || - len(k.mergedUnsignedValues) > 0 || - len(k.mergedStringValues) > 0 || - len(k.mergedBooleanValues) > 0 -} - -func (k *tsmKeyIterator) EstimatedIndexSize() int { - var size uint32 - for _, r := range k.readers { - size += r.IndexSize() - } - return int(size) / len(k.readers) -} - -// Next returns true if there are any values remaining in the iterator. -func (k *tsmKeyIterator) Next() bool { -RETRY: - // Any merged blocks pending? - if len(k.merged) > 0 { - k.merged = k.merged[1:] - if len(k.merged) > 0 { - return true - } - } - - // Any merged values pending? - if k.hasMergedValues() { - k.merge() - if len(k.merged) > 0 || k.hasMergedValues() { - return true - } - } - - // If we still have blocks from the last read, merge them - if len(k.blocks) > 0 { - k.merge() - if len(k.merged) > 0 || k.hasMergedValues() { - return true - } - } - - // Read the next block from each TSM iterator - for i, v := range k.buf { - if len(v) == 0 { - iter := k.iterators[i] - if iter.Next() { - key, minTime, maxTime, typ, _, b, err := iter.Read() - if err != nil { - k.AppendError(err) - } - - // This block may have ranges of time removed from it that would - // reduce the block min and max time. - tombstones := iter.r.TombstoneRange(key) - - var blk *block - if cap(k.buf[i]) > len(k.buf[i]) { - k.buf[i] = k.buf[i][:len(k.buf[i])+1] - blk = k.buf[i][len(k.buf[i])-1] - if blk == nil { - blk = &block{} - k.buf[i][len(k.buf[i])-1] = blk - } - } else { - blk = &block{} - k.buf[i] = append(k.buf[i], blk) - } - blk.minTime = minTime - blk.maxTime = maxTime - blk.key = key - blk.typ = typ - blk.b = b - blk.tombstones = tombstones - blk.readMin = math.MaxInt64 - blk.readMax = math.MinInt64 - - blockKey := key - for bytes.Equal(iter.PeekNext(), blockKey) { - iter.Next() - key, minTime, maxTime, typ, _, b, err := iter.Read() - if err != nil { - k.AppendError(err) - } - - tombstones := iter.r.TombstoneRange(key) - - var blk *block - if cap(k.buf[i]) > len(k.buf[i]) { - k.buf[i] = k.buf[i][:len(k.buf[i])+1] - blk = k.buf[i][len(k.buf[i])-1] - if blk == nil { - blk = &block{} - k.buf[i][len(k.buf[i])-1] = blk - } - } else { - blk = &block{} - k.buf[i] = append(k.buf[i], blk) - } - - blk.minTime = minTime - blk.maxTime = maxTime - blk.key = key - blk.typ = typ - blk.b = b - blk.tombstones = tombstones - blk.readMin = math.MaxInt64 - blk.readMax = math.MinInt64 - } - } - - if iter.Err() != nil { - k.AppendError(iter.Err()) - } - } - } - - // Each reader could have a different key that it's currently at, need to find - // the next smallest one to keep the sort ordering. - var minKey []byte - var minType byte - for _, b := range k.buf { - // block could be nil if the iterator has been exhausted for that file - if len(b) == 0 { - continue - } - if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 { - minKey = b[0].key - minType = b[0].typ - } - } - k.key = minKey - k.typ = minType - - // Now we need to find all blocks that match the min key so we can combine and dedupe - // the blocks if necessary - for i, b := range k.buf { - if len(b) == 0 { - continue - } - if bytes.Equal(b[0].key, k.key) { - k.blocks = append(k.blocks, b...) - k.buf[i] = k.buf[i][:0] - } - } - - if len(k.blocks) == 0 { - return false - } - - k.merge() - - // After merging all the values for this key, we might not have any. (e.g. they were all deleted - // through many tombstones). In this case, move on to the next key instead of ending iteration. - if len(k.merged) == 0 { - goto RETRY - } - - return len(k.merged) > 0 -} - -// merge combines the next set of blocks into merged blocks. -func (k *tsmKeyIterator) merge() { - switch k.typ { - case BlockFloat64: - k.mergeFloat() - case BlockInteger: - k.mergeInteger() - case BlockUnsigned: - k.mergeUnsigned() - case BlockBoolean: - k.mergeBoolean() - case BlockString: - k.mergeString() - default: - k.AppendError(fmt.Errorf("unknown block type: %v", k.typ)) - } -} - -func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) { - // See if compactions were disabled while we were running. - select { - case <-k.interrupt: - return nil, 0, 0, nil, errCompactionAborted{} - default: - } - - if len(k.merged) == 0 { - return nil, 0, 0, nil, k.Err() - } - - block := k.merged[0] - return block.key, block.minTime, block.maxTime, block.b, k.Err() -} - -func (k *tsmKeyIterator) Close() error { - k.values = nil - k.pos = nil - k.iterators = nil - for _, r := range k.readers { - if err := r.Close(); err != nil { - return err - } - } - return nil -} - -// Error returns any errors encountered during iteration. -func (k *tsmKeyIterator) Err() error { - if len(k.errs) == 0 { - return nil - } - return k.errs -} - // tsmBatchKeyIterator implements the KeyIterator for set of TSMReaders. Iteration produces // keys in sorted order and the values between the keys sorted and deduped. If any of // the readers have associated tombstone entries, they are returned as part of iteration. diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 9c3658c633..dfe5fd2468 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -15,7 +15,7 @@ import ( "go.uber.org/zap" ) -// Tests compacting a Cache snapshot into a single TSM file +// Tests compacting a Cache snapshot into a single TSM file func TestCompactor_Snapshot(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) @@ -1098,7 +1098,7 @@ func TestTSMKeyIterator_Single(t *testing.T) { r := MustTSMReader(dir, 1, writes) - iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r) + iter, err := newTSMKeyIterator(1, false, nil, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -1134,6 +1134,14 @@ func TestTSMKeyIterator_Single(t *testing.T) { } } +func newTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*tsm1.TSMReader) (tsm1.KeyIterator, error) { + files := []string{} + for _, r := range readers { + files = append(files, r.Path()) + } + return tsm1.NewTSMBatchKeyIterator(size, fast, 0, interrupt, files, readers...) +} + // Tests that duplicate point values are merged. There is only one case // where this could happen and that is when a compaction completed and we replace // the old TSM file with a new one and we crash just before deleting the old file. @@ -1158,7 +1166,7 @@ func TestTSMKeyIterator_Duplicate(t *testing.T) { r2 := MustTSMReader(dir, 2, writes2) - iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) + iter, err := newTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -1219,7 +1227,7 @@ func TestTSMKeyIterator_MultipleKeysDeleted(t *testing.T) { r2 := MustTSMReader(dir, 2, points2) r2.Delete([][]byte{[]byte("cpu,host=A#!~#count")}) - iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1, r2) + iter, err := newTSMKeyIterator(1, false, nil, r1, r2) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -1300,7 +1308,7 @@ func TestTSMKeyIterator_SingleDeletes(t *testing.T) { t.Fatal(e) } - iter, err := tsm1.NewTSMKeyIterator(1, false, nil, r1) + iter, err := newTSMKeyIterator(1, false, nil, r1) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } @@ -1357,7 +1365,7 @@ func TestTSMKeyIterator_Abort(t *testing.T) { r := MustTSMReader(dir, 1, writes) intC := make(chan struct{}) - iter, err := tsm1.NewTSMKeyIterator(1, false, intC, r) + iter, err := newTSMKeyIterator(1, false, intC, r) if err != nil { t.Fatalf("unexpected error creating WALKeyIterator: %v", err) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c6f51c08c2..0191a3b9f7 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -44,8 +44,8 @@ import ( // to support adding templated data from the command line. // This can probably be worked into the upstream tmpl // but isn't at the moment. -//go:generate go run ../../../tools/tmpl -i -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go -//go:generate go run ../../../tools/tmpl -i -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go +//go:generate go run ../../../tools/tmpl -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store.gen.go +//go:generate go run ../../../tools/tmpl -d isArray=y -data=file_store.gen.go.tmpldata file_store.gen.go.tmpl=file_store_array.gen.go //go:generate tmpl -data=@encoding.gen.go.tmpldata encoding.gen.go.tmpl //go:generate tmpl -data=@compact.gen.go.tmpldata compact.gen.go.tmpl //go:generate tmpl -data=@reader.gen.go.tmpldata reader.gen.go.tmpl diff --git a/tsdb/engine/tsm1/file_store.gen.go.tmpl b/tsdb/engine/tsm1/file_store.gen.go.tmpl index 17566429da..809937bdfb 100644 --- a/tsdb/engine/tsm1/file_store.gen.go.tmpl +++ b/tsdb/engine/tsm1/file_store.gen.go.tmpl @@ -1,10 +1,14 @@ package tsm1 +{{$isArray := .D.isArray}} +{{$isNotArray := not $isArray}} + +{{if $isArray -}} import ( "github.com/influxdata/influxdb/v2/tsdb" ) -{{$isArray := .D.isArray}} -{{$isNotArray := not $isArray}} +{{end}} + {{range .In}} {{if $isArray -}} // Read{{.Name}}ArrayBlock reads the next block as a set of {{.name}} values.