mirror of https://github.com/milvus-io/milvus.git
Cache go mod workflow for jenkins pipeline
Signed-off-by: quicksilver <zhifeng.zhang@zilliz.com>pull/4973/head^2
parent
5512ce8d8a
commit
21721d2e03
|
@ -1,6 +1,7 @@
|
|||
timeout(time: 20, unit: 'MINUTES') {
|
||||
dir ("scripts") {
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"Ccache artfactory files not found!\"'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $GO_CACHE_ARTFACTORY_URL --cache_dir=\$(go env GOCACHE) -f go-cache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"Go cache artfactory files not found!\"'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"Thirdparty artfactory files not found!\"'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz || echo \"Go mod artfactory files not found!\"'
|
||||
}
|
||||
|
@ -10,6 +11,7 @@ timeout(time: 20, unit: 'MINUTES') {
|
|||
dir ("scripts") {
|
||||
withCredentials([usernamePassword(credentialsId: "${env.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $GO_CACHE_ARTFACTORY_URL --cache_dir=\$(go env GOCACHE) -f go-cache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz -u ${USERNAME} -p ${PASSWORD}'
|
||||
sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz -u ${USERNAME} -p ${PASSWORD}'
|
||||
}
|
||||
|
|
|
@ -40,6 +40,7 @@ pipeline {
|
|||
CCACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/ccache"
|
||||
THIRDPARTY_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/thirdparty"
|
||||
CUSTOM_THIRDPARTY_PATH = "${WORKSPACE}/3rdparty_download"
|
||||
GO_CACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/go-cache"
|
||||
GO_MOD_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/go-mod"
|
||||
}
|
||||
steps {
|
||||
|
|
|
@ -28,7 +28,7 @@ type Blob struct {
|
|||
type Base struct {
|
||||
Version int
|
||||
CommitID int
|
||||
TanentID UniqueID
|
||||
TenantID UniqueID
|
||||
Schema *etcdpb.CollectionMeta
|
||||
}
|
||||
|
||||
|
@ -93,16 +93,17 @@ type InsertData struct {
|
|||
}
|
||||
|
||||
// Blob key example:
|
||||
// ${tanent}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
|
||||
// ${tenant}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
|
||||
type InsertCodec struct {
|
||||
Base
|
||||
readerCloseFunc []func() error
|
||||
}
|
||||
|
||||
func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segmentID UniqueID, data *InsertData, ts []Timestamp) ([]*Blob, error) {
|
||||
func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) {
|
||||
var blobs []*Blob
|
||||
var writer *InsertBinlogWriter
|
||||
var err error
|
||||
ts := (data.Data[1]).(Int64FieldData).data
|
||||
|
||||
for fieldID, value := range data.Data {
|
||||
switch singleData := value.(type) {
|
||||
|
@ -115,8 +116,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddBoolToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -130,8 +131,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddInt8ToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -145,8 +146,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddInt16ToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -160,8 +161,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddInt32ToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -175,8 +176,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddInt64ToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -190,8 +191,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddFloatToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -205,8 +206,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddDoubleToPayload(singleData.data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -220,8 +221,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
for _, singleString := range singleData.data {
|
||||
err = eventWriter.AddOneStringToPayload(singleString)
|
||||
}
|
||||
|
@ -237,8 +238,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddBinaryVectorToPayload(singleData.data, singleData.dim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -252,8 +253,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventWriter.SetStartTimestamp(ts[0])
|
||||
eventWriter.SetStartTimestamp(ts[len(ts)-1])
|
||||
eventWriter.SetStartTimestamp(typeutil.Timestamp(ts[0]))
|
||||
eventWriter.SetEndTimestamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
err = eventWriter.AddFloatVectorToPayload(singleData.data, singleData.dim)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -265,8 +266,8 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
writer.CollectionID = insertCodec.Schema.ID
|
||||
writer.PartitionID = partitionID
|
||||
writer.SegmentID = segmentID
|
||||
writer.SetStartTimeStamp(ts[0])
|
||||
writer.SetEndTimeStamp(ts[len(ts)-1])
|
||||
writer.SetStartTimeStamp(typeutil.Timestamp(ts[0]))
|
||||
writer.SetEndTimeStamp(typeutil.Timestamp(ts[len(ts)-1]))
|
||||
|
||||
err := writer.Close()
|
||||
if err != nil {
|
||||
|
@ -275,7 +276,7 @@ func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segm
|
|||
|
||||
buffer := writer.GetBuffer()
|
||||
blobKey := fmt.Sprintf("%d/insert_log/%d/%d/%d/%d/%d",
|
||||
insertCodec.TanentID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx)
|
||||
insertCodec.TenantID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx)
|
||||
blobs = append(blobs, &Blob{
|
||||
key: blobKey,
|
||||
value: buffer,
|
||||
|
@ -499,13 +500,13 @@ func (insertCodec *InsertCodec) Close() error {
|
|||
}
|
||||
|
||||
// Blob key example:
|
||||
// ${tanent}/data_definition_log/${collection_id}/${field_type}/${log_idx}
|
||||
// ${tenant}/data_definition_log/${collection_id}/${field_type}/${log_idx}
|
||||
type DataDefinitionCodec struct {
|
||||
Base
|
||||
readerCloseFunc []func() error
|
||||
}
|
||||
|
||||
func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Serialize(logIdx int, ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
|
||||
writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -567,7 +568,7 @@ func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timest
|
|||
}
|
||||
buffer := writer.GetBuffer()
|
||||
blobKey := fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
|
||||
dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, RequestField, logIdx)
|
||||
dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, RequestField, logIdx)
|
||||
blobs = append(blobs, &Blob{
|
||||
key: blobKey,
|
||||
value: buffer,
|
||||
|
@ -596,7 +597,7 @@ func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timest
|
|||
}
|
||||
buffer = writer.GetBuffer()
|
||||
blobKey = fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
|
||||
dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, TsField, logIdx)
|
||||
dataDefinitionCodec.TenantID, dataDefinitionCodec.Schema.ID, TsField, logIdx)
|
||||
blobs = append(blobs, &Blob{
|
||||
key: blobKey,
|
||||
value: buffer,
|
||||
|
@ -606,7 +607,7 @@ func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timest
|
|||
|
||||
}
|
||||
|
||||
func (dataDefinitionCodec DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
|
||||
func (dataDefinitionCodec *DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
|
||||
if len(blobs) == 0 {
|
||||
return nil, nil, fmt.Errorf("blobs is empty")
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
base := Base{
|
||||
Version: 1,
|
||||
CommitID: 1,
|
||||
TanentID: 1,
|
||||
TenantID: 1,
|
||||
Schema: &etcdpb.CollectionMeta{
|
||||
ID: 1,
|
||||
CreateTime: 1,
|
||||
|
@ -30,10 +30,10 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
DataType: schemapb.DataType_BOOL,
|
||||
},
|
||||
{
|
||||
Name: "field_int8",
|
||||
Name: "field_int64",
|
||||
IsPrimaryKey: false,
|
||||
Description: "description_1",
|
||||
DataType: schemapb.DataType_INT8,
|
||||
DataType: schemapb.DataType_INT64,
|
||||
},
|
||||
{
|
||||
Name: "field_int16",
|
||||
|
@ -48,10 +48,10 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
DataType: schemapb.DataType_INT32,
|
||||
},
|
||||
{
|
||||
Name: "field_int64",
|
||||
Name: "field_int8",
|
||||
IsPrimaryKey: false,
|
||||
Description: "description_1",
|
||||
DataType: schemapb.DataType_INT64,
|
||||
DataType: schemapb.DataType_INT8,
|
||||
},
|
||||
{
|
||||
Name: "field_float",
|
||||
|
@ -97,9 +97,9 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
NumRows: 2,
|
||||
data: []bool{true, false},
|
||||
},
|
||||
1: Int8FieldData{
|
||||
1: Int64FieldData{
|
||||
NumRows: 2,
|
||||
data: []int8{1, 2},
|
||||
data: []int64{1, 2},
|
||||
},
|
||||
2: Int16FieldData{
|
||||
NumRows: 2,
|
||||
|
@ -109,9 +109,9 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
NumRows: 2,
|
||||
data: []int32{1, 2},
|
||||
},
|
||||
4: Int64FieldData{
|
||||
4: Int8FieldData{
|
||||
NumRows: 2,
|
||||
data: []int64{1, 2},
|
||||
data: []int8{1, 2},
|
||||
},
|
||||
5: FloatFieldData{
|
||||
NumRows: 2,
|
||||
|
@ -137,7 +137,7 @@ func TestInsertCodecWriter(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
blobs, err := insertCodec.Serialize(1, 1, 1, insertData, []Timestamp{0, 1})
|
||||
blobs, err := insertCodec.Serialize(1, 1, 1, insertData)
|
||||
assert.Nil(t, err)
|
||||
partitionID, segmentID, resultData, err := insertCodec.Deserialize(blobs)
|
||||
assert.Nil(t, err)
|
||||
|
@ -150,7 +150,7 @@ func TestDDCodecWriter(t *testing.T) {
|
|||
base := Base{
|
||||
Version: 1,
|
||||
CommitID: 1,
|
||||
TanentID: 1,
|
||||
TenantID: 1,
|
||||
Schema: &etcdpb.CollectionMeta{
|
||||
ID: 1,
|
||||
CreateTime: 1,
|
||||
|
|
Loading…
Reference in New Issue