Handle case where IDs file is out of sync with tsm1 file or missing

Avoids a panic if a series ID exists in the tsm file but not in the IDs file.
Also handles the case were we don't have an ids file and just a tsm file.
pull/4445/head
Jason Wilder 2015-10-13 17:10:08 -06:00
parent ff9a3072ab
commit ad35dcea38
1 changed files with 32 additions and 3 deletions

View File

@ -212,6 +212,8 @@ func readIndex(f *os.File) *tsmIndex {
}
func dumpTsm1(path string) {
var errors []error
f, err := os.Open(path)
if err != nil {
println(err.Error())
@ -266,12 +268,32 @@ func dumpTsm1(path string) {
key := invIds[block.id]
split := strings.Split(key, "#!~#")
// We dont' know know if we have fields so use an informative default
var measurement, field string = "UNKNOWN", "UNKNOWN"
// We read some IDs from the ids file
if len(invIds) > 0 {
// Change the default to error until we know we have a valid key
measurement = "ERR"
field = "ERR"
// Possible corruption? Try to read as much as we can and point to the problem.
if key == "" {
errors = append(errors, fmt.Errorf("index pos %d, field id: %d, missing key for id.", i, block.id))
} else if len(split) < 2 {
errors = append(errors, fmt.Errorf("index pos %d, field id: %d, key corrupt: got '%v'", i, block.id, key))
} else {
measurement = split[0]
field = split[1]
}
}
fmt.Fprintln(tw, " "+strings.Join([]string{
strconv.FormatInt(int64(i), 10),
strconv.FormatUint(block.id, 10),
strconv.FormatInt(int64(block.offset), 10),
split[0],
split[1],
measurement,
field,
}, "\t"))
}
@ -375,5 +397,12 @@ func dumpTsm1(path string) {
fmt.Printf(" Per block: %0.2f bytes/point\n", float64(blockSize)/float64(pointCount))
fmt.Printf(" Total: %0.2f bytes/point\n", float64(stat.Size())/float64(pointCount))
println()
if len(errors) > 0 {
println()
fmt.Printf("Errors (%d):\n", len(errors))
for _, err := range errors {
fmt.Printf(" * %v\n", err)
}
println()
}
}