Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 75 additions & 68 deletions scrapers/SARJ-LLC/SARJ-LLC.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import sys
import re
from typing import Any
from urllib.parse import urlparse, urlencode

# to import from a parent directory we need to add that directory to the system path
Expand All @@ -26,7 +27,8 @@
print("If you have pip (normally installed with python), run this command in a terminal (cmd): pip install requests", file=sys.stderr)
sys.exit()

def scrape_url(url, scrape_type):

def scrape_url(url: str, scrape_type: str) -> dict[str, Any] | None:
parsed = urlparse(url)

*_, date, name = parsed.path.split('/')
Expand All @@ -44,7 +46,7 @@ def scrape_url(url, scrape_type):
return scraped


def query(fragment, query_type):
def query(fragment: dict[str, Any], query_type: str) -> dict[str, Any] | None:
res = None
if query_type in ('scene', 'gallery'):
name = re.sub(r'\W', '_', fragment['title']).upper()
Expand All @@ -57,59 +59,72 @@ def query(fragment, query_type):
res = scraper('https://metartnetwork.com', date, name)
return res

def search(s_type, name):

def capitalize_word(match: re.Match[str]) -> str:
"""Helper function to capitalize words"""
return match.group(0).lower().capitalize()


def transform_name_part(part: str) -> str:
"""Transform a name part by replacing underscores/hyphens and capitalizing"""
cleaned = re.sub('[_-]', ' ', part)
return re.sub(r'\w\S*', capitalize_word, cleaned)


