Skip to content
6 changes: 6 additions & 0 deletions .changeset/fast-rocks-help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@e2b/python-sdk': minor
'e2b': minor
---

added option to connect to a running pty session
67 changes: 66 additions & 1 deletion packages/js-sdk/src/sandbox/commands/pty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,21 @@ export interface PtyCreateOpts
cwd?: string
}

/**
* Options for connecting to a command.
*/
export type PtyConnectOpts = Pick<PtyCreateOpts, 'onData' | 'timeoutMs'> &
Pick<ConnectionOpts, 'requestTimeoutMs'>

/**
* Module for interacting with PTYs (pseudo-terminals) in the sandbox.
*/
export class Pty {
private readonly rpc: Client<typeof ProcessService>
private readonly envdVersion: string

private readonly defaultPtyConnectionTimeout = 60_000 // 60 seconds

constructor(
private readonly transport: Transport,
private readonly connectionConfig: ConnectionConfig,
Expand Down Expand Up @@ -118,7 +126,7 @@ export class Pty {
[KEEPALIVE_PING_HEADER]: KEEPALIVE_PING_INTERVAL_SEC.toString(),
},
signal: controller.signal,
timeoutMs: opts?.timeoutMs ?? 60_000,
timeoutMs: opts?.timeoutMs ?? this.defaultPtyConnectionTimeout,
}
)

Expand All @@ -141,6 +149,63 @@ export class Pty {
}
}

/**
* Connect to a running PTY.
*
* @param pid process ID of the PTY to connect to. You can get the list of running PTYs using {@link Commands.list}.
* @param opts connection options.
*
* @returns handle to interact with the PTY.
*/
async connect(pid: number, opts?: PtyConnectOpts): Promise<CommandHandle> {
const requestTimeoutMs =
opts?.requestTimeoutMs ?? this.connectionConfig.requestTimeoutMs

const controller = new AbortController()

const reqTimeout = requestTimeoutMs
? setTimeout(() => {
controller.abort()
}, requestTimeoutMs)
: undefined

const events = this.rpc.connect(
{
process: {
selector: {
case: 'pid',
value: pid,
},
},
},
{
signal: controller.signal,
headers: {
[KEEPALIVE_PING_HEADER]: KEEPALIVE_PING_INTERVAL_SEC.toString(),
},
timeoutMs: opts?.timeoutMs ?? this.defaultPtyConnectionTimeout,
}
)

try {
const pid = await handleProcessStartEvent(events)

clearTimeout(reqTimeout)

return new CommandHandle(
pid,
() => controller.abort(),
() => this.kill(pid),
events,
undefined,
undefined,
opts?.onData
)
} catch (err) {
throw handleRpcError(err)
}
}

