Merge pull request #18477 from influxdata/chore/reuse_ctx_influx_write
chore(influx): use and pass a single context in write cmd processingpull/18485/head
commit
78b9475ab5
|
@ -101,7 +101,7 @@ func (writeFlags *writeFlagsType) dump(args []string) {
|
|||
}
|
||||
|
||||
// createLineReader uses writeFlags and cli arguments to create a reader that produces line protocol
|
||||
func (writeFlags *writeFlagsType) createLineReader(cmd *cobra.Command, args []string) (io.Reader, io.Closer, error) {
|
||||
func (writeFlags *writeFlagsType) createLineReader(ctx context.Context, cmd *cobra.Command, args []string) (io.Reader, io.Closer, error) {
|
||||
files := writeFlags.Files
|
||||
if len(args) > 0 && len(args[0]) > 1 && args[0][0] == '@' {
|
||||
// backward compatibility: @ in arg denotes a file
|
||||
|
@ -156,7 +156,7 @@ func (writeFlags *writeFlagsType) createLineReader(cmd *cobra.Command, args []st
|
|||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, addr, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, addr, nil)
|
||||
if err != nil {
|
||||
return nil, csv2lp.MultiCloser(closers...), fmt.Errorf("failed to open %q: %v", addr, err)
|
||||
}
|
||||
|
@ -257,7 +257,7 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
filter.Org = &writeFlags.org.name
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
ctx := signals.WithStandardSignals(context.Background())
|
||||
buckets, n, err := bs.FindBuckets(ctx, filter)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve buckets: %v", err)
|
||||
|
@ -275,7 +275,7 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
bucketID, orgID := buckets[0].ID, buckets[0].OrgID
|
||||
|
||||
// create line reader
|
||||
r, closer, err := writeFlags.createLineReader(cmd, args)
|
||||
r, closer, err := writeFlags.createLineReader(ctx, cmd, args)
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
|
@ -292,7 +292,6 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
InsecureSkipVerify: flags.skipVerify,
|
||||
},
|
||||
}
|
||||
ctx = signals.WithStandardSignals(ctx)
|
||||
if err := s.Write(ctx, orgID, bucketID, r); err != nil && err != context.Canceled {
|
||||
return fmt.Errorf("failed to write data: %v", err)
|
||||
}
|
||||
|
@ -303,7 +302,8 @@ func fluxWriteF(cmd *cobra.Command, args []string) error {
|
|||
func fluxWriteDryrunF(cmd *cobra.Command, args []string) error {
|
||||
writeFlags.dump(args) // print flags when in Debug mode
|
||||
// create line reader
|
||||
r, closer, err := writeFlags.createLineReader(cmd, args)
|
||||
ctx := signals.WithStandardSignals(context.Background())
|
||||
r, closer, err := writeFlags.createLineReader(ctx, cmd, args)
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -237,7 +238,7 @@ func Test_writeFlags_createLineReader(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: test.stdIn})
|
||||
reader, closer, err := test.flags.createLineReader(command, test.arguments)
|
||||
reader, closer, err := test.flags.createLineReader(context.Background(), command, test.arguments)
|
||||
require.NotNil(t, closer)
|
||||
defer closer.Close()
|
||||
require.Nil(t, err)
|
||||
|
@ -316,7 +317,7 @@ func Test_writeFlags_createLineReader_errors(t *testing.T) {
|
|||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
command := cmdWrite(&globalFlags{}, genericCLIOpts{in: strings.NewReader("")})
|
||||
_, closer, err := test.flags.createLineReader(command, []string{})
|
||||
_, closer, err := test.flags.createLineReader(context.Background(), command, []string{})
|
||||
require.NotNil(t, closer)
|
||||
defer closer.Close()
|
||||
require.NotNil(t, err)
|
||||
|
|
Loading…
Reference in New Issue