Merge pull request #5060 from influxdb/jw-drop-db
Cancel writing TSM files when engine closespull/5069/head
commit
992aea7bd3
|
@ -242,7 +242,8 @@ func (c *DefaultPlanner) findGenerations() tsmGenerations {
|
|||
// Compactor merges multiple TSM files into new files or
|
||||
// writes a Cache into 1 or more TSM files
|
||||
type Compactor struct {
|
||||
Dir string
|
||||
Dir string
|
||||
Cancel chan struct{}
|
||||
|
||||
FileStore interface {
|
||||
NextGeneration() int
|
||||
|
@ -313,6 +314,7 @@ func (c *Compactor) Clone() *Compactor {
|
|||
return &Compactor{
|
||||
Dir: c.Dir,
|
||||
FileStore: c.FileStore,
|
||||
Cancel: c.Cancel,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -377,6 +379,12 @@ func (c *Compactor) write(path string, iter KeyIterator) error {
|
|||
defer w.Close()
|
||||
|
||||
for iter.Next() {
|
||||
select {
|
||||
case <-c.Cancel:
|
||||
return fmt.Errorf("compaction aborted")
|
||||
default:
|
||||
}
|
||||
|
||||
// Each call to read returns the next sorted key (or the prior one if there are
|
||||
// more values to write). The size of values will be less than or equal to our
|
||||
// chunk size (1000)
|
||||
|
|
|
@ -108,6 +108,7 @@ func (e *DevEngine) Format() tsdb.EngineFormat {
|
|||
// Open opens and initializes the engine.
|
||||
func (e *DevEngine) Open() error {
|
||||
e.done = make(chan struct{})
|
||||
e.Compactor.Cancel = e.done
|
||||
|
||||
if err := os.MkdirAll(e.path, 0777); err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue