Merge pull request #3761 from influxdb/pd-fix-wal-query-during-flush
Fix bug querying data from WAL while compacting.pull/3744/head
commit
01f4dc6c95
|
@ -996,7 +996,13 @@ func (p *Partition) flushAndCompact(flush flushType) error {
|
|||
|
||||
startTime := time.Now()
|
||||
if p.log.EnableLogging {
|
||||
p.log.logger.Printf("compacting %d series from partition %d\n", len(c.seriesToFlush), p.id)
|
||||
ftype := "idle"
|
||||
if flush == thresholdFlush {
|
||||
ftype = "threshold"
|
||||
} else if flush == memoryFlush {
|
||||
ftype = "memory"
|
||||
}
|
||||
p.log.logger.Printf("Flush due to %s. Compacting %d series from partition %d\n", ftype, len(c.seriesToFlush), p.id)
|
||||
}
|
||||
|
||||
// write the data to the index first
|
||||
|
@ -1289,7 +1295,7 @@ func (p *Partition) cursor(key string) *cursor {
|
|||
|
||||
entry := p.cache[key]
|
||||
if entry == nil {
|
||||
return &cursor{}
|
||||
entry = &cacheEntry{}
|
||||
}
|
||||
|
||||
// if we're in the middle of a flush, combine the previous cache
|
||||
|
@ -1300,7 +1306,8 @@ func (p *Partition) cursor(key string) *cursor {
|
|||
copy(c, fc)
|
||||
c = append(c, entry.points...)
|
||||
|
||||
return &cursor{cache: tsdb.DedupeEntries(c)}
|
||||
dedupe := tsdb.DedupeEntries(c)
|
||||
return &cursor{cache: dedupe}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -772,6 +772,59 @@ func TestWAL_Compact_Recovery(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWAL_QueryDuringCompaction(t *testing.T) {
|
||||
log := openTestWAL()
|
||||
log.partitionCount = 1
|
||||
defer log.Close()
|
||||
defer os.RemoveAll(log.path)
|
||||
|
||||
var points []map[string][][]byte
|
||||
finishCompaction := make(chan struct{})
|
||||
log.Index = &testIndexWriter{fn: func(pointsByKey map[string][][]byte, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error {
|
||||
points = append(points, pointsByKey)
|
||||
finishCompaction <- struct{}{}
|
||||
return nil
|
||||
}}
|
||||
|
||||
if err := log.Open(); err != nil {
|
||||
t.Fatalf("couldn't open wal: %s", err.Error())
|
||||
}
|
||||
|
||||
codec := tsdb.NewFieldCodec(map[string]*tsdb.Field{
|
||||
"value": {
|
||||
ID: uint8(1),
|
||||
Name: "value",
|
||||
Type: influxql.Float,
|
||||
},
|
||||
})
|
||||
|
||||
// test that we can write to two different series
|
||||
p1 := parsePoint("cpu,host=A value=23.2 1", codec)
|
||||
if err := log.WritePoints([]tsdb.Point{p1}, nil, nil); err != nil {
|
||||
t.Fatalf("failed to write points: %s", err.Error())
|
||||
}
|
||||
|
||||
verify := func() {
|
||||
c := log.Cursor("cpu,host=A")
|
||||
k, v := c.Seek(inttob(1))
|
||||
// ensure the series are there and points are in order
|
||||
if bytes.Compare(v, p1.Data()) != 0 {
|
||||
<-finishCompaction
|
||||
t.Fatalf("expected to seek to first point but got key and value: %v %v", k, v)
|
||||
}
|
||||
}
|
||||
|
||||
verify()
|
||||
go func() {
|
||||
log.Flush()
|
||||
}()
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
verify()
|
||||
<-finishCompaction
|
||||
verify()
|
||||
}
|
||||
|
||||
// test that partitions get compacted and flushed when number of series hits compaction threshold
|
||||
// test that partitions get compacted and flushed when a single series hits the compaction threshold
|
||||
// test that writes slow down when the partition size threshold is hit
|
||||
|
|
Loading…
Reference in New Issue