Write TSM data directly to writer

We were buffering up the data to write into byte slices to reduce
IO calls but at larger sizes, this causes memory to spike.  The TSMWriter
was switched to use a bufio.Writer internally so this byte slice buffering
is unnecessary and costly now.
pull/5279/head
Jason Wilder 2016-01-05 10:21:41 -07:00
parent d2889ecd6a
commit ee54a1e791
2 changed files with 77 additions and 43 deletions

View File

@ -982,6 +982,30 @@ func (a *indexEntries) MarshalBinary() (b []byte, err error) {
return b, nil
}
func (a *indexEntries) Write(w io.Writer) error {
for _, entry := range a.entries {
_, err := w.Write(u64tob(uint64(entry.MinTime.UnixNano())))
if err != nil {
return err
}
_, err = w.Write(u64tob(uint64(entry.MaxTime.UnixNano())))
if err != nil {
return err
}
_, err = w.Write(u64tob(uint64(entry.Offset)))
if err != nil {
return err
}
_, err = w.Write(u32tob(entry.Size))
if err != nil {
return err
}
}
return nil
}
func readKey(b []byte) (n int, key []byte, err error) {
// 2 byte size of key
n, size := 2, int(btou16(b[:2]))

View File

@ -182,6 +182,9 @@ type TSMIndex interface {
// UnmarshalBinary populates an index from an encoded byte slice
// representation of an index.
UnmarshalBinary(b []byte) error
// Write writes the index contents to a writer
Write(w io.Writer) error
}
// IndexEntry is the index information for a given block in a TSM file.
@ -338,7 +341,7 @@ func (d *directIndex) Key(idx int) (string, []*IndexEntry) {
}
func (d *directIndex) KeyCount() int {
return len(d.Keys())
return len(d.blocks)
}
func (d *directIndex) KeyRange() (string, string) {
@ -380,20 +383,6 @@ func (d *directIndex) addEntries(key string, entries *indexEntries) {
}
func (d *directIndex) Write(w io.Writer) error {
b, err := d.MarshalBinary()
if err != nil {
return fmt.Errorf("write: marshal error: %v", err)
}
// Write out the index bytes
_, err = w.Write(b)
if err != nil {
return fmt.Errorf("write: writer error: %v", err)
}
return nil
}
func (d *directIndex) MarshalBinary() ([]byte, error) {
d.mu.RLock()
defer d.mu.RUnlock()
@ -404,37 +393,53 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
}
sort.Strings(keys)
// Buffer to build up the index and write in bulk
var b []byte
// For each key, individual entries are sorted by time
for _, key := range keys {
entries := d.blocks[key]
if entries.Len() > maxIndexEntries {
return nil, fmt.Errorf("key '%s' exceeds max index entries: %d > %d",
return fmt.Errorf("key '%s' exceeds max index entries: %d > %d",
key, entries.Len(), maxIndexEntries)
}
sort.Sort(entries)
// Append the key length and key
b = append(b, u16tob(uint16(len(key)))...)
b = append(b, key...)
_, err := w.Write(u16tob(uint16(len(key))))
if err != nil {
return fmt.Errorf("write: writer key length error: %v", err)
}
_, err = w.Write([]byte(key))
if err != nil {
return fmt.Errorf("write: writer key error: %v", err)
}
// Append the block type
b = append(b, entries.Type)
_, err = w.Write([]byte{entries.Type})
if err != nil {
return fmt.Errorf("write: writer key type error: %v", err)
}
// Append the index block count
b = append(b, u16tob(uint16(entries.Len()))...)
_, err = w.Write(u16tob(uint16(entries.Len())))
if err != nil {
return fmt.Errorf("write: writer block count error: %v", err)
}
// Append each index entry for all blocks for this key
eb, err := entries.MarshalBinary()
if err != nil {
return nil, err
if err = entries.Write(w); err != nil {
return fmt.Errorf("write: writer entries error: %v", err)
}
b = append(b, eb...)
}
return b, nil
return nil
}
func (d *directIndex) MarshalBinary() ([]byte, error) {
var b bytes.Buffer
if err := d.Write(&b); err != nil {
return nil, err
}
return b.Bytes(), nil
}
func (d *directIndex) UnmarshalBinary(b []byte) error {
@ -543,20 +548,28 @@ func (t *tsmWriter) WriteBlock(key string, minTime, maxTime time.Time, block []b
checksum := crc32.ChecksumIEEE(block)
n, err := t.w.Write(append(u32tob(checksum), block...))
_, err := t.w.Write(u32tob(checksum))
if err != nil {
return err
}
n, err := t.w.Write(block)
if err != nil {
return err
}
// 4 byte checksum + len of block
blockSize := 4 + n
blockType, err := BlockType(block)
if err != nil {
return err
}
// Record this block in index
t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(n))
t.index.Add(key, blockType, minTime, maxTime, t.n, uint32(blockSize))
// Increment file position pointer
t.n += int64(n)
// Increment file position pointer (checksum + block len)
t.n += int64(blockSize)
return nil
}
@ -566,23 +579,20 @@ func (t *tsmWriter) WriteBlock(key string, minTime, maxTime time.Time, block []b
func (t *tsmWriter) WriteIndex() error {
indexPos := t.n
// Generate the index bytes
b, err := t.index.MarshalBinary()
if err != nil {
return err
}
// Don't write an index if we don't actually have any blocks in the file.
if len(b) == 0 {
if t.index.KeyCount() == 0 {
return ErrNoValues
}
// Write the index followed by index position
_, err = t.w.Write(append(b, u64tob(uint64(indexPos))...))
if err != nil {
// Write the index
if err := t.index.Write(t.w); err != nil {
return err
}
// Write the index index position
_, err := t.w.Write(u64tob(uint64(indexPos)))
if err != nil {
return err
}
return nil
}