From a237c0e686e4144c7c86217b92f20e1cb80b7060 Mon Sep 17 00:00:00 2001
From: sam80180 <sam80180@gmail.com>
Date: Fri, 31 May 2024 01:44:45 +0800
Subject: [PATCH 1/2] fix: add options for setting MJPEG ports

---
 tidevice3/cli/runwda.py | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)

diff --git a/tidevice3/cli/runwda.py b/tidevice3/cli/runwda.py
index 57194f0..bfef47f 100644
--- a/tidevice3/cli/runwda.py
+++ b/tidevice3/cli/runwda.py
@@ -37,8 +37,10 @@ def guess_wda_bundle_id(service_provider: LockdownClient) -> typing.Optional[str
 @click.option('--bundle-id', default=None, help="WebDriverAgent bundle id")
 @click.option("--src-port", default=8100, help="WebDriverAgent listen port")
 @click.option("--dst-port", default=8100, help="local listen port")
+@click.option("--mjpeg-src-port", default=9100, help="MJPEG listen port")
+@click.option("--mjpeg-dst-port", default=9100, help="MJPEG local listen port")
 @pass_rsd
-def cli_runwda(service_provider: LockdownClient, bundle_id: str, src_port: int, dst_port: int):
+def cli_runwda(service_provider: LockdownClient, bundle_id: str, src_port: int, dst_port: int, mjpeg_src_port: int, mjpeg_dst_port: int):
     """run WebDriverAgent"""
     if not bundle_id:
         bundle_id = guess_wda_bundle_id(service_provider)
@@ -49,16 +51,22 @@ def tcp_forwarder():
         logger.info("forwarder started, listen on %s", dst_port)
         forwarder = UsbmuxTcpForwarder(service_provider.udid, dst_port, src_port)
         forwarder.start()
+
+    def mjpeg_forwarder():
+        logger.info("MJPEG forwarder started, listening on :%s", mjpeg_dst_port)
+        forwarder = UsbmuxTcpForwarder(service_provider.udid, mjpeg_dst_port, mjpeg_src_port)
+        forwarder.start()
     
     def xcuitest():
-        XCUITestService(service_provider).run(bundle_id)
+        XCUITestService(service_provider).run(bundle_id, {"MJPEG_SERVER_PORT": mjpeg_src_port, "USE_PORT": src_port})
 
+    thread0 = threading.Thread(target=mjpeg_forwarder, daemon=True)
     thread1 = threading.Thread(target=tcp_forwarder, daemon=True)
     thread2 = threading.Thread(target=xcuitest, daemon=True)
+    thread0.start()
     thread1.start()
     thread2.start()
 
-    while thread1.is_alive() and thread2.is_alive():
+    while thread0.is_alive() and thread1.is_alive() and thread2.is_alive():
         time.sleep(0.1)
     logger.info("Program exited")
-    
\ No newline at end of file

From 4dd545a7a41aa28c5424a8051253d294df261ba5 Mon Sep 17 00:00:00 2001
From: sam80180 <sam80180@gmail.com>
Date: Sat, 29 Jun 2024 02:16:54 +0800
Subject: [PATCH 2/2] fix: add an option for connecting to different tunnel
 server port

---
 tidevice3/api.py            |  4 +--
 tidevice3/cli/cli_common.py | 62 +++++++++++++++++++++++++++++++++++--
 tidevice3/cli/tunneld.py    | 12 ++++---
 3 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/tidevice3/api.py b/tidevice3/api.py
index 0c3fb7a..88962c1 100644
--- a/tidevice3/api.py
+++ b/tidevice3/api.py
@@ -77,13 +77,13 @@ def list_devices(
 
 DEFAULT_TIMEOUT = 60
 
-def connect_service_provider(udid: Optional[str], force_usbmux: bool = False, usbmux_address: Optional[str] = None) -> LockdownServiceProvider:
+def connect_service_provider(udid: Optional[str], force_usbmux: bool = False, usbmux_address: Optional[str] = None, tunneld_port: Optional[int] = 5555) -> LockdownServiceProvider:
     """Connect to device and return LockdownServiceProvider"""
     lockdown = create_using_usbmux(serial=udid, usbmux_address=usbmux_address)
     if force_usbmux:
         return lockdown
     if lockdown.product_version >= "17":
-        return connect_remote_service_discovery_service(lockdown.udid)
+        return connect_remote_service_discovery_service(lockdown.udid, "http://localhost:%d" % (tunneld_port))
     return lockdown
 
 
diff --git a/tidevice3/cli/cli_common.py b/tidevice3/cli/cli_common.py
index 3f1bcbb..e279e00 100644
--- a/tidevice3/cli/cli_common.py
+++ b/tidevice3/cli/cli_common.py
@@ -6,6 +6,7 @@
 from __future__ import annotations
 
 import collections
+import logging;
 from functools import update_wrapper
 
 import click
@@ -14,6 +15,9 @@
 from tidevice3.api import connect_service_provider
 
 
+logger = logging.getLogger(__name__);
+
+
 class OrderedGroup(click.Group):
     def __init__(self, name=None, commands=None, *args, **attrs):
         super(OrderedGroup, self).__init__(name, commands, *args, **attrs)
@@ -24,14 +28,67 @@ def list_commands(self, ctx):
         return self.commands
 
 
+class DeprecatedOption(click.Option): # https://stackoverflow.com/a/50402799/12857692
+    def __init__(self, *args, **kwargs):
+        self.deprecated = kwargs.pop("deprecated", False);
+        self.preferred = kwargs.pop("preferred", None);
+        super(DeprecatedOption, self).__init__(*args, **kwargs);
+    # end __init__()
+# end class
+
+
+class CommandWithDeprecatedOptions(click.Command):
+    def make_parser(self, ctx): # Hook 'make_parser()' and during processing check the name used to invoke the option to see if it is preferred
+        parser = super().make_parser(ctx);
+
+        # get the parser options
+        options = set(parser._short_opt.values());
+        options |= set(parser._long_opt.values());
+        for option in options:
+            if not isinstance(option.obj, DeprecatedOption):
+                continue;
+            # end if
+
+            def make_process(an_option): # construct a closure to the parser option processor
+                orig_process = an_option.process;
+                opt_deprecated = getattr(an_option.obj, "deprecated", None);
+                assert opt_deprecated is not None, "Expected `deprecated` value for `{}`".format(an_option.obj.name);
+                opt_preferred = getattr(an_option.obj, "preferred", None);
+                opt_name = getattr(an_option.obj, "name", None);
+
+                def process(value, state): # only called if the option is set
+                    if opt_deprecated:
+                        vv = ["--"+opt_name];
+                        msg = "The '%s' option is deprecated";
+                        if opt_preferred is not None:
+                            msg = msg+", use '%s' instead";
+                            vv.append(opt_preferred);
+                        # end if
+                        logger.warning(msg % tuple(vv));
+                    # end if
+                    return orig_process(value, state);
+                # end process()
+
+                return process;
+            # end make_process()
+
+            option.process = make_process(option);
+        # end for
+        return parser;
+    # end make_parser()
+# end class
+
+
 @click.group(cls=OrderedGroup, context_settings=dict(help_option_names=["-h", "--help"]))
 @click.option("-u", "--udid", default=None, help="udid of device")
+@click.option("tunneld_port", "--tunneld-port", default=5555, help="tunneld listen port")
 @click.option("usbmux_address", "--usbmux", help=USBMUX_OPTION_HELP)
 @click.pass_context
-def cli(ctx: click.Context, udid: str, usbmux_address: str):
+def cli(ctx: click.Context, udid: str, usbmux_address: str, tunneld_port: int):
     ctx.ensure_object(dict)
     ctx.obj['udid'] = udid
     ctx.obj['usbmux_address'] = usbmux_address
+    ctx.obj["tunneld_port"] = tunneld_port;
 
 
 def pass_service_provider(func):
@@ -50,7 +107,8 @@ def pass_rsd(func):
     def new_func(ctx, *args, **kwargs):
         udid = ctx.obj['udid']
         usbmux_address = ctx.obj['usbmux_address']
-        service_provider = connect_service_provider(udid=udid, usbmux_address=usbmux_address)
+        tunneld_port = ctx.obj["tunneld_port"];
+        service_provider = connect_service_provider(udid=udid, usbmux_address=usbmux_address, tunneld_port=tunneld_port)
         with service_provider:
             return ctx.invoke(func, service_provider, *args, **kwargs)
     return update_wrapper(new_func, func)
diff --git a/tidevice3/cli/tunneld.py b/tidevice3/cli/tunneld.py
index 02bf211..848cff4 100644
--- a/tidevice3/cli/tunneld.py
+++ b/tidevice3/cli/tunneld.py
@@ -21,7 +21,7 @@
 from pymobiledevice3.exceptions import MuxException
 from pymobiledevice3.osu.os_utils import OsUtils
 
-from tidevice3.cli.cli_common import cli
+from tidevice3.cli.cli_common import cli, CommandWithDeprecatedOptions, DeprecatedOption;
 from tidevice3.cli.list import list_devices
 from tidevice3.utils.common import threadsafe_function
 
@@ -161,15 +161,16 @@ def run_forever(self):
             time.sleep(1)
 
 
-@cli.command(context_settings={"show_default": True})
+@cli.command(context_settings={"show_default": True}, cls=CommandWithDeprecatedOptions)
 @click.option(
     "--pmd3-path",
     "pmd3_path",
     help="pymobiledevice3 cli path",
     default=None,
 )
-@click.option("--port", "port", help="listen port", default=5555)
-def tunneld(pmd3_path: str, port: int):
+@click.option("--port", "port", help="listen port", default=5555, cls=DeprecatedOption, deprecated=True, preferred="--tunneld-port")
+@click.pass_context
+def tunneld(ctx: click.Context, pmd3_path: str, port: int):
     """start server for iOS >= 17 auto start-tunnel, function like pymobiledevice3 remote tunneld"""
     if not os_utils.is_admin:
         logger.error("Please run as root(Mac) or administrator(Windows)")
@@ -197,7 +198,8 @@ def shutdown():
         target=manager.run_forever, daemon=True, name="device_manager"
     ).start()
     try:
-        uvicorn.run(app, host="0.0.0.0", port=port)
+        tunneld_port = ctx.obj["tunneld_port"];
+        uvicorn.run(app, host="0.0.0.0", port=(tunneld_port if tunneld_port!=5555 else port))
     finally:
         logger.info("Shutting down...")
         manager.shutdown()