fix #360. Store and recover continuous queries on startup

pull/370/head
John Shahid 2014-03-28 13:17:36 -04:00
parent c8f88c5bb5
commit 1772fe9e7c
2 changed files with 74 additions and 25 deletions

View File

@ -265,6 +265,17 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()
maxId := uint32(0)
for _, query := range self.continuousQueries[db] {
if query.Id > maxId {
maxId = query.Id
}
}
return self.addContinuousQuery(db, &ContinuousQuery{maxId, query})
}
func (self *ClusterConfiguration) addContinuousQuery(db string, query *ContinuousQuery) error {
if self.continuousQueries == nil {
self.continuousQueries = map[string][]*ContinuousQuery{}
}
@ -273,26 +284,17 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
self.ParsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
}
maxId := uint32(0)
for _, query := range self.continuousQueries[db] {
if query.Id > maxId {
maxId = query.Id
}
}
selectQuery, err := parser.ParseSelectQuery(query)
selectQuery, err := parser.ParseSelectQuery(query.Query)
if err != nil {
return fmt.Errorf("Failed to parse continuous query: %s", query)
}
queryId := maxId + 1
if self.ParsedContinuousQueries[db] == nil {
self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{query.Id: selectQuery}
} else {
self.ParsedContinuousQueries[db][queryId] = selectQuery
self.ParsedContinuousQueries[db][query.Id] = selectQuery
}
self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query})
self.continuousQueries[db] = append(self.continuousQueries[db], query)
return nil
}
@ -415,23 +417,25 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) {
}
type SavedConfiguration struct {
Databases map[string]uint8
Admins map[string]*ClusterAdmin
DbUsers map[string]map[string]*DbUser
Servers []*ClusterServer
ShortTermShards []*NewShardData
LongTermShards []*NewShardData
Databases map[string]uint8
Admins map[string]*ClusterAdmin
DbUsers map[string]map[string]*DbUser
Servers []*ClusterServer
ShortTermShards []*NewShardData
LongTermShards []*NewShardData
ContinuousQueries map[string][]*ContinuousQuery
}
func (self *ClusterConfiguration) Save() ([]byte, error) {
log.Debug("Dumping the cluster configuration")
data := &SavedConfiguration{
Databases: self.DatabaseReplicationFactors,
Admins: self.clusterAdmins,
DbUsers: self.dbUsers,
Servers: self.servers,
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
Databases: self.DatabaseReplicationFactors,
Admins: self.clusterAdmins,
DbUsers: self.dbUsers,
Servers: self.servers,
ContinuousQueries: self.continuousQueries,
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
}
b := bytes.NewBuffer(nil)
@ -527,6 +531,12 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
self.shardsById[s.id] = shard
}
for db, queries := range data.ContinuousQueries {
for _, query := range queries {
self.addContinuousQuery(db, query)
}
}
return nil
}

View File

@ -1,6 +1,7 @@
package integration
import (
h "api/http"
"bytes"
"checkers"
. "common"
@ -396,6 +397,44 @@ func (self *IntegrationSuite) TestDataResurrectionAfterRestart(c *C) {
c.Assert(series, HasLen, 0)
}
// issue #360
func (self *IntegrationSuite) TestContinuousQueriesAfterCompaction(c *C) {
resp, err := http.Post("http://localhost:8086/db/db1/continuous_queries?u=root&p=root", "application/json",
bytes.NewBufferString(`{"query": "select * from foo into bar"}`))
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
resp, err = http.Get("http://localhost:8086/db/db1/continuous_queries?u=root&p=root")
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
queries := []*h.ContinuousQuery{}
err = json.Unmarshal(body, &queries)
c.Assert(err, IsNil)
c.Assert(queries, HasLen, 1)
resp, err = http.Post("http://localhost:8086/raft/force_compaction?u=root&p=root", "", nil)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
self.server.stop()
time.Sleep(time.Second)
c.Assert(self.server.start(), IsNil)
time.Sleep(5 * time.Second)
resp, err = http.Get("http://localhost:8086/db/db1/continuous_queries?u=root&p=root")
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
queries = []*h.ContinuousQuery{}
err = json.Unmarshal(body, &queries)
c.Assert(err, IsNil)
c.Assert(queries, HasLen, 1)
}
func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) {
points := []string{}
for i := 0; i < 101; i++ {