package tsm1

// Compactions are the process of creating read-optimized TSM files.
// The files are created by converting write-optimized WAL entries
// to read-optimized TSM format.  They can also be created from existing
// TSM files when there are tombstone records that neeed to be removed, points
// that were overwritten by later writes and need to updated, or multiple
// smaller TSM files need to be merged to reduce file counts and improve
// compression ratios.
//
// The compaction process is stream-oriented using multiple readers and
// iterators.  The resulting stream is written sorted and chunked to allow for
// one-pass writing of a new TSM file.

import (
	"bytes"
	"context"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"runtime"
	"sort"
	"sync"
	"sync/atomic"
	"time"

	"github.com/influxdata/influxdb/kit/tracing"
	"github.com/influxdata/influxdb/pkg/limiter"
	"github.com/influxdata/influxdb/tsdb"
)

const maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB

const (
	// CompactionTempExtension is the extension used for temporary files created during compaction.
	CompactionTempExtension = "tmp"

	// TSMFileExtension is the extension used for TSM files.
	TSMFileExtension = "tsm"

	// TSSFileExtension is the extension used for TSM stats files.
	TSSFileExtension = "tss"
)

var (
	errMaxFileExceeded     = fmt.Errorf("max file exceeded")
	errSnapshotsDisabled   = fmt.Errorf("snapshots disabled")
	errCompactionsDisabled = fmt.Errorf("compactions disabled")
)

type errCompactionInProgress struct {
	err error
}

// Error returns the string representation of the error, to satisfy the error interface.
func (e errCompactionInProgress) Error() string {
	if e.err != nil {
		return fmt.Sprintf("compaction in progress: %s", e.err)
	}
	return "compaction in progress"
}

type errCompactionAborted struct {
	err error
}

func (e errCompactionAborted) Error() string {
	if e.err != nil {
		return fmt.Sprintf("compaction aborted: %s", e.err)
	}
	return "compaction aborted"
}

// CompactionGroup represents a list of files eligible to be compacted together.
type CompactionGroup []string

// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run.
type CompactionPlanner interface {
	Plan(lastWrite time.Time) []CompactionGroup
	PlanLevel(level int) []CompactionGroup
	PlanOptimize() []CompactionGroup
	Release(group []CompactionGroup)
	FullyCompacted() bool

	// ForceFull causes the planner to return a full compaction plan the next
	// time Plan() is called if there are files that could be compacted.
	ForceFull()

	SetFileStore(fs *FileStore)
}

// DefaultPlanner implements CompactionPlanner using a strategy to roll up
// multiple generations of TSM files into larger files in stages.  It attempts
// to minimize the number of TSM files on disk while rolling up a bounder number
// of files.
type DefaultPlanner struct {
	FileStore fileStore

	// compactFullWriteColdDuration specifies the length of time after
	// which if no writes have been committed to the WAL, the engine will
	// do a full compaction of the TSM files in this shard. This duration
	// should always be greater than the CacheFlushWriteColdDuraion
	compactFullWriteColdDuration time.Duration

	// lastPlanCheck is the last time Plan was called
	lastPlanCheck time.Time

	mu sync.RWMutex
	// lastFindGenerations is the last time findGenerations was run
	lastFindGenerations time.Time

	// lastGenerations is the last set of generations found by findGenerations
	lastGenerations tsmGenerations

	// forceFull causes the next full plan requests to plan any files
	// that may need to be compacted.  Normally, these files are skipped and scheduled
	// infrequently as the plans are more expensive to run.
	forceFull bool

	// filesInUse is the set of files that have been returned as part of a plan and might
	// be being compacted.  Two plans should not return the same file at any given time.
	filesInUse map[string]struct{}
}

type fileStore interface {
	Stats() []FileStat
	LastModified() time.Time
	BlockCount(path string, idx int) int
	ParseFileName(path string) (int, int, error)
}

func NewDefaultPlanner(fs fileStore, writeColdDuration time.Duration) *DefaultPlanner {
	return &DefaultPlanner{
		FileStore:                    fs,
		compactFullWriteColdDuration: writeColdDuration,
		filesInUse:                   make(map[string]struct{}),
	}
}

// tsmGeneration represents the TSM files within a generation.
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
type tsmGeneration struct {
	id            int
	files         []FileStat
	parseFileName ParseFileNameFunc
}

func newTsmGeneration(id int, parseFileNameFunc ParseFileNameFunc) *tsmGeneration {
	return &tsmGeneration{
		id:            id,
		parseFileName: parseFileNameFunc,
	}
}

// size returns the total size of the files in the generation.
func (t *tsmGeneration) size() uint64 {
	var n uint64
	for _, f := range t.files {
		n += uint64(f.Size)
	}
	return n
}

// compactionLevel returns the level of the files in this generation.
func (t *tsmGeneration) level() int {
	// Level 0 is always created from the result of a cache compaction.  It generates
	// 1 file with a sequence num of 1.  Level 2 is generated by compacting multiple
	// level 1 files.  Level 3 is generate by compacting multiple level 2 files.  Level
	// 4 is for anything else.
	_, seq, _ := t.parseFileName(t.files[0].Path)
	if seq < 4 {
		return seq
	}

	return 4
}

// count returns the number of files in the generation.
func (t *tsmGeneration) count() int {
	return len(t.files)
}

// hasTombstones returns true if there are keys removed for any of the files.
func (t *tsmGeneration) hasTombstones() bool {
	for _, f := range t.files {
		if f.HasTombstone {
			return true
		}
	}
	return false
}

func (c *DefaultPlanner) SetFileStore(fs *FileStore) {
	c.FileStore = fs
}

func (c *DefaultPlanner) ParseFileName(path string) (int, int, error) {
	return c.FileStore.ParseFileName(path)
}

// FullyCompacted returns true if the shard is fully compacted.
func (c *DefaultPlanner) FullyCompacted() bool {
	gens := c.findGenerations(false)
	return len(gens) <= 1 && !gens.hasTombstones()
}

// ForceFull causes the planner to return a full compaction plan the next time
// a plan is requested.  When ForceFull is called, level and optimize plans will
// not return plans until a full plan is requested and released.
func (c *DefaultPlanner) ForceFull() {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.forceFull = true
}

