Make drop database close and release resources

Drop database did not close any open shard files or close
any topic reader/heartbeats.  In the tests, we create and drop new
databases during each test run so these were open files and connection
slowed things down and consumed a lot of RAM as the tests progressed.
pull/2353/head
Jason Wilder 2015-04-21 11:17:34 -06:00
parent 25a43a8f11
commit 38628e540b
4 changed files with 76 additions and 7 deletions

View File

@ -45,7 +45,7 @@ func NewClient(c Config) (*Client, error) {
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{},
httpClient: http.DefaultClient,
userAgent: c.UserAgent,
}
if client.userAgent == "" {

View File

@ -34,11 +34,11 @@ const (
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
path string // config file path
conns map[uint64]*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
opened bool
@ -61,6 +61,7 @@ func NewClient(dataURL url.URL) *Client {
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
conns: map[uint64]*Conn{},
}
return c
}
@ -353,12 +354,31 @@ func (c *Client) Conn(topicID uint64) *Conn {
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)
if _, ok := c.conns[topicID]; ok {
panic(fmt.Sprintf("connection for topic %d already exists", topicID))
}
// Add to list of client connections.
c.conns = append(c.conns, conn)
c.conns[topicID] = conn
return conn
}
// CloseConn closes the connection to the broker for a given topic
func (c *Client) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
if conn, ok := c.conns[topicID]; ok && conn != nil {
if err := conn.Close(); err != nil {
return err
}
delete(c.conns, topicID)
}
return nil
}
// pinger periodically pings the broker to check that it is alive.
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()

View File

@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) {
// Remove from metastore.
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) })
db := s.databases[c.Name]
for _, rp := range db.policies {
for _, sg := range rp.shardGroups {
for _, sh := range sg.Shards {
// if we have this shard locally, close and remove it
if sh.store != nil {
// close topic readers/heartbeaters/etc. connections
err := s.client.CloseConn(sh.ID)
if err != nil {
panic(err)
}
err = sh.close()
if err != nil {
panic(err)
}
err = os.Remove(s.shardPath(sh.ID))
if err != nil {
panic(err)
}
}
delete(s.shards, sh.ID)
}
}
}
// Delete the database entry.
delete(s.databases, c.Name)
return
}
@ -3518,6 +3548,7 @@ type MessagingClient interface {
// Conn returns an open, streaming connection to a topic.
Conn(topicID uint64) MessagingConn
CloseConn(topicID uint64) error
}
type messagingClient struct {

View File

@ -90,6 +90,24 @@ func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}
func (c *MessagingClient) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
conns := []*MessagingConn{}
for _, conn := range c.conns {
if conn.topicID == topicID {
if err := conn.Close(); err != nil {
return err
}
continue
}
conns = append(conns, conn)
}
c.conns = conns
return nil
}
// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()