From 557f79656657f855865ae338c7b83be23b65a594 Mon Sep 17 00:00:00 2001 From: Takayuki Usui Date: Thu, 27 Aug 2015 16:47:46 +0900 Subject: [PATCH 1/2] Add an option to cmd/influx to set write consistency level This patch introduces an option in influx allowing users to select write consistency level, which has been implicitly set to 'any' by default so far. --- cmd/influx/main.go | 61 +++++++++++++++++++++++++++++++--------------- 1 file changed, 42 insertions(+), 19 deletions(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 24af8efa4e..25e81006d9 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -17,6 +17,7 @@ import ( "text/tabwriter" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/importer/v8" "github.com/peterh/liner" ) @@ -36,24 +37,25 @@ const ( ) type CommandLine struct { - Client *client.Client - Line *liner.State - Host string - Port int - Username string - Password string - Database string - Ssl bool - RetentionPolicy string - Version string - Pretty bool // controls pretty print for json - Format string // controls the output format. Valid values are json, csv, or column - Execute string - ShowVersion bool - Import bool - PPS int // Controls how many points per second the import will allow via throttling - Path string - Compressed bool + Client *client.Client + Line *liner.State + Host string + Port int + Username string + Password string + Database string + Ssl bool + RetentionPolicy string + Version string + Pretty bool // controls pretty print for json + Format string // controls the output format. Valid values are json, csv, or column + WriteConsistency string + Execute string + ShowVersion bool + Import bool + PPS int // Controls how many points per second the import will allow via throttling + Path string + Compressed bool } func main() { @@ -67,6 +69,7 @@ func main() { fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.") fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.") fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.") + fs.StringVar(&c.WriteConsistency, "consistency", "any", "Set write consistency level: any, one, quorum, or all.") fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.") fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.") fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.") @@ -96,6 +99,8 @@ func main() { Execute command and quit. -format 'json|csv|column' Format specifies the format of the server responses: json, csv, or column. + -consistency 'any|one|quorum|all' + Set write consistency level: any, one, quorum, or all -pretty Turns on pretty print for the json format. -import @@ -244,6 +249,8 @@ func (c *CommandLine) ParseCommand(cmd string) bool { c.help() case strings.HasPrefix(lcmd, "format"): c.SetFormat(cmd) + case strings.HasPrefix(lcmd, "consistency"): + c.SetWriteConsistency(cmd) case strings.HasPrefix(lcmd, "settings"): c.Settings() case strings.HasPrefix(lcmd, "pretty"): @@ -358,6 +365,20 @@ func (c *CommandLine) SetFormat(cmd string) { } } +func (c *CommandLine) SetWriteConsistency(cmd string) { + // Remove the "consistency" keyword if it exists + cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1)) + // normalize cmd + cmd = strings.ToLower(cmd) + + _, err := cluster.ParseConsistencyLevel(cmd) + if err != nil { + fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd) + return + } + c.WriteConsistency = cmd +} + // isWhitespace returns true if the rune is a space, tab, or newline. func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' } @@ -444,7 +465,7 @@ func (c *CommandLine) Insert(stmt string) error { Database: c.Database, RetentionPolicy: c.RetentionPolicy, Precision: "n", - WriteConsistency: client.ConsistencyAny, + WriteConsistency: c.WriteConsistency, }) if err != nil { fmt.Printf("ERR: %s\n", err) @@ -641,6 +662,7 @@ func (c *CommandLine) Settings() { fmt.Fprintf(w, "Database\t%s\n", c.Database) fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty) fmt.Fprintf(w, "Format\t%s\n", c.Format) + fmt.Fprintf(w, "Write Consistency\t%s\n", c.WriteConsistency) fmt.Fprintln(w) w.Flush() } @@ -652,6 +674,7 @@ func (c *CommandLine) help() { pretty toggle pretty print use set current databases format set the output format: json, csv, or column + consistency set write consistency level: any, one, quorum, or all settings output the current settings for the shell exit quit the influx shell From da8efa56e1aedd489a7d715743ea72a08724c64e Mon Sep 17 00:00:00 2001 From: Takayuki Usui Date: Thu, 27 Aug 2015 16:29:44 +0900 Subject: [PATCH 2/2] Fix writes possibly blocked with relaxed write consistency level Immediately return once the required number of writes are completed, otherwise requests running with relaxed consistency levels (e.g. any or one) would be blocked unexpectedly, for instance, waiting for dead nodes to respond. --- cluster/points_writer.go | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/cluster/points_writer.go b/cluster/points_writer.go index 75054f9dfb..8b55ab45fc 100644 --- a/cluster/points_writer.go +++ b/cluster/points_writer.go @@ -233,7 +233,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo } // response channel for each shard writer go routine - ch := make(chan error, len(shard.Owners)) + type AsyncWriteResult struct { + Owner meta.ShardOwner + Err error + } + ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []tsdb.Point) { @@ -244,12 +248,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID) if err != nil { - ch <- err + ch <- &AsyncWriteResult{owner, err} return } err = w.TSDBStore.WriteToShard(shardID, points) } - ch <- err + ch <- &AsyncWriteResult{owner, err} return } @@ -262,11 +266,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo // be considered a successful write so send nil to the response channel // otherwise, let the original error propogate to the response channel if hherr == nil && consistency == ConsistencyLevelAny { - ch <- nil + ch <- &AsyncWriteResult{owner, nil} return } } - ch <- err + ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } @@ -274,32 +278,32 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo var wrote int timeout := time.After(w.WriteTimeout) var writeError error - for _, owner := range shard.Owners { + for range shard.Owners { select { case <-w.closing: return ErrWriteFailed case <-timeout: // return timeout error to caller return ErrTimeout - case err := <-ch: + case result := <-ch: // If the write returned an error, continue to the next response - if err != nil { - w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, owner.NodeID, err) + if result.Err != nil { + w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, result.Owner.NodeID, result.Err) // Keep track of the first error we see to return back to the client if writeError == nil { - writeError = err + writeError = result.Err } continue } wrote += 1 - } - } - // We wrote the required consistency level - if wrote >= required { - return nil + // We wrote the required consistency level + if wrote >= required { + return nil + } + } } if wrote > 0 {