refactor(write): reuse timers
parent
6ce2530003
commit
ad50cb1527
|
@ -98,11 +98,14 @@ func (b *Batcher) write(ctx context.Context, org, bucket platform.ID, lines <-ch
|
|||
maxBytes = DefaultMaxBytes
|
||||
}
|
||||
|
||||
timer := time.NewTimer(flushInterval)
|
||||
defer func() { _ = timer.Stop() }()
|
||||
|
||||
buf := make([]byte, 0, maxBytes)
|
||||
r := bytes.NewReader(buf)
|
||||
|
||||
var line []byte
|
||||
var more = true
|
||||
|
||||
// if read closes the channel normally, exit the loop
|
||||
for more {
|
||||
select {
|
||||
|
@ -113,16 +116,17 @@ func (b *Batcher) write(ctx context.Context, org, bucket platform.ID, lines <-ch
|
|||
// write if we exceed the max lines OR read routine has finished
|
||||
if len(buf) >= maxBytes || (!more && len(buf) > 0) {
|
||||
r.Reset(buf)
|
||||
timer.Reset(flushInterval)
|
||||
if err := b.Service.Write(ctx, org, bucket, r); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
}
|
||||
buf = buf[:0]
|
||||
}
|
||||
case <-time.After(flushInterval):
|
||||
// TODO: worry about timer garbage collection
|
||||
case <-timer.C:
|
||||
if len(buf) > 0 {
|
||||
r.Reset(buf)
|
||||
timer.Reset(flushInterval)
|
||||
if err := b.Service.Write(ctx, org, bucket, r); err != nil {
|
||||
errC <- err
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue