diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 5efc805220..2b923cff7f 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -293,10 +293,14 @@ func (self *ShardData) logAndHandleDropSeriesQuery(querySpec *parser.QuerySpec, } func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error { - requestNumber, err := self.wal.AssignSequenceNumbersAndLog(request, self) + _, err := self.wal.AssignSequenceNumbersAndLog(request, self) if err != nil { return err } + return self.HandleDestructiveQuery(querySpec, request, response, runLocalOnly) +} + +func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, request *p.Request, response chan *p.Response, runLocalOnly bool) error { var localResponses chan *p.Response if self.localShard != nil { localResponses = make(chan *p.Response, 1) @@ -323,7 +327,7 @@ func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, for { res := <-responseChan if *res.Type == endStreamResponse { - self.wal.Commit(requestNumber, self.clusterServers[i].Id) + self.wal.Commit(request.GetRequestNumber(), self.clusterServers[i].Id) break } response <- res @@ -335,7 +339,7 @@ func (self *ShardData) LogAndHandleDestructiveQuery(querySpec *parser.QuerySpec, for { res := <-localResponses if *res.Type == endStreamResponse { - self.wal.Commit(requestNumber, self.localServerId) + self.wal.Commit(request.GetRequestNumber(), self.localServerId) break } response <- res diff --git a/src/coordinator/protobuf_request_handler.go b/src/coordinator/protobuf_request_handler.go index c12b9f0d0d..b682909904 100644 --- a/src/coordinator/protobuf_request_handler.go +++ b/src/coordinator/protobuf_request_handler.go @@ -84,7 +84,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn responseChan := make(chan *protocol.Response) if querySpec.IsDestructiveQuery() { - go shard.LogAndHandleDestructiveQuery(querySpec, request, responseChan, true) + go shard.HandleDestructiveQuery(querySpec, request, responseChan, true) } else { go shard.Query(querySpec, responseChan) } diff --git a/src/integration/server_test.go b/src/integration/server_test.go index 36840e8ad5..4265a07be1 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -718,6 +718,10 @@ func (self *ServerSuite) TestRelogging(c *C) { time.Sleep(time.Second) // wait for data to get replicated + self.serverProcesses[0].Query("full_rep", "delete from test_relogging", false, c) + + time.Sleep(time.Second) + for _, server := range self.serverProcesses[1:] { err := server.doesWalExist() c.Assert(os.IsNotExist(err), Equals, true)