// PlanLevel returns a set of TSM files to rewrite for a specific level.
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {
	// If a full plan has been requested, don't plan any levels which will prevent
	// the full plan from acquiring them.
	c.mu.RLock()
	if c.forceFull {
		c.mu.RUnlock()
		return nil
	}
	c.mu.RUnlock()

	// Determine the generations from all files on disk.  We need to treat
	// a generation conceptually as a single file even though it may be
	// split across several files in sequence.
	generations := c.findGenerations(true)

	// If there is only one generation and no tombstones, then there's nothing to
	// do.
	if len(generations) <= 1 && !generations.hasTombstones() {
		return nil
	}

	// Group each generation by level such that two adjacent generations in the same
	// level become part of the same group.
	var currentGen tsmGenerations
	var groups []tsmGenerations
	for i := 0; i < len(generations); i++ {
		cur := generations[i]

		// See if this generation is orphan'd which would prevent it from being further
		// compacted until a final full compactin runs.
		if i < len(generations)-1 {
			if cur.level() < generations[i+1].level() {
				currentGen = append(currentGen, cur)
				continue
			}
		}

		if len(currentGen) == 0 || currentGen.level() == cur.level() {
			currentGen = append(currentGen, cur)
			continue
		}
		groups = append(groups, currentGen)

		currentGen = tsmGenerations{}
		currentGen = append(currentGen, cur)
	}

	if len(currentGen) > 0 {
		groups = append(groups, currentGen)
	}

	// Remove any groups in the wrong level
	var levelGroups []tsmGenerations
	for _, cur := range groups {
		if cur.level() == level {
			levelGroups = append(levelGroups, cur)
		}
	}

	minGenerations := 4
	if level == 1 {
		minGenerations = 8
	}

	var cGroups []CompactionGroup
	for _, group := range levelGroups {
		for _, chunk := range group.chunk(minGenerations) {
			var cGroup CompactionGroup
			var hasTombstones bool
			for _, gen := range chunk {
				if gen.hasTombstones() {
					hasTombstones = true
				}
				for _, file := range gen.files {
					cGroup = append(cGroup, file.Path)
				}
			}

			if len(chunk) < minGenerations && !hasTombstones {
				continue
			}

			cGroups = append(cGroups, cGroup)
		}
	}

	if !c.acquire(cGroups) {
		return nil
	}

	return cGroups
}

// PlanOptimize returns all TSM files if they are in different generations in order
// to optimize the index across TSM files.  Each returned compaction group can be
// compacted concurrently.
func (c *DefaultPlanner) PlanOptimize() []CompactionGroup {
	// If a full plan has been requested, don't plan any levels which will prevent
	// the full plan from acquiring them.
	c.mu.RLock()
	if c.forceFull {
		c.mu.RUnlock()
		return nil
	}
	c.mu.RUnlock()

	// Determine the generations from all files on disk.  We need to treat
	// a generation conceptually as a single file even though it may be
	// split across several files in sequence.
	generations := c.findGenerations(true)

	// If there is only one generation and no tombstones, then there's nothing to
	// do.
	if len(generations) <= 1 && !generations.hasTombstones() {
		return nil
	}

	// Group each generation by level such that two adjacent generations in the same
	// level become part of the same group.
	var currentGen tsmGenerations
	var groups []tsmGenerations
	for i := 0; i < len(generations); i++ {
		cur := generations[i]

		// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
		if cur.count() > 2 && cur.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(cur.files[0].Path, 1) == MaxPointsPerBlock && !cur.hasTombstones() {
			continue
		}

		// See if this generation is orphan'd which would prevent it from being further
		// compacted until a final full compactin runs.
		if i < len(generations)-1 {
			if cur.level() < generations[i+1].level() {
				currentGen = append(currentGen, cur)
				continue
			}
		}

		if len(currentGen) == 0 || currentGen.level() == cur.level() {
			currentGen = append(currentGen, cur)
			continue
		}
		groups = append(groups, currentGen)

		currentGen = tsmGenerations{}
		currentGen = append(currentGen, cur)
	}

	if len(currentGen) > 0 {
		groups = append(groups, currentGen)
	}

	// Only optimize level 4 files since using lower-levels will collide
	// with the level planners
	var levelGroups []tsmGenerations
	for _, cur := range groups {
		if cur.level() == 4 {
			levelGroups = append(levelGroups, cur)
		}
	}

	var cGroups []CompactionGroup
	for _, group := range levelGroups {
		// Skip the group if it's not worthwhile to optimize it
		if len(group) < 4 && !group.hasTombstones() {
			continue
		}

		var cGroup CompactionGroup
		for _, gen := range group {
			for _, file := range gen.files {
				cGroup = append(cGroup, file.Path)
			}
		}

		cGroups = append(cGroups, cGroup)
	}

	if !c.acquire(cGroups) {
		return nil
	}

	return cGroups
}

// Plan returns a set of TSM files to rewrite for level 4 or higher.  The planning returns
// multiple groups if possible to allow compactions to run concurrently.
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {
	generations := c.findGenerations(true)

	c.mu.RLock()
	forceFull := c.forceFull
	c.mu.RUnlock()

	// first check if we should be doing a full compaction because nothing has been written in a long time
	if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {

		// Reset the full schedule if we planned because of it.
		if forceFull {
			c.mu.Lock()
			c.forceFull = false
			c.mu.Unlock()
		}

		var tsmFiles []string
		var genCount int
		for i, group := range generations {
			var skip bool

			// Skip the file if it's over the max size and contains a full block and it does not have any tombstones
			if len(generations) > 2 && group.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(group.files[0].Path, 1) == MaxPointsPerBlock && !group.hasTombstones() {
				skip = true
			}

			// We need to look at the level of the next file because it may need to be combined with this generation
			// but won't get picked up on it's own if this generation is skipped.  This allows the most recently
			// created files to get picked up by the full compaction planner and avoids having a few less optimally
			// compressed files.
			if i < len(generations)-1 {
				if generations[i+1].level() <= 3 {
					skip = false
				}
			}

			if skip {
				continue
			}

			for _, f := range group.files {
				tsmFiles = append(tsmFiles, f.Path)
			}
			genCount += 1
		}
		sort.Strings(tsmFiles)

		// Make sure we have more than 1 file and more than 1 generation
		if len(tsmFiles) <= 1 || genCount <= 1 {
			return nil
		}

		group := []CompactionGroup{tsmFiles}
		if !c.acquire(group) {
			return nil
		}
		return group
	}

	// don't plan if nothing has changed in the filestore
	if c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {
		return nil
	}

	c.lastPlanCheck = time.Now()

	// If there is only one generation, return early to avoid re-compacting the same file
	// over and over again.
	if len(generations) <= 1 && !generations.hasTombstones() {
		return nil
	}

	// Need to find the ending point for level 4 files.  They will be the oldest files. We scan
	// each generation in descending break once we see a file less than 4.
	end := 0
	start := 0
	for i, g := range generations {
		if g.level() <= 3 {
			break
		}
		end = i + 1
	}

	// As compactions run, the oldest files get bigger.  We don't want to re-compact them during
	// this planning if they are maxed out so skip over any we see.
	var hasTombstones bool
	for i, g := range generations[:end] {
		if g.hasTombstones() {
			hasTombstones = true
		}

		if hasTombstones {
			continue
		}

		// Skip the file if it's over the max size and contains a full block or the generation is split
		// over multiple files.  In the latter case, that would mean the data in the file spilled over
		// the 2GB limit.
		if g.size() > uint64(maxTSMFileSize) && c.FileStore.BlockCount(g.files[0].Path, 1) == MaxPointsPerBlock {
			start = i + 1
		}

		// This is an edge case that can happen after multiple compactions run.  The files at the beginning
		// can become larger faster than ones after them.  We want to skip those really big ones and just
		// compact the smaller ones until they are closer in size.
		if i > 0 {
			if g.size()*2 < generations[i-1].size() {
				start = i
				break
			}
		}
	}

	// step is how may files to compact in a group.  We want to clamp it at 4 but also stil
	// return groups smaller than 4.
	step := 4
	if step > end {
		step = end
	}

	// slice off the generations that we'll examine
	generations = generations[start:end]

	// Loop through the generations in groups of size step and see if we can compact all (or
	// some of them as group)
	groups := []tsmGenerations{}
	for i := 0; i < len(generations); i += step {
		var skipGroup bool
		startIndex := i

		for j := i; j < i+step && j < len(generations); j++ {
			gen := generations[j]
			lvl := gen.level()

			// Skip compacting this group if there happens to be any lower level files in the
			// middle.  These will get picked up by the level compactors.
			if lvl <= 3 {
				skipGroup = true
				break
			}

			// Skip the file if it's over the max size and it contains a full block
			if gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == MaxPointsPerBlock && !gen.hasTombstones() {
				startIndex++
				continue
			}
		}

		if skipGroup {
			continue
		}

		endIndex := i + step
		if endIndex > len(generations) {
			endIndex = len(generations)
		}
		if endIndex-startIndex > 0 {
			groups = append(groups, generations[startIndex:endIndex])
		}
	}

	if len(groups) == 0 {
		return nil
	}

	// With the groups, we need to evaluate whether the group as a whole can be compacted
	compactable := []tsmGenerations{}
	for _, group := range groups {
		//if we don't have enough generations to compact, skip it
		if len(group) < 4 && !group.hasTombstones() {
			continue
		}
		compactable = append(compactable, group)
	}

	// All the files to be compacted must be compacted in order.  We need to convert each
	// group to the actual set of files in that group to be compacted.
	var tsmFiles []CompactionGroup
	for _, c := range compactable {
		var cGroup CompactionGroup
		for _, group := range c {
			for _, f := range group.files {
				cGroup = append(cGroup, f.Path)
			}
		}
		sort.Strings(cGroup)
		tsmFiles = append(tsmFiles, cGroup)
	}

	if !c.acquire(tsmFiles) {
		return nil
	}
	return tsmFiles
}

// findGenerations groups all the TSM files by generation based
// on their filename, then returns the generations in descending order (newest first).
// If skipInUse is true, tsm files that are part of an existing compaction plan
// are not returned.
func (c *DefaultPlanner) findGenerations(skipInUse bool) tsmGenerations {
	c.mu.Lock()
	defer c.mu.Unlock()

	last := c.lastFindGenerations
	lastGen := c.lastGenerations

	if !last.IsZero() && c.FileStore.LastModified().Equal(last) {
		return lastGen
	}

	genTime := c.FileStore.LastModified()
	tsmStats := c.FileStore.Stats()
	generations := make(map[int]*tsmGeneration, len(tsmStats))
	for _, f := range tsmStats {
		gen, _, _ := c.ParseFileName(f.Path)

		// Skip any files that are assigned to a current compaction plan
		if _, ok := c.filesInUse[f.Path]; skipInUse && ok {
			continue
		}

		group := generations[gen]
		if group == nil {
			group = newTsmGeneration(gen, c.ParseFileName)
			generations[gen] = group
		}
		group.files = append(group.files, f)
	}

	orderedGenerations := make(tsmGenerations, 0, len(generations))
	for _, g := range generations {
		orderedGenerations = append(orderedGenerations, g)
	}
	if !orderedGenerations.IsSorted() {
		sort.Sort(orderedGenerations)
	}

	c.lastFindGenerations = genTime
	c.lastGenerations = orderedGenerations

	return orderedGenerations
}

func (c *DefaultPlanner) acquire(groups []CompactionGroup) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	// See if the new files are already in use
	for _, g := range groups {
		for _, f := range g {
			if _, ok := c.filesInUse[f]; ok {
				return false
			}
		}
	}

	// Mark all the new files in use
	for _, g := range groups {
		for _, f := range g {
			c.filesInUse[f] = struct{}{}
		}
	}
	return true
}

// Release removes the files reference in each compaction group allowing new plans
// to be able to use them.
func (c *DefaultPlanner) Release(groups []CompactionGroup) {
	c.mu.Lock()
	defer c.mu.Unlock()
	for _, g := range groups {
		for _, f := range g {
			delete(c.filesInUse, f)
		}
	}
}

// Compactor merges multiple TSM files into new files or
// writes a Cache into 1 or more TSM files.
type Compactor struct {
	Dir  string
	Size int

	FileStore interface {
		NextGeneration() int
		TSMReader(path string) *TSMReader
	}

	// RateLimit is the limit for disk writes for all concurrent compactions.
	RateLimit limiter.Rate

	formatFileName FormatFileNameFunc
	parseFileName  ParseFileNameFunc

	mu                 sync.RWMutex
	snapshotsEnabled   bool
	compactionsEnabled bool

	// lastSnapshotDuration is the amount of time the last snapshot took to complete.
	lastSnapshotDuration time.Duration

	snapshotLatencies *latencies

	// The channel to signal that any in progress snapshots should be aborted.
	snapshotsInterrupt chan struct{}
	// The channel to signal that any in progress level compactions should be aborted.
	compactionsInterrupt chan struct{}

	files map[string]struct{}
}

// NewCompactor returns a new instance of Compactor.
func NewCompactor() *Compactor {
	return &Compactor{
		formatFileName: DefaultFormatFileName,
		parseFileName:  DefaultParseFileName,
	}
}

func (c *Compactor) WithFormatFileNameFunc(formatFileNameFunc FormatFileNameFunc) {
	c.formatFileName = formatFileNameFunc
}

func (c *Compactor) WithParseFileNameFunc(parseFileNameFunc ParseFileNameFunc) {
	c.parseFileName = parseFileNameFunc
}

// Open initializes the Compactor.
func (c *Compactor) Open() {
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.snapshotsEnabled || c.compactionsEnabled {
		return
	}

	c.snapshotsEnabled = true
	c.compactionsEnabled = true
	c.snapshotsInterrupt = make(chan struct{})
	c.compactionsInterrupt = make(chan struct{})
	c.snapshotLatencies = &latencies{values: make([]time.Duration, 4)}

	c.files = make(map[string]struct{})
}

// Close disables the Compactor.
func (c *Compactor) Close() {
	c.mu.Lock()
	defer c.mu.Unlock()
	if !(c.snapshotsEnabled || c.compactionsEnabled) {
		return
	}
	c.snapshotsEnabled = false
	c.compactionsEnabled = false
	if c.compactionsInterrupt != nil {
		close(c.compactionsInterrupt)
	}
	if c.snapshotsInterrupt != nil {
		close(c.snapshotsInterrupt)
	}
}

