diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index d5283024d6..21512704bf 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -354,7 +354,7 @@ func mustMarshalEntry(entry WALEntry) (WalEntryType, []byte) { b, err := entry.Encode(bytes) if err != nil { - panic("error encoding") + panic(fmt.Sprintf("error encoding: %v", err)) } return entry.Type(), snappy.Encode(b, b) diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index 29c4a7e0a6..6d5d91ce1c 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -366,6 +366,13 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { for k, v := range w.Values { + // Make sure we have enough space in our buf before copying. If not, + // grow the buf. + if len(dst[:n])+2+len(k)+len(v)*8+4 > len(dst) { + grow := make([]byte, len(dst)*2) + dst = append(dst, grow...) + } + switch v[0].Value().(type) { case float64: dst[n] = float64EntryType @@ -380,19 +387,19 @@ func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) { } n++ - // Make sure we have enough space in our buf before copying. If not, - // grow the buf. - if len(k)+2+len(v)*8+4 > len(dst)-n { - grow := make([]byte, len(dst)*2) - dst = append(dst, grow...) - } - n += copy(dst[n:], u16tob(uint16(len(k)))) n += copy(dst[n:], []byte(k)) n += copy(dst[n:], u32tob(uint32(len(v)))) for _, vv := range v { + + // Grow our slice if needed + if len(dst[:n])+16 > len(dst) { + grow := make([]byte, len(dst)*2) + dst = append(dst, grow...) + } + n += copy(dst[n:], u64tob(uint64(vv.Time().UnixNano()))) switch t := vv.Value().(type) { case float64: diff --git a/tsdb/engine/tsm1/wal_test.go b/tsdb/engine/tsm1/wal_test.go index bea473a654..ad6d70b9f4 100644 --- a/tsdb/engine/tsm1/wal_test.go +++ b/tsdb/engine/tsm1/wal_test.go @@ -70,6 +70,62 @@ func TestWALWriter_WritePoints_Single(t *testing.T) { } } +func TestWALWriter_WritePoints_LargeBatch(t *testing.T) { + dir := MustTempDir() + defer os.RemoveAll(dir) + f := MustTempFile(dir) + w := tsm1.NewWALSegmentWriter(f) + + var points []tsm1.Value + for i := 0; i < 100000; i++ { + points = append(points, tsm1.NewValue(time.Unix(int64(i), 0), int64(1))) + } + + values := map[string][]tsm1.Value{ + "cpu,host=A,server=01,foo=bar,tag=really-long#!~#float": points, + "mem,host=A,server=01,foo=bar,tag=really-long#!~#float": points, + } + + entry := &tsm1.WriteWALEntry{ + Values: values, + } + + if err := w.Write(mustMarshalEntry(entry)); err != nil { + fatal(t, "write points", err) + } + + if _, err := f.Seek(0, os.SEEK_SET); err != nil { + fatal(t, "seek", err) + } + + r := tsm1.NewWALSegmentReader(f) + + if !r.Next() { + t.Fatalf("expected next, got false") + } + + we, err := r.Read() + if err != nil { + fatal(t, "read entry", err) + } + + e, ok := we.(*tsm1.WriteWALEntry) + if !ok { + t.Fatalf("expected WriteWALEntry: got %#v", e) + } + + for k, v := range e.Values { + for i, vv := range v { + if got, exp := vv.String(), values[k][i].String(); got != exp { + t.Fatalf("points mismatch: got %v, exp %v", got, exp) + } + } + } + + if n := r.Count(); n != MustReadFileSize(f) { + t.Fatalf("wrong count of bytes read, got %d, exp %d", n, MustReadFileSize(f)) + } +} func TestWALWriter_WritePoints_Multiple(t *testing.T) { dir := MustTempDir() defer os.RemoveAll(dir) @@ -490,7 +546,7 @@ func mustMarshalEntry(entry tsm1.WALEntry) (tsm1.WalEntryType, []byte) { b, err := entry.Encode(bytes) if err != nil { - panic("error encoding") + panic(fmt.Sprintf("error encoding: %v", err)) } return entry.Type(), snappy.Encode(b, b)