Skip to content
This repository was archived by the owner on Jan 10, 2023. It is now read-only.

Fastboot Boot support, Android 9 fix, Fixed PycryptodomeAuthSigner's sign method #152

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion adb/adb_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,9 @@ def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progres
device_filename: Destination on the device to write to.
mtime: Optional, modification time to set on the file.
timeout_ms: Expected timeout for any part of the push.
st_mode: stat mode for filename
progress_callback: callback method that accepts filename, bytes_written and total_bytes,
total_bytes will be -1 for file-like objects
st_mode: stat mode for filename
"""

if isinstance(source_file, str):
Expand Down
8 changes: 6 additions & 2 deletions adb/adb_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import struct
import time
import re
from io import BytesIO
from adb import usb_exceptions

Expand Down Expand Up @@ -323,7 +324,7 @@ def Connect(cls, usb, banner=b'notadb', rsa_keys=None, auth_timeout_ms=100):
'Unknown AUTH response: %s %s %s' % (arg0, arg1, banner))

# Do not mangle the banner property here by converting it to a string
signed_token = rsa_key.Sign(banner)
signed_token = rsa_key.Sign(banner) + b'\0'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very important for Android 9 due to a change in struct apacket's 'data' field in adb.h

msg = cls(
command=b'AUTH', arg0=AUTH_SIGNATURE, arg1=0, data=signed_token)
msg.Send(usb)
Expand Down Expand Up @@ -551,8 +552,11 @@ def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, str
stdout = stdout.split(b'\r\r\n')[1]

# Strip delim if requested
# TODO: Handling stripping partial delims here - not a deal breaker the way we're handling it now
if delim and strip_delim:
prefix_exp = re.compile(r'(?P<prefix>\d{1,3}\|)'+delim.decode('utf-8', errors='ignore'))
match = re.match(prefix_exp, stdout.decode('utf-8', errors='ignore'))
if match:
stdout = stdout.replace(str(match.group('prefix') + delim).encode('utf-8'), b'')
stdout = stdout.replace(delim, b'')

stdout = stdout.rstrip()
Expand Down
3 changes: 2 additions & 1 deletion adb/common_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import re
import sys
import types
import traceback

from adb import usb_exceptions

Expand Down Expand Up @@ -158,7 +159,7 @@ def StartCli(args, adb_commands, extra=None, **device_kwargs):
try:
return _RunMethod(dev, args, extra or {})
except Exception as e: # pylint: disable=broad-except
sys.stdout.write(str(e))
sys.stdout.write(traceback.format_exc())
return 1
finally:
dev.Close()
71 changes: 51 additions & 20 deletions adb/fastboot.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import logging
import os
import struct
from io import BytesIO, StringIO

from adb import common
from adb import usb_exceptions
Expand Down Expand Up @@ -99,9 +100,9 @@ def HandleSimpleResponses(
info_cb: Optional callback for text sent from the bootloader.

Returns:
OKAY packet's message.
OKAY packet's message
"""
return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms)
return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms)[0]

