Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: include stream in db, rework blacklisting #575

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/program/db/db_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@ def _get_item_from_db(session, item: MediaItem):
match type:
case "movie":
r = session.execute(select(Movie).where(MediaItem.imdb_id==item.imdb_id).options(joinedload("*"))).unique().scalar_one()
r.set("streams", item.get("streams", {}))
return r
case "show":
r = session.execute(select(Show).where(MediaItem.imdb_id==item.imdb_id).options(joinedload("*"))).unique().scalar_one()
r.set("streams", item.get("streams", {}))
return r
case "season":
r = session.execute(select(Season).where(Season._id==item._id).options(joinedload("*"))).unique().scalar_one()
r.set("streams", item.get("streams", {}))
return r
case "episode":
r = session.execute(select(Episode).where(Episode._id==item._id).options(joinedload("*"))).unique().scalar_one()
r.set("streams", item.get("streams", {}))
return r
case _:
logger.error(f"_get_item_from_db Failed to create item from type: {type}")
Expand Down
8 changes: 7 additions & 1 deletion src/program/downloaders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,10 @@ def validate(self):
def run(self, item: MediaItem):
for service in self.services.values():
if service.initialized:
return service.run(item)
downloaded = service.run(item)
if not downloaded:
if item.type == "show":
yield [season for season in item.seasons]
elif item.type == "season":
yield [episode for episode in item.episodes]
yield item
30 changes: 10 additions & 20 deletions src/program/downloaders/alldebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,14 @@ def validate(self) -> bool:
logger.exception(f"Failed to validate All-Debrid settings: {e}")
return False

def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
def run(self, item: MediaItem) -> bool:
"""Download media item from all-debrid.com"""
if (item.file and item.folder):
yield None
return
if not self.is_cached(item):
if isinstance(item, Season):
res = [e for e in item.episodes]
yield res
return
if isinstance(item, Show):
res = [s for s in item.seasons]
yield res
return
yield None
return
if not self._is_downloaded(item):
return_value = False
if self.is_cached(item) and not self._is_downloaded(item):
self._download_item(item)
return_value = True
self.log_item(item)
yield item
return return_value

@staticmethod
def log_item(item: MediaItem) -> None:
Expand Down Expand Up @@ -165,7 +153,7 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
logger.log("DEBRID", f"Processing {len(item.streams)} streams for {item.log_string}")

processed_stream_hashes = set()
filtered_streams = [hash for hash in item.streams if hash and hash not in processed_stream_hashes]
filtered_streams = [stream.infohash for stream in item.streams if stream.infohash and stream.infohash not in processed_stream_hashes]
if not filtered_streams:
logger.log("NOT_FOUND", f"No streams found from filtering: {item.log_string}")
return False
Expand All @@ -182,7 +170,6 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
except Exception as e:
logger.error(f"Error checking cache for streams: {str(e)}", exc_info=True)

item.set("streams", {})
logger.log("NOT_FOUND", f"No wanted cached streams found for {item.log_string} out of {len(filtered_streams)}")
return False

Expand All @@ -203,7 +190,10 @@ def _evaluate_stream_response(self, data, processed_stream_hashes, item):

processed_stream_hashes.add(stream_hash)
if self._process_providers(item, magnet, stream_hash):
return True
return True
else:
stream = next(stream for stream in item.streams if stream.infohash == stream_hash)
stream.blacklisted = True
return False

def _process_providers(self, item: MediaItem, magnet: dict, stream_hash: str) -> bool:
Expand Down
30 changes: 9 additions & 21 deletions src/program/downloaders/realdebrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,14 @@ def validate(self) -> bool:
logger.error("Couldn't parse user data response from Real-Debrid.")
return False


def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
def run(self, item: MediaItem) -> bool:
"""Download media item from real-debrid.com"""
if (item.file and item.folder):
yield None
return
if not self.is_cached(item):
if isinstance(item, Season):
res = [e for e in item.episodes]
yield res
return
if isinstance(item, Show):
res = [s for s in item.seasons]
yield res
return
yield None
return
if not self._is_downloaded(item):
return_value = False
if self.is_cached(item) and not self._is_downloaded(item):
self._download_item(item)
return_value = True
self.log_item(item)
yield item
return return_value

@staticmethod
def log_item(item: MediaItem) -> None:
Expand Down Expand Up @@ -153,7 +140,7 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
logger.log("DEBRID", f"Processing {len(item.streams)} streams for {item.log_string}")