// DisableSnapshots disables the compactor from performing snapshots.
func (c *Compactor) DisableSnapshots() {
	c.mu.Lock()
	c.snapshotsEnabled = false
	if c.snapshotsInterrupt != nil {
		close(c.snapshotsInterrupt)
		c.snapshotsInterrupt = nil
	}
	c.mu.Unlock()
}

// EnableSnapshots allows the compactor to perform snapshots.
func (c *Compactor) EnableSnapshots() {
	c.mu.Lock()
	c.snapshotsEnabled = true
	if c.snapshotsInterrupt == nil {
		c.snapshotsInterrupt = make(chan struct{})
	}
	c.mu.Unlock()
}

// DisableSnapshots disables the compactor from performing compactions.
func (c *Compactor) DisableCompactions() {
	c.mu.Lock()
	c.compactionsEnabled = false
	if c.compactionsInterrupt != nil {
		close(c.compactionsInterrupt)
		c.compactionsInterrupt = nil
	}
	c.mu.Unlock()
}

// EnableCompactions allows the compactor to perform compactions.
func (c *Compactor) EnableCompactions() {
	c.mu.Lock()
	c.compactionsEnabled = true
	if c.compactionsInterrupt == nil {
		c.compactionsInterrupt = make(chan struct{})
	}
	c.mu.Unlock()
}

// WriteSnapshot writes a Cache snapshot to one or more new TSM files.
func (c *Compactor) WriteSnapshot(ctx context.Context, cache *Cache) ([]string, error) {
	span, _ := tracing.StartSpanFromContext(ctx)
	defer span.Finish()

	c.mu.RLock()
	enabled := c.snapshotsEnabled
	intC := c.snapshotsInterrupt
	c.mu.RUnlock()

	if !enabled {
		return nil, errSnapshotsDisabled
	}

	start := time.Now()
	card := cache.Count()

	// Enable throttling if we have lower cardinality or snapshots are going fast.
	throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second

	// Write snapshost concurrently if cardinality is relatively high.
	concurrency := card / 2e6
	if concurrency < 1 {
		concurrency = 1
	}

	// Special case very high cardinality, use max concurrency and don't throttle writes.
	if card >= 3e6 {
		concurrency = 4
		throttle = false
	}

	splits := cache.Split(concurrency)

	type res struct {
		files []string
		err   error
	}

	resC := make(chan res, concurrency)
	for i := 0; i < concurrency; i++ {
		go func(sp *Cache) {
			iter := NewCacheKeyIterator(sp, MaxPointsPerBlock, intC)
			files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)
			resC <- res{files: files, err: err}

		}(splits[i])
	}

	var err error
	files := make([]string, 0, concurrency)
	for i := 0; i < concurrency; i++ {
		result := <-resC
		if result.err != nil {
			err = result.err
		}
		files = append(files, result.files...)
	}

	dur := time.Since(start).Truncate(time.Second)

	c.mu.Lock()

	// See if we were disabled while writing a snapshot
	enabled = c.snapshotsEnabled
	c.lastSnapshotDuration = dur
	c.snapshotLatencies.add(time.Since(start))
	c.mu.Unlock()

	if !enabled {
		return nil, errSnapshotsDisabled
	}

	return files, err
}

// compact writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
	size := c.Size
	if size <= 0 {
		size = MaxPointsPerBlock
	}

	c.mu.RLock()
	intC := c.compactionsInterrupt
	c.mu.RUnlock()

	// The new compacted files need to added to the max generation in the
	// set.  We need to find that max generation as well as the max sequence
	// number to ensure we write to the next unique location.
	var maxGeneration, maxSequence int
	for _, f := range tsmFiles {
		gen, seq, err := c.parseFileName(f)
		if err != nil {
			return nil, err
		}

		if gen > maxGeneration {
			maxGeneration = gen
			maxSequence = seq
		}

		if gen == maxGeneration && seq > maxSequence {
			maxSequence = seq
		}
	}

	// For each TSM file, create a TSM reader
	var trs []*TSMReader
	for _, file := range tsmFiles {
		select {
		case <-intC:
			return nil, errCompactionAborted{}
		default:
		}

		tr := c.FileStore.TSMReader(file)
		if tr == nil {
			// This would be a bug if this occurred as tsmFiles passed in should only be
			// assigned to one compaction at any one time.  A nil tr would mean the file
			// doesn't exist.
			return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
		}
		defer tr.Unref() // inform that we're done with this reader when this method returns.
		trs = append(trs, tr)
	}

	if len(trs) == 0 {
		return nil, nil
	}

	tsm, err := NewTSMBatchKeyIterator(size, fast, intC, trs...)
	if err != nil {
		return nil, err
	}

	return c.writeNewFiles(maxGeneration, maxSequence, tsmFiles, tsm, true)
}

// CompactFull writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFull(tsmFiles []string) ([]string, error) {
	c.mu.RLock()
	enabled := c.compactionsEnabled
	c.mu.RUnlock()

	if !enabled {
		return nil, errCompactionsDisabled
	}

	if !c.add(tsmFiles) {
		return nil, errCompactionInProgress{}
	}
	defer c.remove(tsmFiles)

	files, err := c.compact(false, tsmFiles)

	// See if we were disabled while writing a snapshot
	c.mu.RLock()
	enabled = c.compactionsEnabled
	c.mu.RUnlock()

	if !enabled {
		if err := c.removeTmpFiles(files); err != nil {
			return nil, err
		}
		return nil, errCompactionsDisabled
	}

	return files, err
}

// CompactFast writes multiple smaller TSM files into 1 or more larger files.
func (c *Compactor) CompactFast(tsmFiles []string) ([]string, error) {
	c.mu.RLock()
	enabled := c.compactionsEnabled
	c.mu.RUnlock()

	if !enabled {
		return nil, errCompactionsDisabled
	}

	if !c.add(tsmFiles) {
		return nil, errCompactionInProgress{}
	}
	defer c.remove(tsmFiles)

	files, err := c.compact(true, tsmFiles)

	// See if we were disabled while writing a snapshot
	c.mu.RLock()
	enabled = c.compactionsEnabled
	c.mu.RUnlock()

	if !enabled {
		if err := c.removeTmpFiles(files); err != nil {
			return nil, err
		}
		return nil, errCompactionsDisabled
	}

	return files, err

}

// removeTmpFiles is responsible for cleaning up a compaction that
// was started, but then abandoned before the temporary files were dealt with.
func (c *Compactor) removeTmpFiles(files []string) error {
	for _, f := range files {
		if err := os.Remove(f); err != nil {
			return fmt.Errorf("error removing temp compaction file: %v", err)
		}
	}
	return nil
}

