165 lines
4.2 KiB
Go
165 lines
4.2 KiB
Go
package write
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"github.com/influxdata/platform"
|
|
)
|
|
|
|
const (
|
|
// DefaultMaxBytes is 500KB; this is typically 250 to 500 lines.
|
|
DefaultMaxBytes = 500000
|
|
// DefaultInterval will flush every 10 seconds.
|
|
DefaultInterval = 10 * time.Second
|
|
)
|
|
|
|
// batcher is a write service that batches for another write service.
|
|
var _ platform.WriteService = (*Batcher)(nil)
|
|
|
|
// Batcher batches line protocol for sends to output.
|
|
type Batcher struct {
|
|
MaxFlushBytes int // MaxFlushBytes is the maximum number of bytes to buffer before flushing
|
|
MaxFlushInterval time.Duration // MaxFlushInterval is the maximum amount of time to wait before flushing
|
|
Service platform.WriteService // Service receives batches flushed from Batcher.
|
|
}
|
|
|
|
// Write reads r in batches and sends to the output.
|
|
func (b *Batcher) Write(ctx context.Context, org, bucket platform.ID, r io.Reader) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
if b.Service == nil {
|
|
return fmt.Errorf("destination write service required")
|
|
}
|
|
|
|
lines := make(chan []byte)
|
|
|
|
writeErrC := make(chan error)
|
|
go b.write(ctx, org, bucket, lines, writeErrC)
|
|
|
|
readErrC := make(chan error)
|
|
go b.read(ctx, r, lines, readErrC)
|
|
|
|
// loop is needed in the case that the read finishes without an
|
|
// error, but, the write has yet to complete.
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-readErrC:
|
|
// only exit if the read has an error
|
|
// read will have closed the lines channel signaling write to exit
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case err := <-writeErrC:
|
|
// if write finishes, exit immediately. reads may block forever
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// read will close the line channel when there is no more data, or an error occurs.
|
|
// it is possible for an io.Reader to block forever; Write's context can be
|
|
// used to cancel, but, it's possible there will be dangling read go routines.
|
|
func (b *Batcher) read(ctx context.Context, r io.Reader, lines chan<- []byte, errC chan<- error) {
|
|
scanner := bufio.NewScanner(r)
|
|
scanner.Split(ScanLines)
|
|
for scanner.Scan() {
|
|
// exit early if the context is done
|
|
select {
|
|
case lines <- scanner.Bytes():
|
|
case <-ctx.Done():
|
|
close(lines)
|
|
errC <- ctx.Err()
|
|
return
|
|
}
|
|
}
|
|
close(lines)
|
|
errC <- scanner.Err()
|
|
}
|
|
|
|
// finishes when the lines channel is closed or context is done.
|
|
// if an error occurs while writing data to the write service, the error is send in the
|
|
// errC channel and the function returns.
|
|
func (b *Batcher) write(ctx context.Context, org, bucket platform.ID, lines <-chan []byte, errC chan<- error) {
|
|
flushInterval := b.MaxFlushInterval
|
|
if flushInterval == 0 {
|
|
flushInterval = DefaultInterval
|
|
}
|
|
|
|
maxBytes := b.MaxFlushBytes
|
|
if maxBytes == 0 {
|
|
maxBytes = DefaultMaxBytes
|
|
}
|
|
|
|
timer := time.NewTimer(flushInterval)
|
|
defer func() { _ = timer.Stop() }()
|
|
|
|
buf := make([]byte, 0, maxBytes)
|
|
r := bytes.NewReader(buf)
|
|
|
|
var line []byte
|
|
var more = true
|
|
// if read closes the channel normally, exit the loop
|
|
for more {
|
|
select {
|
|
case line, more = <-lines:
|
|
if more {
|
|
buf = append(buf, line...)
|
|
}
|
|
// write if we exceed the max lines OR read routine has finished
|
|
if len(buf) >= maxBytes || (!more && len(buf) > 0) {
|
|
r.Reset(buf)
|
|
timer.Reset(flushInterval)
|
|
if err := b.Service.Write(ctx, org, bucket, r); err != nil {
|
|
errC <- err
|
|
return
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
case <-timer.C:
|
|
if len(buf) > 0 {
|
|
r.Reset(buf)
|
|
timer.Reset(flushInterval)
|
|
if err := b.Service.Write(ctx, org, bucket, r); err != nil {
|
|
errC <- err
|
|
return
|
|
}
|
|
buf = buf[:0]
|
|
}
|
|
case <-ctx.Done():
|
|
errC <- ctx.Err()
|
|
return
|
|
}
|
|
}
|
|
|
|
errC <- nil
|
|
}
|
|
|
|
// ScanLines is used in bufio.Scanner.Split to split lines of line protocol.
|
|
func ScanLines(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
|
if atEOF && len(data) == 0 {
|
|
return 0, nil, nil
|
|
}
|
|
|
|
if i := bytes.IndexByte(data, '\n'); i >= 0 {
|
|
// We have a full newline-terminated line.
|
|
return i + 1, data[0 : i+1], nil
|
|
|
|
}
|
|
|
|
// If we're at EOF, we have a final, non-terminated line. Return it.
|
|
if atEOF {
|
|
return len(data), data, nil
|
|
}
|
|
|
|
// Request more data.
|
|
return 0, nil, nil
|
|
}
|