Skip to content

Commit

Permalink
#8 - Preliminary testing with user/list endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Russell-Newton committed Jan 8, 2023
1 parent 6c8158f commit a9e5392
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 34 deletions.
89 changes: 70 additions & 19 deletions src/tiktokapipy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Generic,
Iterator,
List,
Optional,
Tuple,
Type,
TypeVar,
Expand Down Expand Up @@ -102,6 +103,17 @@ def fetch(self, idx: int) -> Video:
return self._api.video(video_link(self._light_models[idx].id))


class LightUserListIter(LightIter[LightUser, User]):
"""
Utility class to lazy-load users retrieved as a class:`.User`'s follower/following list so they aren't all
loaded at once.
:autodoc-skip:
"""

def fetch(self, idx: int) -> User:
return self._api.user(self._light_models[idx].unique_id)


class LightChallengeIter(LightIter[LightChallenge, Challenge]):
"""
Utility class to lazy-load challenges retrieved under a :class:`.Video` loaded at once.
Expand Down Expand Up @@ -211,17 +223,26 @@ def context(self):
raise TikTokAPIError("TikTokAPI must be used as a context manager")
return self._context

@property
def _light_videos_iter_type(self) -> Type[DeferredIterator[Video]]:
return LightVideoIter
def _light_videos_iter(
self, models: List[LightVideo]
) -> DeferredIterator[LightVideo, Video]:
return LightVideoIter(models, self)

@property
def _light_challenge_iter_type(self) -> Type[DeferredIterator[Challenge]]:
return LightChallengeIter
# def _light_user_list_iter(self, scene: int, sec_uid: str) -> DeferredIterator[LightUser, User]:
# return LightUserListIter(scene, sec_uid, self)

@property
def _light_user_getter_type(self):
return LightUserGetter
def _light_user_list_iter(
self, models: List[LightUser]
) -> DeferredIterator[LightUser, User]:
return LightUserListIter(models, self)

def _light_challenge_iter(
self, models: List[LightChallenge]
) -> DeferredIterator[LightChallenge, Challenge]:
return LightChallengeIter(models, self)

def _light_user_getter(self, user: str):
return LightUserGetter(user, self)

@property
def _challenge_response_type(self):
Expand Down Expand Up @@ -267,7 +288,12 @@ def user(self, user: Union[int, str], video_limit: int = 0) -> User:
"""
link = user_link(user)
response, api_extras = self._scrape_data(link, self._user_response_type)
return self._extract_user_from_response(response, api_extras, video_limit)
user = self._extract_user_from_response(response, api_extras, video_limit)

user.following = self._grab_user_list(21, user.sec_uid)
user.followers = self._grab_user_list(67, user.sec_uid)

return user

