diff --git a/cmd/influx_inspect/info.go b/cmd/influx_inspect/info.go index 381486b3ee..b730dd1cfa 100644 --- a/cmd/influx_inspect/info.go +++ b/cmd/influx_inspect/info.go @@ -101,6 +101,17 @@ func u64tob(v uint64) []byte { return b } +func btou32(b []byte) uint32 { + return binary.BigEndian.Uint32(b) +} + +// u32tob converts a uint32 into an 4-byte slice. +func u32tob(v uint32) []byte { + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, v) + return b +} + // ShardIDs is a collection of UINT 64 that represent shard ids. type ShardIDs []uint64 diff --git a/cmd/influx_inspect/main.go b/cmd/influx_inspect/main.go index 647376b87f..f6504cf3ad 100644 --- a/cmd/influx_inspect/main.go +++ b/cmd/influx_inspect/main.go @@ -16,7 +16,8 @@ Displays detailed information about InfluxDB data files. println(`Commands: info - displays series meta-data for all shards. Default location [$HOME/.influxdb] - dumptsm - dumps low-level details about tsm1 files.`) + dumptsm - dumps low-level details about tsm1 files. + dumptsmdev - dumps low-level details about tsm1dev files.`) println() } @@ -80,6 +81,38 @@ func main() { opts.dumpBlocks = opts.dumpBlocks || dumpAll || opts.filterKey != "" opts.dumpIndex = opts.dumpIndex || dumpAll || opts.filterKey != "" cmdDumpTsm1(opts) + case "dumptsmdev": + var dumpAll bool + opts := &tsdmDumpOpts{} + fs := flag.NewFlagSet("file", flag.ExitOnError) + fs.BoolVar(&opts.dumpIndex, "index", false, "Dump raw index data") + fs.BoolVar(&opts.dumpBlocks, "blocks", false, "Dump raw block data") + fs.BoolVar(&dumpAll, "all", false, "Dump all data. Caution: This may print a lot of information") + fs.StringVar(&opts.filterKey, "filter-key", "", "Only display index and block data match this key substring") + + fs.Usage = func() { + println("Usage: influx_inspect dumptsm [options] \n\n Dumps low-level details about tsm1 files.") + println() + println("Options:") + fs.PrintDefaults() + os.Exit(0) + } + + if err := fs.Parse(flag.Args()[1:]); err != nil { + fmt.Printf("%v", err) + os.Exit(1) + } + + if len(fs.Args()) == 0 || fs.Args()[0] == "" { + fmt.Printf("TSM file not specified\n\n") + fs.Usage() + fs.PrintDefaults() + os.Exit(1) + } + opts.path = fs.Args()[0] + opts.dumpBlocks = opts.dumpBlocks || dumpAll || opts.filterKey != "" + opts.dumpIndex = opts.dumpIndex || dumpAll || opts.filterKey != "" + cmdDumpTsm1dev(opts) default: flag.Usage() os.Exit(1) diff --git a/cmd/influx_inspect/tsm.go b/cmd/influx_inspect/tsm.go index 66788b877e..1c85273de6 100644 --- a/cmd/influx_inspect/tsm.go +++ b/cmd/influx_inspect/tsm.go @@ -441,3 +441,196 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) { println() } } + +func cmdDumpTsm1dev(opts *tsdmDumpOpts) { + var errors []error + + f, err := os.Open(opts.path) + if err != nil { + println(err.Error()) + os.Exit(1) + } + + // Get the file size + stat, err := f.Stat() + if err != nil { + println(err.Error()) + os.Exit(1) + } + b := make([]byte, 8) + + r, err := tsm1.NewTSMReader(f) + if err != nil { + println("Error opening TSM files: ", err.Error()) + } + defer r.Close() + + minTime, maxTime := r.TimeRange() + keys := r.Keys() + + blockStats := &blockStats{} + + println("Summary:") + fmt.Printf(" File: %s\n", opts.path) + fmt.Printf(" Time Range: %s - %s\n", + minTime.UTC().Format(time.RFC3339Nano), + maxTime.UTC().Format(time.RFC3339Nano), + ) + fmt.Printf(" Duration: %s ", maxTime.Sub(minTime)) + fmt.Printf(" Series: %d ", len(keys)) + fmt.Printf(" File Size: %d\n", stat.Size()) + println() + + tw := tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, " "+strings.Join([]string{"Pos", "Min Time", "Max Time", "Ofs", "Size", "Key", "Field"}, "\t")) + var pos int + for _, key := range keys { + for _, e := range r.Entries(key) { + pos++ + split := strings.Split(key, "#!~#") + + // We dont' know know if we have fields so use an informative default + var measurement, field string = "UNKNOWN", "UNKNOWN" + + // Possible corruption? Try to read as much as we can and point to the problem. + measurement = split[0] + field = split[1] + + if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + continue + } + fmt.Fprintln(tw, " "+strings.Join([]string{ + strconv.FormatInt(int64(pos), 10), + e.MinTime.UTC().Format(time.RFC3339Nano), + e.MaxTime.UTC().Format(time.RFC3339Nano), + strconv.FormatInt(int64(e.Offset), 10), + strconv.FormatInt(int64(e.Size), 10), + measurement, + field, + }, "\t")) + } + } + + if opts.dumpIndex { + println("Index:") + tw.Flush() + println() + } + + tw = tabwriter.NewWriter(os.Stdout, 8, 8, 1, '\t', 0) + fmt.Fprintln(tw, " "+strings.Join([]string{"Blk", "Chk", "Ofs", "Len", "Type", "Min Time", "Points", "Enc [T/V]", "Len [T/V]"}, "\t")) + + // Starting at 5 because the magic number is 4 bytes + 1 byte version + i := int64(5) + var blockCount, pointCount, blockSize int64 + indexSize := r.IndexSize() + + // Start at the beginning and read every block + for _, key := range keys { + for _, e := range r.Entries(key) { + + f.Seek(int64(e.Offset), 0) + f.Read(b[:4]) + + chksum := btou32(b) + + buf := make([]byte, e.Size) + f.Read(buf) + + blockSize += int64(len(buf)) + 4 + + startTime := time.Unix(0, int64(btou64(buf[:8]))) + blockType := buf[8] + + encoded := buf[9:] + + var v []tsm1.Value + v, err := tsm1.DecodeBlock(buf, v) + if err != nil { + fmt.Printf("error: %v\n", err.Error()) + os.Exit(1) + } + + pointCount += int64(len(v)) + + // Length of the timestamp block + tsLen, j := binary.Uvarint(encoded) + + // Unpack the timestamp bytes + ts := encoded[int(j) : int(j)+int(tsLen)] + + // Unpack the value bytes + values := encoded[int(j)+int(tsLen):] + + tsEncoding := timeEnc[int(ts[0]>>4)] + vEncoding := encDescs[int(blockType+1)][values[0]>>4] + + typeDesc := blockTypes[blockType] + + blockStats.inc(0, ts[0]>>4) + blockStats.inc(int(blockType+1), values[0]>>4) + blockStats.size(len(buf)) + + if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) { + i += (4 + int64(e.Size)) + blockCount++ + continue + } + + fmt.Fprintln(tw, " "+strings.Join([]string{ + strconv.FormatInt(blockCount, 10), + strconv.FormatUint(uint64(chksum), 10), + strconv.FormatInt(i, 10), + strconv.FormatInt(int64(len(buf)), 10), + typeDesc, + startTime.UTC().Format(time.RFC3339Nano), + strconv.FormatInt(int64(len(v)), 10), + fmt.Sprintf("%s/%s", tsEncoding, vEncoding), + fmt.Sprintf("%d/%d", len(ts), len(values)), + }, "\t")) + + i += (4 + int64(e.Size)) + blockCount++ + } + } + + if opts.dumpBlocks { + println("Blocks:") + tw.Flush() + println() + } + + fmt.Printf("Statistics\n") + fmt.Printf(" Blocks:\n") + fmt.Printf(" Total: %d Size: %d Min: %d Max: %d Avg: %d\n", + blockCount, blockSize, blockStats.min, blockStats.max, blockSize/blockCount) + fmt.Printf(" Index:\n") + fmt.Printf(" Total: %d Size: %d\n", blockCount, indexSize) + fmt.Printf(" Points:\n") + fmt.Printf(" Total: %d", pointCount) + println() + + println(" Encoding:") + for i, counts := range blockStats.counts { + if len(counts) == 0 { + continue + } + fmt.Printf(" %s: ", strings.Title(fieldType[i])) + for j, v := range counts { + fmt.Printf("\t%s: %d (%d%%) ", encDescs[i][j], v, int(float64(v)/float64(blockCount)*100)) + } + println() + } + fmt.Printf(" Compression:\n") + fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount)) + fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount)) + + if len(errors) > 0 { + println() + fmt.Printf("Errors (%d):\n", len(errors)) + for _, err := range errors { + fmt.Printf(" * %v\n", err) + } + println() + } +} diff --git a/tsdb/engine.go b/tsdb/engine.go index 946079028b..899e6af134 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -47,6 +47,7 @@ const ( B1Format EngineFormat = iota BZ1Format TSM1Format + TSM1DevFormat ) // NewEngineFunc creates a new engine. diff --git a/tsdb/engine/tsm1/compact.go b/tsdb/engine/tsm1/compact.go new file mode 100644 index 0000000000..cebe493cd0 --- /dev/null +++ b/tsdb/engine/tsm1/compact.go @@ -0,0 +1,298 @@ +package tsm1 + +// Compactions are the process of creating read-optimized TSM files. +// The files are created by converting write-optimized WAL entries +// to read-optimized TSM format. They can also be created from existing +// TSM files when there are tombstone records that neeed to be removed, points +// that were overwritten by later writes and need to updated, or multiple +// smaller TSM files need to be merged to reduce file counts and improve +// compression ratios. +// +// The the compaction process is stream-oriented using multiple readers and +// iterators. The resulting stream is written sorted and chunked to allow for +// one-pass writing of a new TSM file. + +import ( + "fmt" + "os" + "path/filepath" + "sort" +) + +var errMaxFileExceeded = fmt.Errorf("max file exceeded") + +// Compactor merges multiple WAL segments and TSM files into one or more +// new TSM files. +type Compactor struct { + Dir string + MaxFileSize int + currentID int + + merge *MergeIterator +} + +// Compact converts WAL segements and TSM files into new TSM files. +func (c *Compactor) Compact(walSegments []string) ([]string, error) { + var walReaders []*WALSegmentReader + + // For each segment, create a reader to iterate over each WAL entry + for _, path := range walSegments { + f, err := os.Open(path) + if err != nil { + return nil, err + } + r := NewWALSegmentReader(f) + defer r.Close() + + walReaders = append(walReaders, r) + } + + // WALKeyIterator allows all the segments to be ordered by key and + // sorted values during compaction. + walKeyIterator, err := NewWALKeyIterator(walReaders...) + if err != nil { + return nil, err + } + + // Merge iterator combines the WAL and TSM iterators (note: TSM iteration is + // not in place yet). It will also chunk the values into 1000 element blocks. + c.merge = NewMergeIterator(walKeyIterator, 1000) + defer c.merge.Close() + + // These are the new TSM files written + var files []string + + for { + // TODO: this needs to be intialized based on the existing files on disk + c.currentID++ + + // New TSM files are written to a temp file and renamed when fully completed. + fileName := filepath.Join(c.Dir, fmt.Sprintf("%07d.%s.tmp", c.currentID, Format)) + + // Write as much as possible to this file + err := c.write(fileName) + + // We've hit the max file limit and there is more to write. Create a new file + // and continue. + if err == errMaxFileExceeded { + files = append(files, fileName) + continue + } + + // We hit an error but didn't finish the compaction. Remove the temp file and abort. + if err != nil { + os.RemoveAll(fileName) + return nil, err + } + + files = append(files, fileName) + break + } + c.merge = nil + return files, nil +} + +func (c *Compactor) write(path string) error { + fd, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return err + } + + // Create the write for the new TSM file. + w, err := NewTSMWriter(fd) + if err != nil { + return err + } + + for c.merge.Next() { + + // Each call to read returns the next sorted key (or the prior one if there are + // more values to write). The size of values will be less than or equal to our + // chunk size (1000) + key, values, err := c.merge.Read() + if err != nil { + return err + } + + // Write the key and value + if err := w.Write(key, values); err != nil { + return err + } + + // If we have a max file size configured and we're over it, close out the file + // and return the error. + if c.MaxFileSize != 0 && w.Size() > c.MaxFileSize { + if err := w.WriteIndex(); err != nil { + return err + } + + if err := w.Close(); err != nil { + return err + } + + return errMaxFileExceeded + } + } + + // We're all done. Close out the file. + if err := w.WriteIndex(); err != nil { + return err + } + + if err := w.Close(); err != nil { + return err + } + + return nil +} + +// MergeIterator merges multiple KeyIterators while chunking each read call +// into a fixed size. Each iteration, the lowest lexicographically ordered +// key is returned with the next set of values for that key ordered by time. Values +// with identical times are overwitten by the WAL KeyIterator. +// +// Moving through the full iteration cycle will result in sorted, unique, chunks of values +// up to a max size. Each key returned will be greater than or equal to the prior +// key returned. +type MergeIterator struct { + // wal is the iterator for multiple WAL segments combined + wal KeyIterator + + // size is the maximum value of a chunk to return + size int + + // key is the current iteration series key + key string + + // walBuf is the remaining values from the last wal Read call + walBuf []Value + + // chunk is the current set of values that will be returned by Read + chunk []Value + + // err is any error returned by an underlying iterator to be returned by Read + err error +} + +func (m *MergeIterator) Next() bool { + // Prime the wal buffer if possible + if len(m.walBuf) == 0 && m.wal.Next() { + k, v, err := m.wal.Read() + m.key = k + m.err = err + m.walBuf = v + } + + // Move size elements into the current chunk and slice the same + // amount off of the wal buffer. + if m.size < len(m.walBuf) { + m.chunk = m.walBuf[:m.size] + m.walBuf = m.walBuf[m.size:] + } else { + m.chunk = m.walBuf + m.walBuf = m.walBuf[:0] + } + + return len(m.chunk) > 0 +} + +func (m *MergeIterator) Read() (string, []Value, error) { + return m.key, m.chunk, m.err +} + +func (m *MergeIterator) Close() error { + m.walBuf = nil + m.chunk = nil + return m.wal.Close() +} + +func NewMergeIterator(WAL KeyIterator, size int) *MergeIterator { + m := &MergeIterator{ + wal: WAL, + size: size, + } + return m +} + +// KeyIterator allows iteration over set of keys and values in sorted order. +type KeyIterator interface { + Next() bool + Read() (string, []Value, error) + Close() error +} + +// walKeyIterator allows WAL segments to be iterated over in sorted order. +type walKeyIterator struct { + k string + Order []string + Series map[string]Values +} + +func (k *walKeyIterator) Next() bool { + if len(k.Order) == 0 { + return false + } + k.k = k.Order[0] + k.Order = k.Order[1:] + return true +} + +func (k *walKeyIterator) Read() (string, []Value, error) { + return k.k, k.Series[k.k], nil +} + +func (k *walKeyIterator) Close() error { + k.Order = nil + k.Series = nil + return nil +} + +func NewWALKeyIterator(readers ...*WALSegmentReader) (KeyIterator, error) { + series := map[string]Values{} + order := []string{} + + // Iterate over each reader in order. Later readers will overwrite earlier ones if values + // overlap. + for _, r := range readers { + for r.Next() { + entry, err := r.Read() + if err != nil { + return nil, err + } + + switch t := entry.(type) { + case *WriteWALEntry: + // Each point needs to be decomposed from a time with multiple fields, to a time, value tuple + for k, v := range t.Values { + // Just append each point as we see it. Dedup and sorting happens later. + series[k] = append(series[k], v...) + } + + case *DeleteWALEntry: + // Each key is a series, measurement + tagset string + for _, k := range t.Keys { + // seriesKey is specific to a field, measurment + tagset string + sep + field name + for seriesKey := range series { + // If the delete series key matches the portion before the separator, we delete what we have + if k == seriesKey { + delete(series, seriesKey) + } + } + } + } + } + } + + // Need to create the order that we'll iterate over (sorted key), as well as + // sort and dedup all the points for each key. + for k, v := range series { + order = append(order, k) + series[k] = v.Deduplicate() + } + sort.Strings(order) + + return &walKeyIterator{ + Series: series, + Order: order, + }, nil +} diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go new file mode 100644 index 0000000000..90818e00e9 --- /dev/null +++ b/tsdb/engine/tsm1/compact_test.go @@ -0,0 +1,878 @@ +package tsm1_test + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/influxdb/influxdb/models" + "github.com/influxdb/influxdb/tsdb/engine/tsm1" +) + +// Tests that a single WAL segment can be read and iterated over +func TestKeyIterator_WALSegment_Single(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), 1.1) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: writes, + }, + } + r := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), len(writes); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for _, v := range values { + readValues = true + assertValueEqual(t, v, v1) + } + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that duplicate point values are merged +func TestKeyIterator_WALSegment_Duplicate(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), int64(1)) + v2 := tsm1.NewValue(time.Unix(1, 0), int64(2)) + writes := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1, v2}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: writes, + }, + } + + r := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + readValues = true + assertValueEqual(t, values[0], v2) + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that a multiple WAL segment can be read and iterated over +func TestKeyIterator_WALSegment_Multiple(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), int64(1)) + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + + r1 := MustWALSegment(dir, entries) + + v2 := tsm1.NewValue(time.Unix(2, 0), int64(2)) + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v2}, + } + + entries = []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + + r2 := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r1, r2) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 2; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], v1) + assertValueEqual(t, values[1], v2) + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that a multiple WAL segments with out of order points are +// // sorted while iterating +func TestKeyIterator_WALSegment_MultiplePointsSorted(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(2, 0), int64(2)) + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + r1 := MustWALSegment(dir, entries) + + v2 := tsm1.NewValue(time.Unix(1, 0), int64(1)) + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v2}, + } + + entries = []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + r2 := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r1, r2) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 2; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], v2) + assertValueEqual(t, values[1], v1) + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that multiple keys are iterated over in sorted order +func TestKeyIterator_WALSegment_MultipleKeysSorted(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + points1 := map[string][]tsm1.Value{ + "cpu,host=B#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + r1 := MustWALSegment(dir, entries) + + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v2}, + } + + entries = []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + + r2 := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r1, r2) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + var data = []struct { + key string + value tsm1.Value + }{ + {"cpu,host=A#!~#value", v2}, + {"cpu,host=B#!~#value", v1}, + } + + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, data[0].key; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], data[0].value) + data = data[1:] + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that deletes after writes removes the previous written values +func TestKeyIterator_WALSegment_MultipleKeysDeleted(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + + r1 := MustWALSegment(dir, entries) + + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#count": []tsm1.Value{v2}, + "cpu,host=B#!~#value": []tsm1.Value{v3}, + } + + entries = []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points2, + }, + &tsm1.DeleteWALEntry{ + Keys: []string{ + "cpu,host=A#!~#count", + "cpu,host=A#!~#value", + }, + }, + } + r2 := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r1, r2) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + var data = []struct { + key string + value tsm1.Value + }{ + {"cpu,host=B#!~#value", v3}, + } + + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, data[0].key; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], data[0].value) + data = data[1:] + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that writes, deletes followed by more writes returns the the +// // correct values. +func TestKeyIterator_WALSegment_WriteAfterDelete(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + + r1 := MustWALSegment(dir, entries) + + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#count": []tsm1.Value{v2}, + "cpu,host=B#!~#value": []tsm1.Value{v3}, + } + + entries = []tsm1.WALEntry{ + &tsm1.DeleteWALEntry{ + Keys: []string{ + "cpu,host=A#!~#count", + "cpu,host=A#!~#value", + }, + }, + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + r2 := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r1, r2) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + var readValues bool + var data = []struct { + key string + value tsm1.Value + }{ + {"cpu,host=A#!~#count", v2}, + {"cpu,host=B#!~#value", v3}, + } + + for iter.Next() { + key, values, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error read: %v", err) + } + + if got, exp := key, data[0].key; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], data[0].value) + data = data[1:] + } + + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that merge iterator over a wal returns points order correctly. +func TestMergeIteragor_Single(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v2 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + + points := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1, v2}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points, + }, + } + r := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + // Read should return a chunk of 1 value + m := tsm1.NewMergeIterator(iter, 1) + var readValues bool + for _, p := range points { + if !m.Next() { + t.Fatalf("expected next, got false") + } + + key, values, err := m.Read() + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := key, "cpu,host=A#!~#value"; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), 1; got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + readValues = true + + assertValueEqual(t, values[0], p[0]) + } + if !readValues { + t.Fatalf("failed to read any values") + } +} + +// // Tests that merge iterator over a wal returns points order by key and time. +func TestMergeIteragor_MultipleKeys(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + v4 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + v5 := tsm1.NewValue(time.Unix(1, 0), float64(3)) // overwrites p1 + + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1, v3}, + "cpu,host=B#!~#value": []tsm1.Value{v2}, + } + + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v5}, + "cpu,host=B#!~#value": []tsm1.Value{v4}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + r := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + m := tsm1.NewMergeIterator(iter, 2) + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{v5, v3}}, + {"cpu,host=B#!~#value", []tsm1.Value{v2, v4}}, + } + + for _, p := range data { + if !m.Next() { + t.Fatalf("expected next, got false") + } + + key, values, err := m.Read() + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := key, p.key; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + +// // Tests that the merge iterator does not pull in deleted WAL entries. +func TestMergeIteragor_DeletedKeys(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + v4 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + v5 := tsm1.NewValue(time.Unix(1, 0), float64(3)) // overwrites p1 + + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1, v3}, + "cpu,host=B#!~#value": []tsm1.Value{v2}, + } + + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v5}, + "cpu,host=B#!~#value": []tsm1.Value{v4}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + &tsm1.WriteWALEntry{ + Values: points2, + }, + &tsm1.DeleteWALEntry{ + Keys: []string{"cpu,host=A#!~#value"}, + }, + } + + r := MustWALSegment(dir, entries) + + iter, err := tsm1.NewWALKeyIterator(r) + if err != nil { + t.Fatalf("unexpected error creating WALKeyIterator: %v", err) + } + + m := tsm1.NewMergeIterator(iter, 2) + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=B#!~#value", []tsm1.Value{v2, v4}}, + } + + for _, p := range data { + if !m.Next() { + t.Fatalf("expected next, got false") + } + + key, values, err := m.Read() + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := key, p.key; got != exp { + t.Fatalf("key mismatch: got %v, exp %v", got, exp) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + +// // Tests compacting a single wal segment into one tsm file +func TestCompactor_SingleWALSegment(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1}, + "cpu,host=B#!~#value": []tsm1.Value{v2, v3}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + f := MustTempFile(dir) + defer f.Close() + + w := tsm1.NewWALSegmentWriter(f) + for _, e := range entries { + if err := w.Write(e); err != nil { + t.Fatalf("unexpected error writing entry: %v", err) + } + } + + compactor := &tsm1.Compactor{ + Dir: dir, + } + + files, err := compactor.Compact([]string{f.Name()}) + if err != nil { + t.Fatalf("unexpected error compacting: %v", err) + } + + if got, exp := len(files), 1; got != exp { + t.Fatalf("files length mismatch: got %v, exp %v", got, exp) + } + + f, err = os.Open(files[0]) + if err != nil { + t.Fatalf("unexpected error openting tsm: %v", err) + } + r, err := tsm1.NewTSMReader(f) + if err != nil { + t.Fatalf("unexpected error creating tsm reader: %v", err) + } + + keys := r.Keys() + if got, exp := len(keys), 2; got != exp { + t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) + } + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{v1}}, + {"cpu,host=B#!~#value", []tsm1.Value{v2, v3}}, + } + + for _, p := range data { + values, err := r.ReadAll(p.key) + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + +// // Tests compacting a multiple wal segment into one tsm file +func TestCompactor_MultipleWALSegment(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + + // First WAL segment + v1 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) + v3 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + + points1 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v1, v3}, + "cpu,host=B#!~#value": []tsm1.Value{v2}, + } + + entries := []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points1, + }, + } + + f1 := MustTempFile(dir) + defer f1.Close() + + w := tsm1.NewWALSegmentWriter(f1) + for _, e := range entries { + if err := w.Write(e); err != nil { + t.Fatalf("unexpected error writing entry: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing writer: %v", err) + } + + // Second WAL segment + v4 := tsm1.NewValue(time.Unix(2, 0), float64(2)) + v5 := tsm1.NewValue(time.Unix(3, 0), float64(1)) + v6 := tsm1.NewValue(time.Unix(4, 0), float64(1)) + + points2 := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{v5, v6}, + "cpu,host=B#!~#value": []tsm1.Value{v4}, + } + + entries = []tsm1.WALEntry{ + &tsm1.WriteWALEntry{ + Values: points2, + }, + } + + f2 := MustTempFile(dir) + defer f2.Close() + + w = tsm1.NewWALSegmentWriter(f2) + for _, e := range entries { + if err := w.Write(e); err != nil { + t.Fatalf("unexpected error writing entry: %v", err) + } + } + if err := w.Close(); err != nil { + t.Fatalf("unexpected error closing writer: %v", err) + } + + compactor := &tsm1.Compactor{ + Dir: dir, + } + + files, err := compactor.Compact([]string{f1.Name(), f2.Name()}) + if err != nil { + t.Fatalf("unexpected error compacting: %v", err) + } + + if got, exp := len(files), 1; got != exp { + t.Fatalf("files length mismatch: got %v, exp %v", got, exp) + } + + f, err := os.Open(files[0]) + if err != nil { + t.Fatalf("unexpected error openting tsm: %v", err) + } + defer f.Close() + + r, err := tsm1.NewTSMReader(f) + if err != nil { + t.Fatalf("unexpected error creating tsm reader: %v", err) + } + defer r.Close() + + keys := r.Keys() + if got, exp := len(keys), 2; got != exp { + t.Fatalf("keys length mismatch: got %v, exp %v", got, exp) + } + + var data = []struct { + key string + points []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{v1, v3, v5, v6}}, + {"cpu,host=B#!~#value", []tsm1.Value{v2, v4}}, + } + + for _, p := range data { + values, err := r.ReadAll(p.key) + if err != nil { + t.Fatalf("unexpected error reading: %v", err) + } + + if got, exp := len(values), len(p.points); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, point := range p.points { + assertValueEqual(t, values[i], point) + } + } +} + +func assertValueEqual(t *testing.T, a, b tsm1.Value) { + if got, exp := a.Time(), b.Time(); !got.Equal(exp) { + t.Fatalf("time mismatch: got %v, exp %v", got, exp) + } + if got, exp := a.Value(), b.Value(); got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } +} + +func assertEqual(t *testing.T, a tsm1.Value, b models.Point, field string) { + if got, exp := a.Time(), b.Time(); !got.Equal(exp) { + t.Fatalf("time mismatch: got %v, exp %v", got, exp) + } + if got, exp := a.Value(), b.Fields()[field]; got != exp { + t.Fatalf("value mismatch: got %v, exp %v", got, exp) + } +} + +func MustWALSegment(dir string, entries []tsm1.WALEntry) *tsm1.WALSegmentReader { + f := MustTempFile(dir) + w := tsm1.NewWALSegmentWriter(f) + + for _, e := range entries { + if err := w.Write(e); err != nil { + panic(fmt.Sprintf("write WAL entry: %v", err)) + } + } + + if _, err := f.Seek(0, os.SEEK_SET); err != nil { + panic(fmt.Sprintf("seek WAL: %v", err)) + } + + return tsm1.NewWALSegmentReader(f) +} diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index 86a5d33776..fef499c5ee 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -67,6 +67,7 @@ import ( "fmt" "hash/crc32" "io" + "math" "os" "sort" "sync" @@ -109,6 +110,9 @@ type TSMWriter interface { // Closes any underlying file resources. Close() error + + // Size returns the current size in bytes of the file + Size() int } // TSMIndex represent the index section of a TSM file. The index records all @@ -139,6 +143,9 @@ type TSMIndex interface { // Keys returns the unique set of keys in the index. Keys() []string + // Size returns the size of a the current index in bytes + Size() int + // Type returns the block type of the values stored for the key. Returns one of // BlockFloat64, BlockInt64, BlockBool, BlockString. If key does not exist, // an error is returned. @@ -367,6 +374,10 @@ func (d *directIndex) UnmarshalBinary(b []byte) error { return nil } +func (d *directIndex) Size() int { + return 0 +} + // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // implementation can be used for indexes that may be MMAPed into memory. type indirectIndex struct { @@ -597,6 +608,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error { return nil } +func (d *indirectIndex) Size() int { + return 0 +} + // tsmWriter writes keys and values in the TSM format type tsmWriter struct { w io.Writer @@ -667,6 +682,10 @@ func (t *tsmWriter) Close() error { return nil } +func (t *tsmWriter) Size() int { + return int(t.n) + t.index.Size() +} + type tsmReader struct { mu sync.Mutex @@ -879,6 +898,30 @@ func (t *tsmReader) Delete(key string) error { return nil } +// TimeRange returns the min and max time across all keys in the file. +func (t *tsmReader) TimeRange() (time.Time, time.Time) { + min, max := time.Unix(0, math.MaxInt64), time.Unix(0, math.MinInt64) + for _, k := range t.index.Keys() { + for _, e := range t.index.Entries(k) { + if e.MinTime.Before(min) { + min = e.MinTime + } + if e.MaxTime.After(max) { + max = e.MaxTime + } + } + } + return min, max +} + +func (t *tsmReader) Entries(key string) []*IndexEntry { + return t.index.Entries(key) +} + +func (t *tsmReader) IndexSize() int { + return t.index.Size() +} + type indexEntries struct { Type byte entries []*IndexEntry diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index e2db58f410..a819c2ebd9 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -32,6 +32,7 @@ type Value interface { UnixNano() int64 Value() interface{} Size() int + String() string } func NewValue(t time.Time, value interface{}) Value { @@ -55,6 +56,7 @@ func (e *EmptyValue) UnixNano() int64 { return tsdb.EOF } func (e *EmptyValue) Time() time.Time { return time.Unix(0, tsdb.EOF) } func (e *EmptyValue) Value() interface{} { return nil } func (e *EmptyValue) Size() int { return 0 } +func (e *EmptyValue) String() string { return "" } // Values represented a time ascending sorted collection of Value types. // the underlying type should be the same across all values, but the interface @@ -210,6 +212,10 @@ func (f *FloatValue) Size() int { return 16 } +func (f *FloatValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + func encodeFloatBlock(buf []byte, values []Value) ([]byte, error) { if len(values) == 0 { return nil, nil @@ -316,6 +322,10 @@ func (b *BoolValue) Value() interface{} { return b.value } +func (f *BoolValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} + func encodeBoolBlock(buf []byte, values []Value) ([]byte, error) { if len(values) == 0 { return nil, nil @@ -417,7 +427,9 @@ func (v *Int64Value) Size() int { return 16 } -func (v *Int64Value) String() string { return fmt.Sprintf("%v", v.value) } +func (f *Int64Value) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} func encodeInt64Block(buf []byte, values []Value) ([]byte, error) { tsEnc := NewTimeEncoder() @@ -508,7 +520,9 @@ func (v *StringValue) Size() int { return 8 + len(v.value) } -func (v *StringValue) String() string { return v.value } +func (f *StringValue) String() string { + return fmt.Sprintf("%v %v", f.Time(), f.Value()) +} func encodeStringBlock(buf []byte, values []Value) ([]byte, error) { tsEnc := NewTimeEncoder() diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index c32db28b3e..de52da145a 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1,15 +1,24 @@ package tsm1 import ( + "fmt" "io" "log" "os" "sync" + "time" "github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/tsdb" ) +// minCompactionSegments is the number of WAL segements that must be +// closed in order for a compaction to run. A lower value would shorten +// compaction times and memory requirements, but produce more TSM files +// with lower compression ratios. A higher value increases compaction times +// and memory usage but produces more dense TSM files. +const minCompactionSegments = 10 + func init() { tsdb.RegisterEngine("tsm1dev", NewDevEngine) } @@ -24,7 +33,8 @@ type DevEngine struct { path string logger *log.Logger - WAL *WAL + WAL *WAL + Compactor *Compactor RotateFileSize uint32 MaxFileSize uint32 @@ -36,11 +46,16 @@ func NewDevEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engi w := NewWAL(walPath) w.LoggingEnabled = opt.Config.WALLoggingEnabled + c := &Compactor{ + Dir: path, + } + e := &DevEngine{ path: path, logger: log.New(os.Stderr, "[tsm1dev] ", log.LstdFlags), WAL: w, + Compactor: c, RotateFileSize: DefaultRotateFileSize, MaxFileSize: MaxDataFileSize, MaxPointsPerBlock: DefaultMaxPointsPerBlock, @@ -58,15 +73,21 @@ func (e *DevEngine) PerformMaintenance() { // Format returns the format type of this engine func (e *DevEngine) Format() tsdb.EngineFormat { - return tsdb.TSM1Format + return tsdb.TSM1DevFormat } // Open opens and initializes the engine. func (e *DevEngine) Open() error { + if err := os.MkdirAll(e.path, 0777); err != nil { + return err + } + if err := e.WAL.Open(); err != nil { return err } + go e.compact() + return nil } @@ -91,7 +112,15 @@ func (e *DevEngine) LoadMetadataIndex(shard *tsdb.Shard, index *tsdb.DatabaseInd // WritePoints writes metadata and point data into the engine. // Returns an error if new points are added to an existing key. func (e *DevEngine) WritePoints(points []models.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error { - return e.WAL.WritePoints(points) + values := map[string][]Value{} + for _, p := range points { + for k, v := range p.Fields() { + key := fmt.Sprintf("%s%s%s", p.Key(), keyFieldSeparator, k) + values[key] = append(values[key], NewValue(p.Time(), v)) + } + } + + return e.WAL.WritePoints(values) } // DeleteSeries deletes the series from the engine. @@ -115,3 +144,55 @@ func (e *DevEngine) Begin(writable bool) (tsdb.Tx, error) { } func (e *DevEngine) WriteTo(w io.Writer) (n int64, err error) { panic("not implemented") } + +func (e *DevEngine) compact() { + for { + // Grab the closed segments that are no longer being written to + segments, err := e.WAL.ClosedSegments() + if err != nil { + e.logger.Printf("error retrieving closed WAL segments: %v", err) + time.Sleep(time.Second) + continue + } + + // NOTE: This logic is temporary. Only compact the closed segments if we + // have at least 10 of them. + n := minCompactionSegments + if len(segments) == 0 || len(segments) < n { + time.Sleep(time.Second) + continue + } + + // If we have more than 10, just compact 10 to keep compactions times bounded. + compact := segments[:n] + + start := time.Now() + files, err := e.Compactor.Compact(compact) + if err != nil { + e.logger.Printf("error compacting WAL segments: %v", err) + } + + // TODO: this is stubbed out but would be the place to replace files in the + // file store with the new compacted versions. + e.replaceFiles(files, compact) + + // TODO: if replacement succeeds, we'd update the cache with the latest checkpoint. + // e.Cache.SetCheckpoint(...) + + e.logger.Printf("compacted %d segments into %d files in %s", len(compact), len(files), time.Since(start)) + } +} + +func (e *DevEngine) replaceFiles(tsm, segments []string) { + // TODO: this is temporary, this func should replace the files in the file store + + // The new TSM files are have a tmp extension. First rename them. + for _, f := range tsm { + os.Rename(f, f[:len(f)-4]) + } + + // The segments are fully compacted, delete them. + for _, f := range segments { + os.RemoveAll(f) + } +} diff --git a/tsdb/engine/tsm1/pools.go b/tsdb/engine/tsm1/pools.go new file mode 100644 index 0000000000..42e16ba1f8 --- /dev/null +++ b/tsdb/engine/tsm1/pools.go @@ -0,0 +1,147 @@ +package tsm1 + +import "sync" + +var ( + bufPool sync.Pool + float64ValuePool sync.Pool + int64ValuePool sync.Pool + boolValuePool sync.Pool + stringValuePool sync.Pool +) + +// getBuf returns a buffer with length size from the buffer pool. +func getBuf(size int) []byte { + x := bufPool.Get() + if x == nil { + return make([]byte, size) + } + buf := x.([]byte) + if cap(buf) < size { + return make([]byte, size) + } + return buf[:size] +} + +// putBuf returns a buffer to the pool. +func putBuf(buf []byte) { + bufPool.Put(buf) +} + +// getBuf returns a buffer with length size from the buffer pool. +func getFloat64Values(size int) []Value { + var buf []Value + x := float64ValuePool.Get() + if x == nil { + buf = make([]Value, size) + } else { + buf = x.([]Value) + } + if cap(buf) < size { + return make([]Value, size) + } + + for i, v := range buf { + if v == nil { + buf[i] = &FloatValue{} + } + } + return buf[:size] +} + +// putBuf returns a buffer to the pool. +func putFloat64Values(buf []Value) { + float64ValuePool.Put(buf) +} + +// getBuf returns a buffer with length size from the buffer pool. +func getInt64Values(size int) []Value { + var buf []Value + x := int64ValuePool.Get() + if x == nil { + buf = make([]Value, size) + } else { + buf = x.([]Value) + } + if cap(buf) < size { + return make([]Value, size) + } + + for i, v := range buf { + if v == nil { + buf[i] = &Int64Value{} + } + } + return buf[:size] +} + +// putBuf returns a buffer to the pool. +func putInt64Values(buf []Value) { + int64ValuePool.Put(buf) +} + +// getBuf returns a buffer with length size from the buffer pool. +func getBoolValues(size int) []Value { + var buf []Value + x := boolValuePool.Get() + if x == nil { + buf = make([]Value, size) + } else { + buf = x.([]Value) + } + if cap(buf) < size { + return make([]Value, size) + } + + for i, v := range buf { + if v == nil { + buf[i] = &BoolValue{} + } + } + return buf[:size] +} + +// putBuf returns a buffer to the pool. +func putStringValues(buf []Value) { + stringValuePool.Put(buf) +} + +// getBuf returns a buffer with length size from the buffer pool. +func getStringValues(size int) []Value { + var buf []Value + x := stringValuePool.Get() + if x == nil { + buf = make([]Value, size) + } else { + buf = x.([]Value) + } + if cap(buf) < size { + return make([]Value, size) + } + + for i, v := range buf { + if v == nil { + buf[i] = &StringValue{} + } + } + return buf[:size] +} + +// putBuf returns a buffer to the pool. +func putBoolValues(buf []Value) { + boolValuePool.Put(buf) +} +func putValue(buf []Value) { + if len(buf) > 0 { + switch buf[0].(type) { + case *FloatValue: + putFloat64Values(buf) + case *Int64Value: + putInt64Values(buf) + case *BoolValue: + putBoolValues(buf) + case *StringValue: + putBoolValues(buf) + } + } +} diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 5c8c967d5f..922ffd8e06 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -4,21 +4,21 @@ import ( "fmt" "io" "log" + "math" "os" "path/filepath" "sort" "strconv" "strings" "sync" - - "github.com/influxdb/influxdb/models" + "time" "github.com/golang/snappy" ) const ( - // DefaultSegmentSize of 2MB is the size at which segment files will be rolled over - DefaultSegmentSize = 2 * 1024 * 1024 + // DefaultSegmentSize of 10MB is the size at which segment files will be rolled over + DefaultSegmentSize = 10 * 1024 * 1024 // FileExtension is the file extension we expect for wal segments WALFileExtension = "wal" @@ -26,6 +26,11 @@ const ( WALFilePrefix = "_" defaultBufLen = 1024 << 10 // 1MB (sized for batches of 5000 points) + + float64EntryType = 1 + int64EntryType = 2 + boolEntryType = 3 + stringEntryType = 4 ) // walEntry is a byte written to a wal segment file that indicates what the following compressed block contains @@ -38,8 +43,6 @@ const ( var ErrWALClosed = fmt.Errorf("WAL closed") -var bufPool sync.Pool - type WAL struct { mu sync.RWMutex @@ -100,9 +103,9 @@ func (l *WAL) Open() error { return nil } -func (l *WAL) WritePoints(points []models.Point) error { +func (l *WAL) WritePoints(values map[string][]Value) error { entry := &WriteWALEntry{ - Points: points, + Values: values, } if err := l.writeToLog(entry); err != nil { @@ -114,12 +117,17 @@ func (l *WAL) WritePoints(points []models.Point) error { func (l *WAL) ClosedSegments() ([]string, error) { l.mu.RLock() - defer l.mu.RUnlock() + var activePath string + if l.currentSegmentWriter != nil { + activePath = l.currentSegmentWriter.Path() + } // Not loading files from disk so nothing to do if l.path == "" { + l.mu.RUnlock() return nil, nil } + l.mu.RUnlock() files, err := l.segmentFileNames() if err != nil { @@ -129,7 +137,7 @@ func (l *WAL) ClosedSegments() ([]string, error) { var names []string for _, fn := range files { // Skip the active segment - if l.currentSegmentWriter != nil && fn == l.currentSegmentWriter.Path() { + if fn == activePath { continue } @@ -164,16 +172,20 @@ func (l *WAL) writeToLog(entry WALEntry) error { } func (l *WAL) rollSegment() error { - l.mu.Lock() - defer l.mu.Unlock() + l.mu.RLock() if l.currentSegmentWriter == nil || l.currentSegmentWriter.Size() > DefaultSegmentSize { + l.mu.RUnlock() + l.mu.Lock() + defer l.mu.Unlock() if err := l.newSegmentFile(); err != nil { // A drop database or RP call could trigger this error if writes were in-flight // when the drop statement executes. return fmt.Errorf("error opening new segment file for wal: %v", err) } + return nil } + l.mu.RUnlock() return nil } @@ -243,26 +255,77 @@ type WALEntry interface { // WriteWALEntry represents a write of points. type WriteWALEntry struct { - Points []models.Point + Values map[string][]Value } +// Encode converts the WriteWALEntry into a byte stream using dst if it +// is large enough. If dst is too small, the slice will be grown to fit the +// encoded entry. func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { + // The entries values are encode as follows: + // + // For each key and slice of values, first a 1 byte type for the []Values + // slice is written. Following the type, the length and key bytes are written. + // Following the key, a 4 byte count followed by each value as a 8 byte time + // and N byte value. The value is dependent on the type being encoded. float64, + // int64, use 8 bytes, bool uses 1 byte, and string is similar to the key encoding. + // + // This structure is then repeated for each key an value slices. + // + // ┌────────────────────────────────────────────────────────────────────┐ + // │ WriteWALEntry │ + // ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤ + // │ Type │ Key Len │ Key │ Count │ Time │ Value │...│ Type │...│ + // │1 byte│ 4 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │ │1 byte│ │ + // └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘ var n int - for _, p := range w.Points { - // Marshaling points to bytes is relatively expensive, only do it once - bytes, err := p.MarshalBinary() - if err != nil { - return nil, err + + for k, v := range w.Values { + + switch v[0].Value().(type) { + case float64: + dst[n] = float64EntryType + case int64: + dst[n] = int64EntryType + case bool: + dst[n] = boolEntryType + case string: + dst[n] = stringEntryType + default: + return nil, fmt.Errorf("unsupported value type: %#v", v[0].Value()) } + n++ // Make sure we have enough space in our buf before copying. If not, // grow the buf. - if len(bytes)+4 > len(dst)-n { - grow := make([]byte, len(bytes)*2) + if len(k)+2+len(v)*8+4 > len(dst)-n { + grow := make([]byte, len(dst)*2) dst = append(dst, grow...) } - n += copy(dst[n:], u32tob(uint32(len(bytes)))) - n += copy(dst[n:], bytes) + + n += copy(dst[n:], u16tob(uint16(len(k)))) + n += copy(dst[n:], []byte(k)) + + n += copy(dst[n:], u32tob(uint32(len(v)))) + + for _, vv := range v { + n += copy(dst[n:], u64tob(uint64(vv.Time().UnixNano()))) + switch t := vv.Value().(type) { + case float64: + n += copy(dst[n:], u64tob(uint64(math.Float64bits(t)))) + case int64: + n += copy(dst[n:], u64tob(uint64(t))) + case bool: + if t { + n += copy(dst[n:], []byte{1}) + } else { + n += copy(dst[n:], []byte{0}) + } + case string: + n += copy(dst[n:], u32tob(uint32(len(t)))) + n += copy(dst[n:], []byte(t)) + } + } } return dst[:n], nil @@ -276,17 +339,76 @@ func (w *WriteWALEntry) MarshalBinary() ([]byte, error) { func (w *WriteWALEntry) UnmarshalBinary(b []byte) error { var i int - for i < len(b) { - length := int(btou32(b[i : i+4])) + typ := b[i] + i++ + + length := int(btou16(b[i : i+2])) + i += 2 + k := string(b[i : i+length]) + i += length + + nvals := int(btou32(b[i : i+4])) i += 4 - point, err := models.NewPointFromBytes(b[i : i+length]) - if err != nil { - return err + var values []Value + switch typ { + case float64EntryType: + values = getFloat64Values(nvals) + case int64EntryType: + values = getInt64Values(nvals) + case boolEntryType: + values = getBoolValues(nvals) + case stringEntryType: + values = getStringValues(nvals) + default: + return fmt.Errorf("unsupported value type: %#v", typ) } - i += length - w.Points = append(w.Points, point) + + for j := 0; j < nvals; j++ { + t := time.Unix(0, int64(btou64(b[i:i+8]))) + i += 8 + + switch typ { + case float64EntryType: + v := math.Float64frombits((btou64(b[i : i+8]))) + i += 8 + if fv, ok := values[j].(*FloatValue); ok { + fv.time = t + fv.value = v + } + case int64EntryType: + v := int64(btou64(b[i : i+8])) + i += 8 + if fv, ok := values[j].(*Int64Value); ok { + fv.time = t + fv.value = v + } + case boolEntryType: + v := b[i] + i += 1 + if fv, ok := values[j].(*BoolValue); ok { + fv.time = t + if v == 1 { + fv.value = true + } else { + fv.value = false + } + } + case stringEntryType: + length := int(btou32(b[i : i+4])) + i += 4 + v := string(b[i : i+length]) + i += length + if fv, ok := values[j].(*StringValue); ok { + fv.time = t + fv.value = v + } + default: + return fmt.Errorf("unsupported value type: %#v", typ) + } + } + w.Values[k] = values } return nil } @@ -333,8 +455,7 @@ func (w *DeleteWALEntry) Type() walEntryType { // WALSegmentWriter writes WAL segments. type WALSegmentWriter struct { - mu sync.RWMutex - + mu sync.RWMutex w io.WriteCloser size int } @@ -371,10 +492,12 @@ func (w *WALSegmentWriter) Write(e WALEntry) error { if _, err := w.w.Write([]byte{byte(e.Type())}); err != nil { return err } - if _, err := w.w.Write(u32tob(uint32(len(compressed)))); err != nil { + + if _, err = w.w.Write(u32tob(uint32(len(compressed)))); err != nil { return err } - if _, err := w.w.Write(compressed); err != nil { + + if _, err = w.w.Write(compressed); err != nil { return err } @@ -459,10 +582,7 @@ func (r *WALSegmentReader) Next() bool { return true } - buf := getBuf(defaultBufLen) - defer putBuf(buf) - - data, err := snappy.Decode(buf, b[:length]) + data, err := snappy.Decode(nil, b[:length]) if err != nil { r.err = err return true @@ -471,7 +591,9 @@ func (r *WALSegmentReader) Next() bool { // and marshal it and send it to the cache switch walEntryType(entryType) { case WriteWALEntryType: - r.entry = &WriteWALEntry{} + r.entry = &WriteWALEntry{ + Values: map[string][]Value{}, + } case DeleteWALEntryType: r.entry = &DeleteWALEntry{} default: @@ -494,6 +616,10 @@ func (r *WALSegmentReader) Error() error { return r.err } +func (r *WALSegmentReader) Close() error { + return r.r.Close() +} + // idFromFileName parses the segment file ID from its name func idFromFileName(name string) (int, error) { parts := strings.Split(filepath.Base(name), ".") @@ -505,21 +631,3 @@ func idFromFileName(name string) (int, error) { return int(id), err } - -// getBuf returns a buffer with length size from the buffer pool. -func getBuf(size int) []byte { - x := bufPool.Get() - if x == nil { - return make([]byte, size) - } - buf := x.([]byte) - if cap(buf) < size { - return make([]byte, size) - } - return buf[:size] -} - -// putBuf returns a buffer to the pool. -func putBuf(buf []byte) { - bufPool.Put(buf) -} diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index c2d5aa2b76..668715feb5 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -1,11 +1,10 @@ package tsm1_test import ( - "fmt" "os" "testing" + "time" - "github.com/influxdb/influxdb/models" "github.com/influxdb/influxdb/tsdb/engine/tsm1" ) @@ -15,14 +14,20 @@ func TestWALWriter_WritePoints_Single(t *testing.T) { f := MustTempFile(dir) w := tsm1.NewWALSegmentWriter(f) - p1 := parsePoint("cpu,host=A value=1.1 1000000000") + p1 := tsm1.NewValue(time.Unix(1, 0), 1.1) + p2 := tsm1.NewValue(time.Unix(1, 0), int64(1)) + p3 := tsm1.NewValue(time.Unix(1, 0), true) + p4 := tsm1.NewValue(time.Unix(1, 0), "string") - points := []models.Point{ - p1, + values := map[string][]tsm1.Value{ + "cpu,host=A#!~#float": []tsm1.Value{p1}, + "cpu,host=A#!~#int": []tsm1.Value{p2}, + "cpu,host=A#!~#bool": []tsm1.Value{p3}, + "cpu,host=A#!~#string": []tsm1.Value{p4}, } entry := &tsm1.WriteWALEntry{ - Points: points, + Values: values, } if err := w.Write(entry); err != nil { @@ -49,9 +54,11 @@ func TestWALWriter_WritePoints_Single(t *testing.T) { t.Fatalf("expected WriteWALEntry: got %#v", e) } - for i, p := range e.Points { - if exp, got := points[i].String(), p.String(); exp != got { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) + for k, v := range e.Values { + for i, vv := range v { + if got, exp := vv.String(), values[k][i].String(); got != exp { + t.Fatalf("points mismatch: got %v, exp %v", got, exp) + } } } } @@ -62,21 +69,20 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) { f := MustTempFile(dir) w := tsm1.NewWALSegmentWriter(f) - p1 := parsePoint("cpu,host=A value=1.1 1000000000") - p2 := parsePoint("cpu,host=B value=1.1 1000000000") + p1 := tsm1.NewValue(time.Unix(1, 0), int64(1)) + p2 := tsm1.NewValue(time.Unix(1, 0), int64(2)) - exp := [][]models.Point{ - []models.Point{ - p1, - }, - []models.Point{ - p2, - }, + exp := []struct { + key string + values []tsm1.Value + }{ + {"cpu,host=A#!~#value", []tsm1.Value{p1}}, + {"cpu,host=B#!~#value", []tsm1.Value{p2}}, } - for _, e := range exp { + for _, v := range exp { entry := &tsm1.WriteWALEntry{ - Points: e, + Values: map[string][]tsm1.Value{v.key: v.values}, } if err := w.Write(entry); err != nil { @@ -106,10 +112,19 @@ func TestWALWriter_WritePoints_Multiple(t *testing.T) { t.Fatalf("expected WriteWALEntry: got %#v", e) } - points := e.Points - for i, p := range ep { - if exp, got := points[i].String(), p.String(); exp != got { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) + for k, v := range e.Values { + if got, exp := k, ep.key; got != exp { + t.Fatalf("key mismatch. got %v, exp %v", got, exp) + } + + if got, exp := len(v), len(ep.values); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, vv := range v { + if got, exp := vv.String(), ep.values[i].String(); got != exp { + t.Fatalf("points mismatch: got %v, exp %v", got, exp) + } } } } @@ -164,19 +179,22 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) { f := MustTempFile(dir) w := tsm1.NewWALSegmentWriter(f) - p1 := parsePoint("cpu,host=A value=1.1 1000000000") - - write := &tsm1.WriteWALEntry{ - Points: []models.Point{p1}, + p1 := tsm1.NewValue(time.Unix(1, 0), true) + values := map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{p1}, } - if err := w.Write(write); err != nil { + writeEntry := &tsm1.WriteWALEntry{ + Values: values, + } + + if err := w.Write(writeEntry); err != nil { fatal(t, "write points", err) } // Write the delete entry deleteEntry := &tsm1.DeleteWALEntry{ - Keys: []string{"cpu"}, + Keys: []string{"cpu,host=A#!~value"}, } if err := w.Write(deleteEntry); err != nil { @@ -205,10 +223,15 @@ func TestWALWriter_WritePointsDelete_Multiple(t *testing.T) { t.Fatalf("expected WriteWALEntry: got %#v", e) } - points := e.Points - for i, p := range write.Points { - if exp, got := points[i].String(), p.String(); exp != got { - t.Fatalf("points mismatch: got %v, exp %v", got, exp) + for k, v := range e.Values { + if got, exp := len(v), len(values[k]); got != exp { + t.Fatalf("values length mismatch: got %v, exp %v", got, exp) + } + + for i, vv := range v { + if got, exp := vv.String(), values[k][i].String(); got != exp { + t.Fatalf("points mismatch: got %v, exp %v", got, exp) + } } } @@ -254,8 +277,10 @@ func TestWAL_ClosedSegments(t *testing.T) { t.Fatalf("close segment length mismatch: got %v, exp %v", got, exp) } - if err := w.WritePoints([]models.Point{ - parsePoint("cpu,host=A value=1.1 1000000000"), + if err := w.WritePoints(map[string][]tsm1.Value{ + "cpu,host=A#!~#value": []tsm1.Value{ + tsm1.NewValue(time.Unix(1, 0), 1.1), + }, }); err != nil { t.Fatalf("error writing points: %v", err) } @@ -323,9 +348,10 @@ func TestWAL_Delete(t *testing.T) { } func BenchmarkWALSegmentWriter(b *testing.B) { - points := make([]models.Point, 5000) - for i := range points { - points[i] = parsePoint(fmt.Sprintf("cpu,host=host-%d value=1.1 1000000000", i)) + points := map[string][]tsm1.Value{} + for i := 0; i < 5000; i++ { + k := "cpu,host=A#!~#value" + points[k] = append(points[k], tsm1.NewValue(time.Unix(int64(i), 0), 1.1)) } dir := MustTempDir() @@ -335,7 +361,7 @@ func BenchmarkWALSegmentWriter(b *testing.B) { w := tsm1.NewWALSegmentWriter(f) write := &tsm1.WriteWALEntry{ - Points: points, + Values: points, } b.ResetTimer() @@ -347,9 +373,10 @@ func BenchmarkWALSegmentWriter(b *testing.B) { } func BenchmarkWALSegmentReader(b *testing.B) { - points := make([]models.Point, 5000) - for i := range points { - points[i] = parsePoint(fmt.Sprintf("cpu,host=host-%d value=1.1 1000000000", i)) + points := map[string][]tsm1.Value{} + for i := 0; i < 5000; i++ { + k := "cpu,host=A#!~#value" + points[k] = append(points[k], tsm1.NewValue(time.Unix(int64(i), 0), 1.1)) } dir := MustTempDir() @@ -359,7 +386,7 @@ func BenchmarkWALSegmentReader(b *testing.B) { w := tsm1.NewWALSegmentWriter(f) write := &tsm1.WriteWALEntry{ - Points: points, + Values: points, } for i := 0; i < 100; i++ {