Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

applets: add a bi-directional bulk_speed_test applet #205

Merged
merged 2 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions applets/bulk_in_speed_test.py → applets/bulk_speed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@

import usb1

from luna.gateware.applets.speed_test import USBInSpeedTestDevice, USBInSuperSpeedTestDevice, BULK_ENDPOINT_NUMBER
from luna.gateware.applets.speed_test import (
USBSpeedTestDevice,
USBInSuperSpeedTestDevice,
BULK_ENDPOINT_NUMBER,
VENDOR_ID,
PRODUCT_ID,
)

from luna import top_level_cli, configure_default_logging

Expand All @@ -26,9 +32,13 @@
TRANSFER_QUEUE_DEPTH = 16


def run_speed_test():
def run_speed_test(direction=usb1.ENDPOINT_IN):
""" Runs a simple speed test, and reports throughput. """

if os.getenv('LUNA_SUPERSPEED') and not direction == usb1.ENDPOINT_IN:
logging.error("The SuperSpeed test device does not currently support OUT transfers.")
sys.exit(1)

total_data_exchanged = 0
failed_out = False

Expand Down Expand Up @@ -73,7 +83,7 @@ def _transfer_completed(transfer: usb1.USBTransfer):
with usb1.USBContext() as context:

# Grab a reference to our device...
device = context.openByVendorIDAndProductID(0x16d0, 0x0f3b)
device = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)

# ... and claim its bulk interface.
device.claimInterface(0)
Expand All @@ -84,7 +94,13 @@ def _transfer_completed(transfer: usb1.USBTransfer):

# Allocate the transfer...
transfer = device.getTransfer()
transfer.setBulk(0x80 | BULK_ENDPOINT_NUMBER, TEST_TRANSFER_SIZE, callback=_transfer_completed, timeout=1000)
endpoint = direction | BULK_ENDPOINT_NUMBER

if direction == usb1.ENDPOINT_IN:
transfer.setBulk(endpoint, TEST_TRANSFER_SIZE, callback=_transfer_completed, timeout=1000)
else:
out_test_data = bytearray([x % 256 for x in range(512)])
transfer.setBulk(endpoint, out_test_data, callback=_transfer_completed, timeout=1000)

# ... and store it.
active_transfers.append(transfer)
Expand All @@ -108,7 +124,10 @@ def _transfer_completed(transfer: usb1.USBTransfer):
# Cancel all of our active transfers.
for transfer in active_transfers:
if transfer.isSubmitted():
transfer.cancel()
try:
transfer.cancel()
except:
pass

# If we failed out; indicate it.
if (failed_out):
Expand All @@ -122,25 +141,37 @@ def _transfer_completed(transfer: usb1.USBTransfer):

if __name__ == "__main__":

# If our environment is suggesting we rerun tests, do so.
configure_default_logging()

# If our environment is suggesting we rerun tests without rebuilding, do so.
if os.getenv('LUNA_RERUN_TEST'):
configure_default_logging()
logging.info("Running speed test without rebuilding...")
run_speed_test()

