Return error if number of blocks would overflow

pull/4808/head
Jason Wilder 2015-11-12 20:52:51 -07:00
parent 16c5e0a2e0
commit d8c0c26934
2 changed files with 29 additions and 4 deletions

View File

@ -79,7 +79,14 @@ const (
Version byte = 1
// Size in bytes of an index entry
indexEntrySize = 28
// Size in bytes used to store the count of index entries for a key
indexCountSize = 2
// Max number of blocks for a given key that can exist in a single file
maxIndexEntries = (1 << (indexCountSize * 8)) - 1
)
// TSMWriter writes TSM formatted key and values.
@ -229,6 +236,11 @@ func (d *directIndex) MarshalBinary() ([]byte, error) {
// For each key, individual entries are sorted by time
for _, key := range keys {
entries := d.blocks[key]
if len(entries) > maxIndexEntries {
return nil, fmt.Errorf("key '%s' exceeds max index entries: %d > %d",
key, len(entries), maxIndexEntries)
}
sort.Sort(entries)
// Append the key length and key
@ -281,11 +293,11 @@ func (d *directIndex) readKey(b []byte) (n int, key string, err error) {
func (d *directIndex) readEntries(b []byte) (n int, entries indexEntries, err error) {
// 2 byte count of index entries
n, count := 2, int(btou16(b[:2]))
n, count := indexCountSize, int(btou16(b[:2]))
for i := 0; i < count; i++ {
ie := &IndexEntry{}
if err := ie.UnmarshalBinary(b[i*indexEntrySize+2 : i*indexEntrySize+2+indexEntrySize]); err != nil {
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries = append(entries, ie)
@ -470,11 +482,11 @@ func (d *indirectIndex) readKey(b []byte) (n int, key string, err error) {
func (d *indirectIndex) readEntries(b []byte) (n int, entries indexEntries, err error) {
// 2 byte count of index entries
n, count := 2, int(btou16(b[:2]))
n, count := indexCountSize, int(btou16(b[:2]))
for i := 0; i < count; i++ {
ie := &IndexEntry{}
if err := ie.UnmarshalBinary(b[i*indexEntrySize+2 : i*indexEntrySize+2+indexEntrySize]); err != nil {
if err := ie.UnmarshalBinary(b[i*indexEntrySize+indexCountSize : i*indexEntrySize+indexCountSize+indexEntrySize]); err != nil {
return 0, nil, fmt.Errorf("readEntries: unmarshal error: %v", err)
}
entries = append(entries, ie)

View File

@ -424,3 +424,16 @@ func TestIndirectIndex_Entries_NonExistent(t *testing.T) {
t.Fatalf("entries length mismatch: got %v, exp %v", got, exp)
}
}
func TestIndirectIndex_MaxBlocks(t *testing.T) {
index := tsm1.NewDirectIndex()
for i := 0; i < 1<<16; i++ {
index.Add("cpu", time.Unix(0, 0), time.Unix(1, 0), 10, 20)
}
if _, err := index.MarshalBinary(); err == nil {
t.Fatalf("expected max block count error. got nil")
} else {
println(err.Error())
}
}