Batch CQ writes to avoid timeouts

pull/3517/head
Dimitrij Denissenko 2015-07-30 15:24:51 +01:00
parent 755f323fae
commit 642d6eba85
1 changed files with 27 additions and 21 deletions

View File

@ -287,6 +287,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}
// Read all rows from the result channel.
points := make([]tsdb.Point, 0, 100)
for result := range ch {
if result.Err != nil {
return result.Err
@ -294,19 +295,19 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
for _, row := range result.Series {
// Convert the result row to points.
points, err := s.convertRowToPoints(cq.intoMeasurement(), row)
part, err := s.convertRowToPoints(cq.intoMeasurement(), row)
if err != nil {
log.Println(err)
continue
}
if len(points) == 0 {
if len(part) == 0 {
continue
}
// If the points have any nil values, can't write.
// This happens if the CQ is created and running before data is written to the measurement.
for _, p := range points {
for _, p := range part {
fields := p.Fields()
for _, v := range fields {
if v == nil {
@ -314,6 +315,13 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
}
}
}
points = append(points, part...)
}
}
if len(points) == 0 {
return nil
}
// Create a write request for the points.
req := &cluster.WritePointsRequest{
@ -332,8 +340,6 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error {
if s.loggingEnabled {
s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name)
}
}
}
return nil
}