From c88faf86c9b57859af9ec2b9836da85a327e3b02 Mon Sep 17 00:00:00 2001 From: jackjack-jj Date: Sat, 6 Feb 2021 21:57:57 +0100 Subject: [PATCH] python 3 support, fix recovery, add -d/-w shortcuts and add --tests --- pywallet.py | 898 +++++++++++++++++++++++++++++----------------------- 1 file changed, 505 insertions(+), 393 deletions(-) diff --git a/pywallet.py b/pywallet.py index 08822d8..cf24ea0 100644 --- a/pywallet.py +++ b/pywallet.py @@ -1,5 +1,6 @@ #!/usr/bin/env python #-*- coding: utf-8 -*- +from __future__ import print_function pywversion="2.2" never_update=False @@ -12,9 +13,20 @@ import sys PY3 = sys.version_info.major > 2 -try:raw_input -except:raw_input = input -beta_version = ('a' in pywversion.split('-')[0]) or ('b' in pywversion.split('-')[0]) + +import warnings +def warning_on_one_line(message, category, filename, lineno, file=None, line=None): + return '%s:%s: %s: %s\n' % (filename, lineno, category.__name__, message) +warnings.formatwarning = warning_on_one_line +if PY3: + warnings.warn("Python 3 support is still experimental, you may encounter bugs") + import _thread as thread + raw_input = input + xrange = range + long = int + unicode = str +else: + import thread missing_dep = [] @@ -31,23 +43,18 @@ pyw_path = os.path.dirname(os.path.realpath(__file__)) try: - import json + import simplejson as json except: - try: - import simplejson as json - except: - print("Json or simplejson package is needed") + import json import hmac import getpass import logging import struct -import StringIO import traceback import socket import types import string -import exceptions import hashlib import random import urllib @@ -55,7 +62,9 @@ import base64 import collections import weakref +import binascii from types import MethodType +import unittest from datetime import datetime from subprocess import * @@ -64,15 +73,56 @@ import os.path import platform +def ordsix(x): + if x.__class__ == int:return x + return ord(x) +def chrsix(x): + if not(x.__class__ in [int, long]):return x + if PY3:return bytes([x]) + return chr(x) + +def str_to_bytes(k): + if k.__class__ == str and not hasattr(k, 'decode'): + return bytes(k, 'ascii') + return k +def bytes_to_str(k): + if k.__class__ == bytes: + return k.decode() + if k.__class__ == unicode: + return bytes_to_str(k.encode()) + return k + +class Bdict(dict): + def __init__(self, *a, **kw): + super(Bdict, self).__init__(*a, **kw) + for k, v in self.copy().items(): + self[bytes_to_str(k)] = v + del self[k] + def update(self, *a, **kw): + other = self.__class__(*a, **kw) + return super(Bdict, self).update(other) + def pop(self, k, *a): + return super(Bdict, self).pop(bytes_to_str(k), *a) + def get(self, k, default=None): + return super(Bdict, self).get(bytes_to_str(k), default) + def __getitem__(self, k): + return super(Bdict, self).__getitem__(bytes_to_str(k)) + def __setitem__(self, k, v): + return super(Bdict, self).__setitem__(bytes_to_str(k), v) + def __contains__(self, k): + return super(Bdict, self).__contains__(bytes_to_str(k)) + def __repr__(self): + return '%s(%s)'%(self.__class__.__name__, super(Bdict, self).__repr__()) + max_version = 81000 -json_db = {} +json_db = Bdict({}) private_keys = [] private_hex_keys = [] passphrase = "" global_merging_message = ["",""] balance_site = 'https://blockchain.info/q/addressbalance/' -aversions = {}; +aversions = {} for i in range(256): aversions[i] = "version %d" % i; aversions[0] = 'Bitcoin'; @@ -81,23 +131,23 @@ aversions[111] = 'Testnet'; class Network(collections.namedtuple('Network', 'name p2pkh_prefix p2sh_prefix wif_prefix segwit_hrp')): - instances = [] - def __init__(self, *a, **kw): - self.__class__.instances.append(self) - super(Network, self).__init__() - def keyinfo(self, *a): - pass + instances = [] + def __init__(self, *a, **kw): + self.__class__.instances.append(self) + super(Network, self).__init__() + def keyinfo(self, *a): + pass def ethereum_keyinfo(self, keyinfo): if keyinfo.compressed:return ethpubkey = keyinfo.public_key[1:] - eth_addr = Keccak256(ethpubkey).digest()[-20:].encode('hex') + eth_addr = binascii.hexlify(Keccak256(ethpubkey).digest()[-20:]) print("Ethereum address: %s"%eth_addr) print("Ethereum B58address: %s"%public_key_to_bc_address(eth_addr, 33)) network_bitcoin = Network('Bitcoin', 0, 5, 0x80, 'bc') network_bitcoin_testnet3 = Network('Bitcoin-Testnet3', 0x6f, 0xc4, 0xef, 'tb') network_ethereum = Network('Ethereum', 0, 5, 0x80, 'eth') -network_ethereum.keyinfo = MethodType(ethereum_keyinfo, network_ethereum, Network) +network_ethereum.keyinfo = MethodType(ethereum_keyinfo, network_ethereum) network = network_bitcoin wallet_dir = "" @@ -112,8 +162,8 @@ def ethereum_keyinfo(self, keyinfo): To = 1e12 Tio = 1024 ** 4 -prekeys = ["308201130201010420".decode('hex'), "308201120201010420".decode('hex')] -postkeys = ["a081a530".decode('hex'), "81a530".decode('hex')] +prekeys = [binascii.unhexlify("308201130201010420"), binascii.unhexlify("308201120201010420")] +postkeys = [binascii.unhexlify("a081a530"), binascii.unhexlify("81a530")] KeyInfo = collections.namedtuple('KeyInfo', 'secret private_key public_key addr wif compressed') @@ -151,7 +201,7 @@ def determine_db_name(): RoundConstants=[1,32898,0x800000000000808a,0x8000000080008000,32907,2147483649,0x8000000080008081,0x8000000000008009,138,136,2147516425,2147483658,2147516555,0x800000000000008b,0x8000000000008089,0x8000000000008003,0x8000000000008002,0x8000000000000080,32778,0x800000008000000a,0x8000000080008081,0x8000000000008080,2147483649,0x8000000080008008] RotationConstants=[[0,1,62,28,27],[36,44,6,55,20],[3,10,43,25,39],[41,45,15,21,8],[18,2,61,56,14]] Masks=[(1<>bits-left;bot=(value&Masks[bits-left])<>right;bot=(value&Masks[right])< 16: raise ValueError("String ending with %r can't be PCKS7-padded" % s[-1]) return s[:-numpads] @@ -659,7 +709,7 @@ def convertString(self, string, start, end, mode): while len(ar) < end - start: ar.append(0) while i < end: - ar[j] = ord(string[i]) + ar[j] = ordsix(string[i]) j += 1 i += 1 return ar @@ -685,7 +735,7 @@ def encrypt(self, stringIn, mode, key, size, IV): # char firstRound firstRound = True if stringIn != None: - for j in range(int(math.ceil(float(len(stringIn))/16))): + for j in range(int(math.ceil(float(len(stringIn))//16))): start = j*16 end = j*16+16 if end > len(stringIn): @@ -761,11 +811,11 @@ def decrypt(self, cipherIn, originalsize, mode, key, size, IV): output = [] plaintext = [0] * 16 # the output plain text string - stringOut = '' + stringOut = b'' # char firstRound firstRound = True if cipherIn != None: - for j in range(int(math.ceil(float(len(cipherIn))/16))): + for j in range(int(math.ceil(float(len(cipherIn))//16))): start = j*16 end = j*16+16 if j*16+16 > len(cipherIn): @@ -787,7 +837,7 @@ def decrypt(self, cipherIn, originalsize, mode, key, size, IV): else: plaintext[i] = output[i] ^ ciphertext[i] for k in range(end-start): - stringOut += chr(plaintext[k]) + stringOut += chrsix(plaintext[k]) iput = ciphertext elif mode == self.modeOfOperation["OFB"]: if firstRound: @@ -805,7 +855,7 @@ def decrypt(self, cipherIn, originalsize, mode, key, size, IV): else: plaintext[i] = output[i] ^ ciphertext[i] for k in range(end-start): - stringOut += chr(plaintext[k]) + stringOut += chrsix(plaintext[k]) iput = output elif mode == self.modeOfOperation["CBC"]: output = self.aes.decrypt(ciphertext, key, size) @@ -815,12 +865,12 @@ def decrypt(self, cipherIn, originalsize, mode, key, size, IV): else: plaintext[i] = iput[i] ^ output[i] firstRound = False - if originalsize is not None and originalsize < end: + if not(originalsize is None) and originalsize < end: for k in range(originalsize-start): - stringOut += chr(plaintext[k]) + stringOut += chrsix(plaintext[k]) else: for k in range(end-start): - stringOut += chr(plaintext[k]) + stringOut += chrsix(plaintext[k]) iput = ciphertext return stringOut @@ -836,7 +886,7 @@ class Crypter_pycrypto( object ): def SetKeyFromPassphrase(self, vKeyData, vSalt, nDerivIterations, nDerivationMethod): if nDerivationMethod != 0: return 0 - data = vKeyData + vSalt + data = str_to_bytes(vKeyData) + vSalt for i in xrange(nDerivIterations): data = hashlib.sha512(data).digest() self.SetKey(data[0:32]) @@ -909,7 +959,7 @@ def __init__(self): def SetKeyFromPassphrase(self, vKeyData, vSalt, nDerivIterations, nDerivationMethod): if nDerivationMethod != 0: return 0 - data = vKeyData + vSalt + data = str_to_bytes(vKeyData) + vSalt for i in xrange(nDerivIterations): data = hashlib.sha512(data).digest() self.SetKey(data[0:32]) @@ -917,17 +967,17 @@ def SetKeyFromPassphrase(self, vKeyData, vSalt, nDerivIterations, nDerivationMet return len(data) def SetKey(self, key): - self.chKey = [ord(i) for i in key] + self.chKey = [ordsix(i) for i in key] def SetIV(self, iv): - self.chIV = [ord(i) for i in iv] + self.chIV = [ordsix(i) for i in iv] def Encrypt(self, data): mode, size, cypher = self.m.encrypt(append_PKCS7_padding(data), self.cbc, self.chKey, self.sz, self.chIV) - return ''.join(map(chr, cypher)) + return b''.join(map(chrsix, cypher)) def Decrypt(self, data): - chData = [ord(i) for i in data] + chData = [ordsix(i) for i in data] return self.m.decrypt(chData, self.sz, self.cbc, self.chKey, self.sz, self.chIV) crypter = None @@ -953,7 +1003,7 @@ def Decrypt(self, data): def bytes_to_int(bytes): result = 0 for b in bytes: - result = result * 256 + ord(b) + result = result * 256 + ordsix(b) return result def int_to_bytes(value, length = None): @@ -1050,7 +1100,7 @@ def leftmost_bit( x ): assert x > 0 result = 1 while result <= x: result = 2 * result - return result / 2 + return result // 2 e = other if self.__order: e = e % self.__order @@ -1059,13 +1109,13 @@ def leftmost_bit( x ): assert e > 0 e3 = 3 * e negative_self = Point( self.__curve, self.__x, -self.__y, self.__order ) - i = leftmost_bit( e3 ) / 2 + i = leftmost_bit( e3 ) // 2 result = self while i > 1: result = result.double() if ( e3 & i ) != 0 and ( e & i ) == 0: result = result + self if ( e3 & i ) == 0 and ( e & i ) != 0: result = result + negative_self - i = i / 2 + i = i // 2 return result def __rmul__( self, other ): @@ -1153,7 +1203,7 @@ def ser(self): else: pk='04%064x%064x' % (self.point.x(), self.point.y()) - return pk.decode('hex') + return binascii.unhexlify(pk) def get_addr(self, v=0): return public_key_to_bc_address(self.ser(), v) @@ -1161,9 +1211,9 @@ def get_addr(self, v=0): @classmethod def from_ser(cls, g, ser): if len(ser) == 33: - return cls(g, Point(g.curve(), bytes_to_int(ser[1:]), y_odd = ord(ser[0]) == 3), ord(ser[0]) < 4) + return cls(g, Point(g.curve(), bytes_to_int(ser[1:]), y_odd = ordsix(ser[0]) == 3), ordsix(ser[0]) < 4) elif len(ser) == 65: - return cls(g, Point(g.curve(), bytes_to_int(ser[1:33]), bytes_to_int(ser[33:])), ord(ser[0]) < 4) + return cls(g, Point(g.curve(), bytes_to_int(ser[1:33]), bytes_to_int(ser[33:])), ordsix(ser[0]) < 4) raise Exception("Bad public key format: %s"%repr(ser)) class Private_key( object ): @@ -1177,7 +1227,7 @@ def der( self ): 'a00706052b8104000aa14403420004' + \ '%064x' % self.public_key.point.x() + \ '%064x' % self.public_key.point.y() - return hex_der_key.decode('hex') + return binascii.unhexlify(hex_der_key) def sign( self, hash, random_k ): G = self.public_key.generator @@ -1229,7 +1279,7 @@ def i2d_ECPrivateKey(pkey, compressed=False):#, crypted=True): '%064x' % _r + \ '020101a144034200' - return key.decode('hex') + i2o_ECPublicKey(pkey, compressed) + return binascii.unhexlify(key) + i2o_ECPublicKey(pkey, compressed) def i2o_ECPublicKey(pkey, compressed=False): # public keys are 65 bytes long (520 bits) @@ -1246,7 +1296,7 @@ def i2o_ECPublicKey(pkey, compressed=False): '%064x' % pkey.pubkey.point.x() + \ '%064x' % pkey.pubkey.point.y() - return key.decode('hex') + return binascii.unhexlify(key) # bitcointools hashes and base58 implementation @@ -1264,7 +1314,7 @@ def public_key_to_bc_address(public_key, v=None): def hash_160_to_bc_address(h160, v=None): if v==None: v=network.p2pkh_prefix - vh160 = chr(v) + h160 + vh160 = chrsix(v) + h160 h = Hash(vh160) addr = vh160 + h[0:4] return b58encode(addr) @@ -1282,7 +1332,7 @@ def b58encode(v, __b58chars=__b58chars): long_value = 0 for (i, c) in enumerate(v[::-1]): - long_value += (256**i) * ord(c) + long_value += (256**i) * ordsix(c) result = '' while long_value >= __b58base: @@ -1295,7 +1345,7 @@ def b58encode(v, __b58chars=__b58chars): # leading 0-bytes in the input become leading-1s nPad = 0 for c in v: - if c == '\0': nPad += 1 + if chrsix(c) == b'\0': nPad += 1 else: break return (__b58chars[0]*nPad) + result @@ -1308,20 +1358,20 @@ def b58decode(v, length, __b58chars=__b58chars): for (i, c) in enumerate(v[::-1]): long_value += __b58chars.find(c) * (__b58base**i) - result = '' + result = b'' while long_value >= 256: div, mod = divmod(long_value, 256) - result = chr(mod) + result + result = chrsix(mod) + result long_value = div - result = chr(long_value) + result + result = chrsix(long_value) + result nPad = 0 for c in v: if c == __b58chars[0]: nPad += 1 else: break - result = chr(0)*nPad + result - if length is not None and len(result) != length: + result = chrsix(0)*nPad + result + if not(length is None) and len(result) != length: return None return result @@ -1330,9 +1380,6 @@ def b58decode(v, length, __b58chars=__b58chars): # address handling code -def long_hex(bytes): - return bytes.encode('hex_codec') - def Hash(data): return hashlib.sha256(hashlib.sha256(data).digest()).digest() @@ -1355,7 +1402,7 @@ def str_to_long(b): res = 0 pos = 1 for a in reversed(b): - res += ord(a) * pos + res += ordsix(a) * pos pos *= 256 return res @@ -1366,17 +1413,17 @@ def PrivKeyToSecret(privkey): return privkey[8:8+32] def SecretToASecret(secret, compressed=False): - prefix = chr(network.wif_prefix) + prefix = chrsix(network.wif_prefix) vchIn = prefix + secret - if compressed: vchIn += '\01' + if compressed: vchIn += b'\01' return EncodeBase58Check(vchIn) def ASecretToSecret(sec): vch = DecodeBase58Check(sec) if not vch: return False - if vch[0] != chr(network.wif_prefix): - print('Warning: adress prefix seems bad (%d vs %d)'%(ord(vch[0]), network.wif_prefix)) + if ordsix(vch[0]) != network.wif_prefix: + print('Warning: adress prefix seems bad (%d vs %d)'%(ordsix(vch[0]), network.wif_prefix)) return vch[1:] def regenerate_key(sec): @@ -1384,7 +1431,7 @@ def regenerate_key(sec): if not b: return False b = b[0:32] - secret = int('0x' + b.encode('hex'), 16) + secret = int(b'0x' + binascii.hexlify(b), 16) return EC_KEY(secret) def GetPubKey(pkey, compressed=False): @@ -1394,7 +1441,7 @@ def GetPrivKey(pkey, compressed=False): return i2d_ECPrivateKey(pkey, compressed) def GetSecret(pkey): - return ('%064x' % pkey.secret).decode('hex') + return binascii.unhexlify('%064x' % pkey.secret) def is_compressed(sec): b = ASecretToSecret(sec) @@ -1408,7 +1455,7 @@ def create_env(db_dir): return db_env def parse_CAddress(vds): - d = {'ip':'0.0.0.0','port':0,'nTime': 0} + d = Bdict({'ip':'0.0.0.0','port':0,'nTime': 0}) try: d['nVersion'] = vds.read_int32() d['nTime'] = vds.read_uint32() @@ -1424,15 +1471,15 @@ def deserialize_CAddress(d): return d['ip']+":"+str(d['port']) def parse_BlockLocator(vds): - d = { 'hashes' : [] } + d = Bdict({ 'hashes' : [] }) nHashes = vds.read_compact_size() for i in xrange(nHashes): d['hashes'].append(vds.read_bytes(32)) return d def deserialize_BlockLocator(d): - result = "Block Locator top: "+d['hashes'][0][::-1].encode('hex_codec') - return result + result = "Block Locator top: " + binascii.hexlify(d['hashes'][0][::-1]) + return result def parse_setting(setting, vds): if setting[0] == "f": # flag (boolean) settings @@ -1449,6 +1496,16 @@ def parse_setting(setting, vds): class SerializationError(Exception): """ Thrown when there's a problem deserializing or serializing """ +def overlapped_read(f, sz, overlap, maxlen=None): + buffer = b'' + stop = False + total_read = 0 + while not stop and (not maxlen or maxlen > total_read): + new_content = os.read(f, sz) + if not new_content:break + total_read += len(new_content) + buffer = buffer[-overlap:] + new_content + yield buffer def search_patterns_on_disk(device, size, inc, patternlist): # inc must be higher than 1k try: @@ -1463,20 +1520,19 @@ def search_patterns_on_disk(device, size, inc, patternlist): # inc must be hig exit(0) i = 0 - data='' + data=b'' tzero=time.time() sizetokeep=0 BlocksToInspect=dict(map(lambda x:[x,[]], patternlist)) - syst=systype() lendataloaded=None writeProgressEvery=100*Mo while i < int(size) and (lendataloaded!=0 or lendataloaded==None): - if int(i/writeProgressEvery)!=int((i+inc)/writeProgressEvery): - print("%.2f Go read"%(i/1e9)) + if int(i//writeProgressEvery)!=int((i+inc)//writeProgressEvery): + print("%.2f Go read"%(i//1e9)) try: - datakept=data[-sizetokeep:] - data = datakept+os.read(fd, inc) + datakept = data[-sizetokeep:] + data = datakept + os.read(fd, inc) lendataloaded = len(data)-len(datakept) #should be inc for text in patternlist: if text in data: @@ -1493,11 +1549,11 @@ def search_patterns_on_disk(device, size, inc, patternlist): # inc must be hig continue os.close(fd) - AllOffsets=dict(map(lambda x:[x,[]], patternlist)) + AllOffsets=dict(map(lambda x:[repr(x),[]], patternlist)) for text,blocks in BlocksToInspect.items(): for offset,data,ldk in blocks: #ldk = len(datakept) offsetslist=[offset+m.start() for m in re.finditer(text, data)] - AllOffsets[text].extend(offsetslist) + AllOffsets[repr(text)].extend(offsetslist) AllOffsets['PRFdevice']=device AllOffsets['PRFdt']=time.time()-tzero @@ -1510,7 +1566,7 @@ def multiextract(s, ll): for length in ll: r.append(s[cursor:cursor+length]) cursor+=length - if s[cursor:]!='': + if s[cursor:]!=b'': r.append(s[cursor:]) return r @@ -1529,11 +1585,12 @@ def __init__(self, ekey, salt, nditer, ndmethod, nid): self.iterations=nditer self.method=ndmethod self.id=nid + # print((ekey, salt, nditer, ndmethod, nid)) def readpartfile(fd, offset, length): #make everything 512*n because of windows... rest=offset%512 new_offset=offset-rest - big_length=512*(int((length+rest-1)/512)+1) + big_length=512*(int((length+rest-1)//512)+1) os.lseek(fd, new_offset, os.SEEK_SET) d=os.read(fd, big_length) return d[rest:rest+length] @@ -1545,7 +1602,7 @@ def recov_ckey(fd, offset): checks=[] checks.append([0, '30']) checks.append([3, '636b6579']) - if sum(map(lambda x:int(me[x[0]]!=x[1].decode('hex')), checks)): #number of false statements + if sum(map(lambda x:int(me[x[0]] != binascii.unhexlify(x[1])), checks)): #number of false statements return None return me @@ -1559,12 +1616,27 @@ def recov_mkey(fd, offset): checks.append([2, '08']) checks.append([6, '00']) checks.append([8, '090001046d6b6579']) - if sum(map(lambda x:int(me[x[0]]!=x[1].decode('hex')), checks)): #number of false statements + if sum(map(lambda x:int(me[x[0]] != binascii.unhexlify(x[1])), checks)): #number of false statements return None return me +def drop_first(e): + if hasattr(e, 'next'): + e.next() + else: + e=e[1:] + for i in e:yield i + def recov_uckey(fd, offset): + dd = readpartfile(fd, offset, 223) + r = [] + for beg in map(binascii.unhexlify, ['3081d30201010420', '308201130201010420']): + for chunk in drop_first(dd.split(beg)): + r.append(chunk[:32]) + return r and (None, None, None, None, r[0]) + +def recov_uckeyOLD(fd, offset): checks=[] d=readpartfile(fd, offset-217, 223) @@ -1585,7 +1657,7 @@ def recov_uckey(fd, offset): return None - if sum(map(lambda x:int(me[x[0]]!=x[1].decode('hex')), checks)): #number of false statements + if sum(map(lambda x:int(me[x[0]] != binascii.unhexlify(x[1])), checks)): #number of false statements return None return me @@ -1594,25 +1666,27 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): if inc%512>0: inc-=inc%512 #inc must be 512*n on Windows... Don't ask me why... - nameToDBName={'mkey':'\x09\x00\x01\x04mkey','ckey':'\x27\x00\x01\x04ckey','key':'\x00\x01\x03key',} + nameToDBName={ + 'mkey':b'\x09\x00\x01\x04mkey', + 'ckey':b'\x27\x00\x01\x04ckey', + 'key':b'\x00\x01\x03key' + } if not device.startswith('PartialRecoveryFile:'): - r=search_patterns_on_disk(device, size, inc, map(lambda x:nameToDBName[x], ['mkey', 'ckey', 'key'])) + r=search_patterns_on_disk(device, size, inc, nameToDBName.values()) f=open(outputdir+'/pywallet_partial_recovery_%d.json'%ts(), 'w') - f.write(str(r)) + f.write(json.dumps(r)) f.close() - print("\nRead %.1f Go in %.1f minutes\n"%(r['PRFsize']/1e9, r['PRFdt']/60.0)) + print("\nRead %.1f Go in %.1f minutes\n"%(r['PRFsize']//1e9, r['PRFdt']//60.0)) else: prf=device[20:] f=open(prf, 'r') - content=f.read() + content = f.read() f.close() - cmd=("z = "+content+"") - exec cmd in locals() - r=z + r=json.loads(content) device=r['PRFdevice'] - print("\nLoaded %.1f Go from %s\n"%(r['PRFsize']/1e9, device)) + print("\nLoaded %.1f Go from %s\n"%(r['PRFsize']//1e9, device)) try: @@ -1624,12 +1698,18 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): mkeys=[] crypters=[] - syst=systype() - for offset in r[nameToDBName['mkey']]: + for offset in r[repr(nameToDBName['mkey'])]: s=recov_mkey(fd, offset) if s==None: continue - newmkey=RecovMkey(s[1],s[3],int(s[5][::-1].encode('hex'), 16),int(s[4][::-1].encode('hex'), 16),int(s[-1][::-1].encode('hex'), 16)) + if s[-1] == b'':s=s[:-1] + newmkey=RecovMkey( + s[1], + s[3], + int(binascii.hexlify(s[5][::-1]), 16), + int(binascii.hexlify(s[4][::-1]), 16), + int(binascii.hexlify(s[-1][::-1]), 16) + ) mkeys.append([offset,newmkey]) print("Found %d possible wallets"%len(mkeys)) @@ -1638,21 +1718,21 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): ckeys=[] - for offset in r[nameToDBName['ckey']]: + for offset in r[repr(nameToDBName['ckey'])]: s=recov_ckey(fd, offset) if s==None: continue - newckey=RecovCkey(s[1], s[5][:int(s[4].encode('hex'),16)]) + newckey=RecovCkey(s[1], s[5][:int(binascii.hexlify(s[4]),16)]) ckeys.append([offset,newckey]) print('Found %d possible encrypted keys'%len(ckeys)) uckeys=[] - for offset in r[nameToDBName['key']]: + for offset in r[repr(nameToDBName['key'])]: s=recov_uckey(fd, offset) - if s==None: - continue - uckeys.append(s[4]) + if s: + uckeys.append(s[4]) + uckeys = list(set(uckeys)) print('Found %d possible unencrypted keys'%len(uckeys)) @@ -1698,7 +1778,7 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): compressed = ck.public_key[0] != '\04' - pkey = EC_KEY(int('0x' + secret.encode('hex'), 16)) + pkey = EC_KEY(int(b'0x' + binascii.hexlify(secret), 16)) if ck.public_key != GetPubKey(pkey, compressed): failures_in_a_row+=1 else: @@ -1710,7 +1790,7 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): print("\n") tone=time.time() try: - calcspeed=1.0*cpt/(tone-tzero)*60 #calc/min + calcspeed=1.0*cpt//(tone-tzero)*60 #calc/min except: calcspeed=1.0 if calcspeed==0: @@ -1723,7 +1803,7 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): return map(lambda x:x[1].privkey, ckeys) else: print("Private keys not decrypted: %d"%len(ckeys_not_decrypted)) - print("Trying all the remaining possibilities (%d) might take up to %d minutes."%(len(ckeys_not_decrypted)*len(passes)*len(mkeys),int(len(ckeys_not_decrypted)*len(passes)*len(mkeys)/calcspeed))) + print("Trying all the remaining possibilities (%d) might take up to %d minutes."%(len(ckeys_not_decrypted)*len(passes)*len(mkeys),int(len(ckeys_not_decrypted)*len(passes)*len(mkeys)//calcspeed))) cont=raw_input("Do you want to test them? (y/n): ") while len(cont)==0: cont=raw_input("Do you want to test them? (y/n): ") @@ -1748,7 +1828,7 @@ def recov(device, passes, size=102400, inc=10240, outputdir='.'): compressed = ck.public_key[0] != '\04' - pkey = EC_KEY(int('0x' + secret.encode('hex'), 16)) + pkey = EC_KEY(int(b'0x' + binascii.hexlify(secret), 16)) if ck.public_key == GetPubKey(pkey, compressed): ck.mkey=mk ck.privkey=secret @@ -1794,7 +1874,7 @@ def first_read(device, size, prekeys, inc=10000): print("Can't open %s, check the path or try as root"%device) exit(0) prekey = prekeys[0] - data = "" + data = b"" i = 0 data = os.read (fd, i) before_contained_key = False @@ -1803,10 +1883,10 @@ def first_read(device, size, prekeys, inc=10000): while i < int(size): if i%(10*Mio) > 0 and i%(10*Mio) <= inc: - print("\n%.2f/%.2f Go"%(i/1e9, size/1e9)) + print("\n%.2f/%.2f Go"%(i//1e9, size//1e9)) t = ts() - speed = i/(t-t0) - ETAts = size/speed + t0 + speed = i//(t-t0) + ETAts = size//speed + t0 d = datetime.fromtimestamp(ETAts) print(d.strftime(" ETA: %H:%M:%S")) @@ -1837,7 +1917,7 @@ def shrink_intervals(device, ranges, prekeys, inc=1000): prekey = prekeys[0] nranges = [] fd = os.open (device, os.O_RDONLY) - for j in range(len(ranges)/2): + for j in range(len(ranges)//2): before_contained_key = False contains_key = False bi = ranges[2*j] @@ -1850,7 +1930,7 @@ def shrink_intervals(device, ranges, prekeys, inc=1000): k += inc mini_blocks.append(k) - for k in range(len(mini_blocks)/2): + for k in range(len(mini_blocks)//2): mini_blocks[2*k] -= len(prekey) +1 mini_blocks[2*k+1] += len(prekey) +1 @@ -1880,14 +1960,14 @@ def find_offsets(device, ranges, prekeys): list_offsets = [] to_read = 0 fd = os.open (device, os.O_RDONLY) - for i in range(len(ranges)/2): + for i in range(len(ranges)//2): bi = ranges[2*i]-len(prekey)-1 os.lseek(fd, bi, 0) bf = ranges[2*i+1]+len(prekey)+1 to_read += bf-bi+1 - buf = "" + buf = b"" for j in range(len(prekey)): - buf += "\x00" + buf += b"\x00" curs = bi while curs <= bf: @@ -1907,9 +1987,9 @@ def read_keys(device, list_offsets): for offset in list_offsets: os.lseek(fd, offset+1, 0) data = os.read(fd, 40) - hexkey = data[1:33].encode('hex') - after_key = data[33:39].encode('hex') - if hexkey not in found_hexkeys and check_postkeys(after_key.decode('hex'), postkeys): + hexkey = binascii.hexlify(data[1:33]) + after_key = binascii.hexlify(data[33:39]) + if hexkey not in found_hexkeys and check_postkeys(binascii.unhexlify(after_key), postkeys): found_hexkeys.append(hexkey) os.close (fd) @@ -1917,14 +1997,9 @@ def read_keys(device, list_offsets): return found_hexkeys def read_device_size(size): - if size[-2] == 'i': - unit = size[-3:] - value = float(size[:-3]) - else: - unit = size[-2:] - value = float(size[:-2]) - exec('unit = %s' % unit) - return int(value * unit) + n, prefix, bi = re.match(r'(\d+)(|k|M|G|T|P)(i?)[oB]?$', size).groups() + r = int(int(n) * pow(1000+int(bool(bi))*24, 'zkMGTP'.index(prefix or 'z'))) + return r def md5_2(a): return hashlib.md5(a).digest() @@ -1948,7 +2023,7 @@ def __init__ (self): def generate (self, secret=None): if secret: - exp = int ('0x' + secret.encode ('hex'), 16) + exp = int (b'0x' + binascii.hexlify(secret), 16) self.prikey = ecdsa.SigningKey.from_secret_exponent (exp, curve=secp256k1) else: self.prikey = ecdsa.SigningKey.generate (curve=secp256k1) @@ -1977,7 +2052,7 @@ def get_privkey (self): _Gx = self.prikey.curve.generator.x () _Gy = self.prikey.curve.generator.y () encoded_oid2 = der.encode_oid (*(1, 2, 840, 10045, 1, 1)) - encoded_gxgy = "\x04" + ("%64x" % _Gx).decode('hex') + ("%64x" % _Gy).decode('hex') + encoded_gxgy = binascii.unhexlify("04" + ("%64x" % _Gx) + ("%64x" % _Gy)) param_sequence = der.encode_sequence ( ecdsa.der.encode_integer(1), der.encode_sequence ( @@ -2005,7 +2080,7 @@ def get_pubkey (self): def sign (self, hash): sig = self.prikey.sign_digest (hash, sigencode=ecdsa.util.sigencode_der) - return sig.encode('hex') + return binascii.hexlify(sig) def verify (self, hash, sig): return self.pubkey.verify_digest (sig, hash, sigdecode=ecdsa.util.sigdecode_der) @@ -2064,9 +2139,9 @@ def read_bytes(self, length): except IndexError: raise SerializationError("attempt to read past end of buffer") - return '' + return b'' - def read_boolean(self): return self.read_bytes(1)[0] != chr(0) + def read_boolean(self): return self.read_bytes(1)[0] != chrsix(0) def read_int16(self): return self._read_num('66: compressed = as_compressed(False) - print("Keeping first 64 characters, %scompressed"%('un' if not compressed else '')) - pkey = EC_KEY(str_to_long(sec[:64].decode('hex'))) + warnings.warn("Keeping first 64 characters, %scompressed"%('un' if not compressed else '')) + pkey = EC_KEY(str_to_long(binascii.unhexlify(sec[:64]))) else: - print("Error") - exit() + raise Exception("Error") return (pkey, compressed) def pubkey_info(pubkey, network): @@ -2787,7 +2851,7 @@ def pubkey_info(pubkey, network): def keyinfo(sec, network=None, print_info=False, force_compressed=None): if sec.__class__ == Xpriv: assert sec.ktype == 0 - return keyinfo(sec.key.encode('hex'), network, print_info, True) + return keyinfo(binascii.hexlify(sec.key), network, print_info, True) network = network or network_bitcoin (pkey, compressed) = parse_private_key(sec, force_compressed) if not pkey: @@ -2818,13 +2882,13 @@ def keyinfo(sec, network=None, print_info=False, force_compressed=None): print("Privkey: %s"%wif) else: print("Privkey unavailable: unknown network WIF prefix") - print("Hexprivkey: %s"%(secret.encode('hex'))) + print("Hexprivkey: %s"%bytes_to_str(binascii.hexlify(secret))) if compressed: - print(" For compressed keys, the hexadecimal private key sometimes contains an extra '01' at the end") - print("Hash160: %s"%(h160.encode('hex'))) - print("Pubkey: %s"%ser_public_key.encode('hex')) - if int(secret.encode('hex'), 16)>_r: - print('/!\\ Beware, 0x%s is equivalent to 0x%.33x'%(secret.encode('hex'), int(secret.encode('hex'), 16)-_r)) + warnings.warn(" For compressed keys, the hexadecimal private key sometimes contains an extra '01' at the end") + print("Hash160: %s"%bytes_to_str(binascii.hexlify(h160))) + print("Pubkey: %s"%bytes_to_str(binascii.hexlify(ser_public_key))) + if int(binascii.hexlify(secret), 16)>_r: + warnings.warn('/!\\ Beware, 0x%s is equivalent to 0x%.33x'%(binascii.hexlify(secret), int(binascii.hexlify(secret), 16)-_r)) r = KeyInfo(secret, private_key, ser_public_key, addr, wif, compressed) network and network.keyinfo(r) @@ -2843,14 +2907,14 @@ def importprivkey(db, sec, label, reserve, verbose=True): crypted = True if crypted: if passphrase: - cry_master = json_db['mkey']['encrypted_key'].decode('hex') - cry_salt = json_db['mkey']['salt'].decode('hex') + cry_master = binascii.unhexlify(json_db['mkey']['encrypted_key']) + cry_salt = binascii.unhexlify(json_db['mkey']['salt']) cry_rounds = json_db['mkey']['nDerivationIterations'] cry_method = json_db['mkey']['nDerivationMethod'] crypter.SetKeyFromPassphrase(passphrase, cry_salt, cry_rounds, cry_method) # if verbose: -# print("Import with", passphrase, "", cry_master.encode('hex'), "", cry_salt.encode('hex')) +# print("Import with", passphrase, "", binascii.hexlify(cry_master), "", binascii.hexlify(cry_salt)) masterkey = crypter.Decrypt(cry_master) crypter.SetKey(masterkey) crypter.SetIV(Hash(public_key)) @@ -2885,14 +2949,14 @@ def write_jsonfile(filename, array): def export_all_keys(db, ks, filename): txt=";".join(ks)+"\n" for i in db['keys']: - try: - j=i.copy() - if 'label' not in j: - j['label']='#Reserve' - t=";".join([str(j[k]) for k in ks]) - txt+=t+"\n" - except: - return False + try: + j=i.copy() + if 'label' not in j: + j['label']='#Reserve' + t=";".join([str(j[k]) for k in ks]) + txt+=t+"\n" + except: + return False try: myFile = open(filename, 'w') @@ -2918,15 +2982,15 @@ def import_csv_keys(filename, wdir, wname, nbremax=9999999): content=content.split('\n') content=content[:min(nbremax, len(content))] for i in range(len(content)): - c=content[i] - global_merging_message = ["Merging: "+str(round(100.0*(i+1)/len(content),1))+"%" for j in range(2)] - if ';' in c and len(c)>0 and c[0]!="#": - cs=c.split(';') - sec,label=cs[0:2] - reserve=False - if label=="#Reserve": - reserve=True - importprivkey(db, sec, label, reserve, verbose=False) + c=content[i] + global_merging_message = ["Merging: "+str(round(100.0*(i+1)//len(content),1))+"%" for j in range(2)] + if ';' in c and len(c)>0 and c[0]!="#": + cs=c.split(';') + sec,label=cs[0:2] + reserve=False + if label=="#Reserve": + reserve=True + importprivkey(db, sec, label, reserve, verbose=False) global_merging_message = ["Merging done.", ""] @@ -2942,7 +3006,7 @@ def message_to_hash(msg, msgIsHex=False): # str += "Padding text - " str += msg if msgIsHex: - str = str.decode('hex') + str = binascii.unhexlify(str) hash = Hash(str) return hash @@ -2953,8 +3017,8 @@ def sign_message(secret, msg, msgIsHex=False): def verify_message_signature(pubkey, sign, msg, msgIsHex=False): k = KEY() - k.set_pubkey(pubkey.decode('hex')) - return k.verify(message_to_hash(msg, msgIsHex), sign.decode('hex')) + k.set_pubkey(binascii.unhexlify(pubkey)) + return k.verify(message_to_hash(msg, msgIsHex), binascii.unhexlify(sign)) OP_DUP = 118; @@ -2971,7 +3035,7 @@ def verify_message_signature(pubkey, sign, msg, msgIsHex=False): def ct(l_prevh, l_prevn, l_prevsig, l_prevpubkey, l_value_out, l_pubkey_out, is_msg_to_sign=-1, oldScriptPubkey=""): scriptSig = True - if is_msg_to_sign is not -1: + if is_msg_to_sign != -1: scriptSig = False index = is_msg_to_sign @@ -2988,17 +3052,17 @@ def ct(l_prevh, l_prevn, l_prevsig, l_prevpubkey, l_value_out, l_pubkey_out, is_ txin_ret += inverse_str("%08x"%l_prevn[i]) if scriptSig: - txin_ret2 += "%02x"%(1+len(l_prevsig[i])/2) + txin_ret2 += "%02x"%(1+len(l_prevsig[i])//2) txin_ret2 += l_prevsig[i] txin_ret2 += "01" - txin_ret2 += "%02x"%(len(l_prevpubkey[i])/2) + txin_ret2 += "%02x"%(len(l_prevpubkey[i])//2) txin_ret2 += l_prevpubkey[i] - txin_ret += "%02x"%(len(txin_ret2)/2) + txin_ret += "%02x"%(len(txin_ret2)//2) txin_ret += txin_ret2 elif index == i: - txin_ret += "%02x"%(len(oldScriptPubkey)/2) + txin_ret += "%02x"%(len(oldScriptPubkey)//2) txin_ret += oldScriptPubkey else: txin_ret += "00" @@ -3013,10 +3077,10 @@ def ct(l_prevh, l_prevn, l_prevsig, l_prevpubkey, l_value_out, l_pubkey_out, is_ txout_ret = "" txout_ret += inverse_str("%016x"%(l_value_out[i])) - txout_ret += "%02x"%(len(l_pubkey_out[i])/2+5) + txout_ret += "%02x"%(len(l_pubkey_out[i])//2+5) txout_ret += "%02x"%OP_DUP txout_ret += "%02x"%OP_HASH160 - txout_ret += "%02x"%(len(l_pubkey_out[i])/2) + txout_ret += "%02x"%(len(l_pubkey_out[i])//2) txout_ret += l_pubkey_out[i] txout_ret += "%02x"%OP_EQUALVERIFY txout_ret += "%02x"%OP_CHECKSIG @@ -3048,11 +3112,11 @@ def create_transaction(secret_key, hashes_txin, indexes_txin, pubkey_txin, prevS sig_txin = [] i=0 for cpt in hashes_txin: - sig_txin.append(sign_message(secret_key[i].decode('hex'), ct(hashes_txin, indexes_txin, sig_txin, pubkey_txin, amounts_txout, scriptPubkey, i, prevScriptPubKey[i]), True)+"01") + sig_txin.append(sign_message(binascii.unhexlify(secret_key[i]), ct(hashes_txin, indexes_txin, sig_txin, pubkey_txin, amounts_txout, scriptPubkey, i, prevScriptPubKey[i]), True)+"01") i+=1 tx = ct(hashes_txin, indexes_txin, sig_txin, pubkey_txin, amounts_txout, scriptPubkey) - hashtx = Hash(tx.decode('hex')).encode('hex') + hashtx = binascii.hexlify(Hash(binascii.unhexlify(tx))) for i in range(len(sig_txin)): try: @@ -3068,7 +3132,7 @@ def create_transaction(secret_key, hashes_txin, indexes_txin, pubkey_txin, prevS def inverse_str(string): ret = "" - for i in range(len(string)/2): + for i in range(len(string)//2): ret += string[len(string)-2-2*i]; ret += string[len(string)-2-2*i+1]; return ret @@ -3089,7 +3153,7 @@ def read_blockexplorer_table(table): del cell[0] return cell -txin_amounts = {} +txin_amounts = Bdict({}) def bc_address_to_available_tx(address, testnet=False): TN="" @@ -3099,11 +3163,11 @@ def bc_address_to_available_tx(address, testnet=False): blockexplorer_url = "http://blockexplorer.com/"+TN+"/address/" ret = "" txin = [] - txin_no = {} + txin_no = Bdict({}) global txin_amounts txout = [] balance = 0 - txin_is_used = {} + txin_is_used = Bdict({}) page = urllib.urlopen("%s/%s" % (blockexplorer_url, address)) try: @@ -3162,8 +3226,8 @@ def bc_address_to_available_tx(address, testnet=False): return {address : ret} -empty_txin={'hash':'', 'index':'', 'sig':'##', 'pubkey':'', 'oldscript':'', 'addr':''} -empty_txout={'amount':'', 'script':''} +empty_txin=Bdict({'hash':'', 'index':'', 'sig':'##', 'pubkey':'', 'oldscript':'', 'addr':''}) +empty_txout=Bdict({'amount':'', 'script':''}) class tx(): ins=[] @@ -3196,16 +3260,16 @@ def sign(n=-1): sec='' for k in json_db['keys']: - if k['addr']==self.ins[n]['addr'] and 'hexsec' in k: - sec=k['hexsec'] + if k['addr']==self.ins[n]['addr'] and 'hexsec' in k: + sec=k['hexsec'] if sec=='': print("priv key not found (addr:"+self.ins[n]['addr']+")") return "" - self.ins[n]['sig']=sign_message(sec.decode('hex'), txcopy.get_tx(), True) + self.ins[n]['sig']=sign_message(binascii.unhexlify(sec), txcopy.get_tx(), True) def ser(): - r={} + r=Bdict({}) r['ins']=self.ins r['outs']=self.outs r['tosign']=self.tosign @@ -3228,17 +3292,17 @@ def get_tx(): ret += inverse_str("%08x"%txin['index']) if txin['pubkey']!="": - tmp += "%02x"%(1+len(txin['sig'])/2) + tmp += "%02x"%(1+len(txin['sig'])//2) tmp += txin['sig'] tmp += "01" - tmp += "%02x"%(len(txin['pubkey'])/2) + tmp += "%02x"%(len(txin['pubkey'])//2) tmp += txin['pubkey'] ret += "%02x"%(len(tmp)/2) ret += tmp elif txin['oldscript']!="": - ret += "%02x"%(len(txin['oldscript'])/2) + ret += "%02x"%(len(txin['oldscript'])//2) ret += txin['oldscript'] else: @@ -3254,13 +3318,13 @@ def get_tx(): if txout['script'][:2]=='s:': #script script=txout['script'][:2] - ret += "%02x"%(len(script)/2) + ret += "%02x"%(len(script)//2) ret += script else: #address - ret += "%02x"%(len(txout['script'])/2+5) + ret += "%02x"%(len(txout['script'])//2+5) ret += "%02x"%OP_DUP ret += "%02x"%OP_HASH160 - ret += "%02x"%(len(txout['script'])/2) + ret += "%02x"%(len(txout['script'])//2) ret += txout['script'] ret += "%02x"%OP_EQUALVERIFY ret += "%02x"%OP_CHECKSIG @@ -3295,10 +3359,10 @@ def clone_wallet(parentPath, clonePath): datas.append({'key':json_db['defaultkey']}) for k in json_db['keys']: types.append('ckey') - datas.append({'public_key':k['pubkey'].decode('hex'),'encrypted_private_key':random_string(96).decode('hex')}) + datas.append({'public_key':binascii.unhexlify(k['pubkey']),'encrypted_private_key':binascii.unhexlify(random_string(96))}) for k in json_db['pool']: types.append('pool') - datas.append({'n':k['n'],'nVersion':k['nVersion'],'nTime':k['nTime'],'public_key':k['public_key_hex'].decode('hex')}) + datas.append({'n':k['n'],'nVersion':k['nVersion'],'nTime':k['nTime'],'public_key':binascii.unhexlify(k['public_key_hex'])}) for addr,label in json_db['names'].items(): types.append('name') datas.append({'hash':addr,'name':'Watch:'+label}) @@ -3307,10 +3371,10 @@ def clone_wallet(parentPath, clonePath): create_new_wallet(db_env, wname, 60000) db = open_wallet(db_env, wname, True) - NPP_salt=random_string(16).decode('hex') - NPP_rounds=int(50000+random.random()*20000) - NPP_method=0 - NPP_MK=random_string(64).decode('hex') + NPP_salt = binascii.unhexlify(random_string(16)) + NPP_rounds = int(50000+random.random()*20000) + NPP_method = 0 + NPP_MK = binascii.unhexlify(random_string(64)) crypter.SetKeyFromPassphrase(random_string(64), NPP_salt, NPP_rounds, NPP_method) NPP_EMK = crypter.Encrypt(NPP_MK) update_wallet(db, 'mkey', { @@ -3318,7 +3382,7 @@ def clone_wallet(parentPath, clonePath): 'nDerivationIterations' : NPP_rounds, 'nDerivationMethod' : NPP_method, 'nID' : 1, - 'otherParams' : ''.decode('hex'), + 'otherParams' : b'', "salt": NPP_salt }) db.close() @@ -3330,7 +3394,6 @@ def clone_wallet(parentPath, clonePath): db.close() print("Wallet successfully cloned to:\n %s"%clonePath) -import thread md5_last_pywallet = [False, ""] def retrieve_last_pywallet_md5(): @@ -3340,25 +3403,25 @@ def retrieve_last_pywallet_md5(): from optparse import OptionParser def bech32_polymod(values): - GEN = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] - chk = 1 - for v in values: - b = (chk >> 25) - chk = (chk & 0x1ffffff) << 5 ^ v - for i in range(5): - chk ^= GEN[i] if ((b >> i) & 1) else 0 - return chk + GEN = [0x3b6a57b2, 0x26508e6d, 0x1ea119fa, 0x3d4233dd, 0x2a1462b3] + chk = 1 + for v in values: + b = (chk >> 25) + chk = (chk & 0x1ffffff) << 5 ^ v + for i in range(5): + chk ^= GEN[i] if ((b >> i) & 1) else 0 + return chk def bech32_hrp_expand(s): - return [ord(x) >> 5 for x in s] + [0] + [ord(x) & 31 for x in s] + return [ordsix(x) >> 5 for x in s] + [0] + [ordsix(x) & 31 for x in s] def bech32_verify_checksum(hrp, data): - return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 + return bech32_polymod(bech32_hrp_expand(hrp) + data) == 1 def bech32_create_checksum(hrp, data): - values = bech32_hrp_expand(hrp) + data - polymod = bech32_polymod(values + [0,0,0,0,0,0]) ^ 1 - return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] + values = bech32_hrp_expand(hrp) + data + polymod = bech32_polymod(values + [0,0,0,0,0,0]) ^ 1 + return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] BECH32_ALPH='qpzry9x8gf2tvdw0s3jn54khce6mua7l' BASE32_ALPH='ABCDEFGHIJKLMNOPQRSTUVWXYZ234567' @@ -3382,15 +3445,15 @@ def whitepaper(): except: rawtx = urllib.urlopen("https://blockchain.info/tx/54e48e5f5c656b26c3bca14a8c95aa583d07ebe84dde3b7dd4a78f4e4186e713?format=hex").read() outputs = rawtx.split("0100000000000000") - pdf = "" + pdf = b"" for output in outputs[1:-2]: i = 6 - pdf += output[i:i+130].decode('hex') + pdf += binascii.unhexlify(output[i:i+130]) i += 132 - pdf += output[i:i+130].decode('hex') + pdf += binascii.unhexlify(output[i:i+130]) i += 132 - pdf += output[i:i+130].decode('hex') - pdf += outputs[-2][6:-4].decode("hex") + pdf += binascii.unhexlify(output[i:i+130]) + pdf += binascii.unhexlify(outputs[-2][6:-4]) content = pdf[8:-8] assert hashlib.sha256(content).hexdigest() == 'b1674191a88ec5cdd733e4240a81803105dc412d6c6708d53ab94fc248f4f553' filename = 'bitcoin_whitepaper' @@ -3400,11 +3463,12 @@ def whitepaper(): f.write(content) print("Wrote the Bitcoin whitepaper to %s.pdf"%filename) -class Xpriv(collections.namedtuple('Xpriv', 'version depth prt_fpr childnr cc ktype key')): +class Xpriv(collections.namedtuple('XprivNT', 'version depth prt_fpr childnr cc ktype key')): xpriv_fmt = '>IB4sI32sB32s' - def __init__(self, *a, **kw): - super(Xpriv, self).__init__(*a, **kw) + def __new__(cls, *a, **kw): + self = super(Xpriv, cls).__new__(cls, *a, **kw) self.fullpath = 'm' + return self @classmethod def xpriv_version_bytes(cls):return 0x0488ADE4 @classmethod @@ -3420,7 +3484,7 @@ def b58encode(self): return EncodeBase58Check(struct.pack(self.xpriv_fmt, *self._asdict().values())) def xpub(self): pubk = keyinfo(self, None, False, True).public_key - xpub_content = self.clone()._replace(version=self.xpub_version_bytes(), ktype=ord(pubk[0]), key=pubk[1:]) + xpub_content = self.clone()._replace(version=self.xpub_version_bytes(), ktype=ordsix(pubk[0]), key=pubk[1:]) return EncodeBase58Check(struct.pack(self.xpriv_fmt, *xpub_content)) @classmethod def b58decode(cls, b58xpriv): @@ -3458,19 +3522,19 @@ def ckd_xpriv(self, *indexes): par_pubk = keyinfo(self, None, False, True).public_key seri = struct.pack('>I',i) if i>=0x80000000: - I = hmac.new(self.cc, '\x00'+self.key+seri, digestmod=hashlib.sha512).digest() + I = hmac.new(self.cc, b'\x00'+self.key+seri, digestmod=hashlib.sha512).digest() else: I = hmac.new(self.cc, par_pubk+seri, digestmod=hashlib.sha512).digest() il, ir = I[:32], I[32:] - pk = hex((int(il.encode('hex'), 16) + int(self.key.encode('hex'), 16))%_r)[2:].replace('L', '').zfill(64) - child = self.__class__(self.version, self.depth+1, hash_160(par_pubk)[:4], i, ir, 0, pk.decode('hex')).set_fullpath(self.fullpath, i) + pk = hex((int(binascii.hexlify(il), 16) + int(binascii.hexlify(self.key), 16))%_r)[2:].replace('L', '').zfill(64) + child = self.__class__(self.version, self.depth+1, hash_160(par_pubk)[:4], i, ir, 0, binascii.unhexlify(pk)).set_fullpath(self.fullpath, i) if len(indexes)>=2: return child.ckd_xpriv(*indexes[1:]) return child def hprivcontent(self): - return DecodeBase58Check(self.b58encode()).encode('hex') + return binascii.hexlify(DecodeBase58Check(self.b58encode())) def hpubcontent(self): - return DecodeBase58Check(self.xpub()).encode('hex') + return binascii.hexlify(DecodeBase58Check(self.xpub())) def dump_bip32_privkeys(xpriv, paths, format): dump_key = lambda x:x.wif @@ -3478,6 +3542,53 @@ def dump_bip32_privkeys(xpriv, paths, format): for child in Xpriv.b58decode(xpriv).multi_ckd_xpriv(paths): print('%s: %s'%(child.fullpath, dump_key(keyinfo(child)))) +class TestPywallet(unittest.TestCase): + def setUp(self): + super(TestPywallet, self).setUp() + warnings.simplefilter('ignore') + def test_btc_privkey_1(self): + key = keyinfo('1', network=network_bitcoin) + self.assertEqual(key.addr, '1EHNa6Q4Jz2uvNExL497mE43ikXhwF6kZm') + self.assertEqual(key.wif, '5HpHagT65TZzG1PH3CSu63k8DbpvD8s5ip4nEB3kEsreAnchuDf') + self.assertEqual(key.secret, b'\x00'*31+b'\x01') + self.assertFalse(key.compressed) + def test_btc_privkey_1_from_wif(self): + key = keyinfo('5HpHagT65TZzG1PH3CSu63k8DbpvD8s5ip4nEB3kEsreAnchuDf', network=network_bitcoin) + self.assertEqual(key.addr, '1EHNa6Q4Jz2uvNExL497mE43ikXhwF6kZm') + def test_bad_privkey_format(self): + with self.assertRaises(Exception): + keyinfo('g', network=network_bitcoin) + def test_btc_bip32_test_vectors(self): + self.assertEqual( + Xpriv.from_seed(binascii.unhexlify('000102030405060708090a0b0c0d0e0f')) + .ckd_xpriv(0x80000000, 1, -2, 2, 1000000000).xpub(), + 'xpub6H1LXWLaKsWFhvm6RVpEL9P4KfRZSW7abD2ttkWP3SSQvnyA8FSVqNTEcYFgJS2UaFcxupHiYkro49S8yGasTvXEYBVPamhGW6cFJodrTHy' + ) + self.assertEqual( + Xpriv.from_seed(binascii.unhexlify('fffcf9f6f3f0edeae7e4e1dedbd8d5d2cfccc9c6c3c0bdbab7b4b1aeaba8a5a29f9c999693908d8a8784817e7b7875726f6c696663605d5a5754514e4b484542')) + .ckd_xpriv(0, -2147483647, 1, -2147483646, 2).xpub(), + 'xpub6FnCn6nSzZAw5Tw7cgR9bi15UV96gLZhjDstkXXxvCLsUXBGXPdSnLFbdpq8p9HmGsApME5hQTZ3emM2rnY5agb9rXpVGyy3bdW6EEgAtqt' + ) + self.assertEqual( + Xpriv.from_seed(binascii.unhexlify('4b381541583be4423346c643850da4b320e46a87ae3d2a4e6da11eba819cd4acba45d239319ac14f863b8d5ab5a0d0c64d2e8a1e7d1457df2e5a3c51c73235be')) + .ckd_xpriv(0x80000000).xpub(), + 'xpub68NZiKmJWnxxS6aaHmn81bvJeTESw724CRDs6HbuccFQN9Ku14VQrADWgqbhhTHBaohPX4CjNLf9fq9MYo6oDaPPLPxSb7gwQN3ih19Zm4Y' + ) + def test_btc_bip32_2(self): + xpriv = "xprv9s21ZrQH143K2gCVXRarFj5npbgjtJ7MuNb15AoRYJ92ZMA1hcnoqpxJKfcsiMHP6cNmDKHCTphsC6uzzyzr2MwjXbDxg6U9ivvEupavYUb" + paths = "m/7-8'/3/99'/38-39" + keys = Xpriv.b58decode(xpriv).multi_ckd_xpriv(paths) + for k, privkey in zip(keys, [ + '5ca736abd3b19632d11366c4dd79c227236500879980c6a1fc4e7c1e33933350', + '8c793bce5319bf04349b5e4d21d091a98c1a1ad632bffc0425a5f4802c999a76', + '692f2ddb1d5c7213d194643984642df6e9a5c8cd14a1a6b4054571955fcab05f', + '8739db9026ceb50d7774ef145bd27e899228700f1096072fe9d26f8387378314', + ]): + self.assertEqual(k.key, binascii.unhexlify(privkey)) + + def test_btc_key_recovery(self): + pass + if __name__ == '__main__': parser = OptionParser(usage="%prog [options]", version="%prog 1.1") @@ -3493,7 +3604,7 @@ def dump_bip32_privkeys(xpriv, paths, format): parser.add_option("--find_address", help="find info about an address") - parser.add_option("--dumpwallet", dest="dump", action="store_true", + parser.add_option("-d", "--dumpwallet", dest="dump", action="store_true", help="dump wallet in json format") parser.add_option("--dumpformat", default="all", @@ -3511,7 +3622,7 @@ def dump_bip32_privkeys(xpriv, paths, format): parser.add_option("--datadir", dest="datadir", help="REMOVED OPTION: put full path in the --wallet option") - parser.add_option("--wallet", dest="walletfile", + parser.add_option("-w", "--wallet", dest="walletfile", help="wallet filename (defaults to wallet.dat)", default="") @@ -3561,9 +3672,6 @@ def dump_bip32_privkeys(xpriv, paths, format): parser.add_option("--dont_check_walletversion", dest="dcv", action="store_true", help="don't check if wallet version > %d before running (WARNING: this may break your wallet, be sure you know what you do)"%max_version) - parser.add_option("--wait", dest="nseconds", - help="wait NSECONDS seconds before launch") - parser.add_option("--random_key", action="store_true", help="print info of a randomly generated private key") @@ -3573,6 +3681,9 @@ def dump_bip32_privkeys(xpriv, paths, format): parser.add_option("--minimal_encrypted_copy", action="store_true", help="write a copy of an encrypted wallet with only an empty address, *should* be safe to share when needing help bruteforcing the password") + parser.add_option("--tests", action="store_true", + help="run tests") + # parser.add_option("--forcerun", dest="forcerun", # action="store_true", @@ -3589,6 +3700,10 @@ def dump_bip32_privkeys(xpriv, paths, format): # if options.forcerun is None: # exit(0) + if options.tests: + unittest.main(argv=sys.argv[:1] + ['TestPywallet']) + exit() + if options.dump_bip32: print("Warning: single quotes (') may be parsed by your terminal, please use \"H\" for hardened keys") dump_bip32_privkeys(*options.dump_bip32, format=options.bip32_format) @@ -3598,13 +3713,10 @@ def dump_bip32_privkeys(xpriv, paths, format): whitepaper() exit() - if options.nseconds: - time.sleep(int(options.nseconds)) - if options.passphrase: passphrase = options.passphrase - if options.clone_watchonly_from is not None and options.clone_watchonly_to: + if not(options.clone_watchonly_from is None) and options.clone_watchonly_to: clone_wallet(options.clone_watchonly_from, options.clone_watchonly_to) exit(0) @@ -3658,7 +3770,7 @@ def dump_bip32_privkeys(xpriv, paths, format): 'nDerivationIterations' : NPP_rounds, 'nDerivationMethod' : NPP_method, 'nID' : 1, - 'otherParams' : ''.decode('hex'), + 'otherParams' : b'', "salt": NPP_salt }) db.close() @@ -3669,10 +3781,10 @@ def dump_bip32_privkeys(xpriv, paths, format): print("\n\nImporting:") for i,sec in enumerate(recoveredKeys): - sec=sec.encode('hex') - print("\nImporting key %4d/%d:"%(i+1, len(recoveredKeys))) - importprivkey(db, sec, "recovered: %s"%sec, None, True) - importprivkey(db, sec+'01', "recovered: %s"%sec, None, True) + sec=binascii.hexlify(sec) + print("Importing key %4d/%d"%(i+1, len(recoveredKeys))) + importprivkey(db, sec, "recovered: %s"%sec, None, False) + importprivkey(db, sec+'01', "recovered: %s"%sec, None, False) db.close() print("\n\nThe new wallet %s/%s contains the %d recovered key%s"%(options.recov_outputdir, recov_wallet_name, len(recoveredKeys), plural(len(recoveredKeys)))) @@ -3687,10 +3799,10 @@ def dump_bip32_privkeys(xpriv, paths, format): if 'ecdsa' in missing_dep: print("'ecdsa' package is not installed, pywallet won't be able to sign/verify messages") - if options.dcv is not None: + if not(options.dcv is None): max_version = 10 ** 9 - if options.datadir is not None: + if not(options.datadir is None): print("Depreacation") print(" The --datadir option has been deprecated, now the full path of the wallet file should go to --wallet") print(" If you're not sure what to do, concatenating the old --datadir content, then a directory separator, then the old --wallet should do the trick") @@ -3704,12 +3816,12 @@ def dump_bip32_privkeys(xpriv, paths, format): exit() db_dir, wallet_name = os.path.split(os.path.realpath(options.walletfile)) - if options.key_balance is not None: + if not(options.key_balance is None): print(balance(balance_site, options.key_balance)) exit(0) network = network_bitcoin - if options.otherversion is not None: + if not(options.otherversion is None): try: network = None for n in Network.instances: @@ -3729,9 +3841,9 @@ def dump_bip32_privkeys(xpriv, paths, format): db_dir += "/testnet3" network = network_bitcoin_testnet3 - if options.keyinfo is not None or options.random_key: + if not(options.keyinfo is None) or options.random_key: if not options.keyinfo: - options.key = os.urandom(32).encode('hex') + options.key = binascii.hexlify(os.urandom(32)) keyinfo(options.key, network, True, False) print("") keyinfo(options.key, network, True, True) @@ -3743,7 +3855,7 @@ def dump_bip32_privkeys(xpriv, paths, format): exit() db_env = create_env(db_dir) - if options.multidelete is not None: + if not(options.multidelete is None): filename=options.multidelete filin = open(filename, 'r') content = filin.read().split('\n') @@ -3767,13 +3879,13 @@ def dump_bip32_privkeys(xpriv, paths, format): encrypted_keys = [] mkey = None for (key, value) in db.items(): - d = {} + d = Bdict({}) kds.clear(); kds.write(key) vds.clear(); vds.write(value) typ = kds.read_string() - if typ == 'mkey': + if typ == b'mkey': mkey = (key, value) - if typ != 'ckey':continue + if typ != b'ckey':continue d['public_key'] = kds.read_bytes(kds.read_compact_size()) d['__key__'] = key d['__value__'] = value @@ -3806,9 +3918,9 @@ def dump_bip32_privkeys(xpriv, paths, format): print("\nError: all your addresses seem to be used, pywallet can't create a safe minimal wallet to share") exit() - read_wallet(json_db, db_env, wallet_name, True, True, "", options.dumpbalance is not None) + read_wallet(json_db, db_env, wallet_name, True, True, "", not(options.dumpbalance is None)) - if json_db.get('minversion') > max_version: + if json_db.get('minversion', 99999999) > max_version: print("Version mismatch (must be <= %d)" % max_version) #exit(1)