processed_stream_hashes = set()
filtered_streams = [hash for hash in item.streams if hash and hash not in processed_stream_hashes]
filtered_streams = [stream.infohash for stream in item.streams if stream.infohash and stream.infohash not in processed_stream_hashes]
if not filtered_streams:
logger.log("NOT_FOUND", f"No streams found from filtering: {item.log_string}")
return False
Expand All @@ -164,13 +151,11 @@ def _chunked(lst: List, n: int) -> Generator[List, None, None]:
response = get(f"{RD_BASE_URL}/torrents/instantAvailability/{streams}/", additional_headers=self.auth_headers, proxies=self.proxy, response_type=dict, specific_rate_limiter=self.torrents_rate_limiter, overall_rate_limiter=self.overall_rate_limiter)
if response.is_ok and response.data and isinstance(response.data, dict):
if self._evaluate_stream_response(response.data, processed_stream_hashes, item):
item.set("streams", {})
return True
except Exception as e:
logger.exception(f"Error checking cache for streams: {str(e)}", exc_info=True)
continue

item.set("streams", {})
logger.log("NOT_FOUND", f"No wanted cached streams found for {item.log_string} out of {len(filtered_streams)}")
return False

Expand All @@ -185,6 +170,9 @@ def _evaluate_stream_response(self, data, processed_stream_hashes, item):
if self._process_providers(item, provider_list, stream_hash):
logger.debug(f"Finished processing providers - selecting {stream_hash} for downloading")
return True
else:
stream = next(stream for stream in item.streams if stream.infohash == stream_hash)
stream.blacklisted = True
return False

def _process_providers(self, item: MediaItem, provider_list: dict, stream_hash: str) -> bool:
Expand Down
19 changes: 12 additions & 7 deletions src/program/downloaders/torbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ def validate(self) -> bool:
logger.exception(f"Failed to validate Torbox settings: {e}")
return False

def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
def run(self, item: MediaItem) -> bool:
"""Download media item from torbox.app"""
cached_hashes = self.get_torrent_cached([hash for hash in item.streams])
return_value = False
cached_hashes = self.get_torrent_cached([stream.infohash for stream in item.streams])
if cached_hashes:
for cache in cached_hashes.values():
item.active_stream = cache
Expand All @@ -82,15 +83,19 @@ def run(self, item: MediaItem) -> Generator[MediaItem, None, None]:
{"hash": cache["hash"], "files": cache["files"], "id": None},
)
self.download(item)
return_value = True
break
else:
stream = next(stream for stream in item.streams if stream.infohash == cache["hash"])
stream.blacklisted = True
else:
logger.log("DEBRID", f"Item is not cached: {item.log_string}")
for hash in item.streams:
for stream in item.streams:
logger.log(
"DEBUG", f"Blacklisting hash ({hash}) for item: {item.log_string}"
"DEBUG", f"Blacklisting hash ({stream.infohash}) for item: {item.log_string}"
)
item.streams = {}
yield item
stream.blacklisted = True
return return_value

def find_required_files(self, item, container):

Expand Down Expand Up @@ -267,7 +272,7 @@ def get_torrent_cached(self, hash_list):
return response.data["data"]

def create_torrent(self, hash) -> int:
magnet_url = f"magnet:?xt=urn:btih:{hash}"
magnet_url = f"magnet:?xt=urn:btih:{hash}&dn=&tr="
response = post(
f"{self.base_url}/torrents/createtorrent",
data={"magnet": magnet_url, "seed": 1, "allow_zip": False},
Expand Down
49 changes: 8 additions & 41 deletions src/program/media/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from program.media.state import States
from RTN import Torrent, parse
from sqlalchemy import orm
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.orm import Mapped, mapped_column, relationship, collections
from .stream import Stream

# from RTN.patterns import extract_episodes
from utils.logger import logger
Expand All @@ -26,6 +27,7 @@ class MediaItem(db.Model):
scraped_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True)
scraped_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0)
active_stream: Mapped[Optional[dict[str, str]]] = mapped_column(sqlalchemy.JSON, nullable=True)
streams: Mapped[List[Stream]] = relationship("Stream", back_populates="parent", cascade="all, delete-orphan")
symlinked: Mapped[Optional[bool]] = mapped_column(sqlalchemy.Boolean, default=False)
symlinked_at: Mapped[Optional[datetime]] = mapped_column(sqlalchemy.DateTime, nullable=True)
symlinked_times: Mapped[Optional[int]] = mapped_column(sqlalchemy.Integer, default=0)
Expand Down Expand Up @@ -54,18 +56,7 @@ class MediaItem(db.Model):
"polymorphic_on":"type",
"with_polymorphic":"*",
}
@orm.reconstructor
def init_on_load(self):
self.streams: Optional[dict[str, Torrent]] = {}
def __init__(self, item: dict) -> None:
# id: Mapped[int] = mapped_column(primary_key=True)
# name: Mapped[str] = mapped_column(String(30))
# fullname: Mapped[Optional[str]]
# addresses: Mapped[List["Address"]] = relationship(lazy=False,
# back_populates="user", cascade="all, delete-orphan"
# )
# user_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("user_account.id"))
# user: Mapped["User"] = relationship(lazy=False, back_populates="addresses")
self.requested_at = item.get("requested_at", datetime.now())
self.requested_by = item.get("requested_by")