// writeNewFiles writes from the iterator into new TSM files, rotating
// to a new file once it has reached the max TSM file size.
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {
	// These are the new TSM files written
	var files []string

	for {
		sequence++

		// New TSM files are written to a temp file and renamed when fully completed.
		fileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)
		statsFileName := StatsFilename(fileName)

		// Write as much as possible to this file
		err := c.write(fileName, iter, throttle)

		// We've hit the max file limit and there is more to write.  Create a new file
		// and continue.
		if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {
			files = append(files, fileName)
			continue
		} else if err == ErrNoValues {
			// If the file only contained tombstoned entries, then it would be a 0 length
			// file that we can drop.
			if err := os.RemoveAll(fileName); err != nil {
				return nil, err
			} else if err := os.RemoveAll(statsFileName); err != nil && !os.IsNotExist(err) {
				return nil, err
			}
			break
		} else if _, ok := err.(errCompactionInProgress); ok {
			// Don't clean up the file as another compaction is using it.  This should not happen as the
			// planner keeps track of which files are assigned to compaction plans now.
			return nil, err
		} else if err != nil {
			// Remove any tmp files we already completed
			for _, f := range files {
				if err := os.RemoveAll(f); err != nil {
					return nil, err
				} else if err := os.RemoveAll(StatsFilename(f)); err != nil && !os.IsNotExist(err) {
					return nil, err
				}
			}
			// We hit an error and didn't finish the compaction.  Remove the temp file and abort.
			if err := os.RemoveAll(fileName); err != nil {
				return nil, err
			} else if err := os.RemoveAll(statsFileName); err != nil && !os.IsNotExist(err) {
				return nil, err
			}
			return nil, err
		}

		files = append(files, fileName)
		break
	}

	return files, nil
}

func (c *Compactor) write(path string, iter KeyIterator, throttle bool) (err error) {
	fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_EXCL, 0666)
	if err != nil {
		return errCompactionInProgress{err: err}
	}

	// syncingWriter ensures that whatever we wrap the above file descriptor in
	// it will always be able to be synced by the tsm writer, since it does
	// type assertions to attempt to sync.
	type syncingWriter interface {
		io.Writer
		Sync() error
	}

	// Create the write for the new TSM file.
	var (
		w           TSMWriter
		limitWriter syncingWriter = fd
	)

	if c.RateLimit != nil && throttle {
		limitWriter = limiter.NewWriterWithRate(fd, c.RateLimit)
	}

	// Use a disk based TSM buffer if it looks like we might create a big index
	// in memory.
	if iter.EstimatedIndexSize() > 64*1024*1024 {
		w, err = NewTSMWriterWithDiskBuffer(limitWriter)
		if err != nil {
			return err
		}
	} else {
		w, err = NewTSMWriter(limitWriter)
		if err != nil {
			return err
		}
	}

	defer func() {
		closeErr := w.Close()
		if err == nil {
			err = closeErr
		}

		// Check for errors where we should not remove the file
		_, inProgress := err.(errCompactionInProgress)
		maxBlocks := err == ErrMaxBlocksExceeded
		maxFileSize := err == errMaxFileExceeded
		if inProgress || maxBlocks || maxFileSize {
			return
		}

		if err != nil {
			w.Remove()
		}
	}()

	for iter.Next() {
		c.mu.RLock()
		enabled := c.snapshotsEnabled || c.compactionsEnabled
		c.mu.RUnlock()

		if !enabled {
			return errCompactionAborted{}
		}
		// Each call to read returns the next sorted key (or the prior one if there are
		// more values to write).  The size of values will be less than or equal to our
		// chunk size (1000)
		key, minTime, maxTime, block, err := iter.Read()
		if err != nil {
			return err
		}

		if minTime > maxTime {
			return fmt.Errorf("invalid index entry for block. min=%d, max=%d", minTime, maxTime)
		}

		// Write the key and value
		if err := w.WriteBlock(key, minTime, maxTime, block); err == ErrMaxBlocksExceeded {
			if err := w.WriteIndex(); err != nil {
				return err
			}
			return err
		} else if err != nil {
			return err
		}

		// If we have a max file size configured and we're over it, close out the file
		// and return the error.
		if w.Size() > maxTSMFileSize {
			if err := w.WriteIndex(); err != nil {
				return err
			}

			return errMaxFileExceeded
		}
	}

	// Were there any errors encountered during iteration?
	if err := iter.Err(); err != nil {
		return err
	}

	// We're all done.  Close out the file.
	if err := w.WriteIndex(); err != nil {
		return err
	}
	return nil
}

func (c *Compactor) add(files []string) bool {
	c.mu.Lock()
	defer c.mu.Unlock()

	// See if the new files are already in use
	for _, f := range files {
		if _, ok := c.files[f]; ok {
			return false
		}
	}

	// Mark all the new files in use
	for _, f := range files {
		c.files[f] = struct{}{}
	}
	return true
}

func (c *Compactor) remove(files []string) {
	c.mu.Lock()
	defer c.mu.Unlock()
	for _, f := range files {
		delete(c.files, f)
	}
}

// KeyIterator allows iteration over set of keys and values in sorted order.
type KeyIterator interface {
	// Next returns true if there are any values remaining in the iterator.
	Next() bool

	// Read returns the key, time range, and raw data for the next block,
	// or any error that occurred.
	Read() (key []byte, minTime int64, maxTime int64, data []byte, err error)

	// Close closes the iterator.
	Close() error

	// Err returns any errors encountered during iteration.
	Err() error

	// EstimatedIndexSize returns the estimated size of the index that would
	// be required to store all the series and entries in the KeyIterator.
	EstimatedIndexSize() int
}

// tsmKeyIterator implements the KeyIterator for set of TSMReaders.  Iteration produces
// keys in sorted order and the values between the keys sorted and deduped.  If any of
// the readers have associated tombstone entries, they are returned as part of iteration.
type tsmKeyIterator struct {
	// readers is the set of readers it produce a sorted key run with
	readers []*TSMReader

	// values is the temporary buffers for each key that is returned by a reader
	values map[string][]Value

	// pos is the current key postion within the corresponding readers slice.  A value of
	// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
	pos []int

	// err is any error we received while iterating values.
	err error

	// indicates whether the iterator should choose a faster merging strategy over a more
	// optimally compressed one.  If fast is true, multiple blocks will just be added as is
	// and not combined.  In some cases, a slower path will need to be utilized even when
	// fast is true to prevent overlapping blocks of time for the same key.
	// If false, the blocks will be decoded and duplicated (if needed) and
	// then chunked into the maximally sized blocks.
	fast bool

	// size is the maximum number of values to encode in a single block
	size int

	// key is the current key lowest key across all readers that has not be fully exhausted
	// of values.
	key []byte
	typ byte

	iterators []*BlockIterator
	blocks    blocks

	buf []blocks

	// mergeValues are decoded blocks that have been combined
	mergedFloatValues    FloatValues
	mergedIntegerValues  IntegerValues
	mergedUnsignedValues UnsignedValues
	mergedBooleanValues  BooleanValues
	mergedStringValues   StringValues

	// merged are encoded blocks that have been combined or used as is
	// without decode
	merged    blocks
	interrupt chan struct{}
}

type block struct {
	key              []byte
	minTime, maxTime int64
	typ              byte
	b                []byte
	tombstones       []TimeRange

	// readMin, readMax are the timestamps range of values have been
	// read and encoded from this block.
	readMin, readMax int64
}

