-
-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'master' into pre-release
- Loading branch information
Showing
16 changed files
with
215 additions
and
124 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
from typing import List | ||
|
||
from django.core.management.base import BaseCommand | ||
|
||
from app.models import Print, PrinterEvent, GCodeFile, models, PrintShotFeedback | ||
from lib.url_signing import new_signed_url | ||
from lib.utils import printProgressBar | ||
|
||
|
||
class Command(BaseCommand): | ||
help = '(re-)signs all media URLs. Must be run once after updating, and any time the Django SECRET_KEY is rotated' | ||
|
||
@staticmethod | ||
def _resign_urls_on_model(obj: models.Model, url_fields: List[str]): | ||
changed = False | ||
total_rows = len(obj.objects.all()) | ||
print(f"Resigning {obj.__name__} URLs ({total_rows} rows)...") | ||
for idx, row in enumerate(obj.objects.all()): | ||
for url_field in url_fields: | ||
url = getattr(row, url_field) | ||
if url: | ||
setattr(row, url_field, new_signed_url(url)) | ||
changed = True | ||
if changed: | ||
row.save() | ||
if idx % 20 == 0: | ||
printProgressBar(idx + 1, total_rows) | ||
printProgressBar(1, 1) | ||
|
||
def resign_urls(self): | ||
self._resign_urls_on_model( | ||
obj=GCodeFile, # type: ignore | ||
url_fields=['url', 'thumbnail1_url', 'thumbnail2_url', 'thumbnail3_url'] | ||
) | ||
self._resign_urls_on_model( | ||
obj=Print, # type: ignore | ||
url_fields=['video_url', 'tagged_video_url', 'poster_url', 'prediction_json_url'] | ||
) | ||
self._resign_urls_on_model( | ||
obj=PrinterEvent, # type: ignore | ||
url_fields=['image_url'] | ||
) | ||
self._resign_urls_on_model( | ||
obj=PrintShotFeedback, # type: ignore | ||
url_fields=['image_url'] | ||
) | ||
|
||
def handle(self, *args, **options): | ||
self.resign_urls() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
import base64 | ||
import hashlib | ||
from datetime import timedelta | ||
|
||
from django.utils import timezone | ||
from urllib.parse import urlparse, parse_qs, urlunparse, ParseResult | ||
from dataclasses import dataclass, field, InitVar | ||
from typing import Optional, List, Union | ||
import hmac | ||
from django.conf import settings | ||
import logging | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
def calculate_hmac_digest(path: str): | ||
"""Returns base64 encoded digest given a 'path'""" | ||
digest = hmac.digest( | ||
key=settings.SECRET_KEY.encode(), | ||
msg=path.encode(), | ||
digest=hashlib.sha256 | ||
) | ||
return base64.urlsafe_b64encode(str(digest).encode()).decode() | ||
|
||
|
||
def new_signed_url(url_str: str) -> str: | ||
""" | ||
Signs a URL based on a given | ||
Signature is appended in the form of URL parameters in the query string. Note that | ||
the entire query string will be replaced (everything after '?' in the URL). | ||
""" | ||
parsed_url = urlparse(url_str) | ||
digest = calculate_hmac_digest(parsed_url.path) | ||
signed_url = parsed_url._replace(query=f"digest={digest}") | ||
return urlunparse(signed_url) | ||
|
||
|
||
@dataclass | ||
class HmacSignedUrl: | ||
"""This dataclass provides functions to check the validity of an HMAC signed url""" | ||
url_str: InitVar[str] | ||
|
||
# Calculated fields are set during __post_init__ | ||
path: str = field(init=False) | ||
supplied_digest: str = field(init=False) | ||
|
||
# Internal fields (don't show in repr) | ||
_parsed_url: ParseResult = field(init=False, repr=False) | ||
_url_params: dict = field(init=False, repr=False) | ||
|
||
def __post_init__(self, url_str: str): | ||
self._parsed_url = urlparse(url_str) | ||
self._url_params = parse_qs(self._parsed_url.query) | ||
self.path = self._parsed_url.path | ||
self.supplied_digest = self._get_single_url_param('digest', None) | ||
if self.supplied_digest is None: | ||
raise ValueError("Must supply a 'digest' parameter to check authorization") | ||
|
||
def _get_single_url_param(self, key: str, default=None) -> str: | ||
""" | ||
Returns first URL parameter value by key name, or (default) if empty. | ||
This function is necessary because parse_qs() returns a dictionary of lists | ||
since urls can contain duplicate query parameters. | ||
""" | ||
vals: List = self._url_params.get(key, []) | ||
return vals[0] if vals else default | ||
|
||
def is_authorized(self) -> bool: | ||
"""Returns True if the supplied digest matches the calculated digest, else False""" | ||
calculated_digest = calculate_hmac_digest(path=self.path) | ||
return hmac.compare_digest(self.supplied_digest, calculated_digest) |
Oops, something went wrong.