This repository was archived by the owner on Jan 10, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 361
/
Copy pathadb_commands.py
264 lines (216 loc) · 8.73 KB
/
adb_commands.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Copyright 2014 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A libusb1-based ADB reimplementation.
ADB was giving us trouble with its client/server architecture, which is great
for users and developers, but not so great for reliable scripting. This will
allow us to more easily catch errors as Python exceptions instead of checking
random exit codes, and all the other great benefits from not going through
subprocess and a network socket.
All timeouts are in milliseconds.
"""
import cStringIO
import os
import socket
import stat
from M2Crypto import RSA
import adb_protocol
import common
import filesync_protocol
# From adb.h
CLASS = 0xFF
SUBCLASS = 0x42
PROTOCOL = 0x01
# pylint: disable=invalid-name
DeviceIsAvailable = common.InterfaceMatcher(CLASS, SUBCLASS, PROTOCOL)
class M2CryptoSigner(adb_protocol.AuthSigner):
"""AuthSigner using M2Crypto."""
def __init__(self, rsa_key_path):
with open(rsa_key_path + '.pub') as rsa_pub_file:
self.public_key = rsa_pub_file.read()
self.rsa_key = RSA.load_key(rsa_key_path)
def Sign(self, data):
return self.rsa_key.sign(data, 'sha1')
def GetPublicKey(self):
return self.public_key
class AdbCommands(object):
"""Exposes adb-like methods for use.
Some methods are more-pythonic and/or have more options.
"""
protocol_handler = adb_protocol.AdbMessage
filesync_handler = filesync_protocol.FilesyncProtocol
@classmethod
def ConnectDevice(
cls, port_path=None, serial=None, default_timeout_ms=None, **kwargs):
"""Convenience function to get an adb device from usb path or serial.
Args:
port_path: The filename of usb port to use.
serial: The serial number of the device to use.
default_timeout_ms: The default timeout in milliseconds to use.
If serial specifies a TCP address:port, then a TCP connection is
used instead of a USB connection.
"""
if serial and ':' in serial:
handle = common.TcpHandle(serial)
else:
handle = common.UsbHandle.FindAndOpen(
DeviceIsAvailable, port_path=port_path, serial=serial,
timeout_ms=default_timeout_ms)
return cls.Connect(handle, **kwargs)
def __init__(self, handle, device_state):
self.handle = handle
self._device_state = device_state
def Close(self):
self.handle.Close()
@classmethod
def Connect(cls, usb, banner=None, **kwargs):
"""Connect to the device.
Args:
usb: UsbHandle or TcpHandle instance to use.
banner: See protocol_handler.Connect.
**kwargs: See protocol_handler.Connect for kwargs. Includes rsa_keys,
and auth_timeout_ms.
Returns:
An instance of this class if the device connected successfully.
"""
if not banner:
banner = socket.gethostname()
device_state = cls.protocol_handler.Connect(usb, banner=banner, **kwargs)
# Remove banner and colons after device state (state::banner)
device_state = device_state.split(':')[0]
return cls(usb, device_state)
@classmethod
def Devices(cls):
"""Get a generator of UsbHandle for devices available."""
return common.UsbHandle.FindDevices(DeviceIsAvailable)
def GetState(self):
return self._device_state
def Install(self, apk_path, destination_dir=None, timeout_ms=None):
"""Install apk to device.
Doesn't support verifier file, instead allows destination directory to be
overridden.
Arguments:
apk_path: Local path to apk to install.
destination_dir: Optional destination directory. Use /system/app/ for
persistent applications.
timeout_ms: Expected timeout for pushing and installing.
Returns:
The pm install output.
"""
if not destination_dir:
destination_dir = '/data/local/tmp/'
basename = os.path.basename(apk_path)
destination_path = destination_dir + basename
self.Push(apk_path, destination_path, timeout_ms=timeout_ms)
return self.Shell('pm install -r "%s"' % destination_path,
timeout_ms=timeout_ms)
def Push(self, source_file, device_filename, mtime='0', timeout_ms=None):
"""Push source_file to file on device.
Arguments:
source_file: Either a filename or file-like object to push to the device.
device_filename: The filename on the device to write to.
mtime: Optional, modification time to set on the file.
timeout_ms: Expected timeout for any part of the push.
"""
connection = self.protocol_handler.Open(
self.handle, destination='sync:',
timeout_ms=timeout_ms)
if isinstance(source_file, basestring):
source_file = open(source_file)
self.filesync_handler.Push(connection, source_file, device_filename,
mtime=int(mtime))
connection.Close()
def Pull(self, device_filename, dest_file=None, timeout_ms=None):
"""Pull file from device.
Arguments:
device_filename: The filename on the device to pull.
dest_file: If set, a filename or writable file-like object.
timeout_ms: Expected timeout for any part of the pull.
Returns:
The file data if dest_file is not set.
"""
filemode, _, _= self.Stat(device_filename)
if stat.S_ISDIR(filemode):
if dest_file is None:
raise ValueError("Must specify dest_file when pulling a directory")
file_list = self.List(device_filename)
if not os.path.exists(dest_file):
os.makedirs(dest_file)
for device_file in file_list:
if device_file.filename not in (".", ".."):
self.Pull(
os.path.join(device_filename, device_file.filename),
os.path.join(dest_file, device_file.filename) if dest_file else None
)
else:
if stat.S_ISLNK(filemode) or stat.S_ISSOCK(filemode):
return
if isinstance(dest_file, basestring):
dest_file = open(dest_file, 'w')
elif not dest_file:
dest_file = cStringIO.StringIO()
connection = self.protocol_handler.Open(
self.handle, destination='sync:',
timeout_ms=timeout_ms)
self.filesync_handler.Pull(connection, device_filename, dest_file)
connection.Close()
# An empty call to cStringIO.StringIO returns an instance of
# cStringIO.OutputType.
if isinstance(dest_file, cStringIO.OutputType):
return dest_file.getvalue()
def Stat(self, device_filename):
"""Get a file's stat() information."""
connection = self.protocol_handler.Open(self.handle, destination='sync:')
mode, size, mtime = self.filesync_handler.Stat(
connection, device_filename)
connection.Close()
return mode, size, mtime
def List(self, device_path):
"""Return a directory listing of the given path."""
connection = self.protocol_handler.Open(self.handle, destination='sync:')
listing = self.filesync_handler.List(connection, device_path)
connection.Close()
return listing
def Reboot(self, destination=''):
"""Reboot device, specify 'bootloader' for fastboot."""
self.protocol_handler.Open(self.handle, 'reboot:%s' % destination)
def RebootBootloader(self):
"""Reboot device into fastboot."""
self.Reboot('bootloader')
def Remount(self):
"""Remount / as read-write."""
return self.protocol_handler.Command(self.handle, service='remount')
def Root(self):
"""Restart adbd as root on device."""
return self.protocol_handler.Command(self.handle, service='root')
def Shell(self, command, timeout_ms=None):
"""Run command on the device, returning the output."""
return self.protocol_handler.Command(
self.handle, service='shell', command=command,
timeout_ms=timeout_ms)
def StreamingShell(self, command, timeout_ms=None):
"""Run command on the device, yielding each line of output.
Args:
command: the command to run on the target.
timeout_ms: Maximum time to allow the command to run.
Yields:
The responses from the shell command.
"""
return self.protocol_handler.StreamingCommand(
self.handle, service='shell', command=command,
timeout_ms=timeout_ms)
def Logcat(self, options, timeout_ms=None):
"""Run 'shell logcat' and stream the output to stdout."""
return self.protocol_handler.StreamingCommand(
self.handle, service='shell', command='logcat %s' % options,
timeout_ms=timeout_ms)