def video(self, link: str) -> Video:
"""
Expand Down Expand Up @@ -405,7 +431,34 @@ def _create_video_iter(
videos += extra.item_list
if video_limit > 0:
videos = videos[:video_limit]
return self._light_videos_iter_type(videos, self)
return self._light_videos_iter(videos)

def _grab_user_list(
self, scene: int, sec_uid: str
) -> Optional[DeferredIterator[LightUser, User]]:
min_cursor = 0
out_list = []
try:
while min_cursor != -1:
list_request = self.context.request.fetch(
f"https://www.tiktok.com/api/user/list/"
f"?minCursor={min_cursor}&scene={scene}&count=200&secUid={sec_uid}"
)
# print(list_request)
response = APIResponse.parse_obj(list_request.json())
if response.status_code == 10222:
raise TikTokAPIError("The requested user list is set to private.")

user_list = response.user_list
if user_list:
out_list.extend([item.user for item in user_list])

min_cursor = response.min_cursor

return self._light_user_list_iter(out_list)
except (playwright.sync_api.Error, json.JSONDecodeError, TikTokAPIError) as e:
warnings.warn(f"Was unable to grab user list from scene {scene}:\n{e}")
return None

def _extract_video_from_response(
self,
Expand All @@ -429,30 +482,28 @@ def _extract_video_from_response(
comments += extra.comments
for comment in comments:
if isinstance(comment.user, LightUser):
comment.author = self._light_user_getter_type(
comment.user.unique_id, self
)
comment.author = self._light_user_getter(comment.user.unique_id)
else:
comment.author = self._light_user_getter_type(comment.user, self)
comment.author = self._light_user_getter(comment.user)

video.comments = comments
if not video.comments:
warnings.warn(
"Was unable to collect comments.\nA second attempt might work."
)
if isinstance(video.author, LightUser):
video.creator = self._light_user_getter_type(video.author.unique_id, self)
video.creator = self._light_user_getter(video.author.unique_id)
else:
video.creator = self._light_user_getter_type(video.author, self)
video.creator = self._light_user_getter(video.author)

video.tags = self._create_challenge_iter(video)

return video

def _create_challenge_iter(self, video: Video):
if not video.challenges:
return self._light_challenge_iter_type([], self)
return self._light_challenge_iter_type(video.challenges, self)
return self._light_challenge_iter([])
return self._light_challenge_iter(video.challenges)

def _scroll_page_down(self, page: Page):
page.evaluate(
Expand Down
74 changes: 62 additions & 12 deletions src/tiktokapipy/async_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

import json
import traceback
import warnings
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Callable, Generic, List, Tuple, Type
from typing import TYPE_CHECKING, Callable, Generic, List, Optional, Tuple, Type

if TYPE_CHECKING:
from _typeshed import SupportsLessThan
Expand All @@ -26,7 +27,7 @@
from tiktokapipy.models import AsyncDeferredIterator
from tiktokapipy.models.challenge import Challenge, LightChallenge, challenge_link
from tiktokapipy.models.raw_data import APIResponse
from tiktokapipy.models.user import User, user_link
from tiktokapipy.models.user import LightUser, User, user_link
from tiktokapipy.models.video import LightVideo, Video, video_link


Expand Down Expand Up @@ -75,6 +76,17 @@ async def fetch(self, idx: int) -> Video:
return await self._api.video(video_link(self._light_models[idx].id))


class AsyncLightUserIter(AsyncLightIter[LightUser, User]):
"""
Utility class to lazy-load users retrieved as a class:`.User`'s follower/following list so they aren't all
loaded at once.
:autodoc-skip:
"""

async def fetch(self, idx: int) -> User:
return await self._api.user(self._light_models[idx].unique_id)


class AsyncLightChallengeIter(AsyncLightIter[LightChallenge, Challenge]):
"""
Utility class to lazy-load challenges retrieved under a :class:`.Video` loaded at once.
Expand Down Expand Up @@ -120,17 +132,23 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.browser.close()
await self.playwright.stop()

@property
def _light_videos_iter_type(self) -> Type[AsyncDeferredIterator[Video]]:
return AsyncLightVideoIter
def _light_videos_iter(
self, models: List[LightVideo]
) -> AsyncDeferredIterator[LightVideo, Video]:
return AsyncLightVideoIter(models, self)

def _light_user_list_iter(
self, models: List[LightUser]
) -> AsyncDeferredIterator[LightUser, User]:
return AsyncLightUserIter(models, self)

@property
def _light_challenge_iter_type(self) -> Type[AsyncDeferredIterator[Challenge]]:
return AsyncLightChallengeIter
def _light_challenge_iter(
self, models: List[LightChallenge]
) -> AsyncDeferredIterator[LightChallenge, Challenge]:
return AsyncLightChallengeIter(models, self)

@property
def _light_user_getter_type(self):
return AsyncLightUserGetter
def _light_user_getter(self, user: str):
return AsyncLightUserGetter(user, self)

