diff --git a/cmd/influx-tools/importer/importer.go b/cmd/influx-tools/importer/importer.go index b266a1789a..c11dab14f2 100644 --- a/cmd/influx-tools/importer/importer.go +++ b/cmd/influx-tools/importer/importer.go @@ -9,6 +9,7 @@ import ( "time" "github.com/influxdata/influxdb/cmd/influx-tools/internal/errlist" + "github.com/influxdata/influxdb/cmd/influx-tools/internal/shard" "github.com/influxdata/influxdb/cmd/influx-tools/server" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" @@ -26,7 +27,7 @@ type importer struct { log *zap.Logger skipShard bool currentShard uint64 - sh *shardWriter + sh *shard.Writer sfile *tsdb.SeriesFile sw *seriesWriter buildTsi bool @@ -146,7 +147,7 @@ func (i *importer) StartShardGroup(start int64, end int64) error { } i.skipShard = false - i.sh = newShardWriter(shardID, shardsPath) + i.sh = shard.NewWriter(shardID, shardsPath) i.currentShard = shardID i.startSeriesFile() @@ -171,9 +172,9 @@ func (i *importer) Write(key []byte, values tsm1.Values) error { return errors.New("importer not currently writing a shard") } i.sh.Write(key, values) - if i.sh.err != nil { + if i.sh.Err() != nil { el := errlist.NewErrorList() - el.Add(i.sh.err) + el.Add(i.sh.Err()) el.Add(i.CloseShardGroup()) el.Add(i.removeShardGroup(i.rpi.Name, i.currentShard)) i.sh = nil @@ -191,8 +192,8 @@ func (i *importer) CloseShardGroup() error { el := errlist.NewErrorList() el.Add(i.closeSeriesFile()) i.sh.Close() - if i.sh.err != nil { - el.Add(i.sh.err) + if i.sh.Err() != nil { + el.Add(i.sh.Err()) } i.sh = nil return el.Err() @@ -209,9 +210,9 @@ func (i *importer) startSeriesFile() error { var err error if i.buildTsi { - i.sw, err = newTSI1SeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.id)) + i.sw, err = newTSI1SeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID())) } else { - i.sw, err = newInMemSeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.id), i.seriesBuf) + i.sw, err = newInMemSeriesWriter(i.sfile, i.db, dataPath, shardPath, int(i.sh.ShardID()), i.seriesBuf) } if err != nil { diff --git a/cmd/influx-tools/importer/shard_writer.go b/cmd/influx-tools/importer/shard_writer.go deleted file mode 100644 index 1896117972..0000000000 --- a/cmd/influx-tools/importer/shard_writer.go +++ /dev/null @@ -1,96 +0,0 @@ -package importer - -import ( - "fmt" - "os" - "path/filepath" - "strconv" - - "github.com/influxdata/influxdb/cmd/influx-tools/internal/errlist" - "github.com/influxdata/influxdb/tsdb/engine/tsm1" -) - -const ( - maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB -) - -type shardWriter struct { - w tsm1.TSMWriter - id uint64 - path string - gen, seq int - err error -} - -func newShardWriter(id uint64, path string) *shardWriter { - t := &shardWriter{id: id, path: path, gen: 1, seq: 1} - return t -} - -func (t *shardWriter) Write(key []byte, values tsm1.Values) { - if t.err != nil { - return - } - - if t.w == nil { - t.nextTSM() - } - - if t.w.Size() > maxTSMFileSize { - t.closeTSM() - t.nextTSM() - } - - if err := t.w.Write(key, values); err != nil { - if err == tsm1.ErrMaxBlocksExceeded { - t.closeTSM() - t.nextTSM() - } else { - t.err = err - } - } -} - -func (t *shardWriter) Close() { - if t.w != nil { - t.closeTSM() - } -} - -func (t *shardWriter) Err() error { return t.err } - -func (t *shardWriter) nextTSM() { - fileName := filepath.Join(t.path, strconv.Itoa(int(t.id)), fmt.Sprintf("%09d-%09d.%s", t.gen, t.seq, tsm1.TSMFileExtension)) - t.seq++ - - fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) - if err != nil { - t.err = err - return - } - - // Create the writer for the new TSM file. - t.w, err = tsm1.NewTSMWriter(fd) - if err != nil { - t.err = err - return - } -} - -func (t *shardWriter) closeTSM() { - el := errlist.NewErrorList() - if err := t.w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { - el.Add(err) - } - - if err := t.w.Close(); err != nil { - el.Add(err) - } - - err := el.Err() - if err != nil { - t.err = err - } - - t.w = nil -} diff --git a/cmd/influx-tools/internal/shard/writer.go b/cmd/influx-tools/internal/shard/writer.go new file mode 100644 index 0000000000..d61b2605d8 --- /dev/null +++ b/cmd/influx-tools/internal/shard/writer.go @@ -0,0 +1,142 @@ +package shard + +import ( + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/influxdata/influxdb/cmd/influx-tools/internal/errlist" + "github.com/influxdata/influxdb/tsdb/engine/tsm1" +) + +const ( + maxTSMFileSize = uint32(2048 * 1024 * 1024) // 2GB +) + +type Writer struct { + tw tsm1.TSMWriter + id uint64 + path string + ext string + files []string + gen, seq int + err error + auto bool +} + +type option func(w *Writer) + +// Generation specifies the generation number of the tsm files. +func Generation(gen int) option { + return func(w *Writer) { + w.gen = gen + } +} + +// Sequence specifies the starting sequence number of the tsm files. +func Sequence(seq int) option { + return func(w *Writer) { + w.seq = seq + } +} + +// Temporary configures the writer to create tsm.tmp files. +func Temporary() option { + return func(w *Writer) { + w.ext = tsm1.TSMFileExtension + "." + tsm1.TmpTSMFileExtension + } +} + +// AutoNumber will read the existing TSM file names and use generation + 1 +func AutoNumber() option { + return func(w *Writer) { + w.auto = true + } +} + +func NewWriter(id uint64, path string, opts ...option) *Writer { + w := &Writer{id: id, path: path, gen: 1, seq: 1, ext: tsm1.TSMFileExtension} + + for _, opt := range opts { + opt(w) + } + + return w +} + +func (w *Writer) Write(key []byte, values tsm1.Values) { + if w.err != nil { + return + } + + if w.tw == nil { + w.nextTSM() + } + + if w.tw.Size() > maxTSMFileSize { + w.closeTSM() + w.nextTSM() + } + + if err := w.tw.Write(key, values); err != nil { + if err == tsm1.ErrMaxBlocksExceeded { + w.closeTSM() + w.nextTSM() + } else { + w.err = err + } + } +} + +// Close closes the writer. +func (w *Writer) Close() { + if w.tw != nil { + w.closeTSM() + } +} + +// ShardID returns the shard number of the writer. +func (w *Writer) ShardID() uint64 { return w.id } + +func (w *Writer) Err() error { return w.err } + +// Files returns the full paths of all the files written by the Writer. +func (w *Writer) Files() []string { return w.files } + +func (w *Writer) nextTSM() { + fileName := filepath.Join(w.path, strconv.Itoa(int(w.id)), fmt.Sprintf("%09d-%09d.%s", w.gen, w.seq, w.ext)) + w.files = append(w.files, fileName) + w.seq++ + + fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + w.err = err + return + } + + // Create the writer for the new TSM file. + w.tw, err = tsm1.NewTSMWriter(fd) + if err != nil { + w.err = err + return + } +} + +func (w *Writer) closeTSM() { + el := errlist.NewErrorList() + if err := w.tw.WriteIndex(); err != nil && err != tsm1.ErrNoValues { + el.Add(err) + } + + if err := w.tw.Close(); err != nil { + el.Add(err) + } + + err := el.Err() + if err != nil { + w.err = err + } + + w.tw = nil +}