1
+ import datetime
1
2
import os
2
3
import os .path
3
4
import string
4
5
import subprocess
5
6
import sys
6
- import time
7
- from typing import Any , Dict
7
+ from dataclasses import dataclass , field
8
+ from pathlib import Path
9
+ from typing import Any , Dict , Optional
8
10
9
11
import yaml
10
12
11
-
12
- def sleep () -> None :
13
- """Sleep forever seconds"""
14
- while True :
15
- time .sleep (1 )
16
-
17
- def get_secrets () -> Dict [str , Any ]:
13
+ @dataclass
14
+ class FileInfo :
15
+ """Info about a specific file we use here"""
16
+ path : Path
17
+ modified : Optional [datetime .datetime ] = None
18
+
19
+ @classmethod
20
+ def from_env (cls , var : str ) -> Optional ['FileInfo' ]:
21
+ """Loads file info from given enviroment variable"""
22
+ name = os .environ .get (var )
23
+ if not name :
24
+ return None
25
+
26
+ file = cls (path = Path (name ).resolve ())
27
+ if file .exists :
28
+ file .modified = datetime .datetime .fromtimestamp (
29
+ file .path .stat ().st_mtime
30
+ )
31
+
32
+ return file
33
+
34
+ @property
35
+ def exists (self ) -> bool :
36
+ """Does this file exist?"""
37
+ return self .path .is_file ()
38
+
39
+ def __str__ (self ) -> str :
40
+ return str (self .path )
41
+
42
+ @dataclass
43
+ class Files :
44
+ """Info about all of our necessary files"""
45
+ keyfile : Optional [FileInfo ] = field (
46
+ default_factory = lambda : FileInfo .from_env ('DCSM_KEYFILE' ))
47
+ secrets : Optional [FileInfo ] = field (
48
+ default_factory = lambda : FileInfo .from_env ('DCSM_SECRETS_FILE' ))
49
+ source : Optional [FileInfo ] = field (
50
+ default_factory = lambda : FileInfo .from_env ('DCSM_SOURCE_FILE' ))
51
+
52
+ def get_secrets (files : Files ) -> Dict [str , Any ]:
18
53
"""Return the secrets as a dictionary"""
19
- keyfile = os .environ .get ('DCSM_KEYFILE' )
20
- secrets_file = os .environ .get ('DCSM_SECRETS_FILE' )
54
+ assert files .keyfile
21
55
22
- if not keyfile or not secrets_file :
23
- raise ValueError ("DCSM_KEYFILE and DCSM_SECRETS_FILE are required" )
24
-
25
- keyfile , secrets_file = os .path .abspath (keyfile ), os .path .abspath (secrets_file )
26
- if not os .path .exists (keyfile ):
27
- raise ValueError (f'DCSM_KEYFILE { keyfile } does not exist' )
28
- if not os .path .exists (secrets_file ):
29
- raise ValueError (f'DCSM_SECRETS_FILE { secrets_file } does not exist' )
56
+ if not files .secrets :
57
+ raise ValueError ("variable DCSM_SECRETS_FILE is required" )
58
+ if not files .secrets .exists :
59
+ raise ValueError (f'DCSM_SECRETS_FILE { files .secrets } does not exist' )
30
60
31
61
process = subprocess .run (
32
- ['age' , '--decrypt' , '--identity' , keyfile , secrets_file ],
62
+ ['age' , '--decrypt' , '--identity' , files . keyfile . path , files . secrets . path ],
33
63
env = {},
34
64
capture_output = True ,
35
65
)
36
66
if process .returncode != 0 :
37
- raise ValueError (f'age failed: { process .stderr .decode ("utf-8" )} ' )
67
+ raise ValueError (f'age decryption failed: { process .stderr .decode ("utf-8" )} ' )
38
68
39
69
output = process .stdout .decode ('utf-8' )
40
70
secrets : Dict [str , Any ] = yaml .safe_load (output )
@@ -68,9 +98,72 @@ def process_dir(dirname: str, secrets: Dict[str, Any]) -> int:
68
98
69
99
return processed
70
100
71
- def run () -> None :
101
+ def encrypt (files : Files ) -> None :
102
+ """Encrypt the source file into the secrets file"""
103
+ assert files .keyfile
104
+
105
+ if not files .secrets :
106
+ raise ValueError ("variable DCSM_SECRETS_FILE is required" )
107
+
108
+ if not files .source :
109
+ raise ValueError ("variable DCSM_SOURCE_FILE is required" )
110
+ if not files .source .exists :
111
+ raise ValueError (f'DCSM_SOURCE_FILE { files .source } does not exist' )
112
+
113
+ source_is_newer = False
114
+ if not files .secrets .exists :
115
+ source_is_newer = True
116
+ elif files .source .modified and files .secrets .modified :
117
+ source_is_newer = files .source .modified > files .secrets .modified
118
+
119
+ if not source_is_newer :
120
+ raise ValueError ('encrypted secrets are newer than secrets source; will not overwrite' )
121
+
122
+ process = subprocess .run (
123
+ ['age' , '--encrypt' , '--identity' , files .keyfile .path , '--output' , files .secrets .path , files .source .path ],
124
+ env = {},
125
+ capture_output = True ,
126
+ )
127
+ if process .returncode != 0 :
128
+ raise ValueError (f'age encryption failed: { process .stderr .decode ("utf-8" )} ' )
129
+
130
+ print (f"successfully encrypted source file { files .source .path } => { files .secrets .path } " )
131
+
132
+ def decrypt (files : Files ) -> None :
133
+ """Decrypt the secrets file back out to the source file"""
134
+ assert files .keyfile
135
+
136
+ if not files .source :
137
+ raise ValueError ("variable DCSM_SOURCE_FILE is required" )
138
+
139
+ if not files .secrets :
140
+ raise ValueError ("variable DCSM_SECRETS_FILE is required" )
141
+ if not files .secrets .exists :
142
+ raise ValueError (f'DCSM_SECRETS_FILE { files .source } does not exist' )
143
+
144
+ secrets_newer = False
145
+ if not files .source .exists :
146
+ secrets_newer = True
147
+ elif files .source .modified and files .secrets .modified :
148
+ secrets_newer = files .secrets .modified > files .source .modified
149
+
150
+ if not secrets_newer :
151
+ raise ValueError ('secret source file is newer than encrypted secrets file; will not overwrite' )
152
+
153
+ process = subprocess .run (
154
+ ['age' , '--decrypt' , '--identity' , files .keyfile .path , '--output' , files .source .path , files .secrets .path ],
155
+ env = {},
156
+ capture_output = True ,
157
+ )
158
+ if process .returncode != 0 :
159
+ raise ValueError (f'age decryption failed: { process .stderr .decode ("utf-8" )} ' )
160
+
161
+ print (f"successfully decrypted secrets file { files .secrets .path } -> { files .source .path } " )
162
+ print ("don't forget to re-encrypt and remove the source file!" )
163
+
164
+ def run (files : Files ) -> None :
72
165
"""Process all template files"""
73
- secrets = get_secrets ()
166
+ secrets = get_secrets (files )
74
167
75
168
processed = 0
76
169
for key , dirname in os .environ .items ():
@@ -86,22 +179,28 @@ def run() -> None:
86
179
87
180
def main () -> None :
88
181
"""DCSN entry point"""
182
+ usage = "Usage: dcsn <run|encrypt|decrypt>"
89
183
try :
90
184
task = sys .argv [1 ]
91
185
except IndexError :
92
- print ("Usage: dcsn <sleep|decrypt>" )
186
+ print (usage )
93
187
sys .exit (1 )
94
188
95
- if task == "sleep" :
96
- try :
97
- sleep ()
98
- except KeyboardInterrupt :
99
- print ("Exiting..." )
100
- sys .exit (0 )
189
+ # we always need the keyfile no matter what we're doing
190
+ files = Files ()
191
+ if not files .keyfile :
192
+ raise ValueError ("variable DCSM_KEYFILE is required" )
193
+ if not files .keyfile .exists :
194
+ raise ValueError (f'DCSM_KEYFILE { files .keyfile } does not exist' )
195
+
196
+ if task == "run" :
197
+ run (files )
198
+ elif task == "encrypt" :
199
+ encrypt (files )
101
200
elif task == "decrypt" :
102
- run ( )
201
+ decrypt ( files )
103
202
else :
104
- print ("Usage: dcsn <sleep|decrypt>" )
203
+ print (usage )
105
204
sys .exit (1 )
106
205
107
206
if __name__ == "__main__" :
0 commit comments