2021-04-19 05:42:47 +00:00
|
|
|
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
|
|
|
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
|
|
|
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
|
|
|
|
2020-11-02 08:44:54 +00:00
|
|
|
package flowgraph
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"math"
|
|
|
|
"math/rand"
|
|
|
|
"sync"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
const ctxTimeInMillisecond = 3000
|
|
|
|
|
|
|
|
type nodeA struct {
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode
|
2020-11-02 08:44:54 +00:00
|
|
|
a float64
|
|
|
|
}
|
|
|
|
|
|
|
|
type nodeB struct {
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode
|
2020-11-02 08:44:54 +00:00
|
|
|
b float64
|
|
|
|
}
|
|
|
|
|
|
|
|
type nodeC struct {
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode
|
2020-11-02 08:44:54 +00:00
|
|
|
c float64
|
|
|
|
}
|
|
|
|
|
|
|
|
type nodeD struct {
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode
|
2020-11-02 08:44:54 +00:00
|
|
|
d float64
|
|
|
|
resChan chan float64
|
|
|
|
}
|
|
|
|
|
|
|
|
type intMsg struct {
|
|
|
|
num float64
|
|
|
|
t Timestamp
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *intMsg) TimeTick() Timestamp {
|
|
|
|
return m.t
|
|
|
|
}
|
|
|
|
|
2020-11-02 11:30:12 +00:00
|
|
|
func (m *intMsg) DownStreamNodeIdx() int {
|
2020-11-02 08:44:54 +00:00
|
|
|
return 1
|
|
|
|
}
|
|
|
|
|
2021-02-25 09:35:36 +00:00
|
|
|
func intMsg2Msg(in []*intMsg) []Msg {
|
|
|
|
out := make([]Msg, 0)
|
2020-11-02 08:44:54 +00:00
|
|
|
for _, msg := range in {
|
|
|
|
var m Msg = msg
|
2021-02-25 09:35:36 +00:00
|
|
|
out = append(out, m)
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
2021-02-25 09:35:36 +00:00
|
|
|
func msg2IntMsg(in []Msg) []*intMsg {
|
2020-11-02 08:44:54 +00:00
|
|
|
out := make([]*intMsg, 0)
|
|
|
|
for _, msg := range in {
|
2021-02-25 09:35:36 +00:00
|
|
|
out = append(out, msg.(*intMsg))
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
return out
|
|
|
|
}
|
|
|
|
|
|
|
|
func (a *nodeA) Name() string {
|
|
|
|
return "NodeA"
|
|
|
|
}
|
|
|
|
|
2021-03-25 06:41:46 +00:00
|
|
|
func (a *nodeA) Operate(in []Msg) []Msg {
|
|
|
|
return append(in, in...)
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (b *nodeB) Name() string {
|
|
|
|
return "NodeB"
|
|
|
|
}
|
|
|
|
|
2021-03-25 06:41:46 +00:00
|
|
|
func (b *nodeB) Operate(in []Msg) []Msg {
|
2020-11-02 08:44:54 +00:00
|
|
|
messages := make([]*intMsg, 0)
|
|
|
|
for _, msg := range msg2IntMsg(in) {
|
|
|
|
messages = append(messages, &intMsg{
|
|
|
|
num: math.Pow(msg.num, 2),
|
|
|
|
})
|
|
|
|
}
|
2021-03-25 06:41:46 +00:00
|
|
|
return intMsg2Msg(messages)
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (c *nodeC) Name() string {
|
|
|
|
return "NodeC"
|
|
|
|
}
|
|
|
|
|
2021-03-25 06:41:46 +00:00
|
|
|
func (c *nodeC) Operate(in []Msg) []Msg {
|
2020-11-02 08:44:54 +00:00
|
|
|
messages := make([]*intMsg, 0)
|
|
|
|
for _, msg := range msg2IntMsg(in) {
|
|
|
|
messages = append(messages, &intMsg{
|
|
|
|
num: math.Sqrt(msg.num),
|
|
|
|
})
|
|
|
|
}
|
2021-03-25 06:41:46 +00:00
|
|
|
return intMsg2Msg(messages)
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (d *nodeD) Name() string {
|
|
|
|
return "NodeD"
|
|
|
|
}
|
|
|
|
|
2021-03-25 06:41:46 +00:00
|
|
|
func (d *nodeD) Operate(in []Msg) []Msg {
|
2020-11-02 08:44:54 +00:00
|
|
|
messages := make([]*intMsg, 0)
|
|
|
|
outLength := len(in) / 2
|
|
|
|
inMessages := msg2IntMsg(in)
|
|
|
|
for i := 0; i < outLength; i++ {
|
|
|
|
var msg = &intMsg{
|
|
|
|
num: inMessages[i].num + inMessages[i+outLength].num,
|
|
|
|
}
|
|
|
|
messages = append(messages, msg)
|
|
|
|
}
|
|
|
|
d.d = messages[0].num
|
|
|
|
d.resChan <- d.d
|
|
|
|
fmt.Println("flow graph result:", d.d)
|
2021-03-25 06:41:46 +00:00
|
|
|
return intMsg2Msg(messages)
|
2020-11-02 08:44:54 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func sendMsgFromCmd(ctx context.Context, fg *TimeTickedFlowGraph) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
time.Sleep(time.Millisecond * time.Duration(500))
|
|
|
|
var num = float64(rand.Int() % 100)
|
|
|
|
var msg Msg = &intMsg{num: num}
|
|
|
|
a := nodeA{}
|
2021-03-25 06:41:46 +00:00
|
|
|
fg.nodeCtx[a.Name()].inputChannels[0] <- msg
|
2020-11-02 08:44:54 +00:00
|
|
|
fmt.Println("send number", num, "to node", a.Name())
|
|
|
|
res, ok := receiveResult(ctx, fg)
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// assert result
|
2020-11-14 03:24:49 +00:00
|
|
|
expect := math.Pow(num, 2) + math.Sqrt(num)
|
2020-12-24 06:10:00 +00:00
|
|
|
resBits := math.Float64bits(res)
|
|
|
|
expBits := math.Float64bits(expect)
|
|
|
|
var diffBits uint64
|
|
|
|
if resBits >= expBits {
|
|
|
|
diffBits = resBits - expBits
|
|
|
|
} else {
|
|
|
|
diffBits = expBits - resBits
|
|
|
|
}
|
|
|
|
if diffBits > 2 {
|
2020-11-02 08:44:54 +00:00
|
|
|
panic("wrong answer")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func receiveResultFromNodeD(res *float64, fg *TimeTickedFlowGraph, wg *sync.WaitGroup) {
|
|
|
|
d := nodeD{}
|
|
|
|
node := fg.nodeCtx[d.Name()]
|
2021-02-25 09:35:36 +00:00
|
|
|
nd, ok := node.node.(*nodeD)
|
2020-11-02 08:44:54 +00:00
|
|
|
if !ok {
|
|
|
|
log.Fatal("not nodeD type")
|
|
|
|
}
|
|
|
|
*res = <-nd.resChan
|
|
|
|
wg.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
func receiveResult(ctx context.Context, fg *TimeTickedFlowGraph) (float64, bool) {
|
|
|
|
d := nodeD{}
|
|
|
|
node := fg.nodeCtx[d.Name()]
|
2021-02-25 09:35:36 +00:00
|
|
|
nd, ok := node.node.(*nodeD)
|
2020-11-02 08:44:54 +00:00
|
|
|
if !ok {
|
|
|
|
log.Fatal("not nodeD type")
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return 0, false
|
|
|
|
case res := <-nd.resChan:
|
|
|
|
return res, true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestTimeTickedFlowGraph_Start(t *testing.T) {
|
2020-11-18 09:32:52 +00:00
|
|
|
const MaxQueueLength = 1024
|
|
|
|
const MaxParallelism = 1024
|
|
|
|
|
2020-11-02 08:44:54 +00:00
|
|
|
duration := time.Now().Add(ctxTimeInMillisecond * time.Millisecond)
|
2020-11-12 04:04:12 +00:00
|
|
|
ctx, cancel := context.WithDeadline(context.Background(), duration)
|
|
|
|
defer cancel()
|
|
|
|
|
2020-11-02 08:44:54 +00:00
|
|
|
fg := NewTimeTickedFlowGraph(ctx)
|
|
|
|
|
|
|
|
var a Node = &nodeA{
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode: BaseNode{
|
2020-11-09 08:27:11 +00:00
|
|
|
maxQueueLength: MaxQueueLength,
|
2020-11-02 08:44:54 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
var b Node = &nodeB{
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode: BaseNode{
|
2020-11-09 08:27:11 +00:00
|
|
|
maxQueueLength: MaxQueueLength,
|
2020-11-02 08:44:54 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
var c Node = &nodeC{
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode: BaseNode{
|
2020-11-09 08:27:11 +00:00
|
|
|
maxQueueLength: MaxQueueLength,
|
2020-11-02 08:44:54 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
var d Node = &nodeD{
|
2020-11-02 11:30:12 +00:00
|
|
|
BaseNode: BaseNode{
|
2020-11-09 08:27:11 +00:00
|
|
|
maxQueueLength: MaxQueueLength,
|
2020-11-02 08:44:54 +00:00
|
|
|
},
|
|
|
|
resChan: make(chan float64),
|
|
|
|
}
|
|
|
|
|
2021-02-25 09:35:36 +00:00
|
|
|
fg.AddNode(a)
|
|
|
|
fg.AddNode(b)
|
|
|
|
fg.AddNode(c)
|
|
|
|
fg.AddNode(d)
|
2020-11-02 08:44:54 +00:00
|
|
|
|
|
|
|
var err = fg.SetEdges(a.Name(),
|
|
|
|
[]string{},
|
|
|
|
[]string{b.Name(), c.Name()},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal("set edges failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = fg.SetEdges(b.Name(),
|
|
|
|
[]string{a.Name()},
|
|
|
|
[]string{d.Name()},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal("set edges failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = fg.SetEdges(c.Name(),
|
|
|
|
[]string{a.Name()},
|
|
|
|
[]string{d.Name()},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal("set edges failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
err = fg.SetEdges(d.Name(),
|
|
|
|
[]string{b.Name(), c.Name()},
|
|
|
|
[]string{},
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
log.Fatal("set edges failed")
|
|
|
|
}
|
|
|
|
|
|
|
|
// init node A
|
|
|
|
nodeCtxA := fg.nodeCtx[a.Name()]
|
2021-03-25 06:41:46 +00:00
|
|
|
nodeCtxA.inputChannels = []chan Msg{make(chan Msg, 10)}
|
2020-11-02 08:44:54 +00:00
|
|
|
|
|
|
|
go fg.Start()
|
|
|
|
|
|
|
|
sendMsgFromCmd(ctx, fg)
|
|
|
|
}
|