generated from jacobtomlinson/python-container-action
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.py
269 lines (251 loc) · 12.4 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
import json
import sys
from actions_toolkit import core as actions_toolkit
from repo_manager.gh.branch_protections import check_repo_branch_protections
from repo_manager.gh.branch_protections import update_branch_protection
from repo_manager.gh.files import copy_file
from repo_manager.gh.files import delete_file
from repo_manager.gh.files import move_file
from repo_manager.gh.files import RemoteSrcNotFoundError
from repo_manager.gh.labels import check_repo_labels
from repo_manager.gh.labels import update_label
from repo_manager.gh.secrets import check_repo_secrets
from repo_manager.gh.secrets import create_secret
from repo_manager.gh.secrets import delete_secret
from repo_manager.gh.settings import check_repo_settings
from repo_manager.gh.settings import update_settings
from repo_manager.schemas import load_config
from repo_manager.utils import get_inputs
from yaml import YAMLError
from pydantic import ValidationError
from repo_manager.gh import GithubException, UnknownObjectException
def main(): # noqa: C901
try:
inputs = get_inputs()
# actions toolkit has very broad exceptions :(
except Exception as exc:
actions_toolkit.set_failed(f"Unable to collect inputs {exc}")
actions_toolkit.debug(f"Loading config from {inputs['settings_file']}")
try:
config = load_config(inputs["settings_file"])
except FileNotFoundError:
actions_toolkit.set_failed(f"{inputs['settings_file']} does not exist or is not readable")
except YAMLError as exc:
actions_toolkit.set_failed(f"Unable to read {inputs['settings_file']} - {exc}")
except ValidationError as exc:
actions_toolkit.set_failed(f"{inputs['settings_file']} is invalid - {exc}")
actions_toolkit.debug(f"Inputs: {inputs}")
if inputs["action"] == "validate":
actions_toolkit.set_output("result", f"Validated {inputs['settings_file']}")
actions_toolkit.debug(json_diff := json.dumps({}))
actions_toolkit.set_output("diff", json_diff)
sys.exit(0)
actions_toolkit.info(f"Config from {inputs['settings_file']} validated.")
check_result = True
diffs = {}
for check, to_check in {
check_repo_settings: ("settings", config.settings),
check_repo_secrets: ("secrets", config.secrets),
check_repo_labels: ("labels", config.labels),
check_repo_branch_protections: (
"branch_protections",
config.branch_protections,
),
}.items():
check_name, to_check = to_check
if to_check is not None:
this_check, this_diffs = check(inputs["repo_object"], to_check)
check_result &= this_check
if this_diffs is not None:
diffs[check_name] = this_diffs
actions_toolkit.debug(json_diff := json.dumps({}))
actions_toolkit.set_output("diff", json_diff)
if inputs["action"] == "check":
if not check_result:
actions_toolkit.set_output("result", "Check failed, diff detected")
actions_toolkit.set_failed("Diff detected")
actions_toolkit.set_output("result", "Check passed")
sys.exit(0)
if inputs["action"] == "apply":
errors = []
# Because we cannot diff secrets, just apply it every time
if config.secrets is not None:
for secret in config.secrets:
if secret.exists:
try:
create_secret(
inputs["repo_object"], secret.key, secret.expected_value, secret.type == "dependabot"
)
actions_toolkit.info(f"Set {secret.key} to expected value")
except Exception as exc: # this should be tighter
errors.append(
{
"type": "secret-update",
"key": secret.key,
"error": f"{exc}",
}
)
else:
try:
delete_secret(inputs["repo_object"], secret.key, secret.type == "dependabot")
actions_toolkit.info(f"Deleted {secret.key}")
except Exception as exc: # this should be tighter
errors.append(
{
"type": "secret-delete",
"key": secret.key,
"error": f"{exc}",
}
)
labels_diff = diffs.get("labels", None)
if labels_diff is not None:
for label_name in labels_diff["extra"]:
try:
this_label = inputs["repo_object"].get_label(label_name)
this_label.delete()
actions_toolkit.info(f"Deleted {label_name}")
except Exception as exc: # this should be tighter
errors.append({"type": "label-delete", "name": label_name, "error": f"{exc}"})
for label_name in labels_diff["missing"]:
label_object = config.labels_dict[label_name]
if label_object.name != label_object.expected_name:
update_label(inputs["repo_object"], label_object)
actions_toolkit.info(f"Renamed {label_object.name} to {label_object.expected_name}")
else:
try:
inputs["repo_object"].create_label(
label_object.expected_name,
label_object.color_no_hash,
label_object.description,
)
actions_toolkit.info(f"Created label {label_name}")
except Exception as exc: # this should be tighter
errors.append(
{
"type": "label-create",
"name": label_name,
"error": f"{exc}",
}
)
for label_name in labels_diff["diffs"].keys():
update_label(inputs["repo_object"], config.labels_dict[label_name])
actions_toolkit.info(f"Updated label {label_name}")
bp_diff = diffs.get("branch_protections", None)
if bp_diff is not None:
# delete branch protection
for branch_name in bp_diff["extra"]:
try:
this_branch = inputs["repo_object"].get_branch(branch_name)
this_branch.remove_protection()
except GithubException as ghexc:
if ghexc.status != 404:
# a 404 on a delete is fine, means it isnt protected
errors.append(
{
"type": "bp-delete",
"name": branch_name,
"error": f"{ghexc}",
}
)
except Exception as exc: # this should be tighter
errors.append({"type": "bp-delete", "name": branch_name, "error": f"{exc}"})
# update or create branch protection
for branch_name in bp_diff["missing"] + list(bp_diff["diffs"].keys()):
try:
bp_config = config.branch_protections_dict[branch_name]
if bp_config.protection is not None:
update_branch_protection(inputs["repo_object"], branch_name, bp_config.protection)
actions_toolkit.info(f"Updated branch proection for {branch_name}")
else:
actions_toolkit.warning(f"Branch protection config for {branch_name} is empty")
except GithubException as ghexc:
if ghexc.status == 404:
actions_toolkit.info(
f"Can't Update branch protection for {branch_name} because the branch does not exist"
)
else:
errors.append(
{
"type": "bp-update",
"name": branch_name,
"error": f"{ghexc}",
}
)
except Exception as exc: # this should be tighter
errors.append({"type": "bp-update", "name": branch_name, "error": f"{exc}"})
if config.settings is not None:
try:
update_settings(inputs["repo_object"], config.settings)
actions_toolkit.info("Synced Settings")
except Exception as exc:
errors.append({"type": "settings-update", "error": f"{exc}"})
commits = []
if config.files is not None:
for file_config in config.files:
# delete files
if not file_config.exists:
try:
commits.append(delete_file(inputs["repo_object"], file_config))
actions_toolkit.info(f"Deleted {str(file_config.dest_file)}")
except UnknownObjectException:
target_branch = (
file_config.target_branch
if file_config.target_branch is not None
else inputs["repo_object"].default_branch
)
actions_toolkit.warning(
f"{str(file_config.dest_file)} does not exist in "
+ f"{target_branch}"
+ " branch. Because this is a delete, not failing run"
)
except Exception as exc:
errors.append({"type": "file-delete", "file": str(file_config.dest_file), "error": f"{exc}"})
elif file_config.move:
try:
copy_commit, delete_commit = move_file(inputs["repo_object"], file_config)
commits.append(copy_commit)
commits.append(delete_commit)
actions_toolkit.info(f"Moved {str(file_config.src_file)} to {str(file_config.dest_file)}")
except RemoteSrcNotFoundError:
target_branch = (
file_config.target_branch
if file_config.target_branch is not None
else inputs["repo_object"].default_branch
)
actions_toolkit.warning(
f"{str(file_config.src_file)} does not exist in "
+ f"{target_branch}"
+ " branch. Because this is a move, not failing run"
)
except Exception as exc:
errors.append(
{
"type": "file-move",
"src_file": str(file_config.src_file),
"dest_file": str(file_config.dest_file),
"error": f"{exc}",
}
)
else:
try:
commits.append(copy_file(inputs["repo_object"], file_config))
actions_toolkit.info(
f"Copied{' remote ' if file_config.remote_src else ' '}{str(file_config.src_file)}"
+ f" to {str(file_config.dest_file)}"
)
except Exception as exc:
errors.append(
{
"type": "file-copy",
"src_file": str(file_config.src_file),
"dest_file": str(file_config.dest_file),
"error": f"{exc}",
}
)
actions_toolkit.info("Commit SHAs: " + ",".join(commits))
if len(errors) > 0:
actions_toolkit.error(json.dumps(errors))
actions_toolkit.set_failed("Errors during apply")
actions_toolkit.set_output("result", "Apply successful")
if __name__ == "__main__":
main()