func (b *block) overlapsTimeRange(min, max int64) bool {
	return b.minTime <= max && b.maxTime >= min
}

func (b *block) read() bool {
	return b.readMin <= b.minTime && b.readMax >= b.maxTime
}

func (b *block) markRead(min, max int64) {
	if min < b.readMin {
		b.readMin = min
	}

	if max > b.readMax {
		b.readMax = max
	}
}

func (b *block) partiallyRead() bool {
	// If readMin and readMax are still the initial values, nothing has been read.
	if b.readMin == int64(math.MaxInt64) && b.readMax == int64(math.MinInt64) {
		return false
	}
	return b.readMin != b.minTime || b.readMax != b.maxTime
}

type blocks []*block

func (a blocks) Len() int { return len(a) }

func (a blocks) Less(i, j int) bool {
	cmp := bytes.Compare(a[i].key, a[j].key)
	if cmp == 0 {
		return a[i].minTime < a[j].minTime && a[i].maxTime < a[j].minTime
	}
	return cmp < 0
}

func (a blocks) Swap(i, j int) { a[i], a[j] = a[j], a[i] }

// NewTSMKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
	var iter []*BlockIterator
	for _, r := range readers {
		iter = append(iter, r.BlockIterator())
	}

	return &tsmKeyIterator{
		readers:   readers,
		values:    map[string][]Value{},
		pos:       make([]int, len(readers)),
		size:      size,
		iterators: iter,
		fast:      fast,
		buf:       make([]blocks, len(iter)),
		interrupt: interrupt,
	}, nil
}

func (k *tsmKeyIterator) hasMergedValues() bool {
	return len(k.mergedFloatValues) > 0 ||
		len(k.mergedIntegerValues) > 0 ||
		len(k.mergedUnsignedValues) > 0 ||
		len(k.mergedStringValues) > 0 ||
		len(k.mergedBooleanValues) > 0
}

func (k *tsmKeyIterator) EstimatedIndexSize() int {
	var size uint32
	for _, r := range k.readers {
		size += r.IndexSize()
	}
	return int(size) / len(k.readers)
}

// Next returns true if there are any values remaining in the iterator.
func (k *tsmKeyIterator) Next() bool {
RETRY:
	// Any merged blocks pending?
	if len(k.merged) > 0 {
		k.merged = k.merged[1:]
		if len(k.merged) > 0 {
			return true
		}
	}

	// Any merged values pending?
	if k.hasMergedValues() {
		k.merge()
		if len(k.merged) > 0 || k.hasMergedValues() {
			return true
		}
	}

	// If we still have blocks from the last read, merge them
	if len(k.blocks) > 0 {
		k.merge()
		if len(k.merged) > 0 || k.hasMergedValues() {
			return true
		}
	}

	// Read the next block from each TSM iterator
	for i, v := range k.buf {
		if len(v) == 0 {
			iter := k.iterators[i]
			if iter.Next() {
				key, minTime, maxTime, typ, _, b, err := iter.Read()
				if err != nil {
					k.err = err
				}

				var blk *block
				if cap(k.buf[i]) > len(k.buf[i]) {
					k.buf[i] = k.buf[i][:len(k.buf[i])+1]
					blk = k.buf[i][len(k.buf[i])-1]
					if blk == nil {
						blk = &block{}
						k.buf[i][len(k.buf[i])-1] = blk
					}
				} else {
					blk = &block{}
					k.buf[i] = append(k.buf[i], blk)
				}
				blk.minTime = minTime
				blk.maxTime = maxTime
				blk.key = key
				blk.typ = typ
				blk.b = b
				blk.readMin = math.MaxInt64
				blk.readMax = math.MinInt64

				// This block may have ranges of time removed from it that would
				// reduce the block min and max time.
				blk.tombstones = iter.r.TombstoneRange(key, blk.tombstones[:0])

				blockKey := key
				for bytes.Equal(iter.PeekNext(), blockKey) {
					iter.Next()
					key, minTime, maxTime, typ, _, b, err := iter.Read()
					if err != nil {
						k.err = err
					}

					var blk *block
					if cap(k.buf[i]) > len(k.buf[i]) {
						k.buf[i] = k.buf[i][:len(k.buf[i])+1]
						blk = k.buf[i][len(k.buf[i])-1]
						if blk == nil {
							blk = &block{}
							k.buf[i][len(k.buf[i])-1] = blk
						}
					} else {
						blk = &block{}
						k.buf[i] = append(k.buf[i], blk)
					}

					blk.minTime = minTime
					blk.maxTime = maxTime
					blk.key = key
					blk.typ = typ
					blk.b = b
					blk.readMin = math.MaxInt64
					blk.readMax = math.MinInt64
					blk.tombstones = iter.r.TombstoneRange(key, blk.tombstones[:0])
				}
			}

			if iter.Err() != nil {
				k.err = iter.Err()
			}
		}
	}

	// Each reader could have a different key that it's currently at, need to find
	// the next smallest one to keep the sort ordering.
	var minKey []byte
	var minType byte
	for _, b := range k.buf {
		// block could be nil if the iterator has been exhausted for that file
		if len(b) == 0 {
			continue
		}
		if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
			minKey = b[0].key
			minType = b[0].typ
		}
	}
	k.key = minKey
	k.typ = minType

	// Now we need to find all blocks that match the min key so we can combine and dedupe
	// the blocks if necessary
	for i, b := range k.buf {
		if len(b) == 0 {
			continue
		}
		if bytes.Equal(b[0].key, k.key) {
			k.blocks = append(k.blocks, b...)
			k.buf[i] = k.buf[i][:0]
		}
	}

	if len(k.blocks) == 0 {
		return false
	}

	k.merge()

	// After merging all the values for this key, we might not have any.  (e.g. they were all deleted
	// through many tombstones).  In this case, move on to the next key instead of ending iteration.
	if len(k.merged) == 0 {
		goto RETRY
	}

	return len(k.merged) > 0
}

// merge combines the next set of blocks into merged blocks.
func (k *tsmKeyIterator) merge() {
	switch k.typ {
	case BlockFloat64:
		k.mergeFloat()
	case BlockInteger:
		k.mergeInteger()
	case BlockUnsigned:
		k.mergeUnsigned()
	case BlockBoolean:
		k.mergeBoolean()
	case BlockString:
		k.mergeString()
	default:
		k.err = fmt.Errorf("unknown block type: %v", k.typ)
	}
}

func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
	// See if compactions were disabled while we were running.
	select {
	case <-k.interrupt:
		return nil, 0, 0, nil, errCompactionAborted{}
	default:
	}

	if len(k.merged) == 0 {
		return nil, 0, 0, nil, k.err
	}

	block := k.merged[0]
	return block.key, block.minTime, block.maxTime, block.b, k.err
}