/**
* Send input to a PTY.
*
Expand Down
48 changes: 48 additions & 0 deletions packages/js-sdk/tests/sandbox/pty/ptyConnect.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { sandboxTest } from '../../setup'
import { assert } from 'vitest'

sandboxTest('pty connect/reconnect', async ({ sandbox }) => {
let output1 = ''
let output2 = ''
const decoder = new TextDecoder()

// First, create a terminal and disconnect the onData handler
const terminal = await sandbox.pty.create({
cols: 80,
rows: 24,
onData: (data: Uint8Array) => {
output1 += decoder.decode(data)
},
envs: { FOO: 'bar' },
})

await sandbox.pty.sendInput(
terminal.pid,
new Uint8Array(Buffer.from('echo $FOO\n'))
)

// Give time for the command output in the first connection
await new Promise((r) => setTimeout(r, 300))

await terminal.disconnect()

// Now connect again, with a new onData handler
const reconnectHandle = await sandbox.pty.connect(terminal.pid, {
onData: (data: Uint8Array) => {
output2 += decoder.decode(data)
},
})

await sandbox.pty.sendInput(
terminal.pid,
new Uint8Array(Buffer.from('echo $FOO\nexit\n'))
)

await reconnectHandle.wait()

assert.equal(terminal.pid, reconnectHandle.pid)
assert.equal(reconnectHandle.exitCode, 0)

assert.include(output1, 'bar')
assert.include(output2, 'bar')
})
47 changes: 47 additions & 0 deletions packages/python-sdk/e2b/sandbox_async/commands/pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,53 @@ async def create(
except Exception as e:
raise handle_rpc_exception(e)

async def connect(
self,
pid: int,
on_data: OutputHandler[PtyOutput],
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None,
) -> AsyncCommandHandle:
"""
Connect to a running PTY.

:param pid: Process ID of the PTY to connect to. You can get the list of running PTYs using `sandbox.pty.list()`.
:param on_data: Callback to handle PTY data
:param timeout: Timeout for the PTY connection in **seconds**. Using `0` will not limit the connection time
:param request_timeout: Timeout for the request in **seconds**

:return: Handle to interact with the PTY
"""
events = self._rpc.aconnect(
process_pb2.ConnectRequest(
process=process_pb2.ProcessSelector(pid=pid),
),
timeout=timeout,
request_timeout=self._connection_config.get_request_timeout(
request_timeout
),
headers={
KEEPALIVE_PING_HEADER: str(KEEPALIVE_PING_INTERVAL_SEC),
},
)

try:
start_event = await events.__anext__()

if not start_event.HasField("event"):
raise SandboxException(
f"Failed to connect to process: expected start event, got {start_event}"
)

return AsyncCommandHandle(
pid=start_event.event.start.pid,
handle_kill=lambda: self.kill(start_event.event.start.pid),
events=events,
on_pty=on_data,
)
except Exception as e:
raise handle_rpc_exception(e)

async def resize(
self,
pid: int,
Expand Down
44 changes: 44 additions & 0 deletions packages/python-sdk/e2b/sandbox_sync/commands/pty.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,50 @@ def create(
except Exception as e:
raise handle_rpc_exception(e)

def connect(
self,
pid: int,
timeout: Optional[float] = 60,
request_timeout: Optional[float] = None,
) -> CommandHandle:
"""
Connect to a running PTY.

:param pid: Process ID of the PTY to connect to. You can get the list of running PTYs using `sandbox.pty.list()`.
:param timeout: Timeout for the PTY connection in **seconds**. Using `0` will not limit the connection time
:param request_timeout: Timeout for the request in **seconds**

:return: Handle to interact with the PTY
"""
events = self._rpc.connect(
process_pb2.ConnectRequest(
process=process_pb2.ProcessSelector(pid=pid),
),
headers={
KEEPALIVE_PING_HEADER: str(KEEPALIVE_PING_INTERVAL_SEC),
},
timeout=timeout,
request_timeout=self._connection_config.get_request_timeout(
request_timeout
),
)

try:
start_event = events.__next__()

if not start_event.HasField("event"):
raise SandboxException(
f"Failed to connect to process: expected start event, got {start_event}"
)

return CommandHandle(
pid=start_event.event.start.pid,
handle_kill=lambda: self.kill(start_event.event.start.pid),
events=events,
)
except Exception as e:
raise handle_rpc_exception(e)

def resize(
self,
pid: int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio

from e2b import AsyncSandbox
from e2b.sandbox.commands.command_handle import PtySize


async def test_connect_to_pty(async_sandbox: AsyncSandbox):
output1 = []
output2 = []

def append_data(data: list, x: bytes):
data.append(x.decode("utf-8"))

# First, create a terminal and disconnect the on_data handler
terminal = await async_sandbox.pty.create(
PtySize(80, 24),
on_data=lambda x: append_data(output1, x),
envs={"FOO": "bar"},
)

await async_sandbox.pty.send_stdin(terminal.pid, b"echo $FOO\n")

# Give time for the command output in the first connection
await asyncio.sleep(0.3)

await terminal.disconnect()

# Now connect again, with a new on_data handler
reconnect_handle = await async_sandbox.pty.connect(
terminal.pid, on_data=lambda x: append_data(output2, x)
)

await async_sandbox.pty.send_stdin(terminal.pid, b"echo $FOO\nexit\n")

await reconnect_handle.wait()

assert terminal.pid == reconnect_handle.pid
assert reconnect_handle.exit_code == 0

assert "bar" in "".join(output1)
assert "bar" in "".join(output2)
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

from e2b.sandbox.commands.command_handle import PtySize


@pytest.mark.skip_debug()
def test_connect_to_pty(sandbox_factory):
sandbox = sandbox_factory(timeout=100)
output = []

def append_data(data: list, x: bytes):
data.append(x.decode("utf-8"))

terminal = sandbox.pty.create(PtySize(80, 24), envs={"FOO": "bar"})

sandbox.pty.send_stdin(terminal.pid, b"echo $FOO\n")

terminal.disconnect()

# Now connect again, with a new on_pty handler
reconnect_handle = sandbox.pty.connect(terminal.pid)

sandbox.pty.send_stdin(terminal.pid, b"echo $FOO\nexit\n")

result = reconnect_handle.wait(on_pty=lambda x: append_data(output, x))

assert terminal.pid == reconnect_handle.pid
assert result.exit_code == 0

assert "bar" in "".join(output)