def search(s_type: str, name: str) -> list[dict[str, Any]]:
search_type = {
'scene': 'MOVIE',
'gallery': 'GALLERY',
'performer': 'model'
}[s_type]
page = 1
page_size = 30
args = {
args: dict[str, str | int] = {
'searchPhrase': name,
'pageSize': page_size,
'sortBy': 'relevance'
}

if s_type == 'performer':
def map_result(result):
def map_result(result: dict[str, Any]) -> dict[str, Any]:
item = result['item']
return {
'name': item['name'],
'name': item['name'].strip(),
'url': f"https://www.metartnetwork.com{item['path']}",
}
elif s_type == 'scene':
def map_result(result):
def map_result(result: dict[str, Any]) -> dict[str, Any]:
item = result['item']
studio = get_studio(item['siteUUID'])
image = ""
if studio:
image = f"https://www.{studio[1]}{item['thumbnailCoverPath']}"
return {
'title': item['name'],
'title': item['name'].strip(),
'url': f"https://www.metartnetwork.com{item['path']}",
'date': item['publishedAt'][0:item['publishedAt'].find('T')],
'performers': list(map(lambda m: {'name': m['name']}, item['models'])),
'performers': [{'name': m['name'].strip()} for m in item['models']],
'image': image,
}
else:
return []

results = []
results: list[dict[str, Any]] = []

log.info(f"Searching for {s_type} '{name}'")
while True:
args['page'] = page
response = fetch("https://metartnetwork.com", "search-results", args)

if response is None:
break

results += list(
map(
map_result,
filter(
lambda r: r['type'] == search_type,
response['items']
)
)
)
# Filter results by type and map them
filtered_results = [
map_result(r) for r in response['items']
if r['type'] == search_type
]
results += filtered_results

if page * page_size > response['total'] or len(response['items']) == 0:
break
Expand All @@ -119,7 +134,7 @@ def map_result(result):
return results


def fetch(base_url, fetch_type, arguments):
def fetch(base_url: str, fetch_type: str, arguments: dict[str, str | int]) -> dict[str, Any] | None:
url = f"{base_url}/api/{fetch_type}?{urlencode(arguments)}"
log.debug(f"Fetching URL {url}")
try:
Expand All @@ -134,25 +149,16 @@ def fetch(base_url, fetch_type, arguments):
log.info(f"Fetching URL {url} resulted in error status: {response.status_code}")
return None

data = response.json()
data: dict[str, Any] = response.json()
return data


def scrape_model(base_url, name):
transformed_name = str.join(
' ',
list(
map(
lambda p:
re.sub(
'[_-]',
' ',
re.sub(r'\w\S*', lambda m: m.group(0).lower().capitalize(), p),
),
name.split('-')
)
)
)
def scrape_model(base_url: str, name: str) -> dict[str, Any] | None:
# Transform the name by splitting on hyphens and processing each part
name_parts = name.split('-')
transformed_parts = [transform_name_part(part) for part in name_parts]
transformed_name = ' '.join(transformed_parts)

log.info(f"Scraping model '{name}' as '{transformed_name}'")
data = fetch(base_url, 'model', {'name': transformed_name, 'order': 'DATE', 'direction': 'DESC'})
if data is None:
Expand All @@ -161,10 +167,10 @@ def scrape_model(base_url, name):
return map_model(base_url, data)


def map_media(data, studio, base_url):
urls = []
studio_code = data["UUID"]
studio_name = {'Name': ""}
def map_media(data: dict[str, Any], studio: tuple[str, str] | None, base_url: str) -> dict[str, Any]:
urls: list[str] = []
studio_code = data["UUID"].strip()
studio_name: dict[str, str] = {'Name': ""}

# Sites that never put directors in the "crew" section
# this eliminates picking up "still photographer" as an additional director
Expand All @@ -173,39 +179,39 @@ def map_media(data, studio, base_url):
if studio is not None:
studio_url = studio[1]
urls = [f"https://www.{studio_url}{data['path']}"]
studio_name = {'Name': studio[0]}
studio_name = {'Name': studio[0].strip()}

director = None
directors = []
director: str | None = None
directors: list[str] = []

# director seems to be included in `photographers` and `crew` section
if data.get("photographers"):
for director in data['photographers']:
directors.append(director.get('name').strip())
if data.get('crew') and studio_name["Name"] not in directors_not_in_crew:
# some sites only use the `photograpers`` section for director
# some sites only use the `photograpers`` section for director
for crew in data['crew']:
if crew.get('role') == "Still Photographer":
for crew_name in crew.get('names'):
name = crew_name.strip()
if name not in directors:
directors.append(name)
director = ", ".join(directors)
director = ", ".join(directors).strip()

return {
'Title': data['name'],
'Details': data['description'],
'Title': data['name'].strip(),
'Details': data.get('description', "").strip() or None,
'URLs': urls,
'Date': data['publishedAt'][0:data['publishedAt'].find('T')],
'Tags': list(map(lambda t: {'Name': t}, data['tags'])),
'Performers': list(map(lambda m: map_model(base_url, m), data['models'])),
'Tags': [{'Name': t.strip()} for t in data['tags']],
'Performers': [map_model(base_url, m) for m in data['models']],
'Studio': studio_name,
'Code': studio_code,
"Director": director
}


def scrape_movie(base_url, date, name):
def scrape_movie(base_url: str, date: str, name: str) -> dict[str, Any] | None:
log.info(f"Scraping movie '{name}' released on {date}")
data = fetch(base_url, 'movie', {'name': name, 'date': date})
if data is None:
Expand Down Expand Up @@ -234,7 +240,7 @@ def scrape_movie(base_url, date, name):
return res


def scrape_gallery(base_url, date, name):
def scrape_gallery(base_url: str, date: str, name: str) -> dict[str, Any] | None:
log.info(f"Scraping gallery '{name}' released on {date}")
data = fetch(base_url, 'gallery', {'name': name, 'date': date})
if data is None:
Expand All @@ -249,14 +255,15 @@ def scrape_gallery(base_url, date, name):
return map_media(data, studio, base_url)


def map_model(base_url, model):
tags = list(map(lambda t: {'Name': t}, model['tags']))
def map_model(base_url: str, model: dict[str, Any]) -> dict[str, Any]:
# Convert tags list to dictionary format
tags: list[dict[str, str]] = [{'Name': t.strip()} for t in model['tags']]

def add_tag(key, tag_format):
def add_tag(key: str, tag_format: str) -> None:
nonlocal tags
if key in model and model[key] != "":
tags.append({
'Name': tag_format.format(model[key])
'Name': tag_format.format(model[key]).strip()
})

add_tag('hair', '{} hair')
Expand All @@ -270,23 +277,23 @@ def add_tag(key, tag_format):
country_name = None

return {
'Name': model.get("name"),
'Gender': model.get("gender" or "").upper(),
'Name': model.get("name", "").strip(),
'Gender': model.get("gender", "").upper().strip(),
'URL': f"{base_url}{model.get('path')}",
'Ethnicity': model.get("ethnicity"),
'Ethnicity': model.get("ethnicity", "").strip() or None,
'Country': country_name,
'Height': str(model.get("height")),
'Weight': str(model.get("weight")),
'Measurements': model.get("size"),
'Details': model.get("biography"),
'hair_color': model.get("hair" or "").capitalize(),
'eye_color': model.get("eyes" or "").capitalize(),
'Height': str(model.get("height", "")).strip(),
'Weight': str(model.get("weight", "")).strip(),
'Measurements': model.get("size", "").strip() or None,
'Details': model.get("biography", "").strip() or None,
'hair_color': model.get("hair", "").capitalize().strip(),
'eye_color': model.get("eyes", "").capitalize().strip(),
'Image': f"https://cdn.metartnetwork.com/{model.get('siteUUID')}{model.get('headshotImagePath')}",
'Tags': tags
}


studios = {
studios: dict[str, tuple[str, str]] = {
'2163551D11D0439686AD9D291C8DFD71': ('ALS Scan', 'alsscan.com'),
'D0E7E33329311E3BB6E0800200C93255': ('Domai', 'domai.com'),
'FDA021004E3411DF98790800200C9A66': ('Erotic Beauty', 'eroticbeauty.com'),
Expand All @@ -306,7 +313,7 @@ def add_tag(key, tag_format):
}


def validate_url(url):
def validate_url(url: str | None) -> bool:
if url is None or not re.match('^https?://', url):
return False

Expand All @@ -320,7 +327,7 @@ def validate_url(url):
return False


def get_studio(site_uuid):
def get_studio(site_uuid: str) -> tuple[str, str] | None:
return studios[site_uuid] if site_uuid in studios else None


Expand Down