From 642d6eba8507fd718ff689a00e65331f8f5f0dd4 Mon Sep 17 00:00:00 2001 From: Dimitrij Denissenko <dimitrij.denissenko@blacksquaremedia.com> Date: Thu, 30 Jul 2015 15:24:51 +0100 Subject: [PATCH] Batch CQ writes to avoid timeouts --- services/continuous_querier/service.go | 48 +++++++++++++++----------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 5f296277fa..4b6c99d2b3 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -287,6 +287,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } // Read all rows from the result channel. + points := make([]tsdb.Point, 0, 100) for result := range ch { if result.Err != nil { return result.Err @@ -294,19 +295,19 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { for _, row := range result.Series { // Convert the result row to points. - points, err := s.convertRowToPoints(cq.intoMeasurement(), row) + part, err := s.convertRowToPoints(cq.intoMeasurement(), row) if err != nil { log.Println(err) continue } - if len(points) == 0 { + if len(part) == 0 { continue } // If the points have any nil values, can't write. // This happens if the CQ is created and running before data is written to the measurement. - for _, p := range points { + for _, p := range part { fields := p.Fields() for _, v := range fields { if v == nil { @@ -314,27 +315,32 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { } } } - - // Create a write request for the points. - req := &cluster.WritePointsRequest{ - Database: cq.intoDB(), - RetentionPolicy: cq.intoRP(), - ConsistencyLevel: cluster.ConsistencyLevelAny, - Points: points, - } - - // Write the request. - if err := s.PointsWriter.WritePoints(req); err != nil { - s.Logger.Println(err) - return err - } - - if s.loggingEnabled { - s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name) - } + points = append(points, part...) } } + if len(points) == 0 { + return nil + } + + // Create a write request for the points. + req := &cluster.WritePointsRequest{ + Database: cq.intoDB(), + RetentionPolicy: cq.intoRP(), + ConsistencyLevel: cluster.ConsistencyLevelAny, + Points: points, + } + + // Write the request. + if err := s.PointsWriter.WritePoints(req); err != nil { + s.Logger.Println(err) + return err + } + + if s.loggingEnabled { + s.Logger.Printf("wrote %d point(s) to %s.%s.%s", len(points), cq.intoDB(), cq.intoRP(), cq.Info.Name) + } + return nil }