1
- use super :: { time_seria , Schema , SingleAliasedSource } ;
1
+ use super :: { time_series , Schema , SingleAliasedSource } ;
2
2
use crate :: planner:: sql_templates:: PlanSqlTemplates ;
3
3
use crate :: planner:: { BaseJoinCondition , BaseMember , VisitorContext } ;
4
4
use cubenativeutils:: CubeError ;
5
5
6
6
use std:: rc:: Rc ;
7
7
8
8
pub struct RollingWindowJoinCondition {
9
- tailing_interval : Option < String > ,
9
+ data_source : String ,
10
+ time_series_source : String ,
11
+ trailing_interval : Option < String > ,
10
12
leading_interval : Option < String > ,
11
13
offset : String ,
12
- is_from_start_to_end : bool ,
13
- time_dimension : Vec < Rc < BaseMember > > ,
14
+ time_dimension : Rc < dyn BaseMember > ,
14
15
}
15
16
16
17
impl RollingWindowJoinCondition {
17
18
pub fn new (
19
+ data_source : String ,
20
+ time_series_source : String ,
18
21
trailing_interval : Option < String > ,
19
22
leading_interval : Option < String > ,
20
23
offset : String ,
21
- is_from_start_to_end : bool ,
22
- dimensions : Vec < Rc < BaseMember > > ,
24
+ time_dimension : Rc < dyn BaseMember > ,
23
25
) -> Self {
24
26
Self {
25
- tailing_interval,
27
+ data_source,
28
+ time_series_source,
29
+ trailing_interval,
26
30
leading_interval,
27
31
offset,
28
- is_from_start_to_end,
29
32
time_dimension,
30
33
}
31
34
}
32
35
36
+ /*
37
+ *
38
+ offset = offset || 'end';
39
+ return this.timeDimensions.map(
40
+ d => [d, (dateFrom, dateTo, dateField, dimensionDateFrom, dimensionDateTo, isFromStartToEnd) => {
41
+ // dateFrom based window
42
+ const conditions = [];
43
+ if (trailingInterval !== 'unbounded') {
44
+ const startDate = isFromStartToEnd || offset === 'start' ? dateFrom : dateTo;
45
+ const trailingStart = trailingInterval ? this.subtractInterval(startDate, trailingInterval) : startDate;
46
+ const sign = offset === 'start' ? '>=' : '>';
47
+ conditions.push(`${dateField} ${sign} ${trailingStart}`);
48
+ }
49
+ if (leadingInterval !== 'unbounded') {
50
+ const endDate = isFromStartToEnd || offset === 'end' ? dateTo : dateFrom;
51
+ const leadingEnd = leadingInterval ? this.addInterval(endDate, leadingInterval) : endDate;
52
+ const sign = offset === 'end' ? '<=' : '<';
53
+ conditions.push(`${dateField} ${sign} ${leadingEnd}`);
54
+ }
55
+ return conditions.length ? conditions.join(' AND ') : '1 = 1';
56
+ }]
57
+ );
58
+ */
33
59
pub fn to_sql (
34
60
& self ,
35
61
templates : & PlanSqlTemplates ,
36
62
context : Rc < VisitorContext > ,
37
63
schema : Rc < Schema > ,
38
64
) -> Result < String , CubeError > {
39
- let result = if self . dimensions . is_empty ( ) {
40
- format ! ( "1 = 1" )
65
+ let mut conditions = vec ! [ ] ;
66
+ /* let date_column_alias = if let Some(column) = schema.find_column_for_member(&self.time_dimension.full_name(), &None) {
67
+ templates.column_reference(&source, &column.alias.clone())
41
68
} else {
42
- let conditions = vec ! [ ] ;
43
- self . dimensions
44
- . iter ( )
45
- . map ( |dim| -> Result < String , CubeError > {
46
- if let Some ( trailing_interval) = self . trailing_interval {
47
- if tailing_interval == "unbounded" {
48
- let seria_column = "date_from" ,
49
- }
50
- }
69
+ dimension.to_sql(context.clone(), schema.clone())
70
+ } */
71
+ let date_column_alias =
72
+ self . resolve_time_column_alias ( templates, context. clone ( ) , schema. clone ( ) ) ?;
73
+ if let Some ( trailing_interval) = & self . trailing_interval {
74
+ if trailing_interval != "unbounded" {
75
+ let start_date = if self . offset == "start" {
76
+ templates
77
+ . column_reference ( & Some ( self . time_series_source . clone ( ) ) , "date_from" ) ?
78
+ } else {
79
+ templates. column_reference ( & Some ( self . time_series_source . clone ( ) ) , "date_to" ) ?
80
+ } ;
51
81
82
+ let trailing_start = if let Some ( trailing_interval) = & self . trailing_interval {
83
+ format ! ( "{start_date} - interval '{trailing_interval}'" )
84
+ } else {
85
+ start_date
86
+ } ;
52
87
53
- } )
54
- . collect :: < Result < Vec < _ > , _ > > ( ) ?
55
- . join ( " AND " )
88
+ let sign = if self . offset == "start" { ">=" } else { ">" } ;
89
+
90
+ conditions. push ( format ! ( "{date_column_alias} {sign} {trailing_start}" ) ) ;
91
+ }
92
+ }
93
+
94
+ if let Some ( leading_interval) = & self . trailing_interval {
95
+ if leading_interval != "unbounded" {
96
+ let end_date = if self . offset == "end" {
97
+ templates. column_reference ( & Some ( self . time_series_source . clone ( ) ) , "date_to" ) ?
98
+ } else {
99
+ templates
100
+ . column_reference ( & Some ( self . time_series_source . clone ( ) ) , "date_from" ) ?
101
+ } ;
102
+
103
+ let leading_end = if let Some ( leading_interval) = & self . leading_interval {
104
+ format ! ( "{end_date} + interval '{leading_interval}'" )
105
+ } else {
106
+ end_date
107
+ } ;
108
+
109
+ let sign = if self . offset == "end" { "<=" } else { "<" } ;
110
+
111
+ conditions. push ( format ! ( "{date_column_alias} {sign} {leading_end}" ) ) ;
112
+ }
113
+ }
114
+
115
+ let result = if conditions. is_empty ( ) {
116
+ templates. always_true ( ) ?
117
+ } else {
118
+ conditions. join ( " AND " )
56
119
} ;
57
120
Ok ( result)
58
121
}
59
122
60
- fn resolve_member_alias (
123
+ fn resolve_time_column_alias (
61
124
& self ,
62
125
templates : & PlanSqlTemplates ,
63
126
context : Rc < VisitorContext > ,
64
- source : & String ,
65
- dimension : & Rc < dyn BaseMember > ,
66
127
schema : Rc < Schema > ,
67
128
) -> Result < String , CubeError > {
68
- let schema = schema. extract_source_schema ( source) ;
69
- let source = Some ( source. clone ( ) ) ;
70
- if let Some ( column) = schema. find_column_for_member ( & dimension. full_name ( ) , & source) {
129
+ let schema = schema. extract_source_schema ( & self . data_source ) ;
130
+ let source = Some ( self . data_source . clone ( ) ) ;
131
+ if let Some ( column) =
132
+ schema. find_column_for_member ( & self . time_dimension . full_name ( ) , & source)
133
+ {
71
134
templates. column_reference ( & source, & column. alias . clone ( ) )
72
135
} else {
73
- dimension . to_sql ( context. clone ( ) , schema. clone ( ) )
136
+ self . time_dimension . to_sql ( context. clone ( ) , schema. clone ( ) )
74
137
}
75
138
}
76
139
}
@@ -162,6 +225,7 @@ impl DimensionJoinCondition {
162
225
pub enum JoinCondition {
163
226
DimensionJoinCondition ( DimensionJoinCondition ) ,
164
227
BaseJoinCondition ( Rc < dyn BaseJoinCondition > ) ,
228
+ RollingWindowJoinCondition ( RollingWindowJoinCondition ) ,
165
229
}
166
230
167
231
impl JoinCondition {
@@ -179,6 +243,24 @@ impl JoinCondition {
179
243
) )
180
244
}
181
245
246
+ pub fn new_rolling_join (
247
+ data_source : String ,
248
+ time_series_source : String ,
249
+ trailing_interval : Option < String > ,
250
+ leading_interval : Option < String > ,
251
+ offset : String ,
252
+ time_dimension : Rc < dyn BaseMember > ,
253
+ ) -> Self {
254
+ Self :: RollingWindowJoinCondition ( RollingWindowJoinCondition :: new (
255
+ data_source,
256
+ time_series_source,
257
+ trailing_interval,
258
+ leading_interval,
259
+ offset,
260
+ time_dimension,
261
+ ) )
262
+ }
263
+
182
264
pub fn new_base_join ( base : Rc < dyn BaseJoinCondition > ) -> Self {
183
265
Self :: BaseJoinCondition ( base)
184
266
}
@@ -192,6 +274,9 @@ impl JoinCondition {
192
274
match & self {
193
275
JoinCondition :: DimensionJoinCondition ( cond) => cond. to_sql ( templates, context, schema) ,
194
276
JoinCondition :: BaseJoinCondition ( cond) => cond. to_sql ( context, schema) ,
277
+ JoinCondition :: RollingWindowJoinCondition ( cond) => {
278
+ cond. to_sql ( templates, context, schema)
279
+ }
195
280
}
196
281
}
197
282
}
0 commit comments