-
Notifications
You must be signed in to change notification settings - Fork 67
Expand file tree
/
Copy pathexecutor.py
More file actions
131 lines (97 loc) · 4.98 KB
/
executor.py
File metadata and controls
131 lines (97 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# (C) 2022 GoodData Corporation
from __future__ import annotations
from collections.abc import Generator
from typing import Any, ClassVar, NamedTuple
from gooddata_sdk import GoodDataSdk
import gooddata_fdw.column_validation as col_val
from gooddata_fdw import column_utils
from gooddata_fdw.environment import ColumnDefinition, Qual
from gooddata_fdw.filter import extract_filters_from_quals
from gooddata_fdw.options import ServerOptions, TableOptions
from gooddata_fdw.result_reader import InsightTableResultReader, TableResultReader
class InitData(NamedTuple):
sdk: GoodDataSdk
server_options: ServerOptions
table_options: TableOptions
columns: dict[str, ColumnDefinition]
class Executor:
def __init__(self, inputs: InitData, column_validators: list[col_val.ColumnValidator]) -> None:
self._sdk = inputs.sdk
self._table_columns = inputs.columns
self._column_validators = column_validators
@classmethod
def can_react(cls, inputs: InitData) -> bool:
return False
def validate_columns_def(self) -> None:
for column_name, column_def in self._table_columns.items():
for validator in self._column_validators:
validator.validate(column_name, column_def)
def execute(
self, quals: list[Qual], columns: list[str], sort_keys: list[Any] | None = None
) -> Generator[dict[str, Any], None, None]:
raise NotImplementedError()
class InsightExecutor(Executor):
_COLUMN_VALIDATORS: ClassVar[list[col_val.ColumnValidator]] = [
col_val.LocalIdOptionValidator(),
col_val.IdOptionValidator(mandatory=False),
]
def __init__(self, inputs: InitData) -> None:
super().__init__(inputs, self._COLUMN_VALIDATORS)
self._workspace = inputs.table_options.workspace
assert inputs.table_options.insight is not None
self._insight = inputs.table_options.insight
self._table_columns = inputs.columns
@classmethod
def can_react(cls, inputs: InitData) -> bool:
return inputs.table_options.insight is not None
def execute(
self, quals: list[Qual], columns: list[str], sort_keys: list[Any] | None = None
) -> Generator[dict[str, Any], None, None]:
results_reader = InsightTableResultReader(self._table_columns, columns)
insight = self._sdk.visualizations.get_visualization(self._workspace, self._insight)
table = self._sdk.tables.for_visualization(self._workspace, insight)
return results_reader.read_all_rows(table)
class ComputeExecutor(Executor):
_COLUMN_VALIDATORS: list[col_val.ColumnValidator] = [col_val.IdOptionValidator(mandatory=True)]
def __init__(self, inputs: InitData) -> None:
super().__init__(inputs, self._COLUMN_VALIDATORS)
self._workspace = inputs.table_options.workspace
self._results_reader = TableResultReader(self._table_columns)
@classmethod
def can_react(cls, inputs: InitData) -> bool:
return inputs.table_options.compute is not None
def execute(
self, quals: list[Qual], columns: list[str], sort_keys: list[Any] | None = None
) -> Generator[dict[str, Any], None, None]:
col_val.validate_columns_in_table_def(self._table_columns, columns)
items = [column_utils.table_col_as_computable(self._table_columns[col_name]) for col_name in columns]
# TODO: push down more filters that are included in quals
filters = extract_filters_from_quals(quals, self._table_columns)
table = self._sdk.tables.for_items(self._workspace, items, filters)
return self._results_reader.read_all_rows(table)
class CustomExecutor(Executor):
_COLUMN_VALIDATORS: list[col_val.ColumnValidator] = [col_val.IdOptionValidator(mandatory=True)]
def __init__(self, inputs: InitData) -> None:
super().__init__(inputs, self._COLUMN_VALIDATORS)
self._workspace = inputs.table_options.workspace
self._results_reader = TableResultReader(self._table_columns)
@classmethod
def can_react(cls, inputs: InitData) -> bool:
return True
def execute(
self, quals: list[Qual], columns: list[str], sort_keys: list[Any] | None = None
) -> Generator[dict[str, Any], None, None]:
items = [column_utils.table_col_as_computable(col) for col in self._table_columns.values()]
# TODO: pushdown more filters that are included in quals
filters = extract_filters_from_quals(quals, self._table_columns)
table = self._sdk.tables.for_items(self._workspace, items, filters)
return self._results_reader.read_all_rows(table)
class ExecutorFactory:
# Order is important - first executor supporting InitData is used
_SUPPOERTED_EXECUTORS = [InsightExecutor, ComputeExecutor, CustomExecutor]
@classmethod
def create(cls, inputs: InitData) -> Executor:
for executor in cls._SUPPOERTED_EXECUTORS:
if executor.can_react(inputs):
return executor(inputs)
raise ValueError(f"No executor supports initial data {str(inputs)}")