Expand All @@ -74,7 +65,7 @@ def __init__(self, item: dict) -> None:
self.scraped_at = None
self.scraped_times = 0
self.active_stream = item.get("active_stream", {})
self.streams: Optional[dict[str, Torrent]] = {}
self.streams: Optional[list[Stream]] = []

self.symlinked = False
self.symlinked_at = None
Expand Down Expand Up @@ -168,20 +159,9 @@ def copy_other_media_attr(self, other):
self.overseerr_id = getattr(other, "overseerr_id", None)

def is_scraped(self):
return len(self.streams) > 0

def is_checked_for_availability(self):
"""Check if item has been checked for availability."""
if self.streams:
return all(
stream.get("cached", None) is not None
for stream in self.streams.values()
)
return False

def has_complete_metadata(self) -> bool:
"""Check if the item has complete metadata."""
return self.title is not None and self.aired_at is not None
return (len(self.streams) > 0
and
all(stream.blacklisted == False for stream in self.streams))

def to_dict(self):
"""Convert item to dictionary (API response)"""
Expand Down Expand Up @@ -289,9 +269,6 @@ class Movie(MediaItem):
"polymorphic_identity": "movie",
"polymorphic_load": "inline",
}
@orm.reconstructor
def init_on_load(self):
self.streams: Optional[dict[str, Torrent]] = {}

def __init__(self, item):
self.type = "movie"
Expand All @@ -312,10 +289,6 @@ class Season(MediaItem):
parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("Show._id"), use_existing_column=True)
parent: Mapped["Show"] = relationship(lazy=False, back_populates="seasons", foreign_keys="Season.parent_id")
episodes: Mapped[List["Episode"]] = relationship(lazy=False, back_populates="parent", single_parent=True, cascade="all, delete-orphan", foreign_keys="Episode.parent_id")
@orm.reconstructor
def init_on_load(self):
self.streams: Optional[dict[str, Torrent]] = {}

__mapper_args__ = {
"polymorphic_identity": "season",
"polymorphic_load": "inline",
Expand Down Expand Up @@ -408,9 +381,6 @@ class Episode(MediaItem):
_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id"), primary_key=True)
parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("Season._id"), use_existing_column=True)
parent: Mapped["Season"] = relationship(lazy=False, back_populates="episodes", foreign_keys="Episode.parent_id")
@orm.reconstructor
def init_on_load(self):
self.streams: Optional[dict[str, Torrent]] = {}

__mapper_args__ = {
"polymorphic_identity": "episode",
Expand Down Expand Up @@ -464,10 +434,7 @@ class Show(MediaItem):
__tablename__ = "Show"
_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id"), primary_key=True)
seasons: Mapped[List["Season"]] = relationship(lazy=False, back_populates="parent", single_parent=True, cascade="all, delete-orphan", foreign_keys="Season.parent_id")
@orm.reconstructor
def init_on_load(self):
self.streams: Optional[dict[str, Torrent]] = {}


__mapper_args__ = {
"polymorphic_identity": "show",
"polymorphic_load": "inline",
Expand Down
34 changes: 34 additions & 0 deletions src/program/media/stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Optional
from RTN import Torrent
from program.db.db import db
import sqlalchemy
from sqlalchemy.orm import Mapped, mapped_column, relationship


class Stream(db.Model):
__tablename__ = "Stream"
_id: Mapped[int] = mapped_column(primary_key=True)
infohash: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False)
raw_title: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False)
parsed_title: Mapped[str] = mapped_column(sqlalchemy.String, nullable=False)
rank: Mapped[int] = mapped_column(sqlalchemy.Integer, nullable=False)
lev_ratio: Mapped[float] = mapped_column(sqlalchemy.Float, nullable=False)
blacklisted: Mapped[bool] = mapped_column(sqlalchemy.Boolean, nullable=False)

parent_id: Mapped[int] = mapped_column(sqlalchemy.ForeignKey("MediaItem._id"))
parent = relationship("MediaItem", back_populates="streams", cascade="all, delete-orphan", single_parent=True)

def __init__(self, torrent: Torrent):
self.raw_title = torrent.raw_title
self.infohash = torrent.infohash
self.parsed_title = torrent.data.parsed_title
self.rank = torrent.rank
self.lev_ratio = torrent.lev_ratio
self.blacklisted = False

def __hash__(self):
return self.infohash

def __eq__(self, other):
return isinstance(other, Stream) and self.infohash == other.infohash

Loading
Loading