-
Notifications
You must be signed in to change notification settings - Fork 7
/
disk.py
321 lines (267 loc) · 10 KB
/
disk.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""Offers file and disk related functions, like getting a list of
partitions, grepping a file, etc.
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2024071701'
import csv
import os
import re
import tempfile
from . import shell
def file_exists(path, allow_empty=False):
# not finding the file, exit early
if not os.path.exists(path):
return False
# if just the path needs to exists (ie, it can be empty) we are done
if allow_empty:
return True
# file exists but is empty and we dont allow_empty
if os.path.getsize(path) == 0:
return False
# file exists with some content
return True
def get_cwd():
"""Gets the current working directory.
"""
return os.getcwd()
def bd2dmd(device):
"""Get the name of a device mapper device. Instead of using `dmsetup ls`, this solution does
not need sudo-permissions and is straight to the point ("block device to device mapper device").
>>> bd2dmd('dm-0')
'rl_rocky8-root'
>>> bd2dmd('sda') # not a dm device
''
"""
device = os.path.basename(device)
success, result = read_file('/sys/class/block/{}/dm/name'.format(device))
if not success:
return ''
if not result:
return ''
result = '/dev/mapper/{}'.format(result.strip())
if not os.path.islink(result):
return ''
return result
def get_real_disks():
"""Get a list of local devices that are in use and has a filesystem on it.
Build the list like so:
[{'bd': '<block device name>', 'dmd': '<dm device name>', 'mp': '<mountpoint>'}]
>>> get_real_disks()
[{'bd': '/dev/dm-0', 'dmd': '/dev/mapper/rl-root', 'mp:': '/,/home'}]
"""
success, result = read_file('/proc/mounts')
if not success:
return []
disks = {}
for line in result.splitlines():
if not line.startswith('/dev/'):
continue
rd = line.split(' ')
if rd[0].startswith('/dev/mapper/'):
dmdname = rd[0]
bdname = udevadm(dmdname, 'DEVNAME')
else:
bdname = rd[0]
dmdname = udevadm(bdname, 'DM_NAME') # get device mapper device name
if dmdname:
dmdname = '/dev/mapper/{}'.format(dmdname)
if not bdname in disks:
disks[bdname] = {'bd': bdname, 'dmd': dmdname, 'mp': rd[1]}
else:
# disk already listed, append additional mount point
disks[bdname]['mp'] += ' {}'.format(rd[1])
return [i for i in disks.values()]
def get_tmpdir():
""" Return the name of the directory used for temporary files, always
without trailing '/'.
Searches a standard list of directories to find one which the calling user
can create files in. The list is:
* The directory named by the TMPDIR environment variable.
* The directory named by the TEMP environment variable.
* The directory named by the TMP environment variable.
* A platform-specific location:
- On Windows, the directories C:\\TEMP, C:\\TMP, \\TEMP, and \\TMP,
in that order.
- On all other platforms, the directories /tmp, /var/tmp, and /usr/tmp,
in that order.
* As a last resort, the current working directory.
>>> get_tmpdir()
'/tmp'
>>> get_tmpdir()
'C:\\Users\\vagrant\\AppData\\Local\\Temp\\2'
"""
try:
return tempfile.gettempdir()
except:
return '/tmp'
def grep_file(filename, pattern):
"""Like `grep` searches for `pattern` in `filename`. Returns the
match, otherwise `False`.
>>> success, nc_version=lib.disk.grep_file('version.php', r'\\$OC_version=array\\((.*)\\)')
Parameters
----------
filename : str
The file.
pattern : str
A Python regular expression.
Returns
-------
tuple
tuple[0]: bool: if successful (no I/O or file handling errors) or not
tuple[1]: str: the string matched by `pattern` (if any)
"""
try:
with open(filename, 'r') as file:
data = file.read()
except IOError as e:
return (False, 'I/O error "{}" while opening or reading {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error opening or reading {}'.format(filename))
else:
match = re.search(pattern, data).group(1)
return (True, match)
def read_csv(filename, delimiter=',', quotechar='"', newline='', as_dict=False, skip_empty_rows=False):
"""Reads a CSV file, and returns a list or a dict.
"""
try:
with open(filename, newline=newline) as csvfile:
if not as_dict:
reader = csv.reader(csvfile, delimiter=delimiter, quotechar=quotechar)
else:
reader = csv.DictReader(csvfile, delimiter=delimiter, quotechar=quotechar)
data = []
for row in reader:
# check if the list contains empty strings only
if skip_empty_rows and all(s == '' or s.isspace() for s in row):
continue
data.append(row)
return (True, data)
except csv.Error as e:
return (False, 'CSV error in file {}, line {}: {}'.format(filename, reader.line_num, e))
except IOError as e:
return (False, 'I/O error "{}" while opening or reading {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error opening or reading {}'.format(filename))
def read_env(filename, delimiter='='):
"""Reads an shell script for setting environment variables,
and returns a dict with all of them.
Example shell script:
export OS_AUTH_URL="https://api/v3"
export OS_PROJECT_NAME=myproject
# comment
OS_PASSWORD='linuxfabrik'
[ -z "$OS_PASSWORD" ] && read -e -p "Pass: " OS_PASSWORD
export OS_PASSWORD
>>> read_env(filename)
{'OS_AUTH_URL': 'https://api/v3', 'OS_PROJECT_NAME': 'myproject', 'OS_PASSWORD': 'linuxfabrik'}
"""
try:
with open(filename) as envfile:
data = {}
for line in envfile.readlines():
line = line.strip().split(delimiter)
try:
if not line[0].startswith('#'):
data[line[0].replace('export ', '')] = line[1].replace("'", '').replace('"', '')
except:
continue
return (True, data)
except IOError as e:
return (False, 'I/O error "{}" while opening or reading {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error opening or reading {}'.format(filename))
def read_file(filename):
"""Reads a file.
"""
try:
with open(filename, 'r') as f:
data = f.read()
return (True, data)
except IOError as e:
return (False, 'I/O error "{}" while opening or reading {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error opening or reading {}'.format(filename))
def rm_file(filename):
"""Deletes/Removes a file.
>>> rm_file('test.txt')
(True, None)
"""
try:
os.remove(filename)
return (True, None)
except OSError as e:
return (False, 'OS error "{}" while deleting {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error deleting {}'.format(filename))
def udevadm(device, _property):
"""Run `udevadm info`. To support older systems, we can't use the `--property=` parameter and
have to return the desired property on our own.
>>> udevadm('/dev/mapper/rl_rocky8-root', 'DEVNAME')
'/dev/dm-0'
>>> udevadm('/dev/dm-0', 'DM_NAME')
'rl_rocky8-root'
>>> udevadm('/dev/linuxfabrik', 'DEVNAME')
''
"""
success, result = shell.shell_exec('/sbin/udevadm info --query=property --name={}'.format(
device,
))
if not success:
return ''
stdout, _, _ = result
for line in stdout.strip().splitlines():
key, value = line.split('=')
if key == _property:
return value
return ''
def walk_directory(path, exclude_pattern=r'', include_pattern=r'', relative=True):
"""Walks recursively through a directory and creates a list of files.
If an exclude_pattern (regex) is specified, files matching this pattern
are ignored. If an include_pattern (regex) is specified, only files matching
this pattern are put on the list (in this particular order).
>>> walk_directory('/tmp')
['cpu-usage.db', 'segv_output.MCiVt9']
>>> walk_directory('/tmp', exclude_pattern='.*Temp-.*', relative=False)
['/tmp/cpu-usage.db', '/tmp/segv_output.MCiVt9']
"""
if exclude_pattern:
exclude_pattern = re.compile(exclude_pattern, re.IGNORECASE)
if include_pattern:
include_pattern = re.compile(include_pattern, re.IGNORECASE)
if not path.endswith('/'):
path += '/'
result = []
for current, dirs, files in os.walk(path):
for file in files:
file = os.path.join(current, file)
if exclude_pattern and exclude_pattern.match(file) is not None:
continue
if include_pattern and include_pattern.match(file) is None:
continue
if relative:
result.append(file.replace(path, ''))
else:
result.append(file)
return result
def write_file(filename, content, append=False):
"""Writes a string to a file.
>>> write_file('test.txt', 'First line\nSecond line')
(True, None)
"""
try:
with open(filename, 'w' if not append else 'a') as f:
f.write(content)
f.close()
return (True, None)
except IOError as e:
return (False, 'I/O error "{}" while writing {}'.format(e.strerror, filename))
except:
return (False, 'Unknown error writing {}, or content is not a string'.format(filename))