Fix #316. Make shard hold access denied responses until all response channels are emptied out.
parent
7ba33479ae
commit
2a780d3edb
|
@ -90,10 +90,11 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
queryResponse = p.Response_QUERY
|
||||
endStreamResponse = p.Response_END_STREAM
|
||||
queryRequest = p.Request_QUERY
|
||||
dropDatabaseRequest = p.Request_DROP_DATABASE
|
||||
queryResponse = p.Response_QUERY
|
||||
endStreamResponse = p.Response_END_STREAM
|
||||
accessDeniedResponse = p.Response_ACCESS_DENIED
|
||||
queryRequest = p.Request_QUERY
|
||||
dropDatabaseRequest = p.Request_DROP_DATABASE
|
||||
)
|
||||
|
||||
type LocalShardDb interface {
|
||||
|
@ -376,6 +377,7 @@ func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, reque
|
|||
responseCahnnels = append(responseCahnnels, responses...)
|
||||
}
|
||||
|
||||
accessDenied := false
|
||||
for idx, channel := range responseCahnnels {
|
||||
serverId := serverIds[idx]
|
||||
log.Debug("Waiting for response to %s from %d", request.GetDescription(), serverId)
|
||||
|
@ -386,11 +388,22 @@ func (self *ShardData) HandleDestructiveQuery(querySpec *parser.QuerySpec, reque
|
|||
self.wal.Commit(request.GetRequestNumber(), serverId)
|
||||
break
|
||||
}
|
||||
response <- res
|
||||
|
||||
// don't send the access denied response until the end so the readers don't close out before the other responses.
|
||||
// See https://github.com/influxdb/influxdb/issues/316 for more info.
|
||||
if *res.Type != accessDeniedResponse {
|
||||
response <- res
|
||||
} else {
|
||||
accessDenied = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
response <- &p.Response{Type: &endStreamResponse}
|
||||
if accessDenied {
|
||||
response <- &p.Response{Type: &accessDeniedResponse}
|
||||
} else {
|
||||
response <- &p.Response{Type: &endStreamResponse}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue