fix the memory leak on the wait API

The pollForIndex method starts a goroutine to check the latest index
periodically. But when the HTTP request finishs, the loop is still there
and the goroutine never exits.

This leads to a memory leak and goroutine leak after each request to the
"/wait/:index" API.

The problem can be reproduced by ```ab -c 10 -t 30 http://127.0.0.1:8086/wait/1```.
After several times of benchmark you'll find that the memory usage grows markedly.

Signed-off-by: Shijiang Wei <mountkin@gmail.com>
pull/1862/head
Shijiang Wei 2015-03-06 16:54:12 +08:00
parent 20b964b496
commit 6d5ca0e4ef
1 changed files with 51 additions and 19 deletions

View File

@ -308,36 +308,68 @@ func (h *Handler) serveWait(w http.ResponseWriter, r *http.Request) {
} else { } else {
d = time.Duration(timeout) * time.Millisecond d = time.Duration(timeout) * time.Millisecond
} }
err := h.pollForIndex(index, d) poller := &indexPoller{
if err != nil { h: h,
w.WriteHeader(http.StatusRequestTimeout) quit: make(chan bool),
return }
if notify, ok := w.(http.CloseNotifier); ok {
go func(poller *indexPoller) {
<-notify.CloseNotify()
poller.Quit()
}(poller)
}
err := poller.PollForIndex(index, d)
switch err {
case errPollTimedOut:
w.WriteHeader(http.StatusRequestTimeout)
case nil:
w.Write([]byte(fmt.Sprintf("%d", h.server.Index())))
} }
w.Write([]byte(fmt.Sprintf("%d", h.server.Index())))
} }
// pollForIndex will poll until either the index is met or it times out type indexPoller struct {
// timeout is in milliseconds h *Handler
func (h *Handler) pollForIndex(index uint64, timeout time.Duration) error { quit chan bool
done := make(chan struct{}) }
var (
errPollTimedOut = errors.New("timed out")
errAborted = errors.New("aborted")
)
func (p *indexPoller) PollForIndex(index uint64, timeout time.Duration) error {
aborted := make(chan bool)
done := make(chan bool)
go func() { go func() {
for { for {
if h.server.Index() >= index { select {
done <- struct{}{} case <-p.quit:
aborted <- true
return
default:
if p.h.server.Index() >= index {
done <- true
return
}
time.Sleep(10 * time.Millisecond)
} }
time.Sleep(10 * time.Millisecond)
} }
}() }()
for { select {
select { case <-time.After(timeout):
case <-done: return errPollTimedOut
return nil case <-aborted:
case <-time.After(timeout): return errAborted
return fmt.Errorf("timed out") case <-done:
} return nil
} }
return nil
}
func (p *indexPoller) Quit() {
p.quit <- true
} }
// serveDataNodes returns a list of all data nodes in the cluster. // serveDataNodes returns a list of all data nodes in the cluster.