Benchmark go/cgo parquet payload readers (#16736)

Signed-off-by: Letian Jiang <letian.jiang@zilliz.com>
pull/16747/head
Letian Jiang 2022-04-29 15:59:47 +08:00 committed by GitHub
parent 7dfab0aa13
commit 5b2b917987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 239 additions and 26 deletions

View File

@ -0,0 +1,212 @@
package storage
import (
"math/rand"
"testing"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
// workload setting for benchmark
const (
numElements = 1000
vectorDim = 8
)
func BenchmarkPayloadReader_Bool(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_Bool)
defer w.ReleasePayloadWriter()
data := make([]bool, 0, numElements)
for i := 0; i < numElements; i++ {
data = append(data, rand.Intn(2) != 0)
}
w.AddBoolToPayload(data)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_Bool, buffer)
r.GetBoolFromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_Bool, buffer)
r.GetBoolFromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_Int32(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_Int32)
defer w.ReleasePayloadWriter()
data := make([]int32, 0, numElements)
for i := 0; i < numElements; i++ {
data = append(data, rand.Int31n(1000))
}
w.AddInt32ToPayload(data)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_Int32, buffer)
r.GetInt32FromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_Int32, buffer)
r.GetInt32FromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_Int64(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_Int64)
defer w.ReleasePayloadWriter()
data := make([]int64, 0, numElements)
for i := 0; i < numElements; i++ {
data = append(data, rand.Int63n(1000))
}
w.AddInt64ToPayload(data)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_Int64, buffer)
r.GetInt64FromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_Int64, buffer)
r.GetInt64FromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_Float32(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_Float)
defer w.ReleasePayloadWriter()
data := make([]float32, 0, numElements)
for i := 0; i < numElements; i++ {
data = append(data, rand.Float32())
}
w.AddFloatToPayload(data)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_Float, buffer)
r.GetFloatFromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_Float, buffer)
r.GetFloatFromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_Float64(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_Double)
defer w.ReleasePayloadWriter()
data := make([]float64, 0, numElements)
for i := 0; i < numElements; i++ {
data = append(data, rand.Float64())
}
w.AddDoubleToPayload(data)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_Double, buffer)
r.GetDoubleFromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_Double, buffer)
r.GetDoubleFromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_FloatVector(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_FloatVector)
defer w.ReleasePayloadWriter()
data := make([]float32, 0, numElements*vectorDim)
for i := 0; i < numElements; i++ {
data = append(data, rand.Float32())
}
w.AddFloatVectorToPayload(data, vectorDim)
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_FloatVector, buffer)
r.GetFloatVectorFromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_FloatVector, buffer)
r.GetFloatVectorFromPayload()
r.ReleasePayloadReader()
}
})
}
func BenchmarkPayloadReader_BinaryVector(b *testing.B) {
w, _ := NewPayloadWriter(schemapb.DataType_BinaryVector)
defer w.ReleasePayloadWriter()
data := make([]byte, numElements*vectorDim/8)
rand.Read(data)
err := w.AddBinaryVectorToPayload(data, vectorDim)
if err != nil {
panic(err)
}
w.FinishPayloadWriter()
buffer, _ := w.GetPayloadBufferFromWriter()
b.Run("cgo reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReaderCgo(schemapb.DataType_BinaryVector, buffer)
r.GetBinaryVectorFromPayload()
r.ReleasePayloadReader()
}
})
b.Run("go reader", func(b *testing.B) {
for i := 0; i < b.N; i++ {
r, _ := NewPayloadReader(schemapb.DataType_BinaryVector, buffer)
r.GetBinaryVectorFromPayload()
r.ReleasePayloadReader()
}
})
}

View File

@ -20,12 +20,12 @@ import (
"fmt"
"testing"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/milvus-io/milvus/internal/proto/schemapb"
)
// TODO Cgo test can not go through on mac, fix it
func TestPayload_CGO_ReaderandWriter(t *testing.T) {
t.Run("TestBool", func(t *testing.T) {
@ -48,7 +48,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Bool, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Bool, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -84,7 +84,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int8, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int8, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -122,7 +122,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int16, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int16, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -158,7 +158,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int32, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int32, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -195,7 +195,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int64, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int64, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -232,7 +232,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Float, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Float, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -269,7 +269,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Double, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Double, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -307,7 +307,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_String, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_String, buffer)
assert.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -361,7 +361,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_BinaryVector, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_BinaryVector, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -401,7 +401,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_FloatVector, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_FloatVector, buffer)
require.Nil(t, err)
length, err = r.GetPayloadLengthFromReader()
assert.Nil(t, err)
@ -621,7 +621,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
t.Run("TestNewReadError", func(t *testing.T) {
buffer := []byte{0}
r, err := NewPayloadReaderGgo(999, buffer)
r, err := NewPayloadReaderCgo(999, buffer)
assert.NotNil(t, err)
assert.Nil(t, r)
})
@ -646,7 +646,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Bool, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Bool, buffer)
assert.Nil(t, err)
_, err = r.GetBoolFromPayload()
@ -670,7 +670,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int8, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int8, buffer)
assert.Nil(t, err)
_, err = r.GetInt8FromPayload()
@ -694,7 +694,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int16, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int16, buffer)
assert.Nil(t, err)
_, err = r.GetInt16FromPayload()
@ -718,7 +718,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int32, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int32, buffer)
assert.Nil(t, err)
_, err = r.GetInt32FromPayload()
@ -742,7 +742,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Int64, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Int64, buffer)
assert.Nil(t, err)
_, err = r.GetInt64FromPayload()
@ -766,7 +766,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Float, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Float, buffer)
assert.Nil(t, err)
_, err = r.GetFloatFromPayload()
@ -790,7 +790,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_Double, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_Double, buffer)
assert.Nil(t, err)
_, err = r.GetDoubleFromPayload()
@ -814,7 +814,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_String, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_String, buffer)
assert.Nil(t, err)
_, err = r.GetStringFromPayload()
@ -838,7 +838,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_BinaryVector, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_BinaryVector, buffer)
assert.Nil(t, err)
_, _, err = r.GetBinaryVectorFromPayload()
@ -862,7 +862,7 @@ func TestPayload_CGO_ReaderandWriter(t *testing.T) {
buffer, err := w.GetPayloadBufferFromWriter()
assert.Nil(t, err)
r, err := NewPayloadReaderGgo(schemapb.DataType_FloatVector, buffer)
r, err := NewPayloadReaderCgo(schemapb.DataType_FloatVector, buffer)
assert.Nil(t, err)
_, _, err = r.GetFloatVectorFromPayload()

View File

@ -13,19 +13,20 @@ import "C"
import (
"errors"
"fmt"
"unsafe"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"unsafe"
)
// PayloadReader reads data from payload
// PayloadReaderCgo reads data from payload
type PayloadReaderCgo struct {
payloadReaderPtr C.CPayloadReader
colType schemapb.DataType
}
func NewPayloadReaderGgo(colType schemapb.DataType, buf []byte) (*PayloadReaderCgo, error) {
func NewPayloadReaderCgo(colType schemapb.DataType, buf []byte) (*PayloadReaderCgo, error) {
if len(buf) == 0 {
return nil, errors.New("create Payload reader failed, buffer is empty")
}