Add WriteBlock to TSMWriter
parent
d99c1f944e
commit
4a3037814f
|
@ -34,7 +34,6 @@ type TSMReader struct {
|
|||
type BlockIterator struct {
|
||||
r *TSMReader
|
||||
keys []string
|
||||
typ byte
|
||||
key string
|
||||
entries []*IndexEntry
|
||||
err error
|
||||
|
@ -53,23 +52,22 @@ func (b *BlockIterator) Next() bool {
|
|||
b.key = b.keys[0]
|
||||
b.keys = b.keys[1:]
|
||||
b.entries = b.r.Entries(b.key)
|
||||
b.typ, b.err = b.r.Type(b.key)
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (b *BlockIterator) Read() (string, byte, *IndexEntry, []byte, error) {
|
||||
func (b *BlockIterator) Read() (string, *IndexEntry, []byte, error) {
|
||||
if b.err != nil {
|
||||
return "", 0, nil, nil, b.err
|
||||
return "", nil, nil, b.err
|
||||
}
|
||||
|
||||
buf, err := b.r.readBytes(b.entries[0], nil)
|
||||
if err != nil {
|
||||
return "", 0, nil, nil, err
|
||||
return "", nil, nil, err
|
||||
}
|
||||
return b.key, b.typ, b.entries[0], buf, err
|
||||
return b.key, b.entries[0], buf, err
|
||||
}
|
||||
|
||||
// blockAccessor abstracts a method of accessing blocks from a
|
||||
|
|
|
@ -573,7 +573,7 @@ func TestBlockIterator_Single(t *testing.T) {
|
|||
var count int
|
||||
iter := r.BlockIterator()
|
||||
for iter.Next() {
|
||||
key, typ, index, buf, err := iter.Read()
|
||||
key, index, buf, err := iter.Read()
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating iterator: %v", err)
|
||||
|
@ -583,10 +583,6 @@ func TestBlockIterator_Single(t *testing.T) {
|
|||
t.Fatalf("key mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := typ, tsm1.BlockInt64; got != exp {
|
||||
t.Fatalf("type mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
||||
if got, exp := index.MinTime, time.Unix(0, 0); got != exp {
|
||||
t.Fatalf("min time mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
|
@ -640,7 +636,7 @@ func TestBlockIterator_Sorted(t *testing.T) {
|
|||
iter := r.BlockIterator()
|
||||
var lastKey string
|
||||
for iter.Next() {
|
||||
key, _, index, buf, err := iter.Read()
|
||||
key, index, buf, err := iter.Read()
|
||||
|
||||
if key < lastKey {
|
||||
t.Fatalf("keys not sorted: got %v, last %v", key, lastKey)
|
||||
|
|
|
@ -110,6 +110,13 @@ type TSMWriter interface {
|
|||
// used as the minimum and maximum values for the index entry.
|
||||
Write(key string, values Values) error
|
||||
|
||||
// WriteBlock writes a new block for key containing the bytes in block. WriteBlock appends
|
||||
// blocks in the order that the WriteBlock function is called. The caller is
|
||||
// responsible for ensuring keys and blocks are sorted appropriately, and that the
|
||||
// block and index information is correct for the block. The minTime and maxTime
|
||||
// timestamp values are used as the minimum and maximum values for the index entry.
|
||||
WriteBlock(key string, minTime, maxTime time.Time, block []byte) error
|
||||
|
||||
// WriteIndex finishes the TSM write streams and writes the index.
|
||||
WriteIndex() error
|
||||
|
||||
|
@ -462,14 +469,21 @@ func NewTSMWriter(w io.Writer) (TSMWriter, error) {
|
|||
return &tsmWriter{w: w, index: index}, nil
|
||||
}
|
||||
|
||||
func (t *tsmWriter) writeHeader() error {
|
||||
n, err := t.w.Write(append(u32tob(MagicNumber), Version))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
t.n = int64(n)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *tsmWriter) Write(key string, values Values) error {
|
||||
// Write header only after we have some data to write.
|
||||
if t.n == 0 {
|
||||
n, err := t.w.Write(append(u32tob(MagicNumber), Version))
|
||||
if err != nil {
|
||||
if err := t.writeHeader(); err != nil {
|
||||
return err
|
||||
}
|
||||
t.n = int64(n)
|
||||
}
|
||||
|
||||
block, err := values.Encode(nil)
|
||||
|
@ -496,6 +510,32 @@ func (t *tsmWriter) Write(key string, values Values) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *tsmWriter) WriteBlock(key string, minTime, maxTime time.Time, block []byte) error {
|
||||
// Write header only after we have some data to write.
|
||||
if t.n == 0 {
|
||||
if err := t.writeHeader(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n, err := t.w.Write(block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
blockType, err := BlockType(block[4:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Record this block in index
|
||||
t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n))
|
||||
|
||||
// Increment file position pointer
|
||||
t.n += int64(n)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteIndex writes the index section of the file. If there are no index entries to write,
|
||||
// this returns ErrNoValues
|
||||
func (t *tsmWriter) WriteIndex() error {
|
||||
|
|
|
@ -353,3 +353,88 @@ func TestTSMWriter_Read_Multiple(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestTSMWriter_WriteBlock_Multiple(t *testing.T) {
|
||||
// Write a new TSM file
|
||||
var b bytes.Buffer
|
||||
w, err := tsm1.NewTSMWriter(&b)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating writer: %v", err)
|
||||
}
|
||||
|
||||
var data = []struct {
|
||||
key string
|
||||
values []tsm1.Value
|
||||
}{
|
||||
{"cpu", []tsm1.Value{tsm1.NewValue(time.Unix(0, 0), 1.0)}},
|
||||
{"mem", []tsm1.Value{tsm1.NewValue(time.Unix(1, 0), 2.0)}},
|
||||
}
|
||||
|
||||
for _, d := range data {
|
||||
if err := w.Write(d.key, d.values); err != nil {
|
||||
t.Fatalf("unexpected error writing: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
if got, exp := len(b.Bytes()), 5; got < exp {
|
||||
t.Fatalf("file size mismatch: got %v, exp %v", got, exp)
|
||||
}
|
||||
if got := binary.BigEndian.Uint32(b.Bytes()[0:4]); got != tsm1.MagicNumber {
|
||||
t.Fatalf("magic number mismatch: got %v, exp %v", got, tsm1.MagicNumber)
|
||||
}
|
||||
|
||||
// Create reader for that file
|
||||
r, err := tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error created reader: %v", err)
|
||||
}
|
||||
|
||||
// Using the reader, write a new file using WriteBlocks
|
||||
b.Reset()
|
||||
w, err = tsm1.NewTSMWriter(&b)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error creating writer: %v", err)
|
||||
}
|
||||
|
||||
iter := r.BlockIterator()
|
||||
for iter.Next() {
|
||||
key, entry, b, err := iter.Read()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error reading block: %v", err)
|
||||
}
|
||||
if err := w.WriteBlock(key, entry.MinTime, entry.MaxTime, b); err != nil {
|
||||
t.Fatalf("unexpected error writing block: %v", err)
|
||||
}
|
||||
}
|
||||
if err := w.WriteIndex(); err != nil {
|
||||
t.Fatalf("unexpected error closing: %v", err)
|
||||
}
|
||||
|
||||
// Now create a reader to verify the written blocks matches the originally
|
||||
// written file using Write
|
||||
r, err = tsm1.NewTSMReader(bytes.NewReader(b.Bytes()))
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error created reader: %v", err)
|
||||
}
|
||||
|
||||
for _, d := range data {
|
||||
readValues, err := r.ReadAll(d.key)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error readin: %v", err)
|
||||
}
|
||||
|
||||
if exp := len(d.values); exp != len(readValues) {
|
||||
t.Fatalf("read values length mismatch: got %v, exp %v", len(readValues), exp)
|
||||
}
|
||||
|
||||
for i, v := range d.values {
|
||||
if v.Value() != readValues[i].Value() {
|
||||
t.Fatalf("read value mismatch(%d): got %v, exp %d", i, readValues[i].Value(), v.Value())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue