diff --git a/cmd/influx_tsm/converter.go b/cmd/influx_tsm/converter.go index e4aaeff781..8ca4407d62 100644 --- a/cmd/influx_tsm/converter.go +++ b/cmd/influx_tsm/converter.go @@ -27,7 +27,7 @@ type KeyIterator interface { type Converter struct { path string maxTSMFileSize uint32 - generation int + sequence int } // NewConverter returns a new instance of the Converter. @@ -45,21 +45,21 @@ func (c *Converter) Process(iter KeyIterator) error { return err } - w, err := c.nextTSMWriter() - if err != nil { - return err - } - atomic.AddUint64(&TsmFilesCreated, 1) - defer w.Close() - // Iterate until no more data remains. + var w tsm1.TSMWriter for iter.Next() { k, v, err := iter.Read() if err != nil { return err } - scrubbed := scrubValues(v) + + if w == nil { + w, err = c.nextTSMWriter() + if err != nil { + return err + } + } if err := w.Write(k, scrubbed); err != nil { return err } @@ -74,26 +74,26 @@ func (c *Converter) Process(iter KeyIterator) error { if err := w.Close(); err != nil { return err } - - w, err = c.nextTSMWriter() - if err != nil { - return err - } - atomic.AddUint64(&TsmFilesCreated, 1) + w = nil } } - // All done! - if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { - return err + if w != nil { + if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { + return err + } + if err := w.Close(); err != nil { + return err + } } + return nil } // nextTSMWriter returns the next TSMWriter for the Converter. func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { - c.generation++ - fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", c.generation, 0, tsm1.TSMFileExtension)) + c.sequence++ + fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", 1, c.sequence, tsm1.TSMFileExtension)) fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) if err != nil { @@ -106,6 +106,7 @@ func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { return nil, err } + atomic.AddUint64(&TsmFilesCreated, 1) return w, nil }