|
1 | | -from datetime import datetime, timedelta |
| 1 | +from datetime import datetime, timedelta, timezone |
2 | 2 | from typing import Any |
3 | 3 |
|
4 | 4 | from flask import Flask, url_for |
@@ -641,6 +641,64 @@ def test_get_all_possible_role_request_approvers(app: Flask, mocker: MockerFixtu |
641 | 641 | assert users[2] in approvers |
642 | 642 |
|
643 | 643 |
|
| 644 | +def test_role_request_approvers_tagged( |
| 645 | + app: Flask, |
| 646 | + db: SQLAlchemy, |
| 647 | + role_group: RoleGroup, |
| 648 | + okta_group: OktaGroup, |
| 649 | + user: OktaUser, |
| 650 | + tag: Tag, |
| 651 | + mocker: MockerFixture, |
| 652 | +) -> None: |
| 653 | + access_owner = OktaUser.query.filter(OktaUser.email == app.config["CURRENT_OKTA_USER_EMAIL"]).first() |
| 654 | + |
| 655 | + user2 = OktaUserFactory.create() |
| 656 | + user3 = OktaUserFactory.create() |
| 657 | + db.session.add(user) |
| 658 | + db.session.add(user2) |
| 659 | + db.session.add(user3) |
| 660 | + db.session.add(okta_group) |
| 661 | + db.session.commit() |
| 662 | + |
| 663 | + # Add role group that user owns |
| 664 | + db.session.add(role_group) |
| 665 | + db.session.commit() |
| 666 | + |
| 667 | + ModifyGroupUsers(group=role_group, members_to_add=[user2.id], owners_to_add=[user.id], sync_to_okta=False).execute() |
| 668 | + ModifyGroupUsers(group=okta_group, owners_to_add=[user2.id, user3.id], sync_to_okta=False).execute() |
| 669 | + |
| 670 | + # Add tag |
| 671 | + tag.constraints = { |
| 672 | + Tag.DISALLOW_SELF_ADD_MEMBERSHIP_CONSTRAINT_KEY: True, |
| 673 | + } |
| 674 | + db.session.add(tag) |
| 675 | + db.session.commit() |
| 676 | + |
| 677 | + db.session.add(OktaGroupTagMap(group_id=okta_group.id, tag_id=tag.id)) |
| 678 | + db.session.commit() |
| 679 | + |
| 680 | + hook = get_notification_hook() |
| 681 | + request_created_notification_spy = mocker.patch.object(hook, "access_role_request_created") |
| 682 | + request_completed_notification_spy = mocker.patch.object(hook, "access_role_request_completed") |
| 683 | + |
| 684 | + role_request = CreateRoleRequest( |
| 685 | + requester_user=user, |
| 686 | + requester_role=role_group, |
| 687 | + requested_group=okta_group, |
| 688 | + request_reason="test reason", |
| 689 | + ).execute() |
| 690 | + |
| 691 | + assert role_request is not None |
| 692 | + assert request_created_notification_spy.call_count == 1 |
| 693 | + _, kwargs = request_created_notification_spy.call_args |
| 694 | + assert kwargs["role_request"] == role_request |
| 695 | + assert kwargs["role"] == role_group |
| 696 | + assert kwargs["group"] == okta_group |
| 697 | + assert kwargs["requester"] == user |
| 698 | + assert len(kwargs["approvers"]) == 1 |
| 699 | + assert user3 in kwargs["approvers"] |
| 700 | + |
| 701 | + |
644 | 702 | def test_resolve_app_role_request_notification( |
645 | 703 | app: Flask, |
646 | 704 | db: SQLAlchemy, |
@@ -952,6 +1010,166 @@ def test_auto_resolve_create_role_request_with_time_limit_constraint_tag( |
952 | 1010 | assert tag in kwargs["group_tags"] |
953 | 1011 |
|
954 | 1012 |
|
| 1013 | +def test_time_limit_constraint_tag( |
| 1014 | + app: Flask, |
| 1015 | + db: SQLAlchemy, |
| 1016 | + access_app: App, |
| 1017 | + role_group: RoleGroup, |
| 1018 | + app_group: AppGroup, |
| 1019 | + user: OktaUser, |
| 1020 | + tag: Tag, |
| 1021 | + mocker: MockerFixture, |
| 1022 | +) -> None: |
| 1023 | + access_owner = OktaUser.query.filter(OktaUser.email == app.config["CURRENT_OKTA_USER_EMAIL"]).first() |
| 1024 | + |
| 1025 | + # Add User |
| 1026 | + db.session.add(user) |
| 1027 | + db.session.commit() |
| 1028 | + |
| 1029 | + # Add app group that no one owns |
| 1030 | + app_group.app_id = access_app.id |
| 1031 | + app_group.is_owner = False |
| 1032 | + db.session.add(app_group) |
| 1033 | + |
| 1034 | + db.session.commit() |
| 1035 | + |
| 1036 | + # Add role group that user owns |
| 1037 | + db.session.add(role_group) |
| 1038 | + db.session.commit() |
| 1039 | + |
| 1040 | + ModifyGroupUsers(group=role_group, members_to_add=[user.id], owners_to_add=[user.id], sync_to_okta=False).execute() |
| 1041 | + |
| 1042 | + # Add tag |
| 1043 | + tag.constraints = { |
| 1044 | + Tag.MEMBER_TIME_LIMIT_CONSTRAINT_KEY: THREE_DAYS_IN_SECONDS, |
| 1045 | + } |
| 1046 | + db.session.add(tag) |
| 1047 | + db.session.commit() |
| 1048 | + |
| 1049 | + db.session.add(OktaGroupTagMap(group_id=app_group.id, tag_id=tag.id)) |
| 1050 | + db.session.commit() |
| 1051 | + |
| 1052 | + hook = get_notification_hook() |
| 1053 | + request_created_notification_spy = mocker.patch.object(hook, "access_role_request_created") |
| 1054 | + |
| 1055 | + # Make request for more than time limit |
| 1056 | + role_request = CreateRoleRequest( |
| 1057 | + requester_user=user, |
| 1058 | + requester_role=role_group, |
| 1059 | + requested_group=app_group, |
| 1060 | + request_reason="test reason", |
| 1061 | + ).execute() |
| 1062 | + |
| 1063 | + assert role_request is not None |
| 1064 | + assert request_created_notification_spy.call_count == 1 |
| 1065 | + _, kwargs = request_created_notification_spy.call_args |
| 1066 | + assert kwargs["role_request"] == role_request |
| 1067 | + # Ending time is None (more than time limit) |
| 1068 | + assert kwargs["role_request"].request_ending_at == None |
| 1069 | + assert kwargs["role"] == role_group |
| 1070 | + assert kwargs["group"] == app_group |
| 1071 | + assert kwargs["requester"] == user |
| 1072 | + |
| 1073 | + # Try to approve for more than time limit |
| 1074 | + ApproveRoleRequest( |
| 1075 | + role_request=role_request, |
| 1076 | + approver_user=access_owner, |
| 1077 | + ending_at=datetime.now()+timedelta(seconds=SEVEN_DAYS_IN_SECONDS) |
| 1078 | + ).execute() |
| 1079 | + |
| 1080 | + # Should only be approved for time limit if approved time is higher |
| 1081 | + approval = RoleGroupMap.query.filter(RoleGroupMap.role_group == role_group).first() |
| 1082 | + # Make sure approval time is correct (could be a couple milliseconds off from calculated which is okay) |
| 1083 | + approval_time = approval.ended_at.replace(tzinfo=timezone.utc) |
| 1084 | + expected_approval_time = datetime.now(timezone.utc) + timedelta(seconds=THREE_DAYS_IN_SECONDS) |
| 1085 | + assert expected_approval_time > approval_time |
| 1086 | + assert expected_approval_time - approval_time < timedelta(minutes=1) |
| 1087 | + |
| 1088 | + |
| 1089 | +def test_owner_cant_add_self_constraint_tag( |
| 1090 | + app: Flask, |
| 1091 | + db: SQLAlchemy, |
| 1092 | + access_app: App, |
| 1093 | + role_group: RoleGroup, |
| 1094 | + app_group: AppGroup, |
| 1095 | + user: OktaUser, |
| 1096 | + tag: Tag, |
| 1097 | + mocker: MockerFixture, |
| 1098 | +) -> None: |
| 1099 | + access_owner = OktaUser.query.filter(OktaUser.email == app.config["CURRENT_OKTA_USER_EMAIL"]).first() |
| 1100 | + |
| 1101 | + # Add Users |
| 1102 | + user2 = OktaUserFactory.create() |
| 1103 | + db.session.add(user) |
| 1104 | + db.session.add(user2) |
| 1105 | + db.session.commit() |
| 1106 | + |
| 1107 | + # Add app group that no one owns |
| 1108 | + app_group.app_id = access_app.id |
| 1109 | + app_group.is_owner = False |
| 1110 | + db.session.add(app_group) |
| 1111 | + |
| 1112 | + db.session.commit() |
| 1113 | + |
| 1114 | + # Add role group that user owns |
| 1115 | + db.session.add(role_group) |
| 1116 | + db.session.commit() |
| 1117 | + |
| 1118 | + ModifyGroupUsers(group=role_group, members_to_add=[user.id, user2.id], owners_to_add=[user.id], sync_to_okta=False).execute() |
| 1119 | + ModifyGroupUsers(group=app_group, owners_to_add=[user2.id], sync_to_okta=False).execute() |
| 1120 | + |
| 1121 | + # Add tag |
| 1122 | + tag.constraints = { |
| 1123 | + Tag.DISALLOW_SELF_ADD_MEMBERSHIP_CONSTRAINT_KEY: True, |
| 1124 | + } |
| 1125 | + db.session.add(tag) |
| 1126 | + db.session.commit() |
| 1127 | + |
| 1128 | + db.session.add(OktaGroupTagMap(group_id=app_group.id, tag_id=tag.id)) |
| 1129 | + db.session.commit() |
| 1130 | + |
| 1131 | + hook = get_notification_hook() |
| 1132 | + request_created_notification_spy = mocker.patch.object(hook, "access_role_request_created") |
| 1133 | + request_completed_notification_spy = mocker.patch.object(hook, "access_role_request_completed") |
| 1134 | + add_membership_spy = mocker.patch.object(okta, "async_add_user_to_group") |
| 1135 | + |
| 1136 | + role_request = CreateRoleRequest( |
| 1137 | + requester_user=user, |
| 1138 | + requester_role=role_group, |
| 1139 | + requested_group=app_group, |
| 1140 | + request_reason="test reason", |
| 1141 | + ).execute() |
| 1142 | + |
| 1143 | + assert role_request is not None |
| 1144 | + assert request_created_notification_spy.call_count == 1 |
| 1145 | + _, kwargs = request_created_notification_spy.call_args |
| 1146 | + assert kwargs["role_request"] == role_request |
| 1147 | + assert kwargs["role"] == role_group |
| 1148 | + assert kwargs["group"] == app_group |
| 1149 | + assert kwargs["requester"] == user |
| 1150 | + |
| 1151 | + # user2 owns the group and is a member of the requester role, should fail |
| 1152 | + should_fail = ApproveRoleRequest( |
| 1153 | + role_request=role_request, |
| 1154 | + approver_user=user2, |
| 1155 | + ).execute() |
| 1156 | + assert should_fail.status == AccessRequestStatus.PENDING |
| 1157 | + |
| 1158 | + should_pass = ApproveRoleRequest( |
| 1159 | + role_request=role_request, |
| 1160 | + approver_user=access_owner, |
| 1161 | + ).execute() |
| 1162 | + assert should_pass.status == AccessRequestStatus.APPROVED |
| 1163 | + |
| 1164 | + assert add_membership_spy.call_count == 2 |
| 1165 | + assert request_completed_notification_spy.call_count == 1 |
| 1166 | + _, kwargs = request_completed_notification_spy.call_args |
| 1167 | + assert kwargs["role_request"] == should_pass |
| 1168 | + assert kwargs["role"] == role_group |
| 1169 | + assert kwargs["group"] == app_group |
| 1170 | + assert kwargs["requester"] == user |
| 1171 | + |
| 1172 | + |
955 | 1173 | def test_role_request_approval_via_direct_add( |
956 | 1174 | client: FlaskClient, |
957 | 1175 | app: Flask, |
|
0 commit comments