-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkey_join.go
More file actions
100 lines (83 loc) · 2.08 KB
/
key_join.go
File metadata and controls
100 lines (83 loc) · 2.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package flame
import (
"sync"
"golang.org/x/exp/constraints"
)
type KeyJoinNode[K constraints.Ordered, X, Y, Z any] struct {
LeftInput chan KeyValue[K, X]
RightInput chan KeyValue[K, Y]
Outputs []chan KeyValue[K, Z]
Proc func(K, []X, []Y) Z
}
func AddKeyJoin[K constraints.Ordered, X, Y, Z any](w *Workflow, f func(K, []X, []Y) Z) *KeyJoinNode[K, X, Y, Z] {
n := &KeyJoinNode[K, X, Y, Z]{Proc: f, Outputs: []chan KeyValue[K, Z]{}}
w.Nodes = append(w.Nodes, n)
return n
}
func (n *KeyJoinNode[K, X, Y, Z]) GetOutput() chan KeyValue[K, Z] {
m := make(chan KeyValue[K, Z])
n.Outputs = append(n.Outputs, m)
return m
}
func (n *KeyJoinNode[K, X, Y, Z]) ConnectLeft(e Emitter[KeyValue[K, X]]) {
o := e.GetOutput()
n.LeftInput = o
}
func (n *KeyJoinNode[K, X, Y, Z]) ConnectRight(e Emitter[KeyValue[K, Y]]) {
o := e.GetOutput()
n.RightInput = o
}
func (n *KeyJoinNode[K, X, Y, Z]) start(wf *Workflow) {
wf.WaitGroup.Add(1)
wg := &sync.WaitGroup{}
wg.Add(2)
left := map[K][]X{}
right := map[K][]Y{}
go func() {
for i := range n.LeftInput {
if x, ok := left[i.Key]; ok {
left[i.Key] = append(x, i.Value)
} else {
left[i.Key] = []X{i.Value}
}
}
wg.Done()
}()
go func() {
for i := range n.RightInput {
if x, ok := right[i.Key]; ok {
right[i.Key] = append(x, i.Value)
} else {
right[i.Key] = []Y{i.Value}
}
}
wg.Done()
}()
go func() {
wg.Wait()
for key, lVals := range left {
if rVals, ok := right[key]; ok {
for i := range n.Outputs {
n.Outputs[i] <- KeyValue[K, Z]{key, n.Proc(key, lVals, rVals)}
}
}
}
for i := range n.Outputs {
close(n.Outputs[i])
}
wf.WaitGroup.Done()
}()
}
type KeySort[X constraints.Ordered, Y any] []KeyValue[X, Y]
// Swap is part of sort.Interface.
func (s KeySort[X, Y]) Swap(i, j int) {
(s)[i], (s)[j] = (s)[j], (s)[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (s KeySort[X, Y]) Less(i, j int) bool {
return (s)[i].Key < (s)[j].Key
}
// Len is part of sort.Interface.
func (s KeySort[X, Y]) Len() int {
return len(s)
}