Update WAL to fail writes if pressure too high.

If the memory gets 5x above the partition size threshold, the WAL will start returning write failures to the clients. This will allow them to backoff their write volume.

Also updated the stress script to track failed requests and output messages on failure and when it returns to success.
pull/4066/head
Paul Dix 2015-09-09 22:41:32 -07:00
parent 5e3fa1e80f
commit 2d67a9ea22
2 changed files with 30 additions and 14 deletions

View File

@ -40,7 +40,9 @@ func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
responseTimes := make([]int, 0) responseTimes := make([]int, 0)
failedRequests := 0
totalPoints := 0 totalPoints := 0
lastSuccess := true
batch := &client.BatchPoints{ batch := &client.BatchPoints{
Database: *database, Database: *database,
@ -63,9 +65,20 @@ func main() {
go func(b *client.BatchPoints, total int) { go func(b *client.BatchPoints, total int) {
st := time.Now() st := time.Now()
if _, err := c.Write(*b); err != nil { if _, err := c.Write(*b); err != nil {
fmt.Println("ERROR: ", err.Error()) mu.Lock()
if lastSuccess {
fmt.Println("ERROR: ", err.Error())
}
failedRequests += 1
totalPoints -= len(b.Points)
lastSuccess = false
mu.Unlock()
} else { } else {
mu.Lock() mu.Lock()
if !lastSuccess {
fmt.Println("success in ", time.Since(st))
}
lastSuccess = true
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds())) responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
mu.Unlock() mu.Unlock()
} }
@ -96,6 +109,7 @@ func main() {
mean := total / int64(len(responseTimes)) mean := total / int64(len(responseTimes))
fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds()) fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
fmt.Printf("%d requests failed for %d total points that didn't get posted.\n", failedRequests, failedRequests**batchSize)
fmt.Println("Average response time: ", time.Duration(mean)) fmt.Println("Average response time: ", time.Duration(mean))
fmt.Println("Slowest response times:") fmt.Println("Slowest response times:")
for _, r := range responseTimes[:100] { for _, r := range responseTimes[:100] {

View File

@ -57,6 +57,12 @@ const (
// flushed to the index // flushed to the index
MetaFlushInterval = 10 * time.Minute MetaFlushInterval = 10 * time.Minute
// FailWriteMemoryThreshold will start returning errors on writes if the memory gets more
// than this multiple above the maximum threshold. This is set to 5 because previously
// the memory threshold was for 5 partitions, but when this was introduce the partition
// count was reduced to 1 so we know that it can handle at least this much extra memory
FailWriteMemoryThreshold = 5
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria // defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
defaultFlushCheckInterval = time.Second defaultFlushCheckInterval = time.Second
) )
@ -756,28 +762,24 @@ func (p *Partition) Close() error {
// This method will also add the points to the in memory cache // This method will also add the points to the in memory cache
func (p *Partition) Write(points []tsdb.Point) error { func (p *Partition) Write(points []tsdb.Point) error {
// Check if we should compact due to memory pressure and what the backoff time should be. Only // Check if we should compact due to memory pressure and if we should fail the write if
// run the compaction in a goroutine if it's not already running. // we're way too far over the threshold.
if shouldSleep, shouldCompact, sleepTime := func() (shouldSleep bool, shouldCompact bool, sleepTime time.Duration) { if shouldFailWrite, shouldCompact := func() (shouldFailWrite bool, shouldCompact bool) {
p.mu.RLock() p.mu.RLock()
defer p.mu.RUnlock() defer p.mu.RUnlock()
// pause writes for a bit if we've hit the size threshold // pause writes for a bit if we've hit the size threshold
if p.memorySize > p.sizeThreshold { if p.memorySize > p.sizeThreshold {
if !p.compactionRunning { if !p.compactionRunning {
shouldCompact = true shouldCompact = true
} else if p.memorySize > p.sizeThreshold*FailWriteMemoryThreshold {
shouldFailWrite = true
} }
shouldSleep = true
// sleep for an increasing amount of time based on the percentage over the memory threshold we are
// over := float64(p.memorySize-p.sizeThreshold) / float64(p.sizeThreshold) * float64(100)
sleepTime = 0 // time.Duration(2*int(over)) * time.Millisecond
} }
return return
}(); shouldSleep { }(); shouldCompact {
if shouldCompact { go p.flushAndCompact(memoryFlush)
go p.flushAndCompact(memoryFlush) } else if shouldFailWrite {
} return fmt.Errorf("write throughput too high. backoff and retry")
time.Sleep(sleepTime)
} }
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()