async def _scrape_data(
self, link: str, data_model: Type[_DataModelT]
Expand Down Expand Up @@ -198,12 +216,44 @@ async def challenge(self, challenge_name: str, video_limit: int = 0) -> Challeng
async def user(self, user: str, video_limit: int = 0) -> User:
link = user_link(user)
response, api_extras = await self._scrape_data(link, self._user_response_type)
return self._extract_user_from_response(response, api_extras, video_limit)
user = self._extract_user_from_response(response, api_extras, video_limit)

user.following = await self._grab_user_list(21, user.sec_uid)
user.followers = await self._grab_user_list(67, user.sec_uid)

return user

async def video(self, link: str) -> Video:
response, api_extras = await self._scrape_data(link, self._video_response_type)
return self._extract_video_from_response(response, api_extras)

async def _grab_user_list(
self, scene: int, sec_uid: str
) -> Optional[AsyncDeferredIterator[LightUser, User]]:
min_cursor = 0
out_list = []
try:
while min_cursor != -1:
list_request = await self.context.request.fetch(
f"https://us.tiktok.com/api/user/list/"
f"?minCursor={min_cursor}&scene={scene}&count=200&secUid={sec_uid}"
)
print(list_request)
response = APIResponse.parse_obj(list_request.json())
if response.status_code == 10222:
raise TikTokAPIError("The requested user list is set to private.")

user_list = response.user_list
if user_list:
out_list.extend([item.user for item in user_list])

min_cursor = response.min_cursor

return self._light_user_list_iter(out_list)
except (playwright.sync_api.Error, json.JSONDecodeError, TikTokAPIError) as e:
warnings.warn(f"Was unable to grab user list from scene {scene}:\n{e}")
return None

async def _scroll_page_down(self, page: Page):
await page.evaluate(
"""
Expand Down
11 changes: 9 additions & 2 deletions src/tiktokapipy/models/raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tiktokapipy.models import CamelCaseModel, TitleCaseModel
from tiktokapipy.models.challenge import Challenge, ChallengeStats
from tiktokapipy.models.comment import Comment
from tiktokapipy.models.user import User, UserStats
from tiktokapipy.models.user import LightUser, User, UserStats
from tiktokapipy.models.video import LightVideo, Video


Expand All @@ -20,6 +20,10 @@ class UserModule(CamelCaseModel):
stats: Dict[str, UserStats]


class UserListItem(CamelCaseModel):
user: LightUser


class ChallengeInfo(CamelCaseModel):
""":autodoc-skip:"""

Expand All @@ -44,11 +48,14 @@ class APIResponse(CamelCaseModel):

status_code: int = 0
cursor: Optional[int]
has_more: Union[bool, int]
has_more: Optional[Union[bool, int]]
min_cursor: Optional[int]
max_cursor: Optional[int]

total: Optional[int]
comments: Optional[List[Comment]]
item_list: Optional[List[LightVideo]]
user_list: Optional[List[UserListItem]]


class PrimaryResponseType(TitleCaseModel):
Expand Down
16 changes: 15 additions & 1 deletion src/tiktokapipy/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class LightUser(CamelCaseModel):

unique_id: str
"""The User's unique user"""
sec_uid: str


class User(LightUser):
Expand All @@ -44,7 +45,6 @@ class User(LightUser):
########################
# Security information #
########################
sec_uid: str
private_account: Optional[bool]
verified: Optional[bool]
# secret: Optional[bool]
Expand Down Expand Up @@ -88,6 +88,20 @@ class User(LightUser):
]
]
"""Set on return from API. Can be iterated over to load :class:`.Video`s."""
following: Optional[
Union[
DeferredIterator[LightUser, User],
AsyncDeferredIterator[LightUser, User],
]
]
"""Set on return from API. Can be iterated over to load Users this User is following."""
followers: Optional[
Union[
DeferredIterator[LightUser, User],
AsyncDeferredIterator[LightUser, User],
]
]
"""Set on return from API. Can be iterated over to load Users following this User."""


from tiktokapipy.models.video import LightVideo, Video # noqa E402
Expand Down

0 comments on commit a9e5392

Please sign in to comment.