diff --git a/tsdb/engine/pd1/pd1.go b/tsdb/engine/pd1/pd1.go index 02708d7b45..b16561776b 100644 --- a/tsdb/engine/pd1/pd1.go +++ b/tsdb/engine/pd1/pd1.go @@ -58,6 +58,8 @@ const ( // MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall MAP_POPULATE = 0x8000 + + magicNumber uint32 = 0x16D116D1 ) // Ensure Engine implements the interface. @@ -308,6 +310,8 @@ func (e *Engine) WriteAndCompact(pointsByKey map[string]Values, measurementField return nil } +// rewriteFile will read in the old data file, if provided and merge the values +// in the passed map into a new data file func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error { // we need the values in sorted order so that we can merge them into the // new file as we read the old file @@ -351,31 +355,15 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro return err } - // write the header of the file and keep track of the current file position - currentPosition := uint32(4) - // series count - if _, err := f.Write(u32tob(uint32(len(ids)))); err != nil { + // write the magic number + if _, err := f.Write(u32tob(magicNumber)); err != nil { f.Close() return err } - // empty min time and max time - currentPosition += 16 - if _, err := f.Write([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}); err != nil { - f.Close() - return nil - } - - // write the series ids and empty starting positions - for _, id := range ids { - if _, err := f.Write(append(u64tob(id), []byte{0x00, 0x00, 0x00, 0x00}...)); err != nil { - f.Close() - return err - } - currentPosition += 12 - } // now combine the old file data with the new values, keeping track of // their positions + currentPosition := uint32(4) newPositions := make([]uint32, len(ids)) buf := make([]byte, DefaultMaxPointsPerBlock*20) for i, id := range ids { @@ -487,25 +475,31 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro } } - // write out the times and positions - if _, err := f.Seek(4, 0); err != nil { - f.Close() - return err + // write the file index, starting with the series ids and their positions + for i, id := range ids { + if _, err := f.Write(u64tob(id)); err != nil { + f.Close() + return err + } + if _, err := f.Write(u32tob(newPositions[i])); err != nil { + f.Close() + return err + } } + + // write the min time, max time if _, err := f.Write(append(u64tob(uint64(minTime)), u64tob(uint64(maxTime))...)); err != nil { f.Close() return err } - for _, pos := range newPositions { - if _, err := f.Seek(8, 1); err != nil { - f.Close() - return err - } - if _, err := f.Write(u32tob(pos)); err != nil { - return err - } + // series count + if _, err := f.Write(u32tob(uint32(len(ids)))); err != nil { + f.Close() + return err } + + // sync it and see4k back to the beginning to hand off to the mmap if err := f.Sync(); err != nil { return err } @@ -888,23 +882,24 @@ func (d *dataFile) close() error { } func (d *dataFile) MinTime() int64 { - return int64(btou64(d.mmap[4:12])) + return int64(btou64(d.mmap[d.size-20 : d.size-12])) } func (d *dataFile) MaxTime() int64 { - return int64(btou64(d.mmap[12:20])) + return int64(btou64(d.mmap[d.size-12 : d.size-4])) } func (d *dataFile) SeriesCount() uint32 { - return btou32(d.mmap[:4]) + return btou32(d.mmap[d.size-4:]) } func (d *dataFile) IDToPosition() map[uint64]uint32 { count := int(d.SeriesCount()) m := make(map[uint64]uint32) + indexStart := d.size - uint32(count*12+20) for i := 0; i < count; i++ { - offset := 20 + (i * 12) + offset := indexStart + uint32(i*12) id := btou64(d.mmap[offset : offset+8]) pos := btou32(d.mmap[offset+8 : offset+12]) m[id] = pos @@ -917,15 +912,17 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 { // first block for the given ID. If zero is returned the ID doesn't // have any data in this file. func (d *dataFile) StartingPositionForID(id uint64) uint32 { - seriesCount := d.SeriesCount() - min := 0 - max := int(seriesCount) + seriesCount := d.SeriesCount() + indexStart := d.size - uint32(seriesCount*12+20) + + min := uint32(0) + max := uint32(seriesCount) for min < max { mid := (max-min)/2 + min - offset := mid*seriesHeaderSize + fileHeaderSize + offset := mid*seriesHeaderSize + indexStart checkID := btou64(d.mmap[offset : offset+8]) if checkID == id { @@ -1066,6 +1063,7 @@ func (c *cursor) Next() (key, value []byte) { startingPos := f.StartingPositionForID(c.id) if startingPos == 0 { + c.filesPos++ continue } c.f = f @@ -1119,7 +1117,7 @@ func btou32(b []byte) uint32 { func hashSeriesField(key string) uint64 { h := fnv.New64a() h.Write([]byte(key)) - return h.Sum64() + return h.Sum64() % 100 } // seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID