fix #360. Store and recover continuous queries on startup
parent
c8f88c5bb5
commit
1772fe9e7c
|
|
@ -265,6 +265,17 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
|
|||
self.continuousQueriesLock.Lock()
|
||||
defer self.continuousQueriesLock.Unlock()
|
||||
|
||||
maxId := uint32(0)
|
||||
for _, query := range self.continuousQueries[db] {
|
||||
if query.Id > maxId {
|
||||
maxId = query.Id
|
||||
}
|
||||
}
|
||||
|
||||
return self.addContinuousQuery(db, &ContinuousQuery{maxId, query})
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) addContinuousQuery(db string, query *ContinuousQuery) error {
|
||||
if self.continuousQueries == nil {
|
||||
self.continuousQueries = map[string][]*ContinuousQuery{}
|
||||
}
|
||||
|
|
@ -273,26 +284,17 @@ func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string)
|
|||
self.ParsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
|
||||
}
|
||||
|
||||
maxId := uint32(0)
|
||||
for _, query := range self.continuousQueries[db] {
|
||||
if query.Id > maxId {
|
||||
maxId = query.Id
|
||||
}
|
||||
}
|
||||
|
||||
selectQuery, err := parser.ParseSelectQuery(query)
|
||||
selectQuery, err := parser.ParseSelectQuery(query.Query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse continuous query: %s", query)
|
||||
}
|
||||
|
||||
queryId := maxId + 1
|
||||
if self.ParsedContinuousQueries[db] == nil {
|
||||
self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
|
||||
self.ParsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{query.Id: selectQuery}
|
||||
} else {
|
||||
self.ParsedContinuousQueries[db][queryId] = selectQuery
|
||||
self.ParsedContinuousQueries[db][query.Id] = selectQuery
|
||||
}
|
||||
self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query})
|
||||
|
||||
self.continuousQueries[db] = append(self.continuousQueries[db], query)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -415,23 +417,25 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) {
|
|||
}
|
||||
|
||||
type SavedConfiguration struct {
|
||||
Databases map[string]uint8
|
||||
Admins map[string]*ClusterAdmin
|
||||
DbUsers map[string]map[string]*DbUser
|
||||
Servers []*ClusterServer
|
||||
ShortTermShards []*NewShardData
|
||||
LongTermShards []*NewShardData
|
||||
Databases map[string]uint8
|
||||
Admins map[string]*ClusterAdmin
|
||||
DbUsers map[string]map[string]*DbUser
|
||||
Servers []*ClusterServer
|
||||
ShortTermShards []*NewShardData
|
||||
LongTermShards []*NewShardData
|
||||
ContinuousQueries map[string][]*ContinuousQuery
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) Save() ([]byte, error) {
|
||||
log.Debug("Dumping the cluster configuration")
|
||||
data := &SavedConfiguration{
|
||||
Databases: self.DatabaseReplicationFactors,
|
||||
Admins: self.clusterAdmins,
|
||||
DbUsers: self.dbUsers,
|
||||
Servers: self.servers,
|
||||
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
|
||||
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
|
||||
Databases: self.DatabaseReplicationFactors,
|
||||
Admins: self.clusterAdmins,
|
||||
DbUsers: self.dbUsers,
|
||||
Servers: self.servers,
|
||||
ContinuousQueries: self.continuousQueries,
|
||||
ShortTermShards: self.convertShardsToNewShardData(self.shortTermShards),
|
||||
LongTermShards: self.convertShardsToNewShardData(self.longTermShards),
|
||||
}
|
||||
|
||||
b := bytes.NewBuffer(nil)
|
||||
|
|
@ -527,6 +531,12 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
|
|||
self.shardsById[s.id] = shard
|
||||
}
|
||||
|
||||
for db, queries := range data.ContinuousQueries {
|
||||
for _, query := range queries {
|
||||
self.addContinuousQuery(db, query)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package integration
|
||||
|
||||
import (
|
||||
h "api/http"
|
||||
"bytes"
|
||||
"checkers"
|
||||
. "common"
|
||||
|
|
@ -396,6 +397,44 @@ func (self *IntegrationSuite) TestDataResurrectionAfterRestart(c *C) {
|
|||
c.Assert(series, HasLen, 0)
|
||||
}
|
||||
|
||||
// issue #360
|
||||
func (self *IntegrationSuite) TestContinuousQueriesAfterCompaction(c *C) {
|
||||
resp, err := http.Post("http://localhost:8086/db/db1/continuous_queries?u=root&p=root", "application/json",
|
||||
bytes.NewBufferString(`{"query": "select * from foo into bar"}`))
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
resp, err = http.Get("http://localhost:8086/db/db1/continuous_queries?u=root&p=root")
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
queries := []*h.ContinuousQuery{}
|
||||
err = json.Unmarshal(body, &queries)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(queries, HasLen, 1)
|
||||
|
||||
resp, err = http.Post("http://localhost:8086/raft/force_compaction?u=root&p=root", "", nil)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
|
||||
self.server.stop()
|
||||
time.Sleep(time.Second)
|
||||
c.Assert(self.server.start(), IsNil)
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
resp, err = http.Get("http://localhost:8086/db/db1/continuous_queries?u=root&p=root")
|
||||
c.Assert(err, IsNil)
|
||||
defer resp.Body.Close()
|
||||
c.Assert(resp.StatusCode, Equals, http.StatusOK)
|
||||
body, err = ioutil.ReadAll(resp.Body)
|
||||
c.Assert(err, IsNil)
|
||||
queries = []*h.ContinuousQuery{}
|
||||
err = json.Unmarshal(body, &queries)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(queries, HasLen, 1)
|
||||
}
|
||||
|
||||
func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) {
|
||||
points := []string{}
|
||||
for i := 0; i < 101; i++ {
|
||||
|
|
|
|||
Loading…
Reference in New Issue