def HandleDataSending(self, source_file, source_len,
info_cb=DEFAULT_MESSAGE_CALLBACK,
Expand All @@ -123,9 +124,9 @@ def HandleDataSending(self, source_file, source_len,
FastbootInvalidResponse: Fastboot responded with an unknown packet type.

Returns:
OKAY packet's message.
tuple (OKAY packet's message, List of preceding Fastboot Messages)
"""
accepted_size = self._AcceptResponses(
accepted_size, _msgs = self._AcceptResponses(
b'DATA', info_cb, timeout_ms=timeout_ms)

accepted_size = binascii.unhexlify(accepted_size[:8])
Expand All @@ -151,24 +152,33 @@ def _AcceptResponses(self, expected_header, info_cb, timeout_ms=None):
FastbootInvalidResponse: Fastboot responded with an unknown packet type.

Returns:
OKAY packet's message.
tuple (OKAY packet's message, List of preceding Fastboot Messages)
"""

messages = []

while True:
response = self.usb.BulkRead(64, timeout_ms=timeout_ms)
header = bytes(response[:4])
remaining = bytes(response[4:])

if header == b'INFO':
info_cb(FastbootMessage(remaining, header))
fbm = FastbootMessage(remaining, header)
messages.append(fbm)
info_cb(fbm)
elif header in self.FINAL_HEADERS:
if header != expected_header:
raise FastbootStateMismatch(
'Expected %s, got %s', expected_header, header)
if header == b'OKAY':
info_cb(FastbootMessage(remaining, header))
return remaining
fbm = FastbootMessage(remaining, header)
messages.append(fbm)
info_cb(fbm)
return remaining, messages
elif header == b'FAIL':
info_cb(FastbootMessage(remaining, header))
fbm = FastbootMessage(remaining, header)
messages.append(fbm)
info_cb(fbm)
raise FastbootRemoteFailure('FAIL: %s', remaining)
else:
raise FastbootInvalidResponse(
Expand All @@ -188,6 +198,7 @@ def _HandleProgress(self, total, progress_callback):

def _Write(self, data, length, progress_callback=None):
"""Sends the data to the device, tracking progress with the callback."""
progress = None
if progress_callback:
progress = self._HandleProgress(length, progress_callback)
next(progress)
Expand Down Expand Up @@ -310,20 +321,24 @@ def Download(self, source_file, source_len=0,
Returns:
Response to a download request, normally nothing.
"""

if isinstance(source_file, str):
source_file_path = str(source_file)
source_len = os.stat(source_file).st_size
source_file = open(source_file)

with source_file:
if source_len == 0:
# Fall back to storing it all in memory :(
data = source_file.read()
source_file = io.BytesIO(data.encode('utf8'))
source_len = len(data)
with open(source_file_path, 'rb') as fh:
source_file = BytesIO(fh.read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we still need the data.encode('utf8') or was that a bug in the first place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By data.encode('utf-8') do you mean the BytesIO creation?

The behavior here was improved so that a caller to Download() can either pass in a file-like object directly or a string file path. In the event they pass in a string, we have to convert it into a file like object before it's passed into HandleDataSending(). This statement was previously leaving a file handle open, so we read the contents and put it into BytesIO so that it has file-like properties.

If this isn't what you meant then let me know!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see what you're referring too. The diff below shows us running data.encode('utf-8') on the file contents. This was a bug because the images that are Downloaded are almost always binary/outside the utf-8 space and would fail.


if not source_len:
if isinstance(source_file, StringIO):
source_file.seek(0, os.SEEK_END)
source_len = source_file.tell()
source_file.seek(0)
else:
source_len = len(source_file)

self._protocol.SendCommand(b'download', b'%08x' % source_len)
return self._protocol.HandleDataSending(
source_file, source_len, info_cb, progress_callback=progress_callback)
self._protocol.SendCommand(b'download', b'%08x' % source_len)
return self._protocol.HandleDataSending(
source_file, source_len, info_cb, progress_callback=progress_callback)[0]

def Flash(self, partition, timeout_ms=0, info_cb=DEFAULT_MESSAGE_CALLBACK):
"""Flashes the last downloaded file to the given partition.
Expand Down Expand Up @@ -396,3 +411,19 @@ def Reboot(self, target_mode=b'', timeout_ms=None):
def RebootBootloader(self, timeout_ms=None):
"""Reboots into the bootloader, usually equiv to Reboot('bootloader')."""
return self._SimpleCommand(b'reboot-bootloader', timeout_ms=timeout_ms)

def Boot(self, source_file):
"""Fastboot boot image by sending image from local file system then issuing the boot command

Args:
source_file: String file path to the image to send and boot

Returns:
None
"""

if not os.path.exists(source_file):
raise ValueError("source_file must exist")

self.Download(source_file)
self._SimpleCommand(b'boot')
3 changes: 3 additions & 0 deletions adb/fastboot_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ def main():
subparsers, parents, fastboot.FastbootCommands.Oem)
common_cli.MakeSubparser(
subparsers, parents, fastboot.FastbootCommands.Reboot)
common_cli.MakeSubparser(
subparsers, parents, fastboot.FastbootCommands.Boot,
{'source_file': 'Image file on the host to push and boot'})

if len(sys.argv) == 1:
parser.print_help()
Expand Down
46 changes: 43 additions & 3 deletions adb/sign_pycryptodome.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from adb import adb_protocol

from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Util import number


class PycryptodomeAuthSigner(adb_protocol.AuthSigner):
Expand All @@ -18,8 +18,48 @@ def __init__(self, rsa_key_path=None):
self.rsa_key = RSA.import_key(rsa_priv_file.read())

def Sign(self, data):
h = SHA256.new(data)
return pkcs1_15.new(self.rsa_key).sign(h)
# Prepend precomputed ASN1 hash code for SHA1
data = b'\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14' + data
pkcs = pkcs1_15.new(self.rsa_key)

# See 8.2.1 in RFC3447
modBits = number.size(pkcs._key.n)
k = pkcs1_15.ceil_div(modBits,8) # Convert from bits to bytes

# Step 2a (OS2IP)
em_int = pkcs1_15.bytes_to_long(PycryptodomeAuthSigner._pad_for_signing(data, k))
# Step 2b (RSASP1)
m_int = pkcs._key._decrypt(em_int)
# Step 2c (I2OSP)
signature = pkcs1_15.long_to_bytes(m_int, k)

return signature

def GetPublicKey(self):
return self.public_key

@staticmethod
def _pad_for_signing(message, target_length):
"""Pads the message for signing, returning the padded message.

The padding is always a repetition of FF bytes.

Function from python-rsa to replace _EMSA_PKCS1_V1_5_ENCODE's for our use case

:return: 00 01 PADDING 00 MESSAGE

"""

max_msglength = target_length - 11
msglength = len(message)

if msglength > max_msglength:
raise OverflowError('%i bytes needed for message, but there is only'
' space for %i' % (msglength, max_msglength))

padding_length = target_length - msglength - 3

return b''.join([b'\x00\x01',
padding_length * b'\xff',
b'\x00',
message])