feat(cmd/influx): support reading GZIP data in `influx write` (#20763)

Co-authored-by: davidby-influx <dbyrne@influxdata.com>
pull/20794/head
Daniel Moran 2021-02-19 12:31:09 -05:00 committed by GitHub
parent f7516e31fe
commit 58ae51d2d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 352 additions and 52 deletions

View File

@ -6,6 +6,7 @@
1. [20621](https://github.com/influxdata/influxdb/pull/20621): Add Swift client library to the data loading section of the UI.
1. [20307](https://github.com/influxdata/influxdb/pull/20307): Add `influx task retry-failed` command to rerun failed runs.
1. [20759](https://github.com/influxdata/influxdb/pull/20759): Add additional properties for Mosaic Graph.
1. [20763](https://github.com/influxdata/influxdb/pull/20763): Add `--compression` option to `influx write` to support GZIP inputs.
### Bug Fixes

View File

@ -1,6 +1,7 @@
package main
import (
"compress/gzip"
"context"
"encoding/csv"
"fmt"
@ -26,6 +27,8 @@ import (
const (
inputFormatCsv = "csv"
inputFormatLineProtocol = "lp"
inputCompressionNone = "none"
inputCompressionGzip = "gzip"
)
type buildWriteSvcFn func(builder *writeFlagsBuilder) platform.WriteService
@ -52,6 +55,7 @@ type writeFlagsBuilder struct {
Encoding string
ErrorsFile string
RateLimit string
Compression string
}
func newWriteFlagsBuilder(svcFn buildWriteSvcFn, f *globalFlags, opt genericCLIOpts) *writeFlagsBuilder {
@ -127,6 +131,7 @@ func (b *writeFlagsBuilder) cmd() *cobra.Command {
cmd.PersistentFlags().StringVar(&b.Encoding, "encoding", "UTF-8", "Character encoding of input files or stdin")
cmd.PersistentFlags().StringVar(&b.ErrorsFile, "errors-file", "", "The path to the file to write rejected rows to")
cmd.PersistentFlags().StringVar(&b.RateLimit, "rate-limit", "", "Throttles write, examples: \"5 MB / 5 min\" , \"17kBs\". \"\" (default) disables throttling.")
cmd.PersistentFlags().StringVar(&b.Compression, "compression", "", "Input compression, either 'none' or 'gzip'. Defaults to 'none' unless an input has a '.gz' extension")
cmdDryRun := b.newCmd("dryrun", b.writeDryrunE, false)
cmdDryRun.Args = cobra.MaximumNArgs(1)
@ -159,6 +164,10 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if len(b.Format) > 0 && b.Format != inputFormatLineProtocol && b.Format != inputFormatCsv {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("unsupported input format: %s", b.Format)
}
// validate input compression
if len(b.Compression) > 0 && b.Compression != inputCompressionNone && b.Compression != inputCompressionGzip {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("unsupported input compression: %s", b.Compression)
}
// validate and setup decoding of files/stdin if encoding is supplied
decode, err := csv2lp.CreateDecoder(b.Encoding)
@ -166,6 +175,21 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
return nil, csv2lp.MultiCloser(closers...), err
}
// utility to manage common steps used to decode / decompress input sources,
// while tracking resources that must be cleaned-up after reading.
addReader := func(r io.Reader, name string, compressed bool) error {
if compressed {
rcz, err := gzip.NewReader(r)
if err != nil {
return fmt.Errorf("failed to decompress %s: %w", name, err)
}
closers = append(closers, rcz)
r = rcz
}
readers = append(readers, decode(r), strings.NewReader("\n"))
return nil
}
// prepend header lines
if len(b.Headers) > 0 {
for _, header := range b.Headers {
@ -184,10 +208,19 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", file, err)
}
closers = append(closers, f)
readers = append(readers, decode(f), strings.NewReader("\n"))
if len(b.Format) == 0 && strings.HasSuffix(file, ".csv") {
fname := file
compressed := b.Compression == "gzip" || (len(b.Compression) == 0 && strings.HasSuffix(fname, ".gz"))
if compressed {
fname = strings.TrimSuffix(fname, ".gz")
}
if len(b.Format) == 0 && strings.HasSuffix(fname, ".csv") {
b.Format = inputFormatCsv
}
if err = addReader(f, file, compressed); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
}
@ -203,6 +236,7 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
}
req.Header.Set("Accept-Encoding", "gzip")
resp, err := client.Do(req)
if err != nil {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
@ -211,11 +245,21 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
if resp.StatusCode/100 != 2 {
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: response status_code=%d", addr, resp.StatusCode)
}
readers = append(readers, decode(resp.Body), strings.NewReader("\n"))
compressed := b.Compression == "gzip" ||
resp.Header.Get("Content-Encoding") == "gzip" ||
(len(b.Compression) == 0 && strings.HasSuffix(u.Path, ".gz"))
if compressed {
u.Path = strings.TrimSuffix(u.Path, ".gz")
}
if len(b.Format) == 0 &&
(strings.HasSuffix(u.Path, ".csv") || strings.HasPrefix(resp.Header.Get("Content-Type"), "text/csv")) {
b.Format = inputFormatCsv
}
if err = addReader(resp.Body, addr, compressed); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
}
@ -224,13 +268,19 @@ func (b *writeFlagsBuilder) createLineReader(ctx context.Context, cmd *cobra.Com
case len(args) == 0:
// use also stdIn if it is a terminal
if !isCharacterDevice(cmd.InOrStdin()) {
readers = append(readers, decode(cmd.InOrStdin()))
if err = addReader(cmd.InOrStdin(), "stdin", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
case args[0] == "-":
// "-" also means stdin
readers = append(readers, decode(cmd.InOrStdin()))
if err = addReader(cmd.InOrStdin(), "stdin", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
default:
readers = append(readers, strings.NewReader(args[0]))
if err = addReader(strings.NewReader(args[0]), "arg 0", b.Compression == "gzip"); err != nil {
return nil, csv2lp.MultiCloser(closers...), err
}
}
// skipHeader lines when set

View File

@ -3,6 +3,7 @@ package main
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"errors"
"fmt"
@ -18,7 +19,6 @@ import (
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/mock"
"github.com/influxdata/influxdb/v2/pkg/csv2lp"
"github.com/spf13/viper"
"github.com/stretchr/testify/require"
)
@ -57,19 +57,25 @@ func readLines(reader io.Reader) []string {
return retVal
}
func createTempFile(suffix string, contents []byte) string {
func createTempFile(t *testing.T, suffix string, contents []byte, compress bool) string {
t.Helper()
file, err := ioutil.TempFile("", "influx_writeTest*."+suffix)
file.Close() // Close immediately, since we need only a file name
if err != nil {
log.Fatal(err)
return "unknown.file"
}
require.NoError(t, err)
defer file.Close()
fileName := file.Name()
tempFiles = append(tempFiles, fileName)
err = ioutil.WriteFile(fileName, contents, os.ModePerm)
if err != nil {
log.Fatal(err)
var writer io.Writer = file
if compress {
gzipWriter := gzip.NewWriter(writer)
defer gzipWriter.Close()
writer = gzipWriter
}
_, err = writer.Write(contents)
require.NoError(t, err)
return fileName
}
@ -93,9 +99,27 @@ func Test_writeFlags_dump(t *testing.T) {
// are combined and transformed to provide a reader of protocol lines
func Test_writeFlags_createLineReader(t *testing.T) {
defer removeTempFiles()
fileContents := "_measurement,b,c,d\nf1,f2,f3,f4"
csvFile1 := createTempFile("csv", []byte(fileContents))
stdInContents := "i,j,_measurement,k\nstdin1,stdin2,stdin3,stdin4"
gzipStdin := func(uncompressed string) io.Reader {
contents := &bytes.Buffer{}
writer := gzip.NewWriter(contents)
_, err := writer.Write([]byte(uncompressed))
require.NoError(t, err)
require.NoError(t, writer.Close())
return contents
}
lpContents := "f1 b=f2,c=f3,d=f4"
lpFile := createTempFile(t, "txt", []byte(lpContents), false)
gzipLpFile := createTempFile(t, "txt.gz", []byte(lpContents), true)
gzipLpFileNoExt := createTempFile(t, "lp", []byte(lpContents), true)
stdInLpContents := "stdin3 i=stdin1,j=stdin2,k=stdin4"
csvContents := "_measurement,b,c,d\nf1,f2,f3,f4"
csvFile := createTempFile(t, "csv", []byte(csvContents), false)
gzipCsvFile := createTempFile(t, "csv.gz", []byte(csvContents), true)
gzipCsvFileNoExt := createTempFile(t, "csv", []byte(csvContents), true)
stdInCsvContents := "i,j,_measurement,k\nstdin1,stdin2,stdin3,stdin4"
// use a test HTTP server to provide CSV data
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
@ -104,9 +128,20 @@ func Test_writeFlags_createLineReader(t *testing.T) {
if contentType := query.Get("Content-Type"); contentType != "" {
rw.Header().Set("Content-Type", contentType)
}
if encoding := query.Get("encoding"); encoding != "" {
rw.Header().Set("Content-Encoding", encoding)
}
compress := query.Get("compress") != ""
rw.WriteHeader(http.StatusOK)
if data := query.Get("data"); data != "" {
rw.Write([]byte(data))
var writer io.Writer = rw
if compress {
gzw := gzip.NewWriter(writer)
defer gzw.Close()
writer = gzw
}
_, err := writer.Write([]byte(data))
require.NoError(t, err)
}
}))
defer server.Close()
@ -120,24 +155,191 @@ func Test_writeFlags_createLineReader(t *testing.T) {
// output
firstLineCorrection int // 0 unless shifted by prepended headers or skipped rows
lines []string
// lpData indicates the the data are line protocol data
lpData bool
}{
{
name: "read data from LP file",
flags: writeFlagsBuilder{
Files: []string{lpFile},
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read data from LP file using non-UTF encoding",
flags: writeFlagsBuilder{
Files: []string{lpFile},
Encoding: "ISO_8859-1",
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read compressed LP data from file",
flags: writeFlagsBuilder{
Files: []string{gzipLpFileNoExt},
Compression: inputCompressionGzip,
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read compressed data from LP file using non-UTF encoding",
flags: writeFlagsBuilder{
Files: []string{gzipLpFileNoExt},
Compression: inputCompressionGzip,
Encoding: "ISO_8859-1",
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read compressed LP data from file ending in .gz",
flags: writeFlagsBuilder{
Files: []string{gzipLpFile},
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read compressed and uncompressed LP data from file in the same call",
flags: writeFlagsBuilder{
Files: []string{gzipLpFile, lpFile},
},
firstLineCorrection: 0,
lines: []string{
lpContents,
lpContents,
},
},
{
name: "read LP data from stdin",
flags: writeFlagsBuilder{},
stdIn: strings.NewReader(stdInLpContents),
lines: []string{
stdInLpContents,
},
},
{
name: "read compressed LP data from stdin",
flags: writeFlagsBuilder{
Compression: inputCompressionGzip,
},
stdIn: gzipStdin(stdInLpContents),
lines: []string{
stdInLpContents,
},
},
{
name: "read LP data from stdin using '-' argument",
flags: writeFlagsBuilder{},
stdIn: strings.NewReader(stdInLpContents),
arguments: []string{"-"},
lines: []string{
stdInLpContents,
},
},
{
name: "read compressed LP data from stdin using '-' argument",
flags: writeFlagsBuilder{
Compression: inputCompressionGzip,
},
stdIn: gzipStdin(stdInLpContents),
arguments: []string{"-"},
lines: []string{
stdInLpContents,
},
},
{
name: "read LP data from 1st argument",
flags: writeFlagsBuilder{},
arguments: []string{stdInLpContents},
lines: []string{
stdInLpContents,
},
},
{
name: "read LP data from URL",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a?data=%s", server.URL, url.QueryEscape(lpContents))},
},
lines: []string{
lpContents,
},
},
{
name: "read compressed LP data from URL",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a?data=%s&compress=true", server.URL, url.QueryEscape(lpContents))},
Compression: inputCompressionGzip,
},
lines: []string{
lpContents,
},
},
{
name: "read compressed LP data from URL ending in .gz",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a.gz?data=%s&compress=true", server.URL, url.QueryEscape(lpContents))},
},
lines: []string{
lpContents,
},
},
{
name: "read compressed LP data from URL with gzip encoding",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a?data=%s&compress=true&encoding=gzip", server.URL, url.QueryEscape(lpContents))},
},
lines: []string{
lpContents,
},
},
{
name: "read data from CSV file + transform to line protocol",
flags: writeFlagsBuilder{
Files: []string{csvFile1},
Files: []string{csvFile},
},
firstLineCorrection: 0, // no changes
lines: []string{
"f1 b=f2,c=f3,d=f4",
lpContents,
},
},
{
name: "read compressed CSV data from file + transform to line protocol",
flags: writeFlagsBuilder{
Files: []string{gzipCsvFileNoExt},
Compression: inputCompressionGzip,
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read compressed CSV data from file ending in .csv.gz + transform to line protocol",
flags: writeFlagsBuilder{
Files: []string{gzipCsvFile},
},
firstLineCorrection: 0,
lines: []string{
lpContents,
},
},
{
name: "read CSV data from --header and --file + transform to line protocol",
flags: writeFlagsBuilder{
Headers: []string{"x,_measurement,y,z"},
Files: []string{csvFile1},
Files: []string{csvFile},
},
firstLineCorrection: -1, // shifted back by header line
lines: []string{
@ -151,7 +353,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
Headers: []string{"x,_measurement,y,z"},
SkipHeader: 1,
},
arguments: []string{"@" + csvFile1},
arguments: []string{"@" + csvFile},
firstLineCorrection: 0, // shifted (-1) back by header line, forward (+1) by skipHeader
lines: []string{
"f2 x=f1,y=f3,z=f4",
@ -163,7 +365,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
Headers: []string{"x,_measurement,y,z"},
SkipHeader: 1,
},
arguments: []string{"@" + csvFile1},
arguments: []string{"@" + csvFile},
firstLineCorrection: 0, // shifted (-1) back by header line, forward (+1) by skipHeader
lines: []string{
"f2 x=f1,y=f3,z=f4",
@ -174,9 +376,20 @@ func Test_writeFlags_createLineReader(t *testing.T) {
flags: writeFlagsBuilder{
Format: inputFormatCsv,
},
stdIn: strings.NewReader(stdInContents),
stdIn: strings.NewReader(stdInCsvContents),
lines: []string{
"stdin3 i=stdin1,j=stdin2,k=stdin4",
stdInLpContents,
},
},
{
name: "read compressed CSV data from stdin + transform to line protocol",
flags: writeFlagsBuilder{
Format: inputFormatCsv,
Compression: inputCompressionGzip,
},
stdIn: gzipStdin(stdInCsvContents),
lines: []string{
stdInLpContents,
},
},
{
@ -184,10 +397,22 @@ func Test_writeFlags_createLineReader(t *testing.T) {
flags: writeFlagsBuilder{
Format: inputFormatCsv,
},
stdIn: strings.NewReader(stdInContents),
stdIn: strings.NewReader(stdInCsvContents),
arguments: []string{"-"},
lines: []string{
"stdin3 i=stdin1,j=stdin2,k=stdin4",
stdInLpContents,
},
},
{
name: "read compressed CSV data from stdin using '-' argument + transform to line protocol",
flags: writeFlagsBuilder{
Format: inputFormatCsv,
Compression: inputCompressionGzip,
},
stdIn: gzipStdin(stdInCsvContents),
arguments: []string{"-"},
lines: []string{
stdInLpContents,
},
},
{
@ -195,24 +420,52 @@ func Test_writeFlags_createLineReader(t *testing.T) {
flags: writeFlagsBuilder{
Format: inputFormatCsv,
},
arguments: []string{stdInContents},
arguments: []string{stdInCsvContents},
lines: []string{
"stdin3 i=stdin1,j=stdin2,k=stdin4",
stdInLpContents,
},
},
{
name: "read data from .csv URL + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{(server.URL + "/a.csv?data=" + url.QueryEscape(fileContents))},
URLs: []string{fmt.Sprintf("%s/a.csv?data=%s", server.URL, url.QueryEscape(csvContents))},
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
lpContents,
},
},
{
name: "read compressed CSV data from URL + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a.csv?data=%s&compress=true", server.URL, url.QueryEscape(csvContents))},
Compression: inputCompressionGzip,
},
lines: []string{
lpContents,
},
},
{
name: "read compressed CSV data from URL ending in .csv.gz + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a.csv.gz?data=%s&compress=true", server.URL, url.QueryEscape(csvContents))},
},
lines: []string{
lpContents,
},
},
{
name: "read compressed CSV data from URL with gzip encoding + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{fmt.Sprintf("%s/a.csv?data=%s&compress=true&encoding=gzip", server.URL, url.QueryEscape(csvContents))},
},
lines: []string{
lpContents,
},
},
{
name: "read data from .csv URL + change header line + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{(server.URL + "/a.csv?data=" + url.QueryEscape(fileContents))},
URLs: []string{fmt.Sprintf("%s/a.csv?data=%s", server.URL, url.QueryEscape(csvContents))},
Headers: []string{"k,j,_measurement,i"},
SkipHeader: 1,
},
@ -221,30 +474,31 @@ func Test_writeFlags_createLineReader(t *testing.T) {
},
},
{
name: "read data from having text/csv URL resource + transform to line protocol",
name: "read data from URL with text/csv Content-Type + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{(server.URL + "/a?Content-Type=text/csv&data=" + url.QueryEscape(fileContents))},
URLs: []string{fmt.Sprintf("%s/a?Content-Type=text/csv&data=%s", server.URL, url.QueryEscape(csvContents))},
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
lpContents,
},
},
{
name: "read line protocol data from URL",
name: "read compressed data from URL with text/csv Content-Type and gzip Content-Encoding + transform to line protocol",
flags: writeFlagsBuilder{
URLs: []string{(server.URL + "/a?data=" + url.QueryEscape(fileContents))},
URLs: []string{fmt.Sprintf("%s/a?Content-Type=text/csv&data=%s&compress=true&encoding=gzip", server.URL, url.QueryEscape(csvContents))},
},
lines: []string{
lpContents,
},
lines: strings.Split(fileContents, "\n"),
lpData: true,
},
{
name: "read data from CSV file + transform to line protocol + throttle read to 1MB/min",
flags: writeFlagsBuilder{
Files: []string{csvFile1},
Files: []string{csvFile},
RateLimit: "1MBs",
},
lines: []string{
"f1 b=f2,c=f3,d=f4",
lpContents,
},
},
}
@ -257,11 +511,6 @@ func Test_writeFlags_createLineReader(t *testing.T) {
defer closer.Close()
require.Nil(t, err)
require.NotNil(t, reader)
if !test.lpData && len(test.flags.RateLimit) == 0 {
csvToLineReader, ok := reader.(*csv2lp.CsvToLineReader)
require.True(t, ok)
require.Equal(t, csvToLineReader.LineNumber, test.firstLineCorrection)
}
lines := readLines(reader)
require.Equal(t, test.lines, lines)
})
@ -271,7 +520,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
// Test_writeFlags_createLineReader_errors tests input validation
func Test_writeFlags_createLineReader_errors(t *testing.T) {
defer removeTempFiles()
csvFile1 := createTempFile("csv", []byte("_measurement,b,c,d\nf1,f2,f3,f4"))
csvFile1 := createTempFile(t, "csv", []byte("_measurement,b,c,d\nf1,f2,f3,f4"), false)
// use a test HTTP server to server errors
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
@ -500,7 +749,7 @@ func Test_writeRunE(t *testing.T) {
// Test_writeFlags_errorsFile tests that rejected rows are written to errors file
func Test_writeFlags_errorsFile(t *testing.T) {
defer removeTempFiles()
errorsFile := createTempFile("errors", []byte{})
errorsFile := createTempFile(t, "errors", []byte{}, false)
stdInContents := "_measurement,a|long:strict\nm,1\nm,1.1"
out := bytes.Buffer{}
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader(stdInContents), w: bufio.NewWriter(&out), viper: viper.New()})