package tsm1 import ( "sort" "github.com/influxdata/influxdb/tsdb" ) {{range .}} // merge combines the next set of blocks into merged blocks. func (k *tsmBatchKeyIterator) merge{{.Name}}() { // No blocks left, or pending merged values, we're done if len(k.blocks) == 0 && len(k.merged) == 0 && k.merged{{.Name}}Values.Len() == 0 { return } sort.Stable(k.blocks) dedup := k.merged{{.Name}}Values.Len() != 0 if len(k.blocks) > 0 && !dedup { // If we have more than one block or any partially tombstoned blocks, we many need to dedup dedup = len(k.blocks[0].tombstones) > 0 || k.blocks[0].partiallyRead() // Quickly scan each block to see if any overlap with the prior block, if they overlap then // we need to dedup as there may be duplicate points now for i := 1; !dedup && i < len(k.blocks); i++ { dedup = k.blocks[i].partiallyRead() || k.blocks[i].overlapsTimeRange(k.blocks[i-1].minTime, k.blocks[i-1].maxTime) || len(k.blocks[i].tombstones) > 0 } } k.merged = k.combine{{.Name}}(dedup) } // combine returns a new set of blocks using the current blocks in the buffers. If dedup // is true, all the blocks will be decoded, dedup and sorted in in order. If dedup is false, // only blocks that are smaller than the chunk size will be decoded and combined. func (k *tsmBatchKeyIterator) combine{{.Name}}(dedup bool) blocks { if dedup { for k.merged{{.Name}}Values.Len() < k.size && len(k.blocks) > 0 { for len(k.blocks) > 0 && k.blocks[0].read() { k.blocks = k.blocks[1:] } if len(k.blocks) == 0 { break } first := k.blocks[0] minTime := first.minTime maxTime := first.maxTime // Adjust the min time to the start of any overlapping blocks. for i := 0; i < len(k.blocks); i++ { if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() { if k.blocks[i].minTime < minTime { minTime = k.blocks[i].minTime } if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime { maxTime = k.blocks[i].maxTime } } } // We have some overlapping blocks so decode all, append in order and then dedup for i := 0; i < len(k.blocks); i++ { if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() { continue } var v tsdb.{{.Name}}Array var err error if err = Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil { k.handleDecodeError(err, "{{.name}}") return nil } // Invariant: v.MaxTime() == k.blocks[i].maxTime if k.blocks[i].maxTime != v.MaxTime() { if maxTime == k.blocks[i].maxTime { maxTime = v.MaxTime() } k.blocks[i].maxTime = v.MaxTime() } // Remove values we already read v.Exclude(k.blocks[i].readMin, k.blocks[i].readMax) // Filter out only the values for overlapping block v.Include(minTime, maxTime) if v.Len() > 0 { // Record that we read a subset of the block k.blocks[i].markRead(v.MinTime(), v.MaxTime()) } // Apply each tombstone to the block for _, ts := range k.blocks[i].tombstones { v.Exclude(ts.Min, ts.Max) } k.merged{{.Name}}Values.Merge(&v) } } // Since we combined multiple blocks, we could have more values than we should put into // a single block. We need to chunk them up into groups and re-encode them. return k.chunk{{.Name}}(nil) } var i int for ; i < len(k.blocks); i++ { // skip this block if it's values were already read if k.blocks[i].read() { continue } // if this block is already full, just add it as is count, err := BlockCount(k.blocks[i].b) if err != nil { k.AppendError(err) continue } if count < k.size { break } k.merged = append(k.merged, k.blocks[i]) } if k.fast { for i < len(k.blocks) { // skip this block if it's values were already read if k.blocks[i].read() { i++ continue } k.merged = append(k.merged, k.blocks[i]) i++ } } // if we only have 1 blocks left, just append it as is and avoid decoding/recoding if i == len(k.blocks)-1 { if !k.blocks[i].read() { k.merged = append(k.merged, k.blocks[i]) } i++ } // The remaining blocks can be combined and we know that they do not overlap and // so we can just append each, sort and re-encode. for i < len(k.blocks) && k.merged{{.Name}}Values.Len() < k.size { if k.blocks[i].read() { i++ continue } var v tsdb.{{.Name}}Array if err := Decode{{.Name}}ArrayBlock(k.blocks[i].b, &v); err != nil { k.handleDecodeError(err, "{{.name}}") return nil } // Invariant: v.MaxTime() == k.blocks[i].maxTime if k.blocks[i].maxTime != v.MaxTime() { k.blocks[i].maxTime = v.MaxTime() } // Apply each tombstone to the block for _, ts := range k.blocks[i].tombstones { v.Exclude(ts.Min, ts.Max) } k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime) k.merged{{.Name}}Values.Merge(&v) i++ } k.blocks = k.blocks[i:] return k.chunk{{.Name}}(k.merged) } func (k *tsmBatchKeyIterator) chunk{{.Name}}(dst blocks) blocks { if k.merged{{.Name}}Values.Len() > k.size { var values tsdb.{{.Name}}Array values.Timestamps = k.merged{{.Name}}Values.Timestamps[:k.size] minTime, maxTime := values.Timestamps[0], values.Timestamps[len(values.Timestamps)-1] values.Values = k.merged{{.Name}}Values.Values[:k.size] cb, err := Encode{{.Name}}ArrayBlock(&values, nil) // TODO(edd): pool this buffer if err != nil { k.handleEncodeError(err, "{{.name}}") return nil } dst = append(dst, &block{ minTime: minTime, maxTime: maxTime, key: k.key, b: cb, }) k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[k.size:] k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[k.size:] return dst } // Re-encode the remaining values into the last block if k.merged{{.Name}}Values.Len() > 0 { minTime, maxTime := k.merged{{.Name}}Values.Timestamps[0], k.merged{{.Name}}Values.Timestamps[len(k.merged{{.Name}}Values.Timestamps)-1] cb, err := Encode{{.Name}}ArrayBlock(k.merged{{.Name}}Values, nil) // TODO(edd): pool this buffer if err != nil { k.handleEncodeError(err, "{{.name}}") return nil } dst = append(dst, &block{ minTime: minTime, maxTime: maxTime, key: k.key, b: cb, }) k.merged{{.Name}}Values.Timestamps = k.merged{{.Name}}Values.Timestamps[:0] k.merged{{.Name}}Values.Values = k.merged{{.Name}}Values.Values[:0] } return dst } {{ end }}