Merge pull request #6601 from influxdata/js-6559-http-connection-limits
Teach the http service how to enforce connection limitspull/6623/head
commit
51b3f46970
|
@ -8,6 +8,7 @@
|
|||
|
||||
- [#3541](https://github.com/influxdata/influxdb/issues/3451): Update SHOW FIELD KEYS to return the field type with the field key.
|
||||
- [#6609](https://github.com/influxdata/influxdb/pull/6609): Add support for JWT token authentication.
|
||||
- [#6559](https://github.com/influxdata/influxdb/issues/6559): Teach the http service how to enforce connection limits.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -5,16 +5,17 @@ const DefaultBindAddress = ":8086"
|
|||
|
||||
// Config represents a configuration for a HTTP service.
|
||||
type Config struct {
|
||||
Enabled bool `toml:"enabled"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
AuthEnabled bool `toml:"auth-enabled"`
|
||||
LogEnabled bool `toml:"log-enabled"`
|
||||
WriteTracing bool `toml:"write-tracing"`
|
||||
PprofEnabled bool `toml:"pprof-enabled"`
|
||||
HTTPSEnabled bool `toml:"https-enabled"`
|
||||
HTTPSCertificate string `toml:"https-certificate"`
|
||||
MaxRowLimit int `toml:"max-row-limit"`
|
||||
SharedSecret string `toml:"shared-secret"`
|
||||
Enabled bool `toml:"enabled"`
|
||||
BindAddress string `toml:"bind-address"`
|
||||
AuthEnabled bool `toml:"auth-enabled"`
|
||||
LogEnabled bool `toml:"log-enabled"`
|
||||
WriteTracing bool `toml:"write-tracing"`
|
||||
PprofEnabled bool `toml:"pprof-enabled"`
|
||||
HTTPSEnabled bool `toml:"https-enabled"`
|
||||
HTTPSCertificate string `toml:"https-certificate"`
|
||||
MaxRowLimit int `toml:"max-row-limit"`
|
||||
MaxConnectionLimit int `toml:"max-connection-limit"`
|
||||
SharedSecret string `toml:"shared-secret"`
|
||||
}
|
||||
|
||||
// NewConfig returns a new Config with default settings.
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
package httpd
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// LimitListener returns a Listener that accepts at most n simultaneous
|
||||
// connections from the provided Listener and will drop extra connections.
|
||||
func LimitListener(l net.Listener, n int) net.Listener {
|
||||
return &limitListener{Listener: l, sem: make(chan struct{}, n)}
|
||||
}
|
||||
|
||||
// limitListener is a listener that limits the number of active connections
|
||||
// at any given time.
|
||||
type limitListener struct {
|
||||
net.Listener
|
||||
sem chan struct{}
|
||||
}
|
||||
|
||||
func (l *limitListener) release() {
|
||||
<-l.sem
|
||||
}
|
||||
|
||||
func (l *limitListener) Accept() (net.Conn, error) {
|
||||
for {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
select {
|
||||
case l.sem <- struct{}{}:
|
||||
return &limitListenerConn{Conn: c, release: l.release}, nil
|
||||
default:
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type limitListenerConn struct {
|
||||
net.Conn
|
||||
releaseOnce sync.Once
|
||||
release func()
|
||||
}
|
||||
|
||||
func (l *limitListenerConn) Close() error {
|
||||
err := l.Conn.Close()
|
||||
l.releaseOnce.Do(l.release)
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package httpd_test
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/influxdata/influxdb/services/httpd"
|
||||
)
|
||||
|
||||
type fakeListener struct {
|
||||
AcceptFn func() (net.Conn, error)
|
||||
}
|
||||
|
||||
func (l *fakeListener) Accept() (net.Conn, error) {
|
||||
if l.AcceptFn != nil {
|
||||
return l.AcceptFn()
|
||||
}
|
||||
return &fakeConn{}, nil
|
||||
}
|
||||
|
||||
func (*fakeListener) Close() error { return nil }
|
||||
func (*fakeListener) Addr() net.Addr { return nil }
|
||||
|
||||
type fakeConn struct {
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (*fakeConn) Read([]byte) (int, error) { return 0, io.EOF }
|
||||
func (*fakeConn) Write(b []byte) (int, error) { return len(b), nil }
|
||||
func (c *fakeConn) Close() error {
|
||||
c.closed = true
|
||||
return nil
|
||||
}
|
||||
func (*fakeConn) LocalAddr() net.Addr { return nil }
|
||||
func (*fakeConn) RemoteAddr() net.Addr { return nil }
|
||||
func (*fakeConn) SetDeadline(time.Time) error { return nil }
|
||||
func (*fakeConn) SetReadDeadline(time.Time) error { return nil }
|
||||
func (*fakeConn) SetWriteDeadline(time.Time) error { return nil }
|
||||
|
||||
func TestLimitListener(t *testing.T) {
|
||||
conns := make(chan net.Conn, 2)
|
||||
l := httpd.LimitListener(&fakeListener{
|
||||
AcceptFn: func() (net.Conn, error) {
|
||||
select {
|
||||
case c := <-conns:
|
||||
if c != nil {
|
||||
return c, nil
|
||||
}
|
||||
default:
|
||||
}
|
||||
return nil, io.EOF
|
||||
},
|
||||
}, 1)
|
||||
c1, c2 := &fakeConn{}, &fakeConn{}
|
||||
conns <- c1
|
||||
conns <- c2
|
||||
|
||||
var c net.Conn
|
||||
var err error
|
||||
if c, err = l.Accept(); err != nil {
|
||||
t.Fatalf("expected accept to succeed: %s", err)
|
||||
}
|
||||
|
||||
if _, err = l.Accept(); err != io.EOF {
|
||||
t.Fatalf("expected eof, got %s", err)
|
||||
} else if !c2.closed {
|
||||
t.Fatalf("expected connection to be automatically closed")
|
||||
}
|
||||
c.Close()
|
||||
|
||||
conns <- &fakeConn{}
|
||||
if _, err = l.Accept(); err != nil {
|
||||
t.Fatalf("expeced accept to succeed: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLimitListener(b *testing.B) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(b.N)
|
||||
|
||||
l := httpd.LimitListener(&fakeListener{}, b.N)
|
||||
for i := 0; i < b.N; i++ {
|
||||
go func() {
|
||||
c, err := l.Accept()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
|
@ -40,6 +40,7 @@ type Service struct {
|
|||
addr string
|
||||
https bool
|
||||
cert string
|
||||
limit int
|
||||
err chan error
|
||||
|
||||
Handler *Handler
|
||||
|
@ -60,6 +61,7 @@ func NewService(c Config) *Service {
|
|||
addr: c.BindAddress,
|
||||
https: c.HTTPSEnabled,
|
||||
cert: c.HTTPSCertificate,
|
||||
limit: c.MaxConnectionLimit,
|
||||
err: make(chan error),
|
||||
Handler: NewHandler(c, statMap),
|
||||
Logger: log.New(os.Stderr, "[httpd] ", log.LstdFlags),
|
||||
|
@ -99,6 +101,11 @@ func (s *Service) Open() error {
|
|||
s.ln = listener
|
||||
}
|
||||
|
||||
// Enforce a connection limit if one has been given.
|
||||
if s.limit > 0 {
|
||||
s.ln = LimitListener(s.ln, s.limit)
|
||||
}
|
||||
|
||||
// wait for the listeners to start
|
||||
timeout := time.Now().Add(time.Second)
|
||||
for {
|
||||
|
|
Loading…
Reference in New Issue