fix(tsm1): fix wal's totalOldDiskSize statistics (#20811)
parent
5fc9240dc5
commit
265c1f311e
|
@ -16,6 +16,7 @@
|
|||
1. [20754](https://github.com/influxdata/influxdb/pull/20754): Update references to docs site to use current URLs.
|
||||
1. [20773](https://github.com/influxdata/influxdb/pull/20773): Fix data race in TSM engine when inspecting tombstone stats.
|
||||
1. [20797](https://github.com/influxdata/influxdb/pull/20797): Fix data race in TSM cache. Thanks @StoneYunZhao!
|
||||
1. [20811](https://github.com/influxdata/influxdb/pull/20811): Fix TSM WAL segment size computing. Thanks @StoneYunZhao!
|
||||
1. [20798](https://github.com/influxdata/influxdb/pull/20798): Deprecate misleading `retentionPeriodHrs` key in onboarding API.
|
||||
1. [20819](https://github.com/influxdata/influxdb/pull/20819): Fix Single Stat graphs with thresholds crashing on negative values.
|
||||
1. [20809](https://github.com/influxdata/influxdb/pull/20809): Fix InfluxDB port in Flux function UI examples. Thanks @sunjincheng121!
|
||||
|
|
|
@ -242,6 +242,11 @@ func (l *WAL) Open() error {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
if l.currentSegmentWriter != nil {
|
||||
totalOldDiskSize -= int64(l.currentSegmentWriter.size)
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
|
||||
|
||||
l.closing = make(chan struct{})
|
||||
|
@ -379,6 +384,11 @@ func (l *WAL) Remove(files []string) error {
|
|||
|
||||
totalOldDiskSize += stat.Size()
|
||||
}
|
||||
|
||||
if l.currentSegmentWriter != nil {
|
||||
totalOldDiskSize -= int64(l.currentSegmentWriter.size)
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)
|
||||
|
||||
return nil
|
||||
|
@ -565,7 +575,8 @@ func (l *WAL) newSegmentFile() error {
|
|||
if err := l.currentSegmentWriter.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size))
|
||||
|
||||
atomic.AddInt64(&l.stats.OldBytes, int64(l.currentSegmentWriter.size))
|
||||
}
|
||||
|
||||
fileName := filepath.Join(l.path, fmt.Sprintf("%s%05d.%s", WALFilePrefix, l.currentSegmentID, WALFileExtension))
|
||||
|
|
|
@ -6,11 +6,13 @@ import (
|
|||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/influxdata/influxdb/v2/pkg/slices"
|
||||
"github.com/influxdata/influxdb/v2/tsdb/engine/tsm1"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWALWriter_WriteMulti_Single(t *testing.T) {
|
||||
|
@ -701,6 +703,93 @@ func TestWALRollSegment(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestWAL_DiskSize(t *testing.T) {
|
||||
test := func(w *tsm1.WAL, oldZero, curZero bool) {
|
||||
// get disk size by reading file
|
||||
files, err := ioutil.ReadDir(w.Path())
|
||||
require.NoError(t, err)
|
||||
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].Name() < files[j].Name()
|
||||
})
|
||||
|
||||
var old, cur int64
|
||||
if len(files) > 0 {
|
||||
cur = files[len(files)-1].Size()
|
||||
for i := 0; i < len(files)-1; i++ {
|
||||
old += files[i].Size()
|
||||
}
|
||||
}
|
||||
|
||||
// test zero size condition
|
||||
require.False(t, oldZero && old > 0)
|
||||
require.False(t, !oldZero && old == 0)
|
||||
require.False(t, curZero && cur > 0)
|
||||
require.False(t, !curZero && cur == 0)
|
||||
|
||||
// test method DiskSizeBytes
|
||||
require.Equal(t, old+cur, w.DiskSizeBytes(), "total disk size")
|
||||
|
||||
// test Statistics
|
||||
ss := w.Statistics(nil)
|
||||
require.Equal(t, 1, len(ss))
|
||||
|
||||
m := ss[0].Values
|
||||
require.NotNil(t, m)
|
||||
|
||||
require.Equal(t, m["oldSegmentsDiskBytes"].(int64), old, "old disk size")
|
||||
require.Equal(t, m["currentSegmentDiskBytes"].(int64), cur, "current dist size")
|
||||
}
|
||||
|
||||
dir := MustTempDir()
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
w := tsm1.NewWAL(dir)
|
||||
|
||||
const segSize = 1024
|
||||
w.SegmentSize = segSize
|
||||
|
||||
// open
|
||||
require.NoError(t, w.Open())
|
||||
|
||||
test(w, true, true)
|
||||
|
||||
// write some values, the total size of these values does not exceed segSize(1024),
|
||||
// so rollSegment will not be triggered
|
||||
values := map[string][]tsm1.Value{
|
||||
"cpu,host=A#!~#value": {tsm1.NewValue(1, 1.0)},
|
||||
"cpu,host=B#!~#value": {tsm1.NewValue(1, 1.0)},
|
||||
"cpu,host=C#!~#value": {tsm1.NewValue(1, 1.0)},
|
||||
}
|
||||
|
||||
_, err := w.WriteMulti(values)
|
||||
require.NoError(t, err)
|
||||
|
||||
test(w, true, false)
|
||||
|
||||
// write some values, the total size of these values exceeds segSize(1024),
|
||||
// so rollSegment will be triggered
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := w.WriteMulti(values)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
test(w, false, false)
|
||||
|
||||
// reopen
|
||||
require.NoError(t, w.Close())
|
||||
require.NoError(t, w.Open())
|
||||
|
||||
test(w, false, false)
|
||||
|
||||
// remove
|
||||
closedSegments, err := w.ClosedSegments()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w.Remove(closedSegments))
|
||||
|
||||
test(w, true, false)
|
||||
}
|
||||
|
||||
func TestWriteWALSegment_UnmarshalBinary_WriteWALCorrupt(t *testing.T) {
|
||||
p1 := tsm1.NewValue(1, 1.1)
|
||||
p2 := tsm1.NewValue(1, int64(1))
|
||||
|
|
Loading…
Reference in New Issue