Skip to content
alex [dot] kramer [at] g_m_a_i_l [dot] com edited this page Dec 13, 2022 · 74 revisions

Run Jupyter server for remote access (LAN)

jupyter notebook --ip [LOCAL HOST IP GOES HERE] --port 8888
jupyter notebook --ip `iip` --port 8888

Virtual environments

Create and activate/deactivate virtual environment:

python3 -m venv /path/to/new/virtual/env # E.g. /git/repo/root/.venv/venv_name
source /git/repo/root/.venv/venv_name/bin/activate
deactivate

Freeze requirements:

pip3 freeze > requirements.txt

Install requirements:

pip3 install -r requirements.txt

Disable pylint warnings

Place a comment inside a block to disable a specific warning for that block

# pylint: disable=C0321

Place at the top of the file to disable for entire file

# pylint: disable=C
# pylint: disable=W

Flush/unbuffer print

# Single print statement
print(..., flush=True)

# Entire module
from functools import partial
print = partial(print, flush=True)

All python processes:

PYTHONUNBUFFERED=TRUE

Type hinting base class

from typing import Dict, Type

class Base:
    pass

class A(Base):
    pass

my_map: Dict[str, Type[Base]] = {
    'a': A
}

Serve local directory

python -m http.server [PORT]

Stop urllib3 from spewing

requests.packages.urllib3.disable_warnings()

Dump iPython history to file

from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
%history -g -f /tmp/ipyhist.$timestamp

Print exception with stack trace

import traceback
try:
    raise
except Exception as e:
    print(traceback.print_tb(e.__traceback__))

Command line app skeleton

import argparse

def main(
    string_positional_arg,
    int_positional_arg,
    no_arg_flag,
    flag_with_default,
    flag_without_default,
    list_positional_arg
):
    print('string_positional_arg={}\nint_positional_arg={}\nno_arg_flag={}\nflag_with_default={}\nflag_without_default={}\nlist_positional_arg={}'.format(string_positional_arg, int_positional_arg, no_arg_flag, flag_with_default, flag_without_default, list_positional_arg))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Command description help text.')
    parser.add_argument('string_positional_arg', help='String positional arg help text')
    parser.add_argument('int_positional_arg', type=int, help='Int positional arg help text')
    parser.add_argument('-no_arg_flag', action='store_true', help='This flag doesn\'t take an argument')
    parser.add_argument('--flag_with_default', help='This flag has a default value', default='Flag #1 default value')
    parser.add_argument('--flag_without_default', help='This flag has no default value')
    parser.add_argument('list_positional_arg', nargs='*', help='List positional arg help text')
    args = parser.parse_args()

    exit(
        main(
            args.string_positional_arg,
            args.int_positional_arg,
            args.no_arg_flag,
            args.flag_with_default,
            args.flag_without_default,
            args.list_positional_arg
        )
    )

Unittest skeleton

import unittest

class Test(unittest.TestCase):
    def setUp(self):
        self.widget = Widget('The widget')

    def test_something(self):
        self.widget.doSomething()
        self.assertEqual(2 + 2, 5)

    def tearDown(self):
        self.widget.dispose()

if __name__ == '__main__':
    unittest.main()

Flask skeleton

'''
$.ajax({
    url: "http://127.0.0.1:81/hello",
    type: "POST",
    contentType: "application/json",
    data: JSON.stringify({"inputVar": 1}),
    success: function( data ) {
        alert( "success: " + data );
    }
});

curl -X POST http://127.0.0.1:81/hello -d '{"foo": "bar"}'
'''

from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app) # Necessary if you want to hit the API from browser-based javascript

@app.route('/hello', methods=['GET', 'POST'])
def helloWorld():
    print(f'GOT BODY: {request.get_data()}')
    return "🌎 Hello, cross-origin-world! 🌏"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port= 81)

Parse timestamp

https://strftime.org/

from datetime import datetime
dt = datetime.strptime('28/Jul/1995:13:32:22.1234567 -0400', '%d/%b/%Y:%H:%M:%S.%f %z')

Function name logging wrapper

def logName(function):
  def func_wrapper(*args, **kwargs):
    print(f'🤡 =====> Inside {function.__name__}')
    return function(*args, **kwargs)
  return func_wrapper

@logName
def foo():
  # ...

Invoke python3 pip module

pip3 install [module]
python3 -m [module]

# e.g.:
pip3 install pylint
python3 -m pylint somefile.py

Run script in debugger

python -m pdb script.py

Get original markdown from reddit post

Convert comment permalink to json:
https://www.reddit.com/r/geopolitics/comments/5bgwfj/culminating_analysis_of/
becomes
https://www.reddit.com/r/geopolitics/comments/5bgwfj.json

Retrieve, extract, convert:

# Get JSON from endpoint
import requests
response = requests.get("https://www.reddit.com/r/geopolitics/comments/5bgwfj.json")
json = response.json()

# Get selftext from JSON
selftext = json[0]['data']['children'][0]['data']['selftext']

