diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index d987fbbbb3..4fe8e1298c 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -34,7 +34,6 @@ type TSMReader struct { type BlockIterator struct { r *TSMReader keys []string - typ byte key string entries []*IndexEntry err error @@ -53,23 +52,22 @@ func (b *BlockIterator) Next() bool { b.key = b.keys[0] b.keys = b.keys[1:] b.entries = b.r.Entries(b.key) - b.typ, b.err = b.r.Type(b.key) return true } return false } -func (b *BlockIterator) Read() (string, byte, *IndexEntry, []byte, error) { +func (b *BlockIterator) Read() (string, *IndexEntry, []byte, error) { if b.err != nil { - return "", 0, nil, nil, b.err + return "", nil, nil, b.err } buf, err := b.r.readBytes(b.entries[0], nil) if err != nil { - return "", 0, nil, nil, err + return "", nil, nil, err } - return b.key, b.typ, b.entries[0], buf, err + return b.key, b.entries[0], buf, err } // blockAccessor abstracts a method of accessing blocks from a diff --git a/tsdb/engine/tsm1/reader_test.go b/tsdb/engine/tsm1/reader_test.go index d7a42be541..14e5d940e2 100644 --- a/tsdb/engine/tsm1/reader_test.go +++ b/tsdb/engine/tsm1/reader_test.go @@ -573,7 +573,7 @@ func TestBlockIterator_Single(t *testing.T) { var count int iter := r.BlockIterator() for iter.Next() { - key, typ, index, buf, err := iter.Read() + key, index, buf, err := iter.Read() if err != nil { t.Fatalf("unexpected error creating iterator: %v", err) @@ -583,10 +583,6 @@ func TestBlockIterator_Single(t *testing.T) { t.Fatalf("key mismatch: got %v, exp %v", got, exp) } - if got, exp := typ, tsm1.BlockInt64; got != exp { - t.Fatalf("type mismatch: got %v, exp %v", got, exp) - } - if got, exp := index.MinTime, time.Unix(0, 0); got != exp { t.Fatalf("min time mismatch: got %v, exp %v", got, exp) } @@ -640,7 +636,7 @@ func TestBlockIterator_Sorted(t *testing.T) { iter := r.BlockIterator() var lastKey string for iter.Next() { - key, _, index, buf, err := iter.Read() + key, index, buf, err := iter.Read() if key < lastKey { t.Fatalf("keys not sorted: got %v, last %v", key, lastKey) diff --git a/tsdb/engine/tsm1/writer.go b/tsdb/engine/tsm1/writer.go index d3b0eceeb6..4df0e1159b 100644 --- a/tsdb/engine/tsm1/writer.go +++ b/tsdb/engine/tsm1/writer.go @@ -110,6 +110,13 @@ type TSMWriter interface { // used as the minimum and maximum values for the index entry. Write(key string, values Values) error + // WriteBlock writes a new block for key containing the bytes in block. WriteBlock appends + // blocks in the order that the WriteBlock function is called. The caller is + // responsible for ensuring keys and blocks are sorted appropriately, and that the + // block and index information is correct for the block. The minTime and maxTime + // timestamp values are used as the minimum and maximum values for the index entry. + WriteBlock(key string, minTime, maxTime time.Time, block []byte) error + // WriteIndex finishes the TSM write streams and writes the index. WriteIndex() error @@ -462,14 +469,21 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) { return &tsmWriter{w: w, index: index}, nil } +func (t *tsmWriter) writeHeader() error { + n, err := t.w.Write(append(u32tob(MagicNumber), Version)) + if err != nil { + return err + } + t.n = int64(n) + return nil +} + func (t *tsmWriter) Write(key string, values Values) error { // Write header only after we have some data to write. if t.n == 0 { - n, err := t.w.Write(append(u32tob(MagicNumber), Version)) - if err != nil { + if err := t.writeHeader(); err != nil { return err } - t.n = int64(n) } block, err := values.Encode(nil) @@ -496,6 +510,32 @@ func (t *tsmWriter) Write(key string, values Values) error { return nil } +func (t *tsmWriter) WriteBlock(key string, minTime, maxTime time.Time, block []byte) error { + // Write header only after we have some data to write. + if t.n == 0 { + if err := t.writeHeader(); err != nil { + return err + } + } + + n, err := t.w.Write(block) + if err != nil { + return err + } + + blockType, err := BlockType(block[4:]) + if err != nil { + return err + } + // Record this block in index + t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n)) + + // Increment file position pointer + t.n += int64(n) + + return nil +} + // WriteIndex writes the index section of the file. If there are no index entries to write, // this returns ErrNoValues func (t *tsmWriter) WriteIndex() error { diff --git a/tsdb/engine/tsm1/writer_test.go b/tsdb/engine/tsm1/writer_test.go index 98a35bffba..812011c4ed 100644 --- a/tsdb/engine/tsm1/writer_test.go +++ b/tsdb/engine/tsm1/writer_test.go @@ -353,3 +353,88 @@ func TestTSMWriter_Read_Multiple(t *testing.T) { } } } + +func TestTSMWriter_WriteBlock_Multiple(t *testing.T) { + // Write a new TSM file + var b bytes.Buffer + w, err := tsm1.NewTSMWriter(&b) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + var data = []struct { + key string + values []tsm1.Value + }{ + {"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}}, + {"mem", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 2.0)}}, + } + + for _, d := range data { + if err := w.Write(d.key, d.values); err != nil { + t.Fatalf("unexpected error writing: %v", err) + } + } + + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + if got, exp := len(b.Bytes()), 5; got < exp { + t.Fatalf("file size mismatch: got %v, exp %v", got, exp) + } + if got := binary.BigEndian.Uint32(b.Bytes()[0:4]); got != tsm1.MagicNumber { + t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber) + } + + // Create reader for that file + r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + // Using the reader, write a new file using WriteBlocks + b.Reset() + w, err = tsm1.NewTSMWriter(&b) + if err != nil { + t.Fatalf("unexpected error creating writer: %v", err) + } + + iter := r.BlockIterator() + for iter.Next() { + key, entry, b, err := iter.Read() + if err != nil { + t.Fatalf("unexpected error reading block: %v", err) + } + if err := w.WriteBlock(key, entry.MinTime, entry.MaxTime, b); err != nil { + t.Fatalf("unexpected error writing block: %v", err) + } + } + if err := w.WriteIndex(); err != nil { + t.Fatalf("unexpected error closing: %v", err) + } + + // Now create a reader to verify the written blocks matches the originally + // written file using Write + r, err = tsm1.NewTSMReader(bytes.NewReader(b.Bytes())) + if err != nil { + t.Fatalf("unexpected error created reader: %v", err) + } + + for _, d := range data { + readValues, err := r.ReadAll(d.key) + if err != nil { + t.Fatalf("unexpected error readin: %v", err) + } + + if exp := len(d.values); exp != len(readValues) { + t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp) + } + + for i, v := range d.values { + if v.Value() != readValues[i].Value() { + t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value()) + } + } + } +}