1
- from logging import getLogger
2
-
3
1
import dbus
4
- from _dbus_bindings import PROPERTIES_IFACE
5
-
6
- logger = getLogger ("yunohost.storage" )
2
+ from yunohost .utils .system import binary_to_human
7
3
8
4
9
5
__all__ = ["info" , "list" ]
13
9
UDISK_DRIVE_IFC = "org.freedesktop.UDisks2.Drive"
14
10
15
11
16
- def _humaize (byte_size ):
17
- suffixes = "kMGTPEZYRQ"
18
-
19
- byte_size = float (byte_size )
20
-
21
- if byte_size < 1024 :
22
- return f"{ byte_size } B"
23
-
24
- for i , s in enumerate (suffixes , start = 2 ):
25
- unit = 1024 ** i
26
- if byte_size <= unit :
27
- return f"{ (1024 * (byte_size / unit )):.1f} { s } B"
28
-
29
-
30
- def _query_udisks ():
12
+ def _query_udisks () -> list [tuple [str , dict ]]:
31
13
bus = dbus .SystemBus ()
32
14
manager = dbus .Interface (
33
15
bus .get_object ("org.freedesktop.UDisks2" , "/org/freedesktop/UDisks2" ),
34
16
"org.freedesktop.DBus.ObjectManager" ,
35
17
)
36
18
37
- for name , dev in manager .GetManagedObjects ().items ():
38
- if name .startswith (UDISK_DRIVE_PATH ):
39
- yield name .removeprefix (UDISK_DRIVE_PATH ), dev [UDISK_DRIVE_IFC ]
19
+ return sorted (
20
+ (
21
+ (name .removeprefix (UDISK_DRIVE_PATH ), dev [UDISK_DRIVE_IFC ])
22
+ for name , dev in manager .GetManagedObjects ().items ()
23
+ if name .startswith (UDISK_DRIVE_PATH )
24
+ ),
25
+ key = lambda item : item [1 ]["SortKey" ],
26
+ )
40
27
41
28
42
29
def _disk_infos (name : str , drive : dict , human_readable = False ):
@@ -45,7 +32,7 @@ def _disk_infos(name: str, drive: dict, human_readable=False):
45
32
"model" : drive ["Model" ],
46
33
"serial" : drive ["Serial" ],
47
34
"removable" : bool (drive ["MediaRemovable" ]),
48
- "size" : _humaize (drive ["Size" ]) if human_readable else drive ["Size" ],
35
+ "size" : binary_to_human (drive ["Size" ]) if human_readable else drive ["Size" ],
49
36
}
50
37
51
38
if connection_bus := drive ["ConnectionBus" ]:
@@ -55,7 +42,7 @@ def _disk_infos(name: str, drive: dict, human_readable=False):
55
42
result .update (
56
43
{
57
44
"type" : "HDD" ,
58
- "rpm" : "Unknown" ,
45
+ "rpm" : "Unknown" if human_readable else None ,
59
46
}
60
47
)
61
48
elif rotation_rate == 0 :
@@ -75,15 +62,12 @@ def list(with_info=False, human_readable=False):
75
62
if not with_info :
76
63
return [name for name , _ in _query_udisks ()]
77
64
78
- result = {}
65
+ result = []
79
66
80
67
for name , drive in _query_udisks ():
81
- result [ name ] = _disk_infos (name , drive , human_readable )
68
+ result . append ( _disk_infos (name , drive , human_readable ) )
82
69
83
- if human_readable and not result :
84
- return "No external media found"
85
-
86
- return result
70
+ return {"disks" : result }
87
71
88
72
89
73
def info (name , human_readable = False ):
@@ -92,7 +76,7 @@ def info(name, human_readable=False):
92
76
bus .get_object (
93
77
"org.freedesktop.UDisks2" , f"/org/freedesktop/UDisks2/drives/{ name } "
94
78
),
95
- PROPERTIES_IFACE ,
79
+ dbus . PROPERTIES_IFACE ,
96
80
).GetAll ("org.freedesktop.UDisks2.Drive" )
97
81
98
82
return _disk_infos (name , drive , human_readable )
0 commit comments