From 881be7f3e3a103f2fa8e55fa36504f1309b59dad Mon Sep 17 00:00:00 2001
From: XuanYang-cn <xuan.yang@zilliz.com>
Date: Wed, 9 Dec 2020 20:07:27 +0800
Subject: [PATCH] Impl minimum runable writenode and writenode flowgraph

Signed-off-by: XuanYang-cn <xuan.yang@zilliz.com>
---
 .devcontainer.json                            |   2 +-
 .env                                          |   2 +-
 .github/workflows/code-checker.yaml           |  16 +-
 .github/workflows/main.yaml                   |  14 +-
 .jenkins/modules/Build/Build.groovy           |   8 +-
 Makefile                                      |   9 +-
 build/builder.sh                              |   4 +-
 build/ci/jenkins/Jenkinsfile                  |   1 +
 build/ci/jenkins/pod/build-env.yaml           |   4 +-
 build/docker/env/cpu/ubuntu18.04/Dockerfile   |   3 +-
 configs/advanced/channel.yaml                 |   2 +-
 configs/advanced/write_node.yaml              |  41 ++
 docker-compose.yml                            |   3 +-
 internal/storage/binlog_reader.go             |   8 +-
 internal/storage/cwrapper/CMakeLists.txt      |   2 +
 internal/storage/data_codec.go                | 684 ++++++++++++++++++
 internal/storage/data_codec_test.go           | 227 ++++++
 internal/storage/event_reader.go              |   3 +-
 internal/writenode/data_sync_service.go       |  80 ++
 internal/writenode/data_sync_service_test.go  | 143 ++++
 .../writenode/flow_graph_filter_dm_node.go    |  64 ++
 .../flow_graph_insert_buffer_node.go          |  61 ++
 internal/writenode/flow_graph_message.go      | 102 +++
 .../flow_graph_msg_stream_input_node.go       |  39 +
 internal/writenode/flow_graph_node.go         |   9 +
 .../writenode/flow_graph_service_time_node.go |  46 ++
 internal/writenode/param_table.go             | 236 ++++++
 internal/writenode/param_table_test.go        | 112 +++
 internal/writenode/type_def.go                |  15 +
 internal/writenode/write_node.go              |  53 ++
 scripts/run_go_unittest.sh                    |   2 +-
 31 files changed, 1964 insertions(+), 31 deletions(-)
 create mode 100644 configs/advanced/write_node.yaml
 create mode 100644 internal/storage/data_codec.go
 create mode 100644 internal/storage/data_codec_test.go
 create mode 100644 internal/writenode/data_sync_service.go
 create mode 100644 internal/writenode/data_sync_service_test.go
 create mode 100644 internal/writenode/flow_graph_filter_dm_node.go
 create mode 100644 internal/writenode/flow_graph_insert_buffer_node.go
 create mode 100644 internal/writenode/flow_graph_message.go
 create mode 100644 internal/writenode/flow_graph_msg_stream_input_node.go
 create mode 100644 internal/writenode/flow_graph_node.go
 create mode 100644 internal/writenode/flow_graph_service_time_node.go
 create mode 100644 internal/writenode/param_table.go
 create mode 100644 internal/writenode/param_table_test.go
 create mode 100644 internal/writenode/type_def.go
 create mode 100644 internal/writenode/write_node.go