# Otherwise, build and run our tests.
# Otherwise, rebuild.
else:
# Selectively create our device to be either USB3 or USB2 based on the
# SuperSpeed variable.
if os.getenv('LUNA_SUPERSPEED'):
device = top_level_cli(USBInSuperSpeedTestDevice)
else:
device = top_level_cli(USBInSpeedTestDevice,
device = top_level_cli(USBSpeedTestDevice,
fs_only=bool(os.getenv('LUNA_FULL_ONLY')))

logging.info("Giving the device time to connect...")
time.sleep(5)

# Give the device a moment to connect.
if device is not None:
logging.info(f"Starting bulk in speed test.")
run_speed_test()
logging.info("Giving the device time to connect...")
time.sleep(5)

# Run our Bulk IN test.
logging.info(f"Starting Bulk IN speed test.")
run_speed_test(direction=usb1.ENDPOINT_IN)

# Run our Bulk OUT speed test.
#
# Note: The SuperSpeed test device does not yet support an OUT speed test.
if os.getenv('LUNA_SUPERSPEED'):
logging.info("Skipping Bulk OUT speed test.")
logging.info("This test is not yet supported on SuperSpeed devices.")
else:
logging.info(f"Starting Bulk OUT speed test.")
run_speed_test(direction=usb1.ENDPOINT_OUT)
58 changes: 48 additions & 10 deletions luna/gateware/applets/speed_test.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from amaranth import *
from usb_protocol.emitters import DeviceDescriptorCollection, SuperSpeedDeviceDescriptorCollection

from luna.usb2 import USBDevice, USBStreamInEndpoint
from luna.usb2 import USBDevice, USBStreamInEndpoint, USBStreamOutEndpoint
from luna.usb3 import USBSuperSpeedDevice, SuperSpeedStreamInEndpoint

from luna.gateware.platform import NullPin

VENDOR_ID = 0x16d0
PRODUCT_ID = 0x0f3b

BULK_ENDPOINT_NUMBER = 1


class USBInSpeedTestDevice(Elaboratable):
""" Simple device that sends data to the host as fast as hardware can. """
class USBSpeedTestDevice(Elaboratable):
""" Simple device that exchanges data with the host as fast as the hardware can. """

def __init__(self, fs_only=False, phy=None):
self.fs_only = fs_only
Expand All @@ -34,7 +36,7 @@ def create_descriptors(self):
d.idProduct = PRODUCT_ID

d.iManufacturer = "LUNA"
d.iProduct = "IN speed test"
d.iProduct = "speed test"
d.iSerialNumber = "no serial"

d.bNumConfigurations = 1
Expand All @@ -46,10 +48,16 @@ def create_descriptors(self):
with c.InterfaceDescriptor() as i:
i.bInterfaceNumber = 0

# Bulk IN to host (tx, from our side)
with i.EndpointDescriptor() as e:
e.bEndpointAddress = 0x80 | BULK_ENDPOINT_NUMBER
e.wMaxPacketSize = self.max_bulk_packet_size

# Bulk OUT to host (rx, from our side)
with i.EndpointDescriptor() as e:
e.bEndpointAddress = BULK_ENDPOINT_NUMBER
e.wMaxPacketSize = self.max_bulk_packet_size


return descriptors

Expand All @@ -76,19 +84,49 @@ def elaborate(self, platform):
descriptors = self.create_descriptors()
usb.add_standard_control_endpoint(descriptors)

# Add a stream endpoint to our device.
stream_ep = USBStreamInEndpoint(

# Add a stream endpoint to our device for Bulk IN transfers.
stream_in_ep = USBStreamInEndpoint(
endpoint_number=BULK_ENDPOINT_NUMBER,
max_packet_size=self.max_bulk_packet_size
)
usb.add_endpoint(stream_ep)
usb.add_endpoint(stream_in_ep)

# Create a simple data source that increments whenever the
# endpoint is accepting data.
counter = Signal(8)
with m.If(stream_in_ep.stream.ready):
m.d.sync += counter.eq(counter + 1)

# Send entirely zeroes, as fast as we can.
# Send our IN data stream, as fast as we can.
m.d.comb += [
stream_ep.stream.valid .eq(1),
stream_ep.stream.payload .eq(0)
stream_in_ep.stream.valid .eq(1),
stream_in_ep.stream.payload .eq(counter)
]


# Add a stream endpoint to our device for Bulk OUT transfers.
stream_out_ep = USBStreamOutEndpoint(
endpoint_number=BULK_ENDPOINT_NUMBER,
max_packet_size=self.max_bulk_packet_size
)
usb.add_endpoint(stream_out_ep)

# Always accept data as it comes in.
m.d.comb += stream_out_ep.stream.ready.eq(1)

# Receive our OUT data stream, as fast as we can and output
# the received data to our User I/O and LEDS
leds = Cat(platform.request_optional("led", i, default=NullPin()).o for i in range(6))
pmod_a = platform.request_optional("user_pmod", 0, default=NullPin(8))
with m.If(stream_out_ep.stream.valid):
m.d.comb += [
leds.eq(stream_out_ep.stream.payload[2:8]),
pmod_a.o.eq(stream_out_ep.stream.payload),
pmod_a.oe.eq(1),
]


# Connect our device as a high speed device by default.
m.d.comb += [
usb.connect .eq(1),
Expand Down
Loading