func (k *tsmKeyIterator) Close() error {
	k.values = nil
	k.pos = nil
	k.iterators = nil
	for _, r := range k.readers {
		if err := r.Close(); err != nil {
			return err
		}
	}
	return nil
}

// Error returns any errors encountered during iteration.
func (k *tsmKeyIterator) Err() error {
	return k.err
}

// tsmBatchKeyIterator implements the KeyIterator for set of TSMReaders.  Iteration produces
// keys in sorted order and the values between the keys sorted and deduped.  If any of
// the readers have associated tombstone entries, they are returned as part of iteration.
type tsmBatchKeyIterator struct {
	// readers is the set of readers it produce a sorted key run with
	readers []*TSMReader

	// values is the temporary buffers for each key that is returned by a reader
	values map[string][]Value

	// pos is the current key postion within the corresponding readers slice.  A value of
	// pos[0] = 1, means the reader[0] is currently at key 1 in its ordered index.
	pos []int

	// err is any error we received while iterating values.
	err error

	// indicates whether the iterator should choose a faster merging strategy over a more
	// optimally compressed one.  If fast is true, multiple blocks will just be added as is
	// and not combined.  In some cases, a slower path will need to be utilized even when
	// fast is true to prevent overlapping blocks of time for the same key.
	// If false, the blocks will be decoded and duplicated (if needed) and
	// then chunked into the maximally sized blocks.
	fast bool

	// size is the maximum number of values to encode in a single block
	size int

	// key is the current key lowest key across all readers that has not be fully exhausted
	// of values.
	key []byte
	typ byte

	iterators []*BlockIterator
	blocks    blocks

	buf []blocks

	// mergeValues are decoded blocks that have been combined
	mergedFloatValues    *tsdb.FloatArray
	mergedIntegerValues  *tsdb.IntegerArray
	mergedUnsignedValues *tsdb.UnsignedArray
	mergedBooleanValues  *tsdb.BooleanArray
	mergedStringValues   *tsdb.StringArray

	// merged are encoded blocks that have been combined or used as is
	// without decode
	merged    blocks
	interrupt chan struct{}
}

// NewTSMBatchKeyIterator returns a new TSM key iterator from readers.
// size indicates the maximum number of values to encode in a single block.
func NewTSMBatchKeyIterator(size int, fast bool, interrupt chan struct{}, readers ...*TSMReader) (KeyIterator, error) {
	var iter []*BlockIterator
	for _, r := range readers {
		iter = append(iter, r.BlockIterator())
	}

	return &tsmBatchKeyIterator{
		readers:              readers,
		values:               map[string][]Value{},
		pos:                  make([]int, len(readers)),
		size:                 size,
		iterators:            iter,
		fast:                 fast,
		buf:                  make([]blocks, len(iter)),
		mergedFloatValues:    &tsdb.FloatArray{},
		mergedIntegerValues:  &tsdb.IntegerArray{},
		mergedUnsignedValues: &tsdb.UnsignedArray{},
		mergedBooleanValues:  &tsdb.BooleanArray{},
		mergedStringValues:   &tsdb.StringArray{},
		interrupt:            interrupt,
	}, nil
}

func (k *tsmBatchKeyIterator) hasMergedValues() bool {
	return k.mergedFloatValues.Len() > 0 ||
		k.mergedIntegerValues.Len() > 0 ||
		k.mergedUnsignedValues.Len() > 0 ||
		k.mergedStringValues.Len() > 0 ||
		k.mergedBooleanValues.Len() > 0
}

func (k *tsmBatchKeyIterator) EstimatedIndexSize() int {
	var size uint32
	for _, r := range k.readers {
		size += r.IndexSize()
	}
	return int(size) / len(k.readers)
}

// Next returns true if there are any values remaining in the iterator.
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:
	// Any merged blocks pending?
	if len(k.merged) > 0 {
		k.merged = k.merged[1:]
		if len(k.merged) > 0 {
			return true
		}
	}

	// Any merged values pending?
	if k.hasMergedValues() {
		k.merge()
		if len(k.merged) > 0 || k.hasMergedValues() {
			return true
		}
	}

	// If we still have blocks from the last read, merge them
	if len(k.blocks) > 0 {
		k.merge()
		if len(k.merged) > 0 || k.hasMergedValues() {
			return true
		}
	}

	// Read the next block from each TSM iterator
	for i, v := range k.buf {
		if len(v) != 0 {
			continue
		}

		iter := k.iterators[i]
		if iter.Next() {
			key, minTime, maxTime, typ, _, b, err := iter.Read()
			if err != nil {
				k.err = err
			}

			var blk *block
			if cap(k.buf[i]) > len(k.buf[i]) {
				k.buf[i] = k.buf[i][:len(k.buf[i])+1]
				blk = k.buf[i][len(k.buf[i])-1]
				if blk == nil {
					blk = &block{}
					k.buf[i][len(k.buf[i])-1] = blk
				}
			} else {
				blk = &block{}
				k.buf[i] = append(k.buf[i], blk)
			}
			blk.minTime = minTime
			blk.maxTime = maxTime
			blk.key = key
			blk.typ = typ
			blk.b = b
			blk.readMin = math.MaxInt64
			blk.readMax = math.MinInt64

			// This block may have ranges of time removed from it that would
			// reduce the block min and max time.
			blk.tombstones = iter.r.TombstoneRange(key, blk.tombstones[:0])

			blockKey := key
			for bytes.Equal(iter.PeekNext(), blockKey) {
				iter.Next()
				key, minTime, maxTime, typ, _, b, err := iter.Read()
				if err != nil {
					k.err = err
				}

				var blk *block
				if cap(k.buf[i]) > len(k.buf[i]) {
					k.buf[i] = k.buf[i][:len(k.buf[i])+1]
					blk = k.buf[i][len(k.buf[i])-1]
					if blk == nil {
						blk = &block{}
						k.buf[i][len(k.buf[i])-1] = blk
					}
				} else {
					blk = &block{}
					k.buf[i] = append(k.buf[i], blk)
				}

				blk.minTime = minTime
				blk.maxTime = maxTime
				blk.key = key
				blk.typ = typ
				blk.b = b
				blk.readMin = math.MaxInt64
				blk.readMax = math.MinInt64
				blk.tombstones = iter.r.TombstoneRange(key, blk.tombstones[:0])
			}
		}

		if iter.Err() != nil {
			k.err = iter.Err()
		}
	}

	// Each reader could have a different key that it's currently at, need to find
	// the next smallest one to keep the sort ordering.
	var minKey []byte
	var minType byte
	for _, b := range k.buf {
		// block could be nil if the iterator has been exhausted for that file
		if len(b) == 0 {
			continue
		}
		if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {
			minKey = b[0].key
			minType = b[0].typ
		}
	}
	k.key = minKey
	k.typ = minType

	// Now we need to find all blocks that match the min key so we can combine and dedupe
	// the blocks if necessary
	for i, b := range k.buf {
		if len(b) == 0 {
			continue
		}
		if bytes.Equal(b[0].key, k.key) {
			k.blocks = append(k.blocks, b...)
			k.buf[i] = k.buf[i][:0]
		}
	}

	if len(k.blocks) == 0 {
		return false
	}

	k.merge()

	// After merging all the values for this key, we might not have any.  (e.g. they were all deleted
	// through many tombstones).  In this case, move on to the next key instead of ending iteration.
	if len(k.merged) == 0 {
		goto RETRY
	}

	return len(k.merged) > 0
}

