Update engine to put index at the end of data files
parent
750856836e
commit
38f9b29925
|
@ -58,6 +58,8 @@ const (
|
||||||
|
|
||||||
// MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall
|
// MAP_POPULATE is for the mmap syscall. For some reason this isn't defined in golang's syscall
|
||||||
MAP_POPULATE = 0x8000
|
MAP_POPULATE = 0x8000
|
||||||
|
|
||||||
|
magicNumber uint32 = 0x16D116D1
|
||||||
)
|
)
|
||||||
|
|
||||||
// Ensure Engine implements the interface.
|
// Ensure Engine implements the interface.
|
||||||
|
@ -308,6 +310,8 @@ func (e *Engine) WriteAndCompact(pointsByKey map[string]Values, measurementField
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rewriteFile will read in the old data file, if provided and merge the values
|
||||||
|
// in the passed map into a new data file
|
||||||
func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error {
|
func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) error {
|
||||||
// we need the values in sorted order so that we can merge them into the
|
// we need the values in sorted order so that we can merge them into the
|
||||||
// new file as we read the old file
|
// new file as we read the old file
|
||||||
|
@ -351,31 +355,15 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// write the header of the file and keep track of the current file position
|
// write the magic number
|
||||||
currentPosition := uint32(4)
|
if _, err := f.Write(u32tob(magicNumber)); err != nil {
|
||||||
// series count
|
|
||||||
if _, err := f.Write(u32tob(uint32(len(ids)))); err != nil {
|
|
||||||
f.Close()
|
f.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// empty min time and max time
|
|
||||||
currentPosition += 16
|
|
||||||
if _, err := f.Write([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}); err != nil {
|
|
||||||
f.Close()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// write the series ids and empty starting positions
|
|
||||||
for _, id := range ids {
|
|
||||||
if _, err := f.Write(append(u64tob(id), []byte{0x00, 0x00, 0x00, 0x00}...)); err != nil {
|
|
||||||
f.Close()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
currentPosition += 12
|
|
||||||
}
|
|
||||||
|
|
||||||
// now combine the old file data with the new values, keeping track of
|
// now combine the old file data with the new values, keeping track of
|
||||||
// their positions
|
// their positions
|
||||||
|
currentPosition := uint32(4)
|
||||||
newPositions := make([]uint32, len(ids))
|
newPositions := make([]uint32, len(ids))
|
||||||
buf := make([]byte, DefaultMaxPointsPerBlock*20)
|
buf := make([]byte, DefaultMaxPointsPerBlock*20)
|
||||||
for i, id := range ids {
|
for i, id := range ids {
|
||||||
|
@ -487,25 +475,31 @@ func (e *Engine) rewriteFile(oldDF *dataFile, valuesByID map[uint64]Values) erro
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// write out the times and positions
|
// write the file index, starting with the series ids and their positions
|
||||||
if _, err := f.Seek(4, 0); err != nil {
|
for i, id := range ids {
|
||||||
|
if _, err := f.Write(u64tob(id)); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if _, err := f.Write(u32tob(newPositions[i])); err != nil {
|
||||||
|
f.Close()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write the min time, max time
|
||||||
if _, err := f.Write(append(u64tob(uint64(minTime)), u64tob(uint64(maxTime))...)); err != nil {
|
if _, err := f.Write(append(u64tob(uint64(minTime)), u64tob(uint64(maxTime))...)); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for _, pos := range newPositions {
|
|
||||||
if _, err := f.Seek(8, 1); err != nil {
|
// series count
|
||||||
|
if _, err := f.Write(u32tob(uint32(len(ids)))); err != nil {
|
||||||
f.Close()
|
f.Close()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := f.Write(u32tob(pos)); err != nil {
|
// sync it and see4k back to the beginning to hand off to the mmap
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err := f.Sync(); err != nil {
|
if err := f.Sync(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -888,23 +882,24 @@ func (d *dataFile) close() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dataFile) MinTime() int64 {
|
func (d *dataFile) MinTime() int64 {
|
||||||
return int64(btou64(d.mmap[4:12]))
|
return int64(btou64(d.mmap[d.size-20 : d.size-12]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dataFile) MaxTime() int64 {
|
func (d *dataFile) MaxTime() int64 {
|
||||||
return int64(btou64(d.mmap[12:20]))
|
return int64(btou64(d.mmap[d.size-12 : d.size-4]))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dataFile) SeriesCount() uint32 {
|
func (d *dataFile) SeriesCount() uint32 {
|
||||||
return btou32(d.mmap[:4])
|
return btou32(d.mmap[d.size-4:])
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *dataFile) IDToPosition() map[uint64]uint32 {
|
func (d *dataFile) IDToPosition() map[uint64]uint32 {
|
||||||
count := int(d.SeriesCount())
|
count := int(d.SeriesCount())
|
||||||
m := make(map[uint64]uint32)
|
m := make(map[uint64]uint32)
|
||||||
|
|
||||||
|
indexStart := d.size - uint32(count*12+20)
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
offset := 20 + (i * 12)
|
offset := indexStart + uint32(i*12)
|
||||||
id := btou64(d.mmap[offset : offset+8])
|
id := btou64(d.mmap[offset : offset+8])
|
||||||
pos := btou32(d.mmap[offset+8 : offset+12])
|
pos := btou32(d.mmap[offset+8 : offset+12])
|
||||||
m[id] = pos
|
m[id] = pos
|
||||||
|
@ -917,15 +912,17 @@ func (d *dataFile) IDToPosition() map[uint64]uint32 {
|
||||||
// first block for the given ID. If zero is returned the ID doesn't
|
// first block for the given ID. If zero is returned the ID doesn't
|
||||||
// have any data in this file.
|
// have any data in this file.
|
||||||
func (d *dataFile) StartingPositionForID(id uint64) uint32 {
|
func (d *dataFile) StartingPositionForID(id uint64) uint32 {
|
||||||
seriesCount := d.SeriesCount()
|
|
||||||
|
|
||||||
min := 0
|
seriesCount := d.SeriesCount()
|
||||||
max := int(seriesCount)
|
indexStart := d.size - uint32(seriesCount*12+20)
|
||||||
|
|
||||||
|
min := uint32(0)
|
||||||
|
max := uint32(seriesCount)
|
||||||
|
|
||||||
for min < max {
|
for min < max {
|
||||||
mid := (max-min)/2 + min
|
mid := (max-min)/2 + min
|
||||||
|
|
||||||
offset := mid*seriesHeaderSize + fileHeaderSize
|
offset := mid*seriesHeaderSize + indexStart
|
||||||
checkID := btou64(d.mmap[offset : offset+8])
|
checkID := btou64(d.mmap[offset : offset+8])
|
||||||
|
|
||||||
if checkID == id {
|
if checkID == id {
|
||||||
|
@ -1066,6 +1063,7 @@ func (c *cursor) Next() (key, value []byte) {
|
||||||
|
|
||||||
startingPos := f.StartingPositionForID(c.id)
|
startingPos := f.StartingPositionForID(c.id)
|
||||||
if startingPos == 0 {
|
if startingPos == 0 {
|
||||||
|
c.filesPos++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.f = f
|
c.f = f
|
||||||
|
@ -1119,7 +1117,7 @@ func btou32(b []byte) uint32 {
|
||||||
func hashSeriesField(key string) uint64 {
|
func hashSeriesField(key string) uint64 {
|
||||||
h := fnv.New64a()
|
h := fnv.New64a()
|
||||||
h.Write([]byte(key))
|
h.Write([]byte(key))
|
||||||
return h.Sum64()
|
return h.Sum64() % 100
|
||||||
}
|
}
|
||||||
|
|
||||||
// seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
|
// seriesFieldKey combine a series key and field name for a unique string to be hashed to a numeric ID
|
||||||
|
|
Loading…
Reference in New Issue