Update WAL to fail writes if pressure too high.
If the memory gets 5x above the partition size threshold, the WAL will start returning write failures to the clients. This will allow them to backoff their write volume. Also updated the stress script to track failed requests and output messages on failure and when it returns to success.pull/4066/head
parent
5e3fa1e80f
commit
2d67a9ea22
|
@ -40,7 +40,9 @@ func main() {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
responseTimes := make([]int, 0)
|
responseTimes := make([]int, 0)
|
||||||
|
|
||||||
|
failedRequests := 0
|
||||||
totalPoints := 0
|
totalPoints := 0
|
||||||
|
lastSuccess := true
|
||||||
|
|
||||||
batch := &client.BatchPoints{
|
batch := &client.BatchPoints{
|
||||||
Database: *database,
|
Database: *database,
|
||||||
|
@ -63,9 +65,20 @@ func main() {
|
||||||
go func(b *client.BatchPoints, total int) {
|
go func(b *client.BatchPoints, total int) {
|
||||||
st := time.Now()
|
st := time.Now()
|
||||||
if _, err := c.Write(*b); err != nil {
|
if _, err := c.Write(*b); err != nil {
|
||||||
fmt.Println("ERROR: ", err.Error())
|
mu.Lock()
|
||||||
|
if lastSuccess {
|
||||||
|
fmt.Println("ERROR: ", err.Error())
|
||||||
|
}
|
||||||
|
failedRequests += 1
|
||||||
|
totalPoints -= len(b.Points)
|
||||||
|
lastSuccess = false
|
||||||
|
mu.Unlock()
|
||||||
} else {
|
} else {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
|
if !lastSuccess {
|
||||||
|
fmt.Println("success in ", time.Since(st))
|
||||||
|
}
|
||||||
|
lastSuccess = true
|
||||||
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
|
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
}
|
}
|
||||||
|
@ -96,6 +109,7 @@ func main() {
|
||||||
mean := total / int64(len(responseTimes))
|
mean := total / int64(len(responseTimes))
|
||||||
|
|
||||||
fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
|
fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
|
||||||
|
fmt.Printf("%d requests failed for %d total points that didn't get posted.\n", failedRequests, failedRequests**batchSize)
|
||||||
fmt.Println("Average response time: ", time.Duration(mean))
|
fmt.Println("Average response time: ", time.Duration(mean))
|
||||||
fmt.Println("Slowest response times:")
|
fmt.Println("Slowest response times:")
|
||||||
for _, r := range responseTimes[:100] {
|
for _, r := range responseTimes[:100] {
|
||||||
|
|
|
@ -57,6 +57,12 @@ const (
|
||||||
// flushed to the index
|
// flushed to the index
|
||||||
MetaFlushInterval = 10 * time.Minute
|
MetaFlushInterval = 10 * time.Minute
|
||||||
|
|
||||||
|
// FailWriteMemoryThreshold will start returning errors on writes if the memory gets more
|
||||||
|
// than this multiple above the maximum threshold. This is set to 5 because previously
|
||||||
|
// the memory threshold was for 5 partitions, but when this was introduce the partition
|
||||||
|
// count was reduced to 1 so we know that it can handle at least this much extra memory
|
||||||
|
FailWriteMemoryThreshold = 5
|
||||||
|
|
||||||
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
|
// defaultFlushCheckInterval is how often flushes are triggered automatically by the flush criteria
|
||||||
defaultFlushCheckInterval = time.Second
|
defaultFlushCheckInterval = time.Second
|
||||||
)
|
)
|
||||||
|
@ -756,28 +762,24 @@ func (p *Partition) Close() error {
|
||||||
// This method will also add the points to the in memory cache
|
// This method will also add the points to the in memory cache
|
||||||
func (p *Partition) Write(points []tsdb.Point) error {
|
func (p *Partition) Write(points []tsdb.Point) error {
|
||||||
|
|
||||||
// Check if we should compact due to memory pressure and what the backoff time should be. Only
|
// Check if we should compact due to memory pressure and if we should fail the write if
|
||||||
// run the compaction in a goroutine if it's not already running.
|
// we're way too far over the threshold.
|
||||||
if shouldSleep, shouldCompact, sleepTime := func() (shouldSleep bool, shouldCompact bool, sleepTime time.Duration) {
|
if shouldFailWrite, shouldCompact := func() (shouldFailWrite bool, shouldCompact bool) {
|
||||||
p.mu.RLock()
|
p.mu.RLock()
|
||||||
defer p.mu.RUnlock()
|
defer p.mu.RUnlock()
|
||||||
// pause writes for a bit if we've hit the size threshold
|
// pause writes for a bit if we've hit the size threshold
|
||||||
if p.memorySize > p.sizeThreshold {
|
if p.memorySize > p.sizeThreshold {
|
||||||
if !p.compactionRunning {
|
if !p.compactionRunning {
|
||||||
shouldCompact = true
|
shouldCompact = true
|
||||||
|
} else if p.memorySize > p.sizeThreshold*FailWriteMemoryThreshold {
|
||||||
|
shouldFailWrite = true
|
||||||
}
|
}
|
||||||
shouldSleep = true
|
|
||||||
|
|
||||||
// sleep for an increasing amount of time based on the percentage over the memory threshold we are
|
|
||||||
// over := float64(p.memorySize-p.sizeThreshold) / float64(p.sizeThreshold) * float64(100)
|
|
||||||
sleepTime = 0 // time.Duration(2*int(over)) * time.Millisecond
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}(); shouldSleep {
|
}(); shouldCompact {
|
||||||
if shouldCompact {
|
go p.flushAndCompact(memoryFlush)
|
||||||
go p.flushAndCompact(memoryFlush)
|
} else if shouldFailWrite {
|
||||||
}
|
return fmt.Errorf("write throughput too high. backoff and retry")
|
||||||
time.Sleep(sleepTime)
|
|
||||||
}
|
}
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
|
|
Loading…
Reference in New Issue