Update WAL to fail writes if pressure too high.

If the memory gets 5x above the partition size threshold, the WAL will start returning write failures to the clients. This will allow them to backoff their write volume.

Also updated the stress script to track failed requests and output messages on failure and when it returns to success.
pull/4066/head
Paul Dix 2015-09-09 22:41:32 -07:00
parent 5e3fa1e80f
commit 2d67a9ea22
2 changed files with 30 additions and 14 deletions

View File

@ -40,7 +40,9 @@ func main() {
var wg sync.WaitGroup
responseTimes := make([]int, 0)
failedRequests := 0
totalPoints := 0
lastSuccess := true
batch := &client.BatchPoints{
Database: *database,
@ -63,9 +65,20 @@ func main() {
go func(b *client.BatchPoints, total int) {
st := time.Now()
if _, err := c.Write(*b); err != nil {
fmt.Println("ERROR: ", err.Error())
mu.Lock()
if lastSuccess {
fmt.Println("ERROR: ", err.Error())
}
failedRequests += 1
totalPoints -= len(b.Points)
lastSuccess = false
mu.Unlock()
} else {
mu.Lock()
if !lastSuccess {
fmt.Println("success in ", time.Since(st))
}
lastSuccess = true
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
mu.Unlock()
}
@ -96,6 +109,7 @@ func main() {
mean := total / int64(len(responseTimes))
fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
fmt.Printf("%d requests failed for %d total points that didn't get posted.\n", failedRequests, failedRequests**batchSize)
fmt.Println("Average response time: ", time.Duration(mean))
fmt.Println("Slowest response times:")
for _, r := range responseTimes[:100] {

View File

@ -57,6 +57,12 @@ const (
// flushed to the index
MetaFlushInterval = 10 * time.Minute
// FailWriteMemoryThreshold will start returning errors on writes if the memory gets more
// than this multiple above the maximum threshold. This is set to 5 because previously
// the memory threshold was for 5 partitions, but when this was introduce the partition
// count was reduced to 1 so we know that it can handle at least this much extra memory
FailWriteMemoryThreshold = 5
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second
)
@ -756,28 +762,24 @@ func (p *Partition) Close() error {
// This method will also add the points to the in memory cache
func (p *Partition) Write(points []tsdb.Point) error {
// Check if we should compact due to memory pressure and what the backoff time should be. Only
// run the compaction in a goroutine if it's not already running.
if shouldSleep, shouldCompact, sleepTime := func() (shouldSleep bool, shouldCompact bool, sleepTime time.Duration) {
// Check if we should compact due to memory pressure and if we should fail the write if
// we're way too far over the threshold.
if shouldFailWrite, shouldCompact := func() (shouldFailWrite bool, shouldCompact bool) {
p.mu.RLock()
defer p.mu.RUnlock()
// pause writes for a bit if we've hit the size threshold
if p.memorySize > p.sizeThreshold {
if !p.compactionRunning {
shouldCompact = true
} else if p.memorySize > p.sizeThreshold*FailWriteMemoryThreshold {
shouldFailWrite = true
}
shouldSleep = true
// sleep for an increasing amount of time based on the percentage over the memory threshold we are
// over := float64(p.memorySize-p.sizeThreshold) / float64(p.sizeThreshold) * float64(100)
sleepTime = 0 // time.Duration(2*int(over)) * time.Millisecond
}
return
}(); shouldSleep {
if shouldCompact {
go p.flushAndCompact(memoryFlush)
}
time.Sleep(sleepTime)
}(); shouldCompact {
go p.flushAndCompact(memoryFlush)
} else if shouldFailWrite {
return fmt.Errorf("write throughput too high. backoff and retry")
}
p.mu.Lock()
defer p.mu.Unlock()