@@ -20,14 +20,20 @@ import (
20
20
21
21
type columnEvaluator struct {
22
22
inputIdxToOutputIdxes map [int ][]int
23
+ // mergedInputIdxToOutputIdxes is only determined in runtime when saw the input chunk.
24
+ mergedInputIdxToOutputIdxes map [int ][]int
23
25
}
24
26
25
27
// run evaluates "Column" expressions.
26
28
// NOTE: It should be called after all the other expressions are evaluated
27
29
//
28
30
// since it will change the content of the input Chunk.
29
31
func (e * columnEvaluator ) run (ctx EvalContext , input , output * chunk.Chunk ) error {
30
- for inputIdx , outputIdxes := range e .inputIdxToOutputIdxes {
32
+ // mergedInputIdxToOutputIdxes only can be determined in runtime when we saw the input chunk structure.
33
+ if e .mergedInputIdxToOutputIdxes == nil {
34
+ e .mergeInputIdxToOutputIdxes (input , e .inputIdxToOutputIdxes )
35
+ }
36
+ for inputIdx , outputIdxes := range e .mergedInputIdxToOutputIdxes {
31
37
if err := output .SwapColumn (outputIdxes [0 ], input , inputIdx ); err != nil {
32
38
return err
33
39
}
@@ -38,6 +44,93 @@ func (e *columnEvaluator) run(ctx EvalContext, input, output *chunk.Chunk) error
38
44
return nil
39
45
}
40
46
47
+ // mergeInputIdxToOutputIdxes try to merge two separate inputIdxToOutputIdxes item together when we find
48
+ // there is some column-ref inside the input chunk.
49
+ // imaging a case:
50
+ //
51
+ // scan: a(addr: ???)
52
+ //
53
+ // _________________________// \\
54
+ //
55
+ // proj1: a1(addr:0xe) a2(addr:0xe)
56
+ //
57
+ // _________________// \\ // \\
58
+ //
59
+ // proj2: a3(addr:0xe) a4(addr:0xe) a3(addr:0xe) a4(addr:0xe)
60
+ //
61
+ // when we done with proj1 we can output a chunk with two column2 inside[a1, a2], since this two
62
+ // is derived from single column a from scan, they are already made as column ref with same addr
63
+ // in projection1. say the addr is 0xe for both a1 and a2 here.
64
+ //
65
+ // when we start the projection2, we have two separate <inputIdx,[]outputIdxes> items here, like
66
+ // <0, [0,1]> means project input chunk's 0th column twice, put them in 0th and 1st of output chunk.
67
+ // <1, [2,3]> means project input chunk's 1st column twice, put them in 2nd and 3rd of output chunk.
68
+ //
69
+ // since we do have swap column logic inside projection for each <inputIdx,[]outputIdxes>, after the
70
+ // <0, [0,1]> is applied, the input chunk a1 and a2's address will be swapped as a fake column. like
71
+ //
72
+ // proj1: a1(addr:invalid) a2(addr:invalid)
73
+ //
74
+ // ___________________// \\ // \\
75
+ //
76
+ // proj2: a3(addr:0xe) a4(addr:0xe) a3(addr:???) a4(addr:???)
77
+ //
78
+ // then when we start the projection for second <1, [2,3]>, the targeted column a2 addr has already been
79
+ // swapped as an invalid one. so swapping between a2 <-> a3 and a4 is not safe anymore.
80
+ //
81
+ // keypoint: we should identify the original column ref from input chunk, and merge current inputIdxToOutputIdxes
82
+ // as soon as possible. since input chunk's 0 and 1 is column referred. the final inputIdxToOutputIdxes should
83
+ // be like: <0, [0,1,3,4]>.
84
+ func (e * columnEvaluator ) mergeInputIdxToOutputIdxes (input * chunk.Chunk , inputIdxToOutputIdxes map [int ][]int ) {
85
+ // step1: we should detect the self column-ref inside input chunk.
86
+ selfRef := make (map [int ][]int )
87
+ flag := make ([]bool , input .NumCols ())
88
+ for i := 0 ; i < input .NumCols (); i ++ {
89
+ if flag [i ] {
90
+ continue
91
+ }
92
+ for j := i ; j < input .NumCols (); j ++ {
93
+ if input .Column (i ) == input .Column (j ) {
94
+ // mark referred column to avoid successive detection.
95
+ flag [j ] = true
96
+ selfRef [i ] = append (selfRef [i ], j )
97
+ }
98
+ }
99
+ }
100
+ // step2: leverage this self column-refs to merge inputIdxToOutputIdxes if possible.
101
+ rootIdxToOutputIdxes := make (map [int ][]int )
102
+ var (
103
+ find bool
104
+ root int
105
+ )
106
+ for inputIdx , outputIdxes := range inputIdxToOutputIdxes {
107
+ find = false
108
+ root = inputIdx
109
+ for k , v := range selfRef {
110
+ for _ , one := range v {
111
+ // if current inputIdx is in the value set of one selfRef. output the Root.
112
+ if inputIdx == one {
113
+ find = true
114
+ root = k
115
+ break
116
+ }
117
+ }
118
+ if find {
119
+ break
120
+ }
121
+ }
122
+ // if we found, root should be redirected to real root of inputIdx (link the real root)
123
+ // if we didn't, root should be the original inputIdx as it was. (inputIdx itself is a root)
124
+ if _ , ok := rootIdxToOutputIdxes [root ]; ok {
125
+ rootIdxToOutputIdxes [root ] = append (rootIdxToOutputIdxes [root ], outputIdxes ... )
126
+ } else {
127
+ // if not find in any value set, it means input idx itself is a root.
128
+ rootIdxToOutputIdxes [root ] = append (rootIdxToOutputIdxes [root ], outputIdxes ... )
129
+ }
130
+ }
131
+ e .mergedInputIdxToOutputIdxes = rootIdxToOutputIdxes
132
+ }
133
+
41
134
type defaultEvaluator struct {
42
135
outputIdxes []int
43
136
exprs []Expression
0 commit comments