Skip to content

[IMP] papermuncher: Refactor and update the Python bindings #66

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 0 additions & 141 deletions meta/bindings/python/papermuncher.py

This file was deleted.

3 changes: 3 additions & 0 deletions meta/bindings/python/papermuncher/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .render import render, to_pdf, to_image
from .binding import paper_muncher
from .types import Environment
134 changes: 134 additions & 0 deletions meta/bindings/python/papermuncher/_internal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Module with internal functions for handling streams and URLs."""

import asyncio
import logging
import os.path

from io import BytesIO
from functools import wraps
from contextlib import asynccontextmanager
from urllib.parse import urlparse

from typing import Iterable, Optional, overload, Union
from .types import Environment, Streamable

_logger = logging.getLogger(__name__)


@overload
def to_stream(
instance : Union[str, bytes, BytesIO],
environment : Optional[Environment] = None,
) -> Iterable[BytesIO]:
...

@overload
def to_stream(
instance : Streamable,
environment : Optional[Environment] = None,
) -> Iterable[Streamable]:
...

@asynccontextmanager
async def to_stream(
instance : Union[Streamable, str, bytes],
environment : Optional[Environment] = None,
) -> Iterable[Streamable]:
"""
Convert various types of input (str, bytes, Streamable) to a stream.
This function handles different types of input and returns a streamable
object. It can also handle URLs and file paths, converting them to
appropriate streamable objects.
Args:
instance (Union[Streamable, str, bytes]): The input to convert to a
stream.
environment (Optional[Environment]): An optional environment object
for handling URLs (this arg is also used to know if PM is piped).
Yields:
Iterable[Streamable]: A streamable object.
"""
def has_all_attrs(obj, attrs):
return all(hasattr(obj, attr) for attr in attrs)

try:
if has_all_attrs(instance, ['read', 'readline']):
yield instance
else:
if isinstance(instance, bytes):
future_stream = instance
elif isinstance(instance, str):
if environment is not None and os.path.isfile(instance):
with open(instance, 'rb') as file_stream:
yield file_stream
future_stream = instance.encode('utf-8')
elif hasattr('__bytes__', instance):
future_stream = bytes(instance)
elif hasattr('__str__', instance):
future_stream = str(instance).encode('utf-8')
else:
raise TypeError(f"Unsupported type: {type(instance)}")

url = urlparse(future_stream)
if environment is not None and url.scheme and url.netloc:
yield await environment.get_asset(future_stream)
else:
stream = BytesIO(future_stream)
stream.seek(0)
yield stream

except Exception as e:
_logger.error("Error converting to stream: %s", e)
raise
else:
if 'stream' in locals():
stream.close()


def with_pmoptions_init(cls):
"""Decorator to override __init__ to support PMOptions passthrough."""
original_init = cls.__init__

@wraps(original_init)
def __init__(self, **kwargs):
# If any value is a PMOptions, use it to populate fields
option_like = next((v for v in kwargs.values() if isinstance(v, cls)), None)
if option_like:
original_init(self, **option_like.__dict__)
else:
original_init(self, **kwargs)

cls.__init__ = __init__
return cls


class SyncStreamWrapper(Streamable):
"""
A synchronous wrapper for an asynchronous stream.
This class allows synchronous code to interact with an
asynchronous stream by providing synchronous methods for reading
and iterating over the stream.
"""

def __init__(self, async_stream: asyncio.StreamReader):
self.async_stream = async_stream

def read(self, size=-1) -> bytes:
return asyncio.run(self._read(size))

async def _read(self, size=-1) -> bytes:
return await self.async_stream.read(size)

def readline(self) -> bytes:
return asyncio.run(self._readline())

async def _readline(self) -> bytes:
return await self.async_stream.readline()

def __iter__(self):
return self

def __next__(self):
chunk = asyncio.run(self.async_stream.read(4096))
if not chunk:
raise StopIteration
return chunk
Loading
Loading