|
1 | | -from typing import Any |
| 1 | +from typing import TYPE_CHECKING, Any |
2 | 2 |
|
3 | 3 | from fastapi import Request |
4 | 4 | from sqlalchemy import Alias, ColumnElement, Table, and_, or_ |
|
10 | 10 | from backend.common.exception import errors |
11 | 11 | from backend.core.conf import settings |
12 | 12 | from backend.utils.dynamic_import import get_all_models |
| 13 | +from backend.utils.timezone import timezone |
| 14 | + |
| 15 | +if TYPE_CHECKING: |
| 16 | + from backend.app.admin.model import DataRule |
13 | 17 |
|
14 | 18 |
|
15 | 19 | class RequestPermission: |
@@ -69,80 +73,102 @@ def filter_data_permission( # noqa: C901 |
69 | 73 |
|
70 | 74 | # 角色未启用数据权限过滤 |
71 | 75 | for role in request.user.roles: |
72 | | - if not role.is_filter_scopes: |
| 76 | + if role.status and not role.is_filter_scopes: |
73 | 77 | return or_(1 == 1) |
74 | 78 |
|
75 | 79 | # 获取数据规则 |
76 | | - data_rules = set() |
| 80 | + data_rules: set[DataRule] = set() |
77 | 81 | for role in request.user.roles: |
| 82 | + if not role.status: |
| 83 | + continue |
78 | 84 | for scope in role.scopes: |
79 | 85 | if scope.status: |
80 | | - data_rules.update(scope.rules) |
| 86 | + data_rules.update(rule for rule in scope.rules if rule is not None) |
81 | 87 |
|
82 | 88 | if not data_rules: |
83 | 89 | return or_(1 == 1) |
84 | 90 |
|
85 | | - # 获取目标模型 |
86 | | - model_map = ( |
| 91 | + # 目标模型 |
| 92 | + target_model_map = ( |
87 | 93 | {getattr(model, '__name__', str(model)): model for model in models} if models else get_data_permission_models() |
88 | 94 | ) |
89 | 95 |
|
| 96 | + # 字段模板变量映射 |
| 97 | + column_template_resolvers = { |
| 98 | + var['key']: var['key'].strip('_') for var in settings.DATA_PERMISSION_COLUMN_TEMPLATE_VARIABLES |
| 99 | + } |
| 100 | + |
| 101 | + # 模板变量解析映射 |
| 102 | + template_variable_keys = {var['key'] for var in settings.DATA_PERMISSION_TEMPLATE_VARIABLES} |
| 103 | + template_resolvers = { |
| 104 | + '${user_id}': request.user.id, |
| 105 | + '${dept_id}': request.user.dept_id, |
| 106 | + '${now}': timezone.now, |
| 107 | + } |
| 108 | + |
90 | 109 | where_and_list = [] |
91 | 110 | where_or_list = [] |
92 | 111 |
|
93 | 112 | for data_rule in data_rules: |
94 | | - target_model = model_map.get(data_rule.model) |
95 | | - if target_model is None: |
96 | | - continue |
97 | | - |
98 | | - table = target_model if isinstance(target_model, Table) else target_model.__table__ |
99 | | - rule_column = data_rule.column |
100 | | - if rule_column not in table.columns.keys(): |
101 | | - continue |
102 | | - if rule_column in settings.DATA_PERMISSION_COLUMN_EXCLUDE: |
103 | | - continue |
104 | | - |
105 | | - # 构建过滤条件 |
106 | | - column_obj = ( |
107 | | - getattr(target_model, rule_column) if not isinstance(target_model, Table) else table.columns[rule_column] |
108 | | - ) |
109 | | - column_type = table.columns[rule_column].type.python_type |
110 | | - |
111 | | - def cast_value(value: Any) -> Any: |
112 | | - """类型转换""" |
113 | | - try: |
114 | | - return column_type(value) if column_type is not str else value |
115 | | - except (ValueError, TypeError): |
116 | | - return value |
117 | | - |
118 | | - condition = None |
119 | | - match data_rule.expression: |
120 | | - case RoleDataRuleExpressionType.eq: |
121 | | - condition = column_obj == cast_value(data_rule.value) |
122 | | - case RoleDataRuleExpressionType.ne: |
123 | | - condition = column_obj != cast_value(data_rule.value) |
124 | | - case RoleDataRuleExpressionType.gt: |
125 | | - condition = column_obj > cast_value(data_rule.value) |
126 | | - case RoleDataRuleExpressionType.ge: |
127 | | - condition = column_obj >= cast_value(data_rule.value) |
128 | | - case RoleDataRuleExpressionType.lt: |
129 | | - condition = column_obj < cast_value(data_rule.value) |
130 | | - case RoleDataRuleExpressionType.le: |
131 | | - condition = column_obj <= cast_value(data_rule.value) |
132 | | - case RoleDataRuleExpressionType.in_: |
133 | | - values = [cast_value(v.strip()) for v in data_rule.value.split(',')] |
134 | | - condition = column_obj.in_(values) |
135 | | - case RoleDataRuleExpressionType.not_in: |
136 | | - values = [cast_value(v.strip()) for v in data_rule.value.split(',')] |
137 | | - condition = column_obj.not_in(values) |
138 | | - |
139 | | - # 根据运算符添加到对应列表 |
140 | | - if condition is not None: |
141 | | - match data_rule.operator: |
142 | | - case RoleDataRuleOperatorType.AND: |
143 | | - where_and_list.append(condition) |
144 | | - case RoleDataRuleOperatorType.OR: |
145 | | - where_or_list.append(condition) |
| 113 | + if data_rule.model == '__ALL__': |
| 114 | + target_models = list(target_model_map.values()) |
| 115 | + else: |
| 116 | + target_model = target_model_map.get(data_rule.model) |
| 117 | + target_models = [target_model] if target_model is not None else [] |
| 118 | + |
| 119 | + for target_model in target_models: |
| 120 | + table = target_model if isinstance(target_model, Table) else target_model.__table__ |
| 121 | + rule_column = column_template_resolvers.get(data_rule.column, data_rule.column) |
| 122 | + if rule_column not in table.columns.keys(): |
| 123 | + continue |
| 124 | + if rule_column in settings.DATA_PERMISSION_COLUMN_EXCLUDE: |
| 125 | + continue |
| 126 | + |
| 127 | + # 构建过滤条件 |
| 128 | + column_obj = ( |
| 129 | + getattr(target_model, rule_column) |
| 130 | + if not isinstance(target_model, Table) |
| 131 | + else table.columns[rule_column] |
| 132 | + ) |
| 133 | + column_type = table.columns[rule_column].type.python_type |
| 134 | + |
| 135 | + def cast_value(value: Any, _column_type: type = column_type) -> Any: |
| 136 | + """类型转换""" |
| 137 | + try: |
| 138 | + if value in template_variable_keys: |
| 139 | + return _column_type(template_resolvers[value]) |
| 140 | + return _column_type(value) if _column_type is not str else value |
| 141 | + except (ValueError, TypeError): |
| 142 | + return value |
| 143 | + |
| 144 | + condition = None |
| 145 | + match data_rule.expression: |
| 146 | + case RoleDataRuleExpressionType.eq: |
| 147 | + condition = column_obj == cast_value(data_rule.value) |
| 148 | + case RoleDataRuleExpressionType.ne: |
| 149 | + condition = column_obj != cast_value(data_rule.value) |
| 150 | + case RoleDataRuleExpressionType.gt: |
| 151 | + condition = column_obj > cast_value(data_rule.value) |
| 152 | + case RoleDataRuleExpressionType.ge: |
| 153 | + condition = column_obj >= cast_value(data_rule.value) |
| 154 | + case RoleDataRuleExpressionType.lt: |
| 155 | + condition = column_obj < cast_value(data_rule.value) |
| 156 | + case RoleDataRuleExpressionType.le: |
| 157 | + condition = column_obj <= cast_value(data_rule.value) |
| 158 | + case RoleDataRuleExpressionType.in_: |
| 159 | + values = [cast_value(v.strip()) for v in data_rule.value.split(',')] |
| 160 | + condition = column_obj.in_(values) |
| 161 | + case RoleDataRuleExpressionType.not_in: |
| 162 | + values = [cast_value(v.strip()) for v in data_rule.value.split(',')] |
| 163 | + condition = column_obj.not_in(values) |
| 164 | + |
| 165 | + # 根据运算符添加到对应列表 |
| 166 | + if condition is not None: |
| 167 | + match data_rule.operator: |
| 168 | + case RoleDataRuleOperatorType.AND: |
| 169 | + where_and_list.append(condition) |
| 170 | + case RoleDataRuleOperatorType.OR: |
| 171 | + where_or_list.append(condition) |
146 | 172 |
|
147 | 173 | # 组合所有条件 |
148 | 174 | where_list = [] |
|
0 commit comments