fix #309. Don't relog the delete queries
parent
2cb3124a03
commit
4b974ba92f
|
@ -293,10 +293,14 @@ func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error {
|
func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error {
|
||||||
requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self)
|
_, err := self.wal.AssignSequenceNumbersAndLog(request, self)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return self.HandleDestructiveQuery(querySpec, request, response, runLocalOnly)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error {
|
||||||
var localResponses chan *p.Response
|
var localResponses chan *p.Response
|
||||||
if self.localShard != nil {
|
if self.localShard != nil {
|
||||||
localResponses = make(chan *p.Response, 1)
|
localResponses = make(chan *p.Response, 1)
|
||||||
|
@ -323,7 +327,7 @@ func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec,
|
||||||
for {
|
for {
|
||||||
res := <-responseChan
|
res := <-responseChan
|
||||||
if *res.Type == endStreamResponse {
|
if *res.Type == endStreamResponse {
|
||||||
self.wal.Commit(requestNumber, self.clusterServers[i].Id)
|
self.wal.Commit(request.GetRequestNumber(), self.clusterServers[i].Id)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
response <- res
|
response <- res
|
||||||
|
@ -335,7 +339,7 @@ func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec,
|
||||||
for {
|
for {
|
||||||
res := <-localResponses
|
res := <-localResponses
|
||||||
if *res.Type == endStreamResponse {
|
if *res.Type == endStreamResponse {
|
||||||
self.wal.Commit(requestNumber, self.localServerId)
|
self.wal.Commit(request.GetRequestNumber(), self.localServerId)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
response <- res
|
response <- res
|
||||||
|
|
|
@ -84,7 +84,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
|
||||||
|
|
||||||
responseChan := make(chan *protocol.Response)
|
responseChan := make(chan *protocol.Response)
|
||||||
if querySpec.IsDestructiveQuery() {
|
if querySpec.IsDestructiveQuery() {
|
||||||
go shard.LogAndHandleDestructiveQuery(querySpec, request, responseChan, true)
|
go shard.HandleDestructiveQuery(querySpec, request, responseChan, true)
|
||||||
} else {
|
} else {
|
||||||
go shard.Query(querySpec, responseChan)
|
go shard.Query(querySpec, responseChan)
|
||||||
}
|
}
|
||||||
|
|
|
@ -718,6 +718,10 @@ func (self *ServerSuite) TestRelogging(c *C) {
|
||||||
|
|
||||||
time.Sleep(time.Second) // wait for data to get replicated
|
time.Sleep(time.Second) // wait for data to get replicated
|
||||||
|
|
||||||
|
self.serverProcesses[0].Query("full_rep", "delete from test_relogging", false, c)
|
||||||
|
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
|
||||||
for _, server := range self.serverProcesses[1:] {
|
for _, server := range self.serverProcesses[1:] {
|
||||||
err := server.doesWalExist()
|
err := server.doesWalExist()
|
||||||
c.Assert(os.IsNotExist(err), Equals, true)
|
c.Assert(os.IsNotExist(err), Equals, true)
|
||||||
|
|
Loading…
Reference in New Issue