Skip to content

Commit

Permalink
Merge pull request #205 from antoinevg/antoinevg/bulk_speed_test
Browse files Browse the repository at this point in the history
applets: add a bi-directional bulk_speed_test applet
  • Loading branch information
miek authored Oct 10, 2023
2 parents e0f9fea + 86572fd commit ca18ec1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 25 deletions.
61 changes: 46 additions & 15 deletions applets/bulk_in_speed_test.py → applets/bulk_speed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@

import usb1

from luna.gateware.applets.speed_test import USBInSpeedTestDevice, USBInSuperSpeedTestDevice, BULK_ENDPOINT_NUMBER
from luna.gateware.applets.speed_test import (
USBSpeedTestDevice,
USBInSuperSpeedTestDevice,
BULK_ENDPOINT_NUMBER,
VENDOR_ID,
PRODUCT_ID,
)

from luna import top_level_cli, configure_default_logging

Expand All @@ -26,9 +32,13 @@
TRANSFER_QUEUE_DEPTH = 16


def run_speed_test():
def run_speed_test(direction=usb1.ENDPOINT_IN):
""" Runs a simple speed test, and reports throughput. """

if os.getenv('LUNA_SUPERSPEED') and not direction == usb1.ENDPOINT_IN:
logging.error("The SuperSpeed test device does not currently support OUT transfers.")
sys.exit(1)

total_data_exchanged = 0
failed_out = False

Expand Down Expand Up @@ -73,7 +83,7 @@ def _transfer_completed(transfer: usb1.USBTransfer):
with usb1.USBContext() as context:

# Grab a reference to our device...
device = context.openByVendorIDAndProductID(0x16d0, 0x0f3b)
device = context.openByVendorIDAndProductID(VENDOR_ID, PRODUCT_ID)

# ... and claim its bulk interface.
device.claimInterface(0)
Expand All @@ -84,7 +94,13 @@ def _transfer_completed(transfer: usb1.USBTransfer):

# Allocate the transfer...
transfer = device.getTransfer()
transfer.setBulk(0x80 | BULK_ENDPOINT_NUMBER, TEST_TRANSFER_SIZE, callback=_transfer_completed, timeout=1000)
endpoint = direction | BULK_ENDPOINT_NUMBER

if direction == usb1.ENDPOINT_IN:
transfer.setBulk(endpoint, TEST_TRANSFER_SIZE, callback=_transfer_completed, timeout=1000)
else:
out_test_data = bytearray([x % 256 for x in range(512)])
transfer.setBulk(endpoint, out_test_data, callback=_transfer_completed, timeout=1000)

# ... and store it.
active_transfers.append(transfer)
Expand All @@ -108,7 +124,10 @@ def _transfer_completed(transfer: usb1.USBTransfer):
# Cancel all of our active transfers.
for transfer in active_transfers:
if transfer.isSubmitted():
transfer.cancel()
try:
transfer.cancel()
except:
pass

# If we failed out; indicate it.
if (failed_out):
Expand All @@ -122,25 +141,37 @@ def _transfer_completed(transfer: usb1.USBTransfer):

if __name__ == "__main__":

# If our environment is suggesting we rerun tests, do so.
configure_default_logging()

# If our environment is suggesting we rerun tests without rebuilding, do so.
if os.getenv('LUNA_RERUN_TEST'):
configure_default_logging()
logging.info("Running speed test without rebuilding...")
run_speed_test()