# HTML decode selftext
import HTMLParser
selftext = HTMLParser.HTMLParser().unescape(selftext)

# Unescape line breaks
selftext = selftext.replace("\\n", "\n")

# Unescape unicode (unnecessary if retrieved via python because it's already a unicode string)
# selftext = unicode(selftext, 'unicode-escape')

print selftext

Format UNIX epoch timestamp

import datetime

epoch_time = 1500000000
date = datetime.datetime.utcfromtimestamp(epoch_time)

date.strftime('%b %d %Y %H:%M')
# Jul 14 2017 02:40

date.strftime('%Y%m%d%H%M%S')
# 20170714024000

Sigmoid compression

import numpy

def _s(x: float) -> float:
    if x >= 1:
        return 1.
    if x <= 0:
        return 0.
    return 1 / (1 + numpy.exp(10 * (0.5 - x)))

Scrape using Selenium WebDriver with a proxy

import json
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.proxy import Proxy, ProxyType

options = webdriver.ChromeOptions()
options.add_argument('headless')
options.add_argument('window-size=1200x600')

capabilities = webdriver.DesiredCapabilities.CHROME

prox = Proxy()
prox.proxy_type = ProxyType.MANUAL
prox.http_proxy = '123.45.67.890:1234'
prox.socks_proxy = '123.45.67.890:1234'
prox.ssl_proxy = '123.45.67.890:1234'
prox.add_to_capabilities(caps)

driver = webdriver.Chrome(chrome_options=options)

##### Only do this if you get an error about wrong chromedriver version #####
# from webdriver_manager.chrome import ChromeDriverManager
# manager = ChromeDriverManager().install()
# driver = webdriver.Chrome(manager, chrome_options=options)
########################################################################

driver.set_window_size(1920, 1080)
driver.maximize_window()
driver.implicitly_wait(120)

try:
    driver.get('https://api.ipify.org?format=json')
    ipJson = driver.find_element_by_css_selector('pre').text
    ip = json.loads(ipJson)['ip']
    print('WebDriver requests configured to originate from ' + ip)
except Exception as e:
    print(type(e).__name__ + ' Exception: ' + str(e))
    traceback.print_exc()
    print('=====> Current URL:\n' + driver.current_url)
    print('=====> Page source:\n' + driver.page_source)
    print('=====> HAR log:\n' + driver.get_log('har')) # Useful! Will print out console log messages and failed requests, etc.
    driver.save_screenshot('/tmp/webdriver.png')
    driver.close()
    driver.quit()

Filter csv file example

NOTE: SHOULD PROBABLY USE PANDAS FOR THIS
Filter on single column:

filteredLines = [line for line in open('/tmp/mp3.txt') if int(line.split(',')[0]) > 100000]
output = ''.join(filteredLines) # Already has a line break because it was never stripped
outFile = open('bigMp3.txt', 'w')
outFile.write(output)
outFile.close()

Filter on multiple columns:

rows = [line.rstrip('\n').split(',') for line in open('/tmp/mp3.txt')]
filteredLines = [','.join(row) for row in rows if int(row[0]) > 100000 and 'foo' in row[2]]
output = '\n'.join(filteredLines)
outFile = open('bigFooMp3.txt', 'w')
outFile.write(output)
outFile.close()

Filter dictionary

def filterDict(dictionary, keys):
    keyValuePairs = [(key, dictionary.get(key, None)) for key in keys]
    return dict(keyValuePairs)

Grant browser permissions in Selenium for headless Chromedriver

driver.execute_cdp_cmd(
  "Browser.grantPermissions",
  {
    "origin": page.server_url,
    "permissions": ["clipboardReadWrite", "clipboardSanitizedWrite"]
  },
)

Scrape Spotify API

Artists

#!/usr/bin/env python3

import time
import requests
import json

def pruneDict(dictionary, keys):
    keyValuePairs = [(key, dictionary.get(key, None)) for key in keys]
    return dict(keyValuePairs)

##########################################

url = 'https://api.spotify.com/v1/me/following?type=artist&limit=50'

# Get auth token from website https://developer.spotify.com/console/get-following/
headers = {
    'Authorization': 'Bearer AUTH_TOKEN_GOES_HERE',
    'Cache-Control': 'no-cache'
}

artists = []
artistKeys = ['name', 'genres', 'followers', 'popularity', 'uri']
count = 0

while True:
    count += 1
    response = requests.request('GET', url, headers=headers)
    responseJson = response.json()
    items = responseJson['artists']['items']
    artists.extend(items)

    try:
        url = responseJson['artists']['next']
        print(str(count) + ': ' + url, flush=True)
    except Exception as err:
        print('BREAKING ON COUNT: #' + str(count) + ', ' + str(err), flush=True)
        print('==========\n\n', flush=True)
        break

    time.sleep(1)

out = [pruneDict(artist, artistKeys) for artist in artists]
for artist in out:
    artist['followers'] = artist['followers']['total']

print(json.dumps(out))

IKEA curbside pickup availability monitor

Turn up volume on speakers and leave running

import time
import traceback
import os
from selenium import webdriver
from selenium.webdriver.support.ui import Select

options = webdriver.ChromeOptions()
options.add_argument('headless')
options.add_argument('window-size=1920x1080')

capabilities = webdriver.DesiredCapabilities.CHROME

##### Do this if you get an error about wrong chromedriver version #####
from webdriver_manager.chrome import ChromeDriverManager
manager = ChromeDriverManager().install()
driver = webdriver.Chrome(manager, options=options)
########################################################################

driver.set_window_size(1920, 1080)
driver.maximize_window()
driver.implicitly_wait(3)
driver.delete_all_cookies()

spoon_url = 'https://www.ikea.com/us/en/p/dragon-spoon-stainless-steel-30091763/'
cart_url = 'https://www.ikea.com/us/en/shoppingcart/'
delivery_url = 'https://order.ikea.com/us/en/checkout/delivery/'

try:
    driver.get(spoon_url)
    time.sleep(5)
    add_button = driver.find_element_by_css_selector('button[aria-label="Add to bag"]')
    driver.execute_script('arguments[0].scrollIntoView();', add_button)
    add_button.click()
    time.sleep(5)

    attempt_count = 0

    while(True):
        attempt_count += 1
        print(f'Attempt: {attempt_count}')

        driver.get(cart_url)
        checkout_button = driver.find_element_by_css_selector('.checkout button.checkout__button')
        checkout_button.click()        
        time.sleep(5)
        zip_input = driver.find_element_by_css_selector('#zipcode')
        zip_input.click()
        zip_input.clear()
        zip_input.send_keys('10000')
        time.sleep(1)
        zip_submit_button = driver.find_element_by_css_selector('.zipin button')
        zip_submit_button.click()
        time.sleep(10)
        collect_button = driver.find_element_by_css_selector('.collect button.delivery__option')
        collect_button.click()
        time.sleep(1)
        store_selector = Select(driver.find_element_by_css_selector('[title="Select store where to collect your products"]'))
        store_selector.select_by_visible_text('NY, IKEA Brooklyn')
        next_button = driver.find_element_by_css_selector('.delivery__submit button')
        next_button.click()
        time.sleep(3)
        if 'Click & Collect is temporarily unavailable at this store' in driver.page_source:
            continue
        else:
            time.sleep(8)
            if 'My IKEA Order' in driver.page_source:
                driver.close()
                driver.quit()
                for i in range(20):
                    print('🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨🚨')
                    os.system('say "ALERT\! ALERT\! ALERT\! Ikea pickup available\! Ikea pickup available\! Ikea pickup available\!"')
                    time.sleep(5)
                break
            else:
                continue
except Exception as e:
    driver.save_screenshot('/tmp/webdriver_fail.png')
    print(f'🔥☠️🔥☠️🔥☠️🔥☠️🔥☠️🔥☠️🔥\n{type(e).__name__} Exception: {str(e)}')
    print('⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> STACK TRACE:')
    traceback.print_exc()
    print(f'⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> Current URL:\n{driver.current_url}')
    source_dump_path = '/tmp/webdriver_dump.html'
    print(f'⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️ =====> Dumping page source to: {source_dump_path}')
    with open(source_dump_path, 'w') as source_dump_file:
        source_dump_file.write(driver.page_source)
        source_dump_file.close()
    
    driver.close()
    driver.quit()
    for i in range(20):
        os.system('say "ABORT\! ABORT\! ABORT\! FATAL ERROR\! "')
        time.sleep(5)

Find most profitable single trade (one buy, one sell) in stock price tick stream (with visualization)

# Stock tick stream
prices = [random.randint(0,100) for x in range(100)]

best_lo, best_hi = prices[0], prices[1]
best_lo_time, best_hi_time = 0, 1
new_lo, new_hi = best_lo, best_hi
new_lo_time, new_hi_time = best_lo_time, best_hi_time
best_profit = best_hi - best_lo

for time, price in enumerate(prices):
    out = '{:3d}'.format(time) + ' | '
    if price > new_hi and time != 0:
        new_hi, new_hi_time = price, time
        out += 'H'
    else:
        out += '.'

    if price < new_lo:
        new_lo, new_lo_time = price, time
        new_hi, new_hi_time = 0, time + 1
        out += 'L'
    else:
        out += '.'

    new_profit = new_hi - new_lo

    if new_profit > best_profit:
        best_profit = new_profit
        best_lo, best_lo_time = new_lo, new_lo_time
        best_hi, best_hi_time = new_hi, new_hi_time
        out += 'P'
    else:
        out += '.'

    print(out +' ' + '*' * price)

print('==========')
print('buy@${}(t{}), sell@${}(t{}), profit: ${}'.format(best_lo, best_lo_time, best_hi, best_hi_time, best_profit))

LINK FOR BWOB

http://192.168.0.60:8888/?token=dabf641130d9f1501944cc07bc1967fa17a27ac812a55751

Clone this wiki locally