diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index f5aaaf5b2c..51cda41095 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -105,14 +105,14 @@ func (s *Node) Close() error { } } - if s.Broker != nil { - if err := s.Broker.Close(); err != nil { + if s.raftLog != nil { + if err := s.raftLog.Close(); err != nil { return err } } - if s.raftLog != nil { - if err := s.raftLog.Close(); err != nil { + if s.Broker != nil { + if err := s.Broker.Close(); err != nil { return err } } diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index a996c9e98e..7ab6d3a07c 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1435,7 +1435,8 @@ func Test3NodeServer(t *testing.T) { } func Test3NodeServerFailover(t *testing.T) { - testName := "3-node server integration" + t.Parallel() + testName := "3-node server failover integration" if testing.Short() { t.Skip(fmt.Sprintf("skipping '%s'", testName)) @@ -1458,7 +1459,6 @@ func Test3NodeServerFailover(t *testing.T) { // ensure that all queries work if there are more nodes in a cluster than the replication factor func Test3NodeClusterPartiallyReplicated(t *testing.T) { - t.Skip("Skipping due to instability") t.Parallel() testName := "3-node server integration partial replication" if testing.Short() { diff --git a/tx.go b/tx.go index 8cabd168bc..d6db631616 100644 --- a/tx.go +++ b/tx.go @@ -142,60 +142,62 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri // create mappers for each shard we need to hit for _, sg := range shardGroups { - if len(sg.Shards) != 1 { // we'll only have more than 1 shard in a group when RF < # servers in cluster - // TODO: implement distributed queries. - panic("distributed queries not implemented yet and there are too many shards in this group") + + shards := map[*Shard][]uint64{} + for _, sid := range t.SeriesIDs { + shard := sg.ShardBySeriesID(sid) + shards[shard] = append(shards[shard], sid) } - shard := sg.Shards[0] + for shard, _ := range shards { + var mapper influxql.Mapper - var mapper influxql.Mapper + // create either a remote or local mapper for this shard + if shard.store == nil { + nodes := tx.server.DataNodesByID(shard.DataNodeIDs) + if len(nodes) == 0 { + return nil, ErrShardNotFound + } - // create either a remote or local mapper for this shard - if shard.store == nil { - nodes := tx.server.DataNodesByID(shard.DataNodeIDs) - if len(nodes) == 0 { - return nil, ErrShardNotFound + balancer := NewDataNodeBalancer(nodes) + + mapper = &RemoteMapper{ + dataNodes: balancer, + Database: mm.Database, + MeasurementName: m.Name, + TMin: tmin.UnixNano(), + TMax: tmax.UnixNano(), + SeriesIDs: t.SeriesIDs, + ShardID: shard.ID, + WhereFields: whereFields, + SelectFields: selectFields, + SelectTags: selectTags, + Limit: stmt.Limit, + Offset: stmt.Offset, + Interval: interval, + } + mapper.(*RemoteMapper).SetFilters(t.Filters) + } else { + mapper = &LocalMapper{ + seriesIDs: t.SeriesIDs, + db: shard.store, + job: job, + decoder: NewFieldCodec(m), + filters: t.Filters, + whereFields: whereFields, + selectFields: selectFields, + selectTags: selectTags, + tmax: tmax.UnixNano(), + interval: interval, + // multiple mappers may need to be merged together to get the results + // for a raw query. So each mapper will have to read at least the + // limit plus the offset in data points to ensure we've hit our mark + limit: uint64(stmt.Limit) + uint64(stmt.Offset), + } } - balancer := NewDataNodeBalancer(nodes) - - mapper = &RemoteMapper{ - dataNodes: balancer, - Database: mm.Database, - MeasurementName: m.Name, - TMin: tmin.UnixNano(), - TMax: tmax.UnixNano(), - SeriesIDs: t.SeriesIDs, - ShardID: shard.ID, - WhereFields: whereFields, - SelectFields: selectFields, - SelectTags: selectTags, - Limit: stmt.Limit, - Offset: stmt.Offset, - Interval: interval, - } - mapper.(*RemoteMapper).SetFilters(t.Filters) - } else { - mapper = &LocalMapper{ - seriesIDs: t.SeriesIDs, - db: shard.store, - job: job, - decoder: NewFieldCodec(m), - filters: t.Filters, - whereFields: whereFields, - selectFields: selectFields, - selectTags: selectTags, - tmax: tmax.UnixNano(), - interval: interval, - // multiple mappers may need to be merged together to get the results - // for a raw query. So each mapper will have to read at least the - // limit plus the offset in data points to ensure we've hit our mark - limit: uint64(stmt.Limit) + uint64(stmt.Offset), - } + mappers = append(mappers, mapper) } - - mappers = append(mappers, mapper) } job.Mappers = mappers