# Otherwise, build and run our tests.
# Otherwise, rebuild.
else:
# Selectively create our device to be either USB3 or USB2 based on the
# SuperSpeed variable.
if os.getenv('LUNA_SUPERSPEED'):
device = top_level_cli(USBInSuperSpeedTestDevice)
else:
device = top_level_cli(USBInSpeedTestDevice,
device = top_level_cli(USBSpeedTestDevice,
fs_only=bool(os.getenv('LUNA_FULL_ONLY')))

logging.info("Giving the device time to connect...")
time.sleep(5)

# Give the device a moment to connect.
if device is not None:
logging.info(f"Starting bulk in speed test.")
run_speed_test()
logging.info("Giving the device time to connect...")
time.sleep(5)

# Run our Bulk IN test.
logging.info(f"Starting Bulk IN speed test.")
run_speed_test(direction=usb1.ENDPOINT_IN)

# Run our Bulk OUT speed test.
#
# Note: The SuperSpeed test device does not yet support an OUT speed test.
if os.getenv('LUNA_SUPERSPEED'):
logging.info("Skipping Bulk OUT speed test.")
logging.info("This test is not yet supported on SuperSpeed devices.")
else:
logging.info(f"Starting Bulk OUT speed test.")
run_speed_test(direction=usb1.ENDPOINT_OUT)
58 changes: 48 additions & 10 deletions luna/gateware/applets/speed_test.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
from amaranth import *
from usb_protocol.emitters import DeviceDescriptorCollection, SuperSpeedDeviceDescriptorCollection

from luna.usb2 import USBDevice, USBStreamInEndpoint
from luna.usb2 import USBDevice, USBStreamInEndpoint, USBStreamOutEndpoint
from luna.usb3 import USBSuperSpeedDevice, SuperSpeedStreamInEndpoint

from luna.gateware.platform import NullPin

VENDOR_ID = 0x16d0
PRODUCT_ID = 0x0f3b

BULK_ENDPOINT_NUMBER = 1


class USBInSpeedTestDevice(Elaboratable):
""" Simple device that sends data to the host as fast as hardware can. """
class USBSpeedTestDevice(Elaboratable):
""" Simple device that exchanges data with the host as fast as the hardware can. """

def __init__(self, fs_only=False, phy=None):
self.fs_only = fs_only
Expand All @@ -34,7 +36,7 @@ def create_descriptors(self):
d.idProduct = PRODUCT_ID

d.iManufacturer = "LUNA"
d.iProduct = "IN speed test"
d.iProduct = "speed test"
d.iSerialNumber = "no serial"

d.bNumConfigurations = 1
Expand All @@ -46,10 +48,16 @@ def create_descriptors(self):
with c.InterfaceDescriptor() as i:
i.bInterfaceNumber = 0

# Bulk IN to host (tx, from our side)
with i.EndpointDescriptor() as e:
e.bEndpointAddress = 0x80 | BULK_ENDPOINT_NUMBER
e.wMaxPacketSize = self.max_bulk_packet_size

# Bulk OUT to host (rx, from our side)
with i.EndpointDescriptor() as e:
e.bEndpointAddress = BULK_ENDPOINT_NUMBER
e.wMaxPacketSize = self.max_bulk_packet_size


return descriptors

Expand All @@ -76,19 +84,49 @@ def elaborate(self, platform):
descriptors = self.create_descriptors()
usb.add_standard_control_endpoint(descriptors)

# Add a stream endpoint to our device.
stream_ep = USBStreamInEndpoint(

# Add a stream endpoint to our device for Bulk IN transfers.
stream_in_ep = USBStreamInEndpoint(
endpoint_number=BULK_ENDPOINT_NUMBER,
max_packet_size=self.max_bulk_packet_size
)
usb.add_endpoint(stream_ep)
usb.add_endpoint(stream_in_ep)

# Create a simple data source that increments whenever the
# endpoint is accepting data.
counter = Signal(8)
with m.If(stream_in_ep.stream.ready):
m.d.sync += counter.eq(counter + 1)

# Send entirely zeroes, as fast as we can.
# Send our IN data stream, as fast as we can.
m.d.comb += [
stream_ep.stream.valid .eq(1),
stream_ep.stream.payload .eq(0)
stream_in_ep.stream.valid .eq(1),
stream_in_ep.stream.payload .eq(counter)
]


# Add a stream endpoint to our device for Bulk OUT transfers.
stream_out_ep = USBStreamOutEndpoint(
endpoint_number=BULK_ENDPOINT_NUMBER,
max_packet_size=self.max_bulk_packet_size
)
usb.add_endpoint(stream_out_ep)

# Always accept data as it comes in.
m.d.comb += stream_out_ep.stream.ready.eq(1)

# Receive our OUT data stream, as fast as we can and output
# the received data to our User I/O and LEDS
leds = Cat(platform.request_optional("led", i, default=NullPin()).o for i in range(6))
pmod_a = platform.request_optional("user_pmod", 0, default=NullPin(8))
with m.If(stream_out_ep.stream.valid):
m.d.comb += [
leds.eq(stream_out_ep.stream.payload[2:8]),
pmod_a.o.eq(stream_out_ep.stream.payload),
pmod_a.oe.eq(1),
]


# Connect our device as a high speed device by default.
m.d.comb += [
usb.connect .eq(1),
Expand Down

0 comments on commit ca18ec1

Please sign in to comment.