diff --git a/.devcontainer.json b/.devcontainer.json
index 57928bf15c..efc26969c3 100644
--- a/.devcontainer.json
+++ b/.devcontainer.json
@@ -1,6 +1,6 @@
 {
     "name": "Milvus Distributed Dev Container Definition",
-    "image": "milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201120-092740",
+    "image": "milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201209-104246",
     "runArgs": [ "--cap-add=SYS_PTRACE", "--security-opt", "seccomp=unconfined" ],
     "workspaceFolder": "/go/src/github.com/zilliztech/milvus-distributed",
     "workspaceMount": "source=${localWorkspaceFolder},target=/go/src/github.com/zilliztech/milvus-distributed,type=bind,consistency=cached",
diff --git a/.env b/.env
index 7f4e042a2a..0ce6e0ff20 100644
--- a/.env
+++ b/.env
@@ -1,7 +1,7 @@
 REPO=milvusdb/milvus-distributed-dev
 ARCH=amd64
 UBUNTU=18.04
-DATE_VERSION=20201202-085131
+DATE_VERSION=20201209-104246
 LATEST_DATE_VERSION=latest
 MINIO_ADDRESS=minio:9000
 PULSAR_ADDRESS=pulsar://pulsar:6650
diff --git a/.github/workflows/code-checker.yaml b/.github/workflows/code-checker.yaml
index 9b5a330a86..725fbaf597 100644
--- a/.github/workflows/code-checker.yaml
+++ b/.github/workflows/code-checker.yaml
@@ -42,12 +42,18 @@ jobs:
     steps:
       - name: Checkout
         uses: actions/checkout@v2
-      - name: Cache Docker Volumes
+      - name: Cache CCache Volumes
         uses: actions/cache@v1
         with:
-          path: .docker
-          key: ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
-          restore-keys: ubuntu${{ matrix.ubuntu }}-
+          path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-ccache
+          key: ubuntu${{ matrix.ubuntu }}-ccache-${{ hashFiles('internal/core/**') }}
+          restore-keys: ubuntu${{ matrix.ubuntu }}-ccache-
+      - name: Cache Go Mod Volumes
+        uses: actions/cache@v1
+        with:
+          path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-go-mod
+          key: ubuntu${{ matrix.ubuntu }}-go-mod-${{ hashFiles('**/go.sum') }}
+          restore-keys: ubuntu${{ matrix.ubuntu }}-go-mod-
       - name: Dockerfile Lint
         uses: reviewdog/action-hadolint@v1
         with:
@@ -55,5 +61,7 @@ jobs:
           reporter: github-pr-check # Default is github-pr-check
           hadolint_ignore: DL3008
       - name: Code Check
+        env:
+          CHECK_BUILDER: "1"
         run: |
           ./build/builder.sh /bin/bash -c "make check-proto-product && make verifiers"
diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml
index d7a516ab79..342110b301 100644
--- a/.github/workflows/main.yaml
+++ b/.github/workflows/main.yaml
@@ -42,12 +42,18 @@ jobs:
     steps:
       - name: Checkout
         uses: actions/checkout@v2
-      - name: Cache Docker Volumes
+      - name: Cache CCache Volumes
         uses: actions/cache@v1
         with:
-          path: .docker
-          key: ubuntu${{ matrix.ubuntu }}-${{ hashFiles('internal/core/**') }}
-          restore-keys: ubuntu${{ matrix.ubuntu }}-
+          path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-ccache
+          key: ubuntu${{ matrix.ubuntu }}-ccache-${{ hashFiles('internal/core/**') }}
+          restore-keys: ubuntu${{ matrix.ubuntu }}-ccache-
+      - name: Cache Go Mod Volumes
+        uses: actions/cache@v1
+        with:
+          path: .docker/amd64-ubuntu${{ matrix.ubuntu }}-go-mod
+          key: ubuntu${{ matrix.ubuntu }}-go-mod-${{ hashFiles('**/go.sum') }}
+          restore-keys: ubuntu${{ matrix.ubuntu }}-go-mod-
       - name: Start Service
         shell: bash
         run: |
diff --git a/.jenkins/modules/Build/Build.groovy b/.jenkins/modules/Build/Build.groovy
index dc5e5db542..1965559256 100644
--- a/.jenkins/modules/Build/Build.groovy
+++ b/.jenkins/modules/Build/Build.groovy
@@ -1,7 +1,8 @@
-timeout(time: 10, unit: 'MINUTES') {
+timeout(time: 20, unit: 'MINUTES') {
     dir ("scripts") {
-        sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"ccache artfactory files not found!\"'
-        sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"thirdparty artfactory files not found!\"'
+        sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz || echo \"Ccache artfactory files not found!\"'
+        sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz || echo \"Thirdparty artfactory files not found!\"'
+        sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./check_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz || echo \"Go mod artfactory files not found!\"'
     }
 
     sh '. ./scripts/before-install.sh && make install'
@@ -10,6 +11,7 @@ timeout(time: 10, unit: 'MINUTES') {
         withCredentials([usernamePassword(credentialsId: "${env.JFROG_CREDENTIALS_ID}", usernameVariable: 'USERNAME', passwordVariable: 'PASSWORD')]) {
             sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $CCACHE_ARTFACTORY_URL --cache_dir=\$CCACHE_DIR -f ccache-\$OS_NAME-\$BUILD_ENV_IMAGE_ID.tar.gz -u ${USERNAME} -p ${PASSWORD}'
             sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $THIRDPARTY_ARTFACTORY_URL --cache_dir=$CUSTOM_THIRDPARTY_PATH -f thirdparty-download.tar.gz -u ${USERNAME} -p ${PASSWORD}'
+            sh '. ./before-install.sh && unset http_proxy && unset https_proxy && ./update_cache.sh -l $GO_MOD_ARTFACTORY_URL --cache_dir=\$GOPATH/pkg/mod -f milvus-distributed-go-mod-cache.tar.gz -u ${USERNAME} -p ${PASSWORD}'
         }
     }
 }
diff --git a/Makefile b/Makefile
index 75232a2044..1f2d9ae7ce 100644
--- a/Makefile
+++ b/Makefile
@@ -21,11 +21,18 @@ all: build-cpp build-go
 get-build-deps:
 	@(env bash $(PWD)/scripts/install_deps.sh)
 
+getdeps:
+	@mkdir -p ${GOPATH}/bin
+	@which golangci-lint 1>/dev/null || (echo "Installing golangci-lint" && curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(GOPATH)/bin v1.27.0)
+	@which ruleguard 1>/dev/null || (echo "Installing ruleguard" && GO111MODULE=off go get github.com/quasilyte/go-ruleguard/...)
+
 cppcheck:
 	@(env bash ${PWD}/scripts/core_build.sh -l)
 
 generated-proto-go:export protoc:=${PWD}/cmake_build/thirdparty/protobuf/protobuf-build/protoc
 generated-proto-go: build-cpp
+	@mkdir -p ${GOPATH}/bin
+	@which protoc-gen-go 1>/dev/null || (echo "Installing protoc-gen-go" && go get github.com/golang/protobuf/protoc-gen-go@v1.3.2)
 	@(env bash $(PWD)/scripts/proto_gen_go.sh)
 
 check-proto-product: generated-proto-go
@@ -51,7 +58,7 @@ ruleguard:
 	@${GOPATH}/bin/ruleguard -rules ruleguard.rules.go ./cmd/...
 	@${GOPATH}/bin/ruleguard -rules ruleguard.rules.go ./tests/go/...
 
-verifiers: cppcheck fmt lint ruleguard
+verifiers: getdeps cppcheck fmt lint ruleguard
 
 # Builds various components locally.
 build-go:
diff --git a/build/builder.sh b/build/builder.sh
index cc8eacccdd..b0d4ff4660 100755
--- a/build/builder.sh
+++ b/build/builder.sh
@@ -14,7 +14,6 @@ if [ "${1-}" = "pull" ]; then
 fi
 
 if [ "${1-}" = "gdbserver" ]; then
-    mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-gdbserver-cache"
     mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-gdbserver-home"
     chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
 
@@ -45,7 +44,8 @@ gid=$(id -g)
 [ "$uid" -lt 500 ] && uid=501
 [ "$gid" -lt 500 ] && gid=$uid
 
-mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-cache"
+mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-ccache"
+mkdir -p "${DOCKER_VOLUME_DIRECTORY:-.docker}/amd64-ubuntu18.04-go-mod"
 chmod -R 777 "${DOCKER_VOLUME_DIRECTORY:-.docker}"
 
 docker-compose pull --ignore-pull-failures ubuntu
diff --git a/build/ci/jenkins/Jenkinsfile b/build/ci/jenkins/Jenkinsfile
index ed21ffee9e..3bae427d1b 100644
--- a/build/ci/jenkins/Jenkinsfile
+++ b/build/ci/jenkins/Jenkinsfile
@@ -40,6 +40,7 @@ pipeline {
                 CCACHE_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/ccache"
                 THIRDPARTY_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/thirdparty"
                 CUSTOM_THIRDPARTY_PATH = "${WORKSPACE}/3rdparty_download"
+                GO_MOD_ARTFACTORY_URL = "${JFROG_ARTFACTORY_URL}/milvus-distributed/go-mod"
             }
             steps {
                 container('build-env') {
diff --git a/build/ci/jenkins/pod/build-env.yaml b/build/ci/jenkins/pod/build-env.yaml
index ba8d5cfe97..7fdfc193bd 100644
--- a/build/ci/jenkins/pod/build-env.yaml
+++ b/build/ci/jenkins/pod/build-env.yaml
@@ -11,12 +11,12 @@ spec:
     runAsGroup: 2000
   containers:
   - name: build-env
-    image: milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201124-101232
+    image: milvusdb/milvus-distributed-dev:amd64-ubuntu18.04-20201209-104246
     env:
     - name: OS_NAME
       value: "ubuntu18.04"
     - name: BUILD_ENV_IMAGE_ID
-      value: "f0f52760fde8758793f5a68c39ba20298c812e754de337ba4cc7fd8edf4ae7a9"
+      value: "ae09110abf11fc58e2bcd2393c4cd071f0c5cfe2b65ab1a75f9510f3b32921bc"
     command:
     - cat
     tty: true
diff --git a/build/docker/env/cpu/ubuntu18.04/Dockerfile b/build/docker/env/cpu/ubuntu18.04/Dockerfile
index 95ff5f7495..2a9a42e9be 100644
--- a/build/docker/env/cpu/ubuntu18.04/Dockerfile
+++ b/build/docker/env/cpu/ubuntu18.04/Dockerfile
@@ -38,8 +38,7 @@ ENV GOROOT /usr/local/go
 ENV GO111MODULE on
 ENV PATH $GOPATH/bin:$GOROOT/bin:$PATH
 RUN mkdir -p /usr/local/go && wget -qO- "https://golang.org/dl/go1.15.2.linux-amd64.tar.gz" | tar --strip-components=1 -xz -C /usr/local/go && \
-    mkdir -p "$GOPATH/src" "$GOPATH/src/github.com/zilliztech" "$GOPATH/bin" && \
-    go get github.com/golang/protobuf/protoc-gen-go@v1.3.2 && \
+    mkdir -p "$GOPATH/src" "$GOPATH/bin" && \
     curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b ${GOPATH}/bin v1.27.0 && \
     export GO111MODULE=off && go get github.com/quasilyte/go-ruleguard/... && \
     chmod -R 777 "$GOPATH" && chmod -R a+w $(go env GOTOOLDIR)
diff --git a/configs/advanced/channel.yaml b/configs/advanced/channel.yaml
index ed62dd977e..8116601e97 100644
--- a/configs/advanced/channel.yaml
+++ b/configs/advanced/channel.yaml
@@ -37,4 +37,4 @@ msgChannel:
     dataDefinition: [0,1]
     k2s: [0, 1]
     search: [0, 1]
-    searchResult: [0, 1]
\ No newline at end of file
+    searchResult: [0, 1]
diff --git a/configs/advanced/write_node.yaml b/configs/advanced/write_node.yaml
new file mode 100644
index 0000000000..6bf4289c46
--- /dev/null
+++ b/configs/advanced/write_node.yaml
@@ -0,0 +1,41 @@
+
+# Copyright (C) 2019-2020 Zilliz. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software distributed under the License
+# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+# or implied. See the License for the specific language governing permissions and limitations under the License.
+
+writeNode:
+  stats:
+    publishInterval: 1000 # milliseconds
+
+  dataSync:
+    flowGraph:
+      maxQueueLength: 1024
+      maxParallelism: 1024
+
+  msgStream:
+    insert:
+      #streamBufSize: 1024 # msgPack chan buffer size
+      recvBufSize: 1024 # msgPack chan buffer size
+      pulsarBufSize: 1024 # pulsar chan buffer size
+
+    delete:
+      #streamBufSize: 1024 # msgPack chan buffer size
+      recvBufSize: 1024 # msgPack chan buffer size
+      pulsarBufSize: 1024 # pulsar chan buffer size
+
+    search:
+      recvBufSize: 512
+      pulsarBufSize: 512
+
+    searchResult:
+      recvBufSize: 64
+
+    stats:
+      recvBufSize: 64
diff --git a/docker-compose.yml b/docker-compose.yml
index fc13efcd09..308055ed59 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -24,7 +24,8 @@ services:
       MINIO_ADDRESS: ${MINIO_ADDRESS}
     volumes: &ubuntu-volumes
       - .:/go/src/github.com/zilliztech/milvus-distributed:delegated
-      - ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-cache:/ccache:delegated
+      - ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-ccache:/ccache:delegated
+      - ${DOCKER_VOLUME_DIRECTORY:-.docker}/${ARCH}-ubuntu${UBUNTU}-go-mod:/go/pkg/mod:delegated
     working_dir: "/go/src/github.com/zilliztech/milvus-distributed"
     command: &ubuntu-command >
       /bin/bash -c "
diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go
index b9c95bece8..9099850073 100644
--- a/internal/storage/binlog_reader.go
+++ b/internal/storage/binlog_reader.go
@@ -28,14 +28,8 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
 			return nil, err
 		}
 		reader.currentEventReader = nil
-		if reader.currentOffset >= int32(reader.buffer.Len()) {
-			return nil, nil
-		}
-		// skip remaining bytes of this event
-		remaining := int(reader.currentOffset) - (reader.bufferLength - reader.buffer.Len())
-		reader.buffer.Next(remaining)
 	}
-	if reader.currentOffset >= int32(reader.buffer.Len()) {
+	if reader.buffer.Len() <= 0 {
 		return nil, nil
 	}
 	eventReader, err := newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer)
diff --git a/internal/storage/cwrapper/CMakeLists.txt b/internal/storage/cwrapper/CMakeLists.txt
index 934e805405..6806f8bb7b 100644
--- a/internal/storage/cwrapper/CMakeLists.txt
+++ b/internal/storage/cwrapper/CMakeLists.txt
@@ -37,6 +37,8 @@ macro( build_arrow )
         "-DARROW_BUILD_UTILITIES=OFF"
         "-DARROW_PARQUET=ON"
         "-DPARQUET_BUILD_SHARED=OFF"
+        "-DThrift_SOURCE=BUNDLED"
+        "-Dutf8proc_SOURCE=BUNDLED"
         "-DARROW_S3=OFF"
         "-DCMAKE_VERBOSE_MAKEFILE=ON"
         "-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_BINARY_DIR}"
diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go
new file mode 100644
index 0000000000..0a5ab101d7
--- /dev/null
+++ b/internal/storage/data_codec.go
@@ -0,0 +1,684 @@
+package storage
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+
+	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
+	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
+)
+
+const (
+	TsField      int = 1
+	RequestField int = 100
+)
+
+type (
+	UniqueID  = typeutil.UniqueID
+	Timestamp = typeutil.Timestamp
+)
+
+type Blob struct {
+	key   string
+	value []byte
+}
+
+type Base struct {
+	Version  int
+	CommitID int
+	TanentID UniqueID
+	Schema   *etcdpb.CollectionMeta
+}
+
+type FieldData interface{}
+
+type BoolFieldData struct {
+	NumRows int
+	data    []bool
+}
+type Int8FieldData struct {
+	NumRows int
+	data    []int8
+}
+type Int16FieldData struct {
+	NumRows int
+	data    []int16
+}
+type Int32FieldData struct {
+	NumRows int
+	data    []int32
+}
+type Int64FieldData struct {
+	NumRows int
+	data    []int64
+}
+type FloatFieldData struct {
+	NumRows int
+	data    []float32
+}
+type DoubleFieldData struct {
+	NumRows int
+	data    []float64
+}
+type StringFieldData struct {
+	NumRows int
+	data    []string
+}
+type BinaryVectorFieldData struct {
+	NumRows int
+	data    []byte
+	dim     int
+}
+type FloatVectorFieldData struct {
+	NumRows int
+	data    []float32
+	dim     int
+}
+
+// TODO: more types of FieldData
+
+// system filed id:
+// 0: unique row id
+// 1: timestamp
+// 100: first user field id
+// 101: second user field id
+// 102: ...
+
+// example row_schema: {float_field, int_field, float_vector_field, string_field}
+// Data {<0, row_id>, <1, timestamp>, <100, float_field>, <101, int_field>, <102, float_vector_field>, <103, string_field>}
+type InsertData struct {
+	Data map[int]FieldData // field id to field data
+}
+
+// Blob key example:
+// ${tanent}/insert_log/${collection_id}/${partition_id}/${segment_id}/${field_id}/${log_idx}
+type InsertCodec struct {
+	Base
+	readerCloseFunc []func() error
+}
+
+func (insertCodec *InsertCodec) Serialize(logIdx int, partitionID UniqueID, segmentID UniqueID, data *InsertData, ts []Timestamp) ([]*Blob, error) {
+	var blobs []*Blob
+	var writer *InsertBinlogWriter
+	var err error
+
+	for fieldID, value := range data.Data {
+		switch singleData := value.(type) {
+		case BoolFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_BOOL)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddBoolToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case Int8FieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_INT8)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddInt8ToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case Int16FieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_INT16)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddInt16ToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case Int32FieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_INT32)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddInt32ToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case Int64FieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_INT64)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddInt64ToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case FloatFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_FLOAT)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddFloatToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case DoubleFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_DOUBLE)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddDoubleToPayload(singleData.data)
+			if err != nil {
+				return nil, err
+			}
+		case StringFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_STRING)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			for _, singleString := range singleData.data {
+				err = eventWriter.AddOneStringToPayload(singleString)
+			}
+			if err != nil {
+				return nil, err
+			}
+		case BinaryVectorFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_BINARY)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddBinaryVectorToPayload(singleData.data, singleData.dim)
+			if err != nil {
+				return nil, err
+			}
+		case FloatVectorFieldData:
+			writer, err = NewInsertBinlogWriter(schemapb.DataType_VECTOR_FLOAT)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter, err := writer.NextInsertEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[0])
+			eventWriter.SetStartTimestamp(ts[len(ts)-1])
+			err = eventWriter.AddFloatVectorToPayload(singleData.data, singleData.dim)
+			if err != nil {
+				return nil, err
+			}
+		}
+		if writer == nil {
+			return nil, fmt.Errorf("binlog writer is nil")
+		}
+		writer.CollectionID = insertCodec.Schema.ID
+		writer.PartitionID = partitionID
+		writer.SegmentID = segmentID
+		writer.SetStartTimeStamp(ts[0])
+		writer.SetEndTimeStamp(ts[len(ts)-1])
+
+		err := writer.Close()
+		if err != nil {
+			return nil, err
+		}
+
+		buffer := writer.GetBuffer()
+		blobKey := fmt.Sprintf("%d/insert_log/%d/%d/%d/%d/%d",
+			insertCodec.TanentID, insertCodec.Schema.ID, partitionID, segmentID, fieldID, logIdx)
+		blobs = append(blobs, &Blob{
+			key:   blobKey,
+			value: buffer,
+		})
+
+	}
+	return blobs, nil
+
+}
+func (insertCodec *InsertCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *InsertData, err error) {
+	if len(blobs) == 0 {
+		return -1, -1, nil, fmt.Errorf("blobs is empty")
+	}
+	readerClose := func(reader *BinlogReader) func() error {
+		return func() error { return reader.Close() }
+	}
+	pID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[3], 0, 10)
+	sID, _ := strconv.ParseInt(strings.Split(blobs[0].key, "/")[4], 0, 10)
+	var resultData InsertData
+	resultData.Data = make(map[int]FieldData)
+	for _, blob := range blobs {
+		fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[5])
+
+		if err != nil {
+			return -1, -1, nil, err
+		}
+		dataType := insertCodec.Schema.Schema.Fields[fieldID].GetDataType()
+
+		switch dataType {
+		case schemapb.DataType_BOOL:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var boolFieldData BoolFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			boolFieldData.data, err = eventReader.GetBoolFromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			boolFieldData.NumRows = len(boolFieldData.data)
+			resultData.Data[fieldID] = boolFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_INT8:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var int8FieldData Int8FieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int8FieldData.data, err = eventReader.GetInt8FromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int8FieldData.NumRows = len(int8FieldData.data)
+			resultData.Data[fieldID] = int8FieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_INT16:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var int16FieldData Int16FieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int16FieldData.data, err = eventReader.GetInt16FromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int16FieldData.NumRows = len(int16FieldData.data)
+			resultData.Data[fieldID] = int16FieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_INT32:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var int32FieldData Int32FieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int32FieldData.data, err = eventReader.GetInt32FromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int32FieldData.NumRows = len(int32FieldData.data)
+			resultData.Data[fieldID] = int32FieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_INT64:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var int64FieldData Int64FieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int64FieldData.data, err = eventReader.GetInt64FromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			int64FieldData.NumRows = len(int64FieldData.data)
+			resultData.Data[fieldID] = int64FieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_FLOAT:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var floatFieldData FloatFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			floatFieldData.data, err = eventReader.GetFloatFromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			floatFieldData.NumRows = len(floatFieldData.data)
+			resultData.Data[fieldID] = floatFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_DOUBLE:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var doubleFieldData DoubleFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			doubleFieldData.data, err = eventReader.GetDoubleFromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			doubleFieldData.NumRows = len(doubleFieldData.data)
+			resultData.Data[fieldID] = doubleFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_STRING:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var stringFieldData StringFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			length, err := eventReader.GetPayloadLengthFromReader()
+			stringFieldData.NumRows = length
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			for i := 0; i < length; i++ {
+				singleString, err := eventReader.GetOneStringFromPayload(i)
+				if err != nil {
+					return -1, -1, nil, err
+				}
+				stringFieldData.data = append(stringFieldData.data, singleString)
+			}
+			resultData.Data[fieldID] = stringFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_VECTOR_BINARY:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var binaryVectorFieldData BinaryVectorFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			binaryVectorFieldData.data, binaryVectorFieldData.dim, err = eventReader.GetBinaryVectorFromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			binaryVectorFieldData.NumRows = len(binaryVectorFieldData.data)
+			resultData.Data[fieldID] = binaryVectorFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		case schemapb.DataType_VECTOR_FLOAT:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			var floatVectorFieldData FloatVectorFieldData
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			floatVectorFieldData.data, floatVectorFieldData.dim, err = eventReader.GetFloatVectorFromPayload()
+			if err != nil {
+				return -1, -1, nil, err
+			}
+			floatVectorFieldData.NumRows = len(floatVectorFieldData.data) / 8
+			resultData.Data[fieldID] = floatVectorFieldData
+			insertCodec.readerCloseFunc = append(insertCodec.readerCloseFunc, readerClose(binlogReader))
+		}
+	}
+
+	return pID, sID, &resultData, nil
+}
+
+func (insertCodec *InsertCodec) Close() error {
+	for _, closeFunc := range insertCodec.readerCloseFunc {
+		err := closeFunc()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+// Blob key example:
+// ${tanent}/data_definition_log/${collection_id}/${field_type}/${log_idx}
+type DataDefinitionCodec struct {
+	Base
+	readerCloseFunc []func() error
+}
+
+func (dataDefinitionCodec DataDefinitionCodec) Serialize(logIdx int, ts []Timestamp, ddRequests []string, eventTypes []EventTypeCode) ([]*Blob, error) {
+	writer, err := NewDDLBinlogWriter(schemapb.DataType_STRING)
+	if err != nil {
+		return nil, err
+	}
+
+	var blobs []*Blob
+
+	for pos, req := range ddRequests {
+		switch eventTypes[pos] {
+		case CreateCollectionEventType:
+			eventWriter, err := writer.NextCreateCollectionEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			err = eventWriter.AddOneStringToPayload(req)
+			if err != nil {
+				return nil, err
+			}
+			eventWriter.SetStartTimestamp(ts[pos])
+			eventWriter.SetEndTimestamp(ts[pos])
+		case DropCollectionEventType:
+			eventWriter, err := writer.NextDropCollectionEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			err = eventWriter.AddOneStringToPayload(req)
+			eventWriter.SetStartTimestamp(ts[pos])
+			eventWriter.SetEndTimestamp(ts[pos])
+			if err != nil {
+				return nil, err
+			}
+		case CreatePartitionEventType:
+			eventWriter, err := writer.NextCreatePartitionEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			err = eventWriter.AddOneStringToPayload(req)
+			eventWriter.SetStartTimestamp(ts[pos])
+			eventWriter.SetEndTimestamp(ts[pos])
+			if err != nil {
+				return nil, err
+			}
+		case DropPartitionEventType:
+			eventWriter, err := writer.NextDropPartitionEventWriter()
+			if err != nil {
+				return nil, err
+			}
+			err = eventWriter.AddOneStringToPayload(req)
+			eventWriter.SetStartTimestamp(ts[pos])
+			eventWriter.SetEndTimestamp(ts[pos])
+			if err != nil {
+				return nil, err
+			}
+		}
+	}
+	err = writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	buffer := writer.GetBuffer()
+	blobKey := fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
+		dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, RequestField, logIdx)
+	blobs = append(blobs, &Blob{
+		key:   blobKey,
+		value: buffer,
+	})
+
+	writer, err = NewDDLBinlogWriter(schemapb.DataType_INT64)
+	if err != nil {
+		return nil, err
+	}
+
+	eventWriter, err := writer.NextCreateCollectionEventWriter()
+	if err != nil {
+		return nil, err
+	}
+	var int64Ts []int64
+	for _, singleTs := range ts {
+		int64Ts = append(int64Ts, int64(singleTs))
+	}
+	err = eventWriter.AddInt64ToPayload(int64Ts)
+	if err != nil {
+		return nil, err
+	}
+	err = writer.Close()
+	if err != nil {
+		return nil, err
+	}
+	buffer = writer.GetBuffer()
+	blobKey = fmt.Sprintf("%d/data_definition_log/%d/%d/%d",
+		dataDefinitionCodec.TanentID, dataDefinitionCodec.Schema.ID, TsField, logIdx)
+	blobs = append(blobs, &Blob{
+		key:   blobKey,
+		value: buffer,
+	})
+
+	return blobs, nil
+
+}
+
+func (dataDefinitionCodec DataDefinitionCodec) Deserialize(blobs []*Blob) (ts []Timestamp, ddRequests []string, err error) {
+	if len(blobs) == 0 {
+		return nil, nil, fmt.Errorf("blobs is empty")
+	}
+	readerClose := func(reader *BinlogReader) func() error {
+		return func() error { return reader.Close() }
+	}
+	var requestsStrings []string
+	var resultTs []Timestamp
+	for _, blob := range blobs {
+		fieldID, err := strconv.Atoi(strings.Split(blob.key, "/")[3])
+
+		if err != nil {
+			return nil, nil, err
+		}
+		switch fieldID {
+		case TsField:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return nil, nil, err
+			}
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return nil, nil, err
+			}
+			int64Ts, err := eventReader.GetInt64FromPayload()
+			if err != nil {
+				return nil, nil, err
+			}
+			for _, singleTs := range int64Ts {
+				resultTs = append(resultTs, Timestamp(singleTs))
+			}
+			dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
+		case RequestField:
+			binlogReader, err := NewBinlogReader(blob.value)
+			if err != nil {
+				return nil, nil, err
+			}
+			eventReader, err := binlogReader.NextEventReader()
+			if err != nil {
+				return nil, nil, err
+			}
+			for eventReader != nil {
+				length, err := eventReader.GetPayloadLengthFromReader()
+				if err != nil {
+					return nil, nil, err
+				}
+				for i := 0; i < length; i++ {
+					singleString, err := eventReader.GetOneStringFromPayload(i)
+					if err != nil {
+						return nil, nil, err
+					}
+					requestsStrings = append(requestsStrings, singleString)
+				}
+				eventReader, err = binlogReader.NextEventReader()
+				if err != nil {
+					return nil, nil, err
+				}
+			}
+			dataDefinitionCodec.readerCloseFunc = append(dataDefinitionCodec.readerCloseFunc, readerClose(binlogReader))
+		}
+
+	}
+
+	return resultTs, requestsStrings, nil
+}
+
+func (dataDefinitionCodec *DataDefinitionCodec) Close() error {
+	for _, closeFunc := range dataDefinitionCodec.readerCloseFunc {
+		err := closeFunc()
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go
new file mode 100644
index 0000000000..0216876691
--- /dev/null
+++ b/internal/storage/data_codec_test.go
@@ -0,0 +1,227 @@
+package storage
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+	"github.com/zilliztech/milvus-distributed/internal/proto/etcdpb"
+	"github.com/zilliztech/milvus-distributed/internal/proto/schemapb"
+)
+
+func TestInsertCodecWriter(t *testing.T) {
+	base := Base{
+		Version:  1,
+		CommitID: 1,
+		TanentID: 1,
+		Schema: &etcdpb.CollectionMeta{
+			ID:            1,
+			CreateTime:    1,
+			SegmentIDs:    []int64{0, 1},
+			PartitionTags: []string{"partition_0", "partition_1"},
+			Schema: &schemapb.CollectionSchema{
+				Name:        "schema",
+				Description: "schema",
+				AutoID:      true,
+				Fields: []*schemapb.FieldSchema{
+					{
+						Name:         "field_bool",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_BOOL,
+					},
+					{
+						Name:         "field_int8",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT8,
+					},
+					{
+						Name:         "field_int16",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT16,
+					},
+					{
+						Name:         "field_int32",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT32,
+					},
+					{
+						Name:         "field_int64",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT64,
+					},
+					{
+						Name:         "field_float",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_FLOAT,
+					},
+					{
+						Name:         "field_double",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_DOUBLE,
+					},
+					{
+						Name:         "field_string",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_STRING,
+					},
+					{
+						Name:         "field_binary_vector",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_VECTOR_BINARY,
+					},
+					{
+						Name:         "field_float_vector",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_VECTOR_FLOAT,
+					},
+				},
+			},
+		},
+	}
+	insertCodec := &InsertCodec{
+		base,
+		make([]func() error, 0),
+	}
+	insertData := &InsertData{
+		Data: map[int]FieldData{
+			0: BoolFieldData{
+				NumRows: 2,
+				data:    []bool{true, false},
+			},
+			1: Int8FieldData{
+				NumRows: 2,
+				data:    []int8{1, 2},
+			},
+			2: Int16FieldData{
+				NumRows: 2,
+				data:    []int16{1, 2},
+			},
+			3: Int32FieldData{
+				NumRows: 2,
+				data:    []int32{1, 2},
+			},
+			4: Int64FieldData{
+				NumRows: 2,
+				data:    []int64{1, 2},
+			},
+			5: FloatFieldData{
+				NumRows: 2,
+				data:    []float32{1, 2},
+			},
+			6: DoubleFieldData{
+				NumRows: 2,
+				data:    []float64{1, 2},
+			},
+			7: StringFieldData{
+				NumRows: 2,
+				data:    []string{"1", "2"},
+			},
+			8: BinaryVectorFieldData{
+				NumRows: 8,
+				data:    []byte{0, 255, 0, 1, 0, 1, 0, 1},
+				dim:     8,
+			},
+			9: FloatVectorFieldData{
+				NumRows: 1,
+				data:    []float32{0, 1, 2, 3, 4, 5, 6, 7},
+				dim:     8,
+			},
+		},
+	}
+	blobs, err := insertCodec.Serialize(1, 1, 1, insertData, []Timestamp{0, 1})
+	assert.Nil(t, err)
+	partitionID, segmentID, resultData, err := insertCodec.Deserialize(blobs)
+	assert.Nil(t, err)
+	assert.Equal(t, partitionID, int64(1))
+	assert.Equal(t, segmentID, int64(1))
+	assert.Equal(t, insertData, resultData)
+	assert.Nil(t, insertCodec.Close())
+}
+func TestDDCodecWriter(t *testing.T) {
+	base := Base{
+		Version:  1,
+		CommitID: 1,
+		TanentID: 1,
+		Schema: &etcdpb.CollectionMeta{
+			ID:            1,
+			CreateTime:    1,
+			SegmentIDs:    []int64{0, 1},
+			PartitionTags: []string{"partition_0", "partition_1"},
+			Schema: &schemapb.CollectionSchema{
+				Name:        "schema",
+				Description: "schema",
+				AutoID:      true,
+				Fields: []*schemapb.FieldSchema{
+					{
+						Name:         "field_1",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT32,
+					},
+					{
+						Name:         "field_2",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_INT64,
+					},
+					{
+						Name:         "field_3",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_STRING,
+					},
+					{
+						Name:         "field_3",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_STRING,
+					},
+					{
+						Name:         "field_3",
+						IsPrimaryKey: false,
+						Description:  "description_1",
+						DataType:     schemapb.DataType_STRING,
+					},
+				},
+			},
+		},
+	}
+	dataDefinitionCodec := &DataDefinitionCodec{
+		base,
+		make([]func() error, 0),
+	}
+	ts := []Timestamp{
+		1,
+		2,
+		3,
+		4,
+	}
+	ddRequests := []string{
+		"CreateCollection",
+		"DropCollection",
+		"CreatePartition",
+		"DropPartition",
+	}
+	eventTypeCodes := []EventTypeCode{
+		CreateCollectionEventType,
+		DropCollectionEventType,
+		CreatePartitionEventType,
+		DropPartitionEventType,
+	}
+	blobs, err := dataDefinitionCodec.Serialize(1, ts, ddRequests, eventTypeCodes)
+	assert.Nil(t, err)
+	resultTs, resultRequests, err := dataDefinitionCodec.Deserialize(blobs)
+	assert.Nil(t, err)
+	assert.Equal(t, resultTs, ts)
+	assert.Equal(t, resultRequests, ddRequests)
+	assert.Nil(t, dataDefinitionCodec.Close())
+}
diff --git a/internal/storage/event_reader.go b/internal/storage/event_reader.go
index 7bc9b93588..41a76d58cd 100644
--- a/internal/storage/event_reader.go
+++ b/internal/storage/event_reader.go
@@ -90,7 +90,8 @@ func newEventReader(datatype schemapb.DataType, buffer *bytes.Buffer) (*EventRea
 		return nil, err
 	}
 
-	payloadReader, err := NewPayloadReader(datatype, buffer.Bytes())
+	payloadBuffer := buffer.Next(int(reader.EventLength - reader.eventHeader.GetMemoryUsageInBytes() - reader.GetEventDataFixPartSize()))
+	payloadReader, err := NewPayloadReader(datatype, payloadBuffer)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/writenode/data_sync_service.go b/internal/writenode/data_sync_service.go
new file mode 100644
index 0000000000..4c5e8303b7
--- /dev/null
+++ b/internal/writenode/data_sync_service.go
@@ -0,0 +1,80 @@
+package writenode
+
+import (
+	"context"
+	"log"
+
+	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
+)
+
+type dataSyncService struct {
+	ctx context.Context
+	fg  *flowgraph.TimeTickedFlowGraph
+}
+
+func newDataSyncService(ctx context.Context) *dataSyncService {
+
+	return &dataSyncService{
+		ctx: ctx,
+		fg:  nil,
+	}
+}
+
+func (dsService *dataSyncService) start() {
+	dsService.initNodes()
+	dsService.fg.Start()
+}
+
+func (dsService *dataSyncService) close() {
+	if dsService.fg != nil {
+		dsService.fg.Close()
+	}
+}
+
+func (dsService *dataSyncService) initNodes() {
+	// TODO: add delete pipeline support
+
+	dsService.fg = flowgraph.NewTimeTickedFlowGraph(dsService.ctx)
+
+	var dmStreamNode Node = newDmInputNode(dsService.ctx)
+	var filterDmNode Node = newFilteredDmNode()
+	var writeNode Node = newWriteNode()
+	var serviceTimeNode Node = newServiceTimeNode()
+
+	dsService.fg.AddNode(&dmStreamNode)
+	dsService.fg.AddNode(&filterDmNode)
+	dsService.fg.AddNode(&writeNode)
+	dsService.fg.AddNode(&serviceTimeNode)
+
+	var err = dsService.fg.SetEdges(dmStreamNode.Name(),
+		[]string{},
+		[]string{filterDmNode.Name()},
+	)
+	if err != nil {
+		log.Fatal("set edges failed in node:", dmStreamNode.Name())
+	}
+
+	err = dsService.fg.SetEdges(filterDmNode.Name(),
+		[]string{dmStreamNode.Name()},
+		[]string{writeNode.Name()},
+	)
+	if err != nil {
+		log.Fatal("set edges failed in node:", filterDmNode.Name())
+	}
+
+	err = dsService.fg.SetEdges(writeNode.Name(),
+		[]string{filterDmNode.Name()},
+		[]string{serviceTimeNode.Name()},
+	)
+	if err != nil {
+		log.Fatal("set edges failed in node:", writeNode.Name())
+	}
+
+	err = dsService.fg.SetEdges(serviceTimeNode.Name(),
+		[]string{writeNode.Name()},
+		[]string{},
+	)
+	if err != nil {
+		log.Fatal("set edges failed in node:", serviceTimeNode.Name())
+	}
+}
diff --git a/internal/writenode/data_sync_service_test.go b/internal/writenode/data_sync_service_test.go
new file mode 100644
index 0000000000..095699f1a9
--- /dev/null
+++ b/internal/writenode/data_sync_service_test.go
@@ -0,0 +1,143 @@
+package writenode
+
+import (
+	"context"
+	"encoding/binary"
+	"math"
+	"math/rand"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
+)
+
+// NOTE: start pulsar before test
+func TestDataSyncService_Start(t *testing.T) {
+	Params.Init()
+	const ctxTimeInMillisecond = 200
+	const closeWithDeadline = true
+	var ctx context.Context
+
+	if closeWithDeadline {
+		var cancel context.CancelFunc
+		d := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
+		ctx, cancel = context.WithDeadline(context.Background(), d)
+		defer cancel()
+	} else {
+		ctx = context.Background()
+	}
+
+	// init write node
+	pulsarURL, _ := Params.pulsarAddress()
+	node := NewWriteNode(ctx, 0)
+
+	// test data generate
+	const DIM = 16
+	const N = 10
+
+	var vec = [DIM]float32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
+	var rawData []byte
+	for _, ele := range vec {
+		buf := make([]byte, 4)
+		binary.LittleEndian.PutUint32(buf, math.Float32bits(ele))
+		rawData = append(rawData, buf...)
+	}
+	bs := make([]byte, 4)
+	binary.LittleEndian.PutUint32(bs, 1)
+	rawData = append(rawData, bs...)
+	var records []*commonpb.Blob
+	for i := 0; i < N; i++ {
+		blob := &commonpb.Blob{
+			Value: rawData,
+		}
+		records = append(records, blob)
+	}
+
+	timeRange := TimeRange{
+		timestampMin: 0,
+		timestampMax: math.MaxUint64,
+	}
+
+	// messages generate
+	const MSGLENGTH = 10
+	insertMessages := make([]msgstream.TsMsg, 0)
+	for i := 0; i < MSGLENGTH; i++ {
+		randt := rand.Intn(MSGLENGTH)
+		// randt := i
+		var msg msgstream.TsMsg = &msgstream.InsertMsg{
+			BaseMsg: msgstream.BaseMsg{
+				HashValues: []uint32{
+					uint32(i), uint32(i),
+				},
+			},
+			InsertRequest: internalPb.InsertRequest{
+				MsgType:        internalPb.MsgType_kInsert,
+				ReqID:          int64(0),
+				CollectionName: "collection0",
+				PartitionTag:   "default",
+				SegmentID:      int64(0),
+				ChannelID:      int64(0),
+				ProxyID:        int64(0),
+				Timestamps:     []uint64{uint64(randt + 1000), uint64(randt + 1000)},
+				RowIDs:         []int64{int64(i), int64(i)},
+				RowData: []*commonpb.Blob{
+					{Value: rawData},
+					{Value: rawData},
+				},
+			},
+		}
+		insertMessages = append(insertMessages, msg)
+	}
+
+	msgPack := msgstream.MsgPack{
+		BeginTs: timeRange.timestampMin,
+		EndTs:   timeRange.timestampMax,
+		Msgs:    insertMessages,
+	}
+
+	// generate timeTick
+	timeTickMsgPack := msgstream.MsgPack{}
+
+	timeTickMsg := &msgstream.TimeTickMsg{
+		BaseMsg: msgstream.BaseMsg{
+			BeginTimestamp: 0,
+			EndTimestamp:   0,
+			HashValues:     []uint32{0},
+		},
+		TimeTickMsg: internalPb.TimeTickMsg{
+			MsgType:   internalPb.MsgType_kTimeTick,
+			PeerID:    UniqueID(0),
+			Timestamp: math.MaxUint64,
+		},
+	}
+	timeTickMsgPack.Msgs = append(timeTickMsgPack.Msgs, timeTickMsg)
+
+	// pulsar produce
+	const receiveBufSize = 1024
+	producerChannels := Params.insertChannelNames()
+
+	insertStream := msgstream.NewPulsarMsgStream(ctx, receiveBufSize)
+	insertStream.SetPulsarClient(pulsarURL)
+	insertStream.CreatePulsarProducers(producerChannels)
+
+	var insertMsgStream msgstream.MsgStream = insertStream
+	insertMsgStream.Start()
+
+	err := insertMsgStream.Produce(&msgPack)
+	assert.NoError(t, err)
+
+	err = insertMsgStream.Broadcast(&timeTickMsgPack)
+	assert.NoError(t, err)
+
+	// dataSync
+	node.dataSyncService = newDataSyncService(node.ctx)
+	go node.dataSyncService.start()
+
+	node.Close()
+
+	<-ctx.Done()
+}
diff --git a/internal/writenode/flow_graph_filter_dm_node.go b/internal/writenode/flow_graph_filter_dm_node.go
new file mode 100644
index 0000000000..23533f07e2
--- /dev/null
+++ b/internal/writenode/flow_graph_filter_dm_node.go
@@ -0,0 +1,64 @@
+package writenode
+
+import (
+	"log"
+
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	internalPb "github.com/zilliztech/milvus-distributed/internal/proto/internalpb"
+)
+
+type filterDmNode struct {
+	BaseNode
+}
+
+func (fdmNode *filterDmNode) Name() string {
+	return "fdmNode"
+}
+
+func (fdmNode *filterDmNode) Operate(in []*Msg) []*Msg {
+
+	if len(in) != 1 {
+		log.Println("Invalid operate message input in filterDmNode, input length = ", len(in))
+		// TODO: add error handling
+	}
+
+	msMsg, ok := (*in[0]).(*MsgStreamMsg)
+	if !ok {
+		log.Println("type assertion failed for MsgStreamMsg")
+		// TODO: add error handling
+	}
+
+	var iMsg = insertMsg{
+		insertMessages: make([]*msgstream.InsertMsg, 0),
+		timeRange: TimeRange{
+			timestampMin: msMsg.TimestampMin(),
+			timestampMax: msMsg.TimestampMax(),
+		},
+	}
+	for _, msg := range msMsg.TsMessages() {
+		switch msg.Type() {
+		case internalPb.MsgType_kInsert:
+			iMsg.insertMessages = append(iMsg.insertMessages, msg.(*msgstream.InsertMsg))
+		// case internalPb.MsgType_kDelete:
+		// dmMsg.deleteMessages = append(dmMsg.deleteMessages, (*msg).(*msgstream.DeleteTask))
+		default:
+			log.Println("Non supporting message type:", msg.Type())
+		}
+	}
+
+	var res Msg = &iMsg
+	return []*Msg{&res}
+}
+
+func newFilteredDmNode() *filterDmNode {
+	maxQueueLength := Params.flowGraphMaxQueueLength()
+	maxParallelism := Params.flowGraphMaxParallelism()
+
+	baseNode := BaseNode{}
+	baseNode.SetMaxQueueLength(maxQueueLength)
+	baseNode.SetMaxParallelism(maxParallelism)
+
+	return &filterDmNode{
+		BaseNode: baseNode,
+	}
+}
diff --git a/internal/writenode/flow_graph_insert_buffer_node.go b/internal/writenode/flow_graph_insert_buffer_node.go
new file mode 100644
index 0000000000..aa7731802e
--- /dev/null
+++ b/internal/writenode/flow_graph_insert_buffer_node.go
@@ -0,0 +1,61 @@
+package writenode
+
+import (
+	"log"
+)
+
+type (
+	writeNode struct {
+		BaseNode
+	}
+)
+
+func (wNode *writeNode) Name() string {
+	return "wNode"
+}
+
+func (wNode *writeNode) Operate(in []*Msg) []*Msg {
+	log.Println("=========== WriteNode Operating")
+
+	if len(in) != 1 {
+		log.Println("Invalid operate message input in writetNode, input length = ", len(in))
+		// TODO: add error handling
+	}
+
+	iMsg, ok := (*in[0]).(*insertMsg)
+	if !ok {
+		log.Println("type assertion failed for insertMsg")
+		// TODO: add error handling
+	}
+
+	log.Println("=========== insertMsg length:", len(iMsg.insertMessages))
+	for _, task := range iMsg.insertMessages {
+		if len(task.RowIDs) != len(task.Timestamps) || len(task.RowIDs) != len(task.RowData) {
+			log.Println("Error, misaligned messages detected")
+			continue
+		}
+		log.Println("Timestamp: ", task.Timestamps[0])
+		log.Printf("t(%d) : %v ", task.Timestamps[0], task.RowData[0])
+	}
+
+	var res Msg = &serviceTimeMsg{
+		timeRange: iMsg.timeRange,
+	}
+
+	// TODO
+	return []*Msg{&res}
+
+}
+
+func newWriteNode() *writeNode {
+	maxQueueLength := Params.flowGraphMaxQueueLength()
+	maxParallelism := Params.flowGraphMaxParallelism()
+
+	baseNode := BaseNode{}
+	baseNode.SetMaxQueueLength(maxQueueLength)
+	baseNode.SetMaxParallelism(maxParallelism)
+
+	return &writeNode{
+		BaseNode: baseNode,
+	}
+}
diff --git a/internal/writenode/flow_graph_message.go b/internal/writenode/flow_graph_message.go
new file mode 100644
index 0000000000..e6941150b8
--- /dev/null
+++ b/internal/writenode/flow_graph_message.go
@@ -0,0 +1,102 @@
+package writenode
+
+import (
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
+	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
+)
+
+type (
+	Msg          = flowgraph.Msg
+	MsgStreamMsg = flowgraph.MsgStreamMsg
+	SegmentID    = UniqueID
+)
+
+type (
+	key2SegMsg struct {
+		tsMessages []msgstream.TsMsg
+		timeRange  TimeRange
+	}
+
+	schemaUpdateMsg struct {
+		timeRange TimeRange
+	}
+
+	insertMsg struct {
+		insertMessages []*msgstream.InsertMsg
+		timeRange      TimeRange
+	}
+
+	deleteMsg struct {
+		deleteMessages []*msgstream.DeleteMsg
+		timeRange      TimeRange
+	}
+
+	serviceTimeMsg struct {
+		timeRange TimeRange
+	}
+
+	InsertData struct {
+		insertIDs        map[SegmentID][]UniqueID
+		insertTimestamps map[SegmentID][]Timestamp
+		insertRecords    map[SegmentID][]*commonpb.Blob
+		insertOffset     map[SegmentID]int64
+	}
+
+	DeleteData struct {
+		deleteIDs        map[SegmentID][]UniqueID
+		deleteTimestamps map[SegmentID][]Timestamp
+		deleteOffset     map[SegmentID]int64
+	}
+
+	DeleteRecord struct {
+		entityID  UniqueID
+		timestamp Timestamp
+		segmentID UniqueID
+	}
+
+	DeletePreprocessData struct {
+		deleteRecords []*DeleteRecord
+		count         int32
+	}
+)
+
+func (ksMsg *key2SegMsg) TimeTick() Timestamp {
+	return ksMsg.timeRange.timestampMax
+}
+
+func (ksMsg *key2SegMsg) DownStreamNodeIdx() int {
+	return 0
+}
+
+func (suMsg *schemaUpdateMsg) TimeTick() Timestamp {
+	return suMsg.timeRange.timestampMax
+}
+
+func (suMsg *schemaUpdateMsg) DownStreamNodeIdx() int {
+	return 0
+}
+
+func (iMsg *insertMsg) TimeTick() Timestamp {
+	return iMsg.timeRange.timestampMax
+}
+
+func (iMsg *insertMsg) DownStreamNodeIdx() int {
+	return 0
+}
+
+func (dMsg *deleteMsg) TimeTick() Timestamp {
+	return dMsg.timeRange.timestampMax
+}
+
+func (dMsg *deleteMsg) DownStreamNodeIdx() int {
+	return 0
+}
+
+func (stMsg *serviceTimeMsg) TimeTick() Timestamp {
+	return stMsg.timeRange.timestampMax
+}
+
+func (stMsg *serviceTimeMsg) DownStreamNodeIdx() int {
+	return 0
+}
diff --git a/internal/writenode/flow_graph_msg_stream_input_node.go b/internal/writenode/flow_graph_msg_stream_input_node.go
new file mode 100644
index 0000000000..0ec6980367
--- /dev/null
+++ b/internal/writenode/flow_graph_msg_stream_input_node.go
@@ -0,0 +1,39 @@
+package writenode
+
+import (
+	"context"
+	"log"
+
+	"github.com/zilliztech/milvus-distributed/internal/msgstream"
+	"github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
+)
+
+func newDmInputNode(ctx context.Context) *flowgraph.InputNode {
+	receiveBufSize := Params.insertReceiveBufSize()
+	pulsarBufSize := Params.insertPulsarBufSize()
+
+	msgStreamURL, err := Params.pulsarAddress()
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	consumeChannels := Params.insertChannelNames()
+	consumeSubName := Params.msgChannelSubName()
+
+	insertStream := msgstream.NewPulsarTtMsgStream(ctx, receiveBufSize)
+
+	// TODO could panic of nil pointer
+	insertStream.SetPulsarClient(msgStreamURL)
+	unmarshalDispatcher := msgstream.NewUnmarshalDispatcher()
+
+	// TODO could panic of nil pointer
+	insertStream.CreatePulsarConsumers(consumeChannels, consumeSubName, unmarshalDispatcher, pulsarBufSize)
+
+	var stream msgstream.MsgStream = insertStream
+
+	maxQueueLength := Params.flowGraphMaxQueueLength()
+	maxParallelism := Params.flowGraphMaxParallelism()
+
+	node := flowgraph.NewInputNode(&stream, "dmInputNode", maxQueueLength, maxParallelism)
+	return node
+}
diff --git a/internal/writenode/flow_graph_node.go b/internal/writenode/flow_graph_node.go
new file mode 100644
index 0000000000..a857eeb423
--- /dev/null
+++ b/internal/writenode/flow_graph_node.go
@@ -0,0 +1,9 @@
+package writenode
+
+import "github.com/zilliztech/milvus-distributed/internal/util/flowgraph"
+
+type (
+	Node      = flowgraph.Node
+	BaseNode  = flowgraph.BaseNode
+	InputNode = flowgraph.InputNode
+)
diff --git a/internal/writenode/flow_graph_service_time_node.go b/internal/writenode/flow_graph_service_time_node.go
new file mode 100644
index 0000000000..ee6b1869a2
--- /dev/null
+++ b/internal/writenode/flow_graph_service_time_node.go
@@ -0,0 +1,46 @@
+package writenode
+
+import (
+	"log"
+)
+
+type serviceTimeNode struct {
+	BaseNode
+}
+
+func (stNode *serviceTimeNode) Name() string {
+	return "stNode"
+}
+
+func (stNode *serviceTimeNode) Operate(in []*Msg) []*Msg {
+
+	if len(in) != 1 {
+		log.Println("Invalid operate message input in serviceTimeNode, input length = ", len(in))
+		// TODO: add error handling
+	}
+
+	// serviceTimeMsg, ok := (*in[0]).(*serviceTimeMsg)
+	_, ok := (*in[0]).(*serviceTimeMsg)
+	if !ok {
+		log.Println("type assertion failed for serviceTimeMsg")
+		// TODO: add error handling
+	}
+
+	// update service time
+	// (*(*stNode.replica).getTSafe()).set(serviceTimeMsg.timeRange.timestampMax)
+	// fmt.Println("update tSafe to:", getPhysicalTime(serviceTimeMsg.timeRange.timestampMax))
+	return nil
+}
+
+func newServiceTimeNode() *serviceTimeNode {
+	maxQueueLength := Params.flowGraphMaxQueueLength()
+	maxParallelism := Params.flowGraphMaxParallelism()
+
+	baseNode := BaseNode{}
+	baseNode.SetMaxQueueLength(maxQueueLength)
+	baseNode.SetMaxParallelism(maxParallelism)
+
+	return &serviceTimeNode{
+		BaseNode: baseNode,
+	}
+}
diff --git a/internal/writenode/param_table.go b/internal/writenode/param_table.go
new file mode 100644
index 0000000000..e88f5931c5
--- /dev/null
+++ b/internal/writenode/param_table.go
@@ -0,0 +1,236 @@
+package writenode
+
+import (
+	"log"
+	"os"
+	"strconv"
+
+	"github.com/zilliztech/milvus-distributed/internal/util/paramtable"
+)
+
+type ParamTable struct {
+	paramtable.BaseTable
+}
+
+var Params ParamTable
+
+func (p *ParamTable) Init() {
+	p.BaseTable.Init()
+	err := p.LoadYaml("advanced/write_node.yaml")
+	if err != nil {
+		panic(err)
+	}
+
+	writeNodeIDStr := os.Getenv("WRITE_NODE_ID")
+	if writeNodeIDStr == "" {
+		writeNodeIDList := p.WriteNodeIDList()
+		if len(writeNodeIDList) <= 0 {
+			writeNodeIDStr = "0"
+		} else {
+			writeNodeIDStr = strconv.Itoa(int(writeNodeIDList[0]))
+		}
+	}
+	p.Save("_writeNodeID", writeNodeIDStr)
+}
+
+func (p *ParamTable) pulsarAddress() (string, error) {
+	url, err := p.Load("_PulsarAddress")
+	if err != nil {
+		panic(err)
+	}
+	return url, nil
+}
+
+func (p *ParamTable) WriteNodeID() UniqueID {
+	writeNodeID, err := p.Load("_writeNodeID")
+	if err != nil {
+		panic(err)
+	}
+	id, err := strconv.Atoi(writeNodeID)
+	if err != nil {
+		panic(err)
+	}
+	return UniqueID(id)
+}
+
+func (p *ParamTable) insertChannelRange() []int {
+	insertChannelRange, err := p.Load("msgChannel.channelRange.insert")
+	if err != nil {
+		panic(err)
+	}
+
+	return paramtable.ConvertRangeToIntRange(insertChannelRange, ",")
+}
+
+// advanced params
+// stats
+func (p *ParamTable) statsPublishInterval() int {
+	return p.ParseInt("writeNode.stats.publishInterval")
+}
+
+// dataSync:
+func (p *ParamTable) flowGraphMaxQueueLength() int32 {
+	return p.ParseInt32("writeNode.dataSync.flowGraph.maxQueueLength")
+}
+
+func (p *ParamTable) flowGraphMaxParallelism() int32 {
+	return p.ParseInt32("writeNode.dataSync.flowGraph.maxParallelism")
+}
+
+// msgStream
+func (p *ParamTable) insertReceiveBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.insert.recvBufSize")
+}
+
+func (p *ParamTable) insertPulsarBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.insert.pulsarBufSize")
+}
+
+func (p *ParamTable) searchReceiveBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.search.recvBufSize")
+}
+
+func (p *ParamTable) searchPulsarBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.search.pulsarBufSize")
+}
+
+func (p *ParamTable) searchResultReceiveBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.searchResult.recvBufSize")
+}
+
+func (p *ParamTable) statsReceiveBufSize() int64 {
+	return p.ParseInt64("writeNode.msgStream.stats.recvBufSize")
+}
+
+func (p *ParamTable) etcdAddress() string {
+	etcdAddress, err := p.Load("_EtcdAddress")
+	if err != nil {
+		panic(err)
+	}
+	return etcdAddress
+}
+
+func (p *ParamTable) metaRootPath() string {
+	rootPath, err := p.Load("etcd.rootPath")
+	if err != nil {
+		panic(err)
+	}
+	subPath, err := p.Load("etcd.metaSubPath")
+	if err != nil {
+		panic(err)
+	}
+	return rootPath + "/" + subPath
+}
+
+func (p *ParamTable) gracefulTime() int64 {
+	gracefulTime, err := p.Load("writeNode.gracefulTime")
+	if err != nil {
+		panic(err)
+	}
+	time, err := strconv.Atoi(gracefulTime)
+	if err != nil {
+		panic(err)
+	}
+	return int64(time)
+}
+
+func (p *ParamTable) insertChannelNames() []string {
+	prefix, err := p.Load("msgChannel.chanNamePrefix.insert")
+	if err != nil {
+		log.Fatal(err)
+	}
+	channelRange, err := p.Load("msgChannel.channelRange.insert")
+	if err != nil {
+		panic(err)
+	}
+
+	channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
+
+	var ret []string
+	for _, ID := range channelIDs {
+		ret = append(ret, prefix+strconv.Itoa(ID))
+	}
+	sep := len(channelIDs) / p.writeNodeNum()
+	index := p.sliceIndex()
+	if index == -1 {
+		panic("writeNodeID not Match with Config")
+	}
+	start := index * sep
+	return ret[start : start+sep]
+}
+
+func (p *ParamTable) searchChannelNames() []string {
+	prefix, err := p.Load("msgChannel.chanNamePrefix.search")
+	if err != nil {
+		log.Fatal(err)
+	}
+	channelRange, err := p.Load("msgChannel.channelRange.search")
+	if err != nil {
+		panic(err)
+	}
+
+	channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
+
+	var ret []string
+	for _, ID := range channelIDs {
+		ret = append(ret, prefix+strconv.Itoa(ID))
+	}
+	return ret
+}
+
+func (p *ParamTable) searchResultChannelNames() []string {
+	prefix, err := p.Load("msgChannel.chanNamePrefix.searchResult")
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	prefix += "-"
+	channelRange, err := p.Load("msgChannel.channelRange.searchResult")
+	if err != nil {
+		panic(err)
+	}
+
+	channelIDs := paramtable.ConvertRangeToIntSlice(channelRange, ",")
+
+	var ret []string
+	for _, ID := range channelIDs {
+		ret = append(ret, prefix+strconv.Itoa(ID))
+	}
+	return ret
+}
+
+func (p *ParamTable) msgChannelSubName() string {
+	// TODO: subName = namePrefix + "-" + writeNodeID, writeNodeID is assigned by master
+	name, err := p.Load("msgChannel.subNamePrefix.writeNodeSubNamePrefix")
+	if err != nil {
+		log.Panic(err)
+	}
+	writeNodeIDStr, err := p.Load("_WriteNodeID")
+	if err != nil {
+		panic(err)
+	}
+	return name + "-" + writeNodeIDStr
+}
+
+func (p *ParamTable) writeNodeTimeTickChannelName() string {
+	channels, err := p.Load("msgChannel.chanNamePrefix.writeNodeTimeTick")
+	if err != nil {
+		panic(err)
+	}
+	return channels
+}
+
+func (p *ParamTable) sliceIndex() int {
+	writeNodeID := p.WriteNodeID()
+	writeNodeIDList := p.WriteNodeIDList()
+	for i := 0; i < len(writeNodeIDList); i++ {
+		if writeNodeID == writeNodeIDList[i] {
+			return i
+		}
+	}
+	return -1
+}
+
+func (p *ParamTable) writeNodeNum() int {
+	return len(p.WriteNodeIDList())
+}
diff --git a/internal/writenode/param_table_test.go b/internal/writenode/param_table_test.go
new file mode 100644
index 0000000000..0702ec1a32
--- /dev/null
+++ b/internal/writenode/param_table_test.go
@@ -0,0 +1,112 @@
+package writenode
+
+import (
+	"strings"
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestParamTable_WriteNode(t *testing.T) {
+
+	Params.Init()
+
+	t.Run("Test PulsarAddress", func(t *testing.T) {
+		address, err := Params.pulsarAddress()
+		assert.NoError(t, err)
+		split := strings.Split(address, ":")
+		assert.Equal(t, split[0], "pulsar")
+		assert.Equal(t, split[len(split)-1], "6650")
+	})
+
+	t.Run("Test WriteNodeID", func(t *testing.T) {
+		id := Params.WriteNodeID()
+		assert.Equal(t, id, UniqueID(3))
+	})
+
+	t.Run("Test insertChannelRange", func(t *testing.T) {
+		channelRange := Params.insertChannelRange()
+		assert.Equal(t, len(channelRange), 2)
+		assert.Equal(t, channelRange[0], 0)
+		assert.Equal(t, channelRange[1], 2)
+	})
+
+	t.Run("Test statsServiceTimeInterval", func(t *testing.T) {
+		interval := Params.statsPublishInterval()
+		assert.Equal(t, interval, 1000)
+	})
+
+	t.Run("Test statsMsgStreamReceiveBufSize", func(t *testing.T) {
+		bufSize := Params.statsReceiveBufSize()
+		assert.Equal(t, bufSize, int64(64))
+	})
+
+	t.Run("Test insertMsgStreamReceiveBufSize", func(t *testing.T) {
+		bufSize := Params.insertReceiveBufSize()
+		assert.Equal(t, bufSize, int64(1024))
+	})
+
+	t.Run("Test searchMsgStreamReceiveBufSize", func(t *testing.T) {
+		bufSize := Params.searchReceiveBufSize()
+		assert.Equal(t, bufSize, int64(512))
+	})
+
+	t.Run("Test searchResultMsgStreamReceiveBufSize", func(t *testing.T) {
+		bufSize := Params.searchResultReceiveBufSize()
+		assert.Equal(t, bufSize, int64(64))
+	})
+
+	t.Run("Test searchPulsarBufSize", func(t *testing.T) {
+		bufSize := Params.searchPulsarBufSize()
+		assert.Equal(t, bufSize, int64(512))
+	})
+
+	t.Run("Test insertPulsarBufSize", func(t *testing.T) {
+		bufSize := Params.insertPulsarBufSize()
+		assert.Equal(t, bufSize, int64(1024))
+	})
+
+	t.Run("Test flowGraphMaxQueueLength", func(t *testing.T) {
+		length := Params.flowGraphMaxQueueLength()
+		assert.Equal(t, length, int32(1024))
+	})
+
+	t.Run("Test flowGraphMaxParallelism", func(t *testing.T) {
+		maxParallelism := Params.flowGraphMaxParallelism()
+		assert.Equal(t, maxParallelism, int32(1024))
+	})
+
+	t.Run("Test insertChannelNames", func(t *testing.T) {
+		names := Params.insertChannelNames()
+		assert.Equal(t, len(names), 2)
+		assert.Equal(t, names[0], "insert0")
+		assert.Equal(t, names[1], "insert1")
+	})
+
+	t.Run("Test searchChannelNames", func(t *testing.T) {
+		names := Params.searchChannelNames()
+		assert.Equal(t, len(names), 1)
+		assert.Equal(t, names[0], "search0")
+	})
+
+	t.Run("Test searchResultChannelName", func(t *testing.T) {
+		names := Params.searchResultChannelNames()
+		assert.Equal(t, len(names), 1)
+		assert.Equal(t, names[0], "searchResult-0")
+	})
+
+	t.Run("Test msgChannelSubName", func(t *testing.T) {
+		name := Params.msgChannelSubName()
+		assert.Equal(t, name, "writeNode-3")
+	})
+
+	t.Run("Test timeTickChannelName", func(t *testing.T) {
+		name := Params.writeNodeTimeTickChannelName()
+		assert.Equal(t, name, "writeNodeTimeTick")
+	})
+
+	t.Run("Test metaRootPath", func(t *testing.T) {
+		path := Params.metaRootPath()
+		assert.Equal(t, path, "by-dev/meta")
+	})
+}
diff --git a/internal/writenode/type_def.go b/internal/writenode/type_def.go
new file mode 100644
index 0000000000..c87222578e
--- /dev/null
+++ b/internal/writenode/type_def.go
@@ -0,0 +1,15 @@
+package writenode
+
+import "github.com/zilliztech/milvus-distributed/internal/util/typeutil"
+
+type (
+	UniqueID      = typeutil.UniqueID
+	Timestamp     = typeutil.Timestamp
+	IntPrimaryKey = typeutil.IntPrimaryKey
+	DSL           = string
+
+	TimeRange struct {
+		timestampMin Timestamp
+		timestampMax Timestamp
+	}
+)
diff --git a/internal/writenode/write_node.go b/internal/writenode/write_node.go
new file mode 100644
index 0000000000..1542467c8f
--- /dev/null
+++ b/internal/writenode/write_node.go
@@ -0,0 +1,53 @@
+package writenode
+
+import (
+	"context"
+)
+
+type WriteNode struct {
+	ctx context.Context
+
+	WriteNodeID uint64
+
+	dataSyncService *dataSyncService
+}
+
+func NewWriteNode(ctx context.Context, writeNodeID uint64) *WriteNode {
+
+	return &WriteNode{
+		ctx: ctx,
+
+		WriteNodeID: writeNodeID,
+
+		dataSyncService: nil,
+	}
+}
+
+func (node *WriteNode) Start() {
+	node.dataSyncService = newDataSyncService(node.ctx)
+	// node.searchService = newSearchService(node.ctx)
+	// node.metaService = newMetaService(node.ctx)
+	// node.statsService = newStatsService(node.ctx)
+
+	go node.dataSyncService.start()
+	// go node.searchService.start()
+	// go node.metaService.start()
+	// node.statsService.start()
+}
+
+func (node *WriteNode) Close() {
+	<-node.ctx.Done()
+	// free collectionReplica
+	// (*node.replica).freeAll()
+
+	// close services
+	if node.dataSyncService != nil {
+		(*node.dataSyncService).close()
+	}
+	// if node.searchService != nil {
+	//     (*node.searchService).close()
+	// }
+	// if node.statsService != nil {
+	//     (*node.statsService).close()
+	// }
+}
diff --git a/scripts/run_go_unittest.sh b/scripts/run_go_unittest.sh
index a22617e263..b77186c72c 100755
--- a/scripts/run_go_unittest.sh
+++ b/scripts/run_go_unittest.sh
@@ -13,5 +13,5 @@ SCRIPTS_DIR="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
 # ignore Minio,S3 unittes
 MILVUS_DIR="${SCRIPTS_DIR}/../internal/"
 echo $MILVUS_DIR
-go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/proxy/..." -failfast
+go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." "${MILVUS_DIR}/storage" "${MILVUS_DIR}/proxy/..." "${MILVUS_DIR}/writenode/..." -failfast
 #go test -cover "${MILVUS_DIR}/kv/..." "${MILVUS_DIR}/msgstream/..." "${MILVUS_DIR}/master/..." "${MILVUS_DIR}/querynode/..." -failfast