// merge combines the next set of blocks into merged blocks.
func (k *tsmBatchKeyIterator) merge() {
	switch k.typ {
	case BlockFloat64:
		k.mergeFloat()
	case BlockInteger:
		k.mergeInteger()
	case BlockUnsigned:
		k.mergeUnsigned()
	case BlockBoolean:
		k.mergeBoolean()
	case BlockString:
		k.mergeString()
	default:
		k.err = fmt.Errorf("unknown block type: %v", k.typ)
	}
}

func (k *tsmBatchKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
	// See if compactions were disabled while we were running.
	select {
	case <-k.interrupt:
		return nil, 0, 0, nil, errCompactionAborted{}
	default:
	}

	if len(k.merged) == 0 {
		return nil, 0, 0, nil, k.err
	}

	block := k.merged[0]
	return block.key, block.minTime, block.maxTime, block.b, k.err
}

func (k *tsmBatchKeyIterator) Close() error {
	k.values = nil
	k.pos = nil
	k.iterators = nil
	for _, r := range k.readers {
		if err := r.Close(); err != nil {
			return err
		}
	}
	return nil
}

// Error returns any errors encountered during iteration.
func (k *tsmBatchKeyIterator) Err() error {
	return k.err
}

type cacheKeyIterator struct {
	cache *Cache
	size  int
	order [][]byte

	i         int
	blocks    [][]cacheBlock
	ready     []chan struct{}
	interrupt chan struct{}
	err       error
}

type cacheBlock struct {
	k                []byte
	minTime, maxTime int64
	b                []byte
	err              error
}

// NewCacheKeyIterator returns a new KeyIterator from a Cache.
func NewCacheKeyIterator(cache *Cache, size int, interrupt chan struct{}) KeyIterator {
	keys := cache.Keys()

	chans := make([]chan struct{}, len(keys))
	for i := 0; i < len(keys); i++ {
		chans[i] = make(chan struct{}, 1)
	}

	cki := &cacheKeyIterator{
		i:         -1,
		size:      size,
		cache:     cache,
		order:     keys,
		ready:     chans,
		blocks:    make([][]cacheBlock, len(keys)),
		interrupt: interrupt,
	}
	go cki.encode()
	return cki
}

func (c *cacheKeyIterator) EstimatedIndexSize() int {
	var n int
	for _, v := range c.order {
		n += len(v)
	}
	return n
}

func (c *cacheKeyIterator) encode() {
	concurrency := runtime.GOMAXPROCS(0)
	n := len(c.ready)

	// Divide the keyset across each CPU
	chunkSize := 1
	idx := uint64(0)

	for i := 0; i < concurrency; i++ {
		// Run one goroutine per CPU and encode a section of the key space concurrently
		go func() {
			tenc := getTimeEncoder(MaxPointsPerBlock)
			fenc := getFloatEncoder(MaxPointsPerBlock)
			benc := getBooleanEncoder(MaxPointsPerBlock)
			uenc := getUnsignedEncoder(MaxPointsPerBlock)
			senc := getStringEncoder(MaxPointsPerBlock)
			ienc := getIntegerEncoder(MaxPointsPerBlock)

			defer putTimeEncoder(tenc)
			defer putFloatEncoder(fenc)
			defer putBooleanEncoder(benc)
			defer putUnsignedEncoder(uenc)
			defer putStringEncoder(senc)
			defer putIntegerEncoder(ienc)

			for {
				i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSize

				if i >= n {
					break
				}

				key := c.order[i]
				values := c.cache.values(key)

				for len(values) > 0 {

					end := len(values)
					if end > c.size {
						end = c.size
					}

					minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()
					var b []byte
					var err error

					switch values[0].(type) {
					case FloatValue:
						b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)
					case IntegerValue:
						b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)
					case UnsignedValue:
						b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)
					case BooleanValue:
						b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)
					case StringValue:
						b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)
					default:
						b, err = Values(values[:end]).Encode(nil)
					}

					values = values[end:]

					c.blocks[i] = append(c.blocks[i], cacheBlock{
						k:       key,
						minTime: minTime,
						maxTime: maxTime,
						b:       b,
						err:     err,
					})

					if err != nil {
						c.err = err
					}
				}
				// Notify this key is fully encoded
				c.ready[i] <- struct{}{}
			}
		}()
	}
}

func (c *cacheKeyIterator) Next() bool {
	if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 {
		c.blocks[c.i] = c.blocks[c.i][1:]
		if len(c.blocks[c.i]) > 0 {
			return true
		}
	}
	c.i++

	if c.i >= len(c.ready) {
		return false
	}

	<-c.ready[c.i]
	return true
}

func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
	// See if snapshot compactions were disabled while we were running.
	select {
	case <-c.interrupt:
		c.err = errCompactionAborted{}
		return nil, 0, 0, nil, c.err
	default:
	}

	blk := c.blocks[c.i][0]
	return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err
}

func (c *cacheKeyIterator) Close() error {
	return nil
}

func (c *cacheKeyIterator) Err() error {
	return c.err
}

type tsmGenerations []*tsmGeneration

func (a tsmGenerations) Len() int           { return len(a) }
func (a tsmGenerations) Less(i, j int) bool { return a[i].id < a[j].id }
func (a tsmGenerations) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a tsmGenerations) hasTombstones() bool {
	for _, g := range a {
		if g.hasTombstones() {
			return true
		}
	}
	return false
}

func (a tsmGenerations) level() int {
	var level int
	for _, g := range a {
		lev := g.level()
		if lev > level {
			level = lev
		}
	}
	return level
}

func (a tsmGenerations) chunk(size int) []tsmGenerations {
	var chunks []tsmGenerations
	for len(a) > 0 {
		if len(a) >= size {
			chunks = append(chunks, a[:size])
			a = a[size:]
		} else {
			chunks = append(chunks, a)
			a = a[len(a):]
		}
	}
	return chunks
}

func (a tsmGenerations) IsSorted() bool {
	if len(a) == 1 {
		return true
	}

	for i := 1; i < len(a); i++ {
		if a.Less(i, i-1) {
			return false
		}
	}
	return true
}

type latencies struct {
	i      int
	values []time.Duration
}

func (l *latencies) add(t time.Duration) {
	l.values[l.i%len(l.values)] = t
	l.i++
}

func (l *latencies) avg() time.Duration {
	var n int64
	var sum time.Duration
	for _, v := range l.values {
		if v == 0 {
			continue
		}
		sum += v
		n++
	}

	if n > 0 {
		return time.Duration(int64(sum) / n)
	}
	return time.Duration(0)
}