diff --git a/README.md b/README.md index 60b52ea..b809db5 100644 --- a/README.md +++ b/README.md @@ -1,174 +1,118 @@ -[![DOI](https://zenodo.org/badge/80209610.svg)](https://zenodo.org/badge/latestdoi/80209610) - -# Muse LSL - -A Python package for streaming, visualizing, and recording EEG data from the Muse 2016 headband. - -![Blinks](blinks.png) - -## Requirements - -The code relies on [pygatt](https://github.com/peplin/pygatt) or [BlueMuse](https://github.com/kowalej/BlueMuse/tree/master/Dist) for BLE communication and works differently on different operating systems. - -- Windows: On Windows 10, we recommend installing [BlueMuse](https://github.com/kowalej/BlueMuse/tree/master/Dist) and using its GUI to discover and connect to Muse devices. Alternatively, if you have a BLED112 dongle you can try Muse LSL's bgapi backend (`muselsl stream --backend bgapi`). -- Mac: On Mac, a **BLED112 dongle is required**. The bgapi backend is required and will be used by default when running Muse LSL from the command line -- Linux: No dongle required. However, you may need to run a command to enable root-level access to bluetooth hardware (see [Common Issues](#linux)). The pygatt backend is required and will be used by default from the command line. and make sure to read the - -**Compatible with Python 2.7 and Python 3.x** - -**Only compatible with Muse 2 and Muse 2016** - -_Note: if you run into any issues, first check out out [Common Issues](#common-issues) and then the [Issues](https://github.com/alexandrebarachant/muse-lsl/issues) section of this repository_ - -## Getting Started - -### Installation - -Install Muse LSL with pip - - pip install muselsl - -### Setting Up a Stream - -On Windows 10, we recommend using the [BlueMuse](https://github.com/kowalej/BlueMuse/tree/master/Dist) GUI to set up an LSL stream. On Mac and Linux, the easiest way to get Muse data is to use Muse LSL directly from the command line. Use the `-h` flag to get a comprehensive list of all commands and options. - -To print a list of available muses: - $ muselsl list -To begin an LSL stream from the first available Muse: +# Muse LSL - Experiment - $ muselsl stream +This repo is forked from [https://github.com/alexandrebarachant/muse-lsl](https://github.com/alexandrebarachant/muse-lsl) [1]. -To connect to a specific Muse you can pass the name of the device as an argument. Device names can be found on the inside of the left earpiece (e.g. Muse-41D2): +The original code was modified and extended to allow streaming and recording of two or more muses simultaneously. +We used two `2019 Muse 2 headsets` for developement and testing. - $ muselsl stream --name YOUR_DEVICE_NAME +## Installation -You can also directly pass the MAC address of your Muse. This provides the benefit of bypassing the device discovery step and can make connecting to devices quicker and more reliable: +Install this repository as a pip package with - $ muselsl stream --address YOUR_DEVICE_ADDRESS + pip3 install . -### Working with Streaming Data +or just install the dependencies using: -Once an LSL stream is created, you have access to the following commands. + pip3 install -r requirements.txt -*Note: the process running the `stream` command must be kept alive in order to maintain the LSL stream. These following commands should be run in another terminal or second process* +If you *did not* install the repo as a pip package, you need to replace `muselsl` with +`python3 -m muselsl` in all following examples. -To view data: +## Hardware Setup - $ muselsl view +We tested different hardware setups for multiple streams. +The most reliable setup, which allowed streaming of all data-streams from two muses was the following: -If the visualization freezes or is laggy, you can also try the alternate version 2 of the viewer. *Note: this will require the additional [vispy](https://github.com/vispy/vispy) and [mne](https://github.com/mne-tools/mne-python) dependencies* +- Two Muses +- Two Laptops (We used a MacBook with `BLED112 dongle` and a linux machine with integrated Bluetooth.) +- Wireless (or wired) network with peer to peer traffic allowed (HPI network might not work, we made a WIFI hotspot using an android phone) - $ muselsl view --version 2 +One of the laptops acts as a *stream source*, while the other *records* its own stream and the stream from the stream source, which is sent over +the network. -To record EEG data into a CSV: +## Stream Source - $ muselsl record --duration 60 +The stream source is started before the recorder and must run until after the recorder has stopped. +If the stream source crashes it should be restarted, the recorder can recover only if the stream source is restarted, otherwise the recorder will +crash, when attempting to stop the recording. -*Note: this command will also save data from any LSL stream containing 'Markers' data, such as from the stimulus presentation scripts in [EEG Notebooks](https://github.com/neurotechx/eeg-notebooks)* +The stream source is started with the following command: -Alternatively, you can record data directly without using LSL through the following command: + muselsl stream -pcg -a - $ muselsl record_direct --duration 60 +Where `` must be replaced with the mac-address of one of the muses. +To find the mac addresses of your muses, you can turn on your muses and run -_Note: direct recording does not allow 'Markers' data to be recorded_ + muselsl list -## Running Experiments +to get a list of the available headsets. +The last four letters (two bytes) of the MAC address are printed on each Muse headset. -Muse LSL was designed so that the Muse could be used to run a number of classic EEG experiments, including the [P300 event-related potential](http://alexandre.barachant.org/blog/2017/02/05/P300-with-muse.html) and the SSVEP and SSAEP evoked potentials. +In our setup one stream source is started on the *stream source* machine and one is started +for the other muse, on the *recording* machine. -The code to perform these experiments is still available, but is now maintained in the [EEG Notebooks](https://github.com/neurotechx/eeg-notebooks) repository by the [NeuroTechX](https://neurotechx.com) community. -## Usage as a Library +## View -If you want to integrate Muse LSL into your own Python project, you can import and use its functions as you would any Python library. Examples are available in the `examples` folder: +Before starting the recording, make sure all streams are coming through, and are of decent quality. +This can be done using the viewer. -```Python -from muselsl import stream, list_muses + muselsl view -v 2 -a -t -muses = list_muses() -stream(muses[0]['address']) +Where data-type can be any of: +- PPG +- EEG +- ACC +- GYRO -# Note: Streaming is synchronous, so code here will not execute until after the stream has been closed -print('Stream has ended') -``` +You can zoom in and out by holding your left mouse button and moving your mouse up or down. -## Alternate Data Types +> It is also a good idea, to at least monitor the EEG streams of all participant during the +> experiment, to be able to detect artifacts early and fix them (eg. loose electrodes, muscle noise, etc.). -In addition to EEG, the Muse also provides data from an accelerometer, gyroscope, and, in the case of the Muse 2, a photoplethysmography (PPG) sensor. These data types can be enabled via command line arguments or by passing the correct parameters to the `stream` function. Once enabled, PPG, accelerometer, and gyroscope data will streamed in their own separate LSL streams named "PPG", "ACC", and "GYRO", respectively. +## Recorder -To stream data from all sensors in a Muse 2 from the command line: +When the stream source is running, the recorder script can be started with the following command: - muselsl stream --ppg --acc --gyro + muselsl record -d -n [-i ] -As a library function: +The script will ask for the `MUSE-ID` of each participant, which is written on each muse device. +The `MUSE-ID` are the last two bytes (four letters) of the mac address. -```Python -from muselsl import stream, list_muses +After all IDs are set, the recording starts automatically. +During the recording, timestamps can be saved, by typing any text as a label and pressing enter. +These timestamps can be used to mark when different parts of the experiment begin, or when the experiment has ended. -muses = list_muses() -stream(muses[0]['address'], ppg_enabled=True, acc_enabled=True, gyro_enabled=True) -``` +The recording will continue, until it is stopped by pressing `CTRL` `+` `C`. -To record data from an alternate data source: +All recordings are saved in the specified directory. +If no `trial-id` is provided, the current timestamp is used instead to create a new subdirectory, +in which all data is stored. +This directory contains the recorded markers, which are stored in `markers.csv` and a subdirectory for each participant. - muselsl record --type ACC +Each participant directory contains the following files: -*Note: The record process will only record from one data type at a time. However, multiple terminals or processes can be used to record from multiple data types simultaneously* -## What is LSL? +| File | Description | +|------|-------------| +|ACC.csv | Data from the Accelerometer | +|EEG.csv| EEG Data| +|GYRO.csv | Gyroscope Data | +|PPG.csv | Heart rate measured with PPG | -Lab Streaming Layer or LSL is a system designed to unify the collection of time series data for research experiments. It has become standard in the field of EEG-based brain-computer interfaces for its ability to make seperate streams of data available on a network with time synchronization and near real-time access. For more information, check out this [lecture from Modern Brain-Computer Interface Design](https://www.youtube.com/watch?v=Y1at7yrcFW0) or the [LSL repository](https://github.com/sccn/labstreaminglayer) +## Other Setups -## Common Issues +We tried using Raspberry Pi's as stream source, however +even the newest model 4 with a `BLED112 dongle` +it was not able to stream reliably for more than 10 minutes. +It might be possible, though to use Raspberry Pi's, if not all data +streams are enabled (eg. if only recording ppg). +See [this github issue](https://github.com/alexandrebarachant/muse-lsl/issues/55) for more information. -### Mac and Windows - -1. Connection issues with BLED112 dongle: - -- You may need to use the `--interface` argument to provide the appropriate COM port value for the BLED112 device. The default value is COM9. To setup or view the device's COM port go to your OS's system settings - -### Linux - -1. `pygatt.exceptions.BLEError: Unexpected error when scanning: Set scan parameters failed: Operation not permitted` (Linux) - -- This is an issue with pygatt requiring root privileges to run a scan. Make sure you [have `libcap` installed](https://askubuntu.com/questions/347788/how-can-i-install-libpcap-header-files-on-ubuntu-12-04) and run `` sudo setcap 'cap_net_raw,cap_net_admin+eip' `which hcitool` `` - -2. `pygatt.exceptions.BLEError: No characteristic found matching 273e0003-4c4d-454d-96be-f03bac821358` (Linux) - -- There is a problem with the most recent version of pygatt. Work around this by downgrading to 3.1.1: `pip install pygatt==3.1.1` - -3. `pygatt.exceptions.BLEError: No BLE adapter found` (Linux) - -- Make sure your computer's Bluetooth is turned on. - -4. `pygatt.exceptions.BLEError: Unexpected error when scanning: Set scan parameters failed: Connection timed out` (Linux) - -- This seems to be due to a OS-level Bluetooth crash. Try turning your computer's bluetooth off and on again - -5. `'RuntimeError: could not create stream outlet'` (Linux) - -- This appears to be due to Linux-specific issues with the newest version of pylsl. Ensure that you have pylsl 1.10.5 installed in the environment in which you are trying to run Muse LSL - -## Citing muse-lsl - -``` -@misc{muse-lsl, - author = {Alexandre Barachant and - Dano Morrison and - Hubert Banville and - Jason Kowaleski and - Uri Shaked and - Sylvain Chevallier and - Juan Jesús Torre Tresols}, - title = {muse-lsl}, - month = may, - year = 2019, - doi = {10.5281/zenodo.3228861}, - url = {https://doi.org/10.5281/zenodo.3228861} -} -``` +[1] > Alexandre Barachant, Dano Morrison, Hubert Banville, Jason Kowaleski, Uri Shaked, Sylvain Chevallier, & Juan Jesús Torre Tresols. (2019, May 25). muse-lsl (Version v2.0.2). Zenodo. http://doi.org/10.5281/zenodo.3228861 +[![DOI](https://zenodo.org/badge/80209610.svg)](https://zenodo.org/badge/latestdoi/80209610) diff --git a/muselsl/__init__.py b/muselsl/__init__.py index a0e925e..5e15f66 100644 --- a/muselsl/__init__.py +++ b/muselsl/__init__.py @@ -1,4 +1,5 @@ from .stream import stream, list_muses -from .record import record, record_direct +from .record import record from .view import view +from .replay import replay __version__ = "1.0.0" diff --git a/muselsl/__main__.py b/muselsl/__main__.py index c394339..7d22b39 100644 --- a/muselsl/__main__.py +++ b/muselsl/__main__.py @@ -29,18 +29,14 @@ def main(): -f --figure Window size. -v --version Viewer version (1 or 2) - 1 is the default stable version, 2 is in development (and takes no arguments). -b --backend Matplotlib backend to use. Default: TkAgg - - record Record EEG data from an LSL stream. - -d --duration Duration of the recording in seconds. + record Recording an experiment. + -d --directory Root-directory to store recorded data in. + -n --participants The number of participants in this run. + -i --trial-id The id of this trial. Data is stored in a subdirectory with this name. + If no id is provided, the current timestamp is used instead. + + replay Replay data from a recorded CSV file into a new LSL stream. -f --filename Name of the recording file. - -dj --dejitter Whether to apply dejitter correction to timestamps. - -t --type Data type to record from. Either EEG, PPG, ACC, or GYRO - - record_direct Record data directly from Muse headset (no LSL). - -a --address Device MAC address. - -n --name Device name (e.g. Muse-41D2). - -b --backend BLE backend to use. can be auto, bluemuse, gatt or bgapi. - -i --interface The interface to use, 'hci0' for gatt or a com port for bgapi. ''') parser.add_argument('command', help='Command to run.') diff --git a/muselsl/cli.py b/muselsl/cli.py index 793b444..ecfcb1c 100644 --- a/muselsl/cli.py +++ b/muselsl/cli.py @@ -51,53 +51,31 @@ def stream(self): args.interface, args.name, args.ppg, args.acc, args.gyro, args.disable_eeg) def record(self): - parser = argparse.ArgumentParser( - description='Record data from an LSL stream.') - parser.add_argument("-d", "--duration", - dest="duration", type=int, default=60, - help="Duration of the recording in seconds.") - parser.add_argument("-f", "--filename", - dest="filename", type=str, default=None, - help="Name of the recording file.") - parser.add_argument("-dj", "--dejitter", - dest="dejitter", type=bool, default=True, - help="Whether to apply dejitter correction to timestamps.") - parser.add_argument("-t", "--type", type=str, default="EEG", - help="Data type to record from. Either EEG, PPG, ACC, or GYRO.") + from .run_experiment import ExperimentalRun - args = parser.parse_args(sys.argv[2:]) - from . import record - record(args.duration, args.filename, args.dejitter, args.type) - - def record_direct(self): parser = argparse.ArgumentParser( - description='Record directly from Muse without LSL.') - parser.add_argument("-a", "--address", - dest="address", type=str, default=None, - help="Device MAC address.") - parser.add_argument("-n", "--name", - dest="name", type=str, default=None, - help="Name of the device.") - parser.add_argument("-b", "--backend", - dest="backend", type=str, default="auto", - help="BLE backend to use. Can be auto, bluemuse, gatt or bgapi.") - parser.add_argument("-i", "--interface", - dest="interface", type=str, default=None, - help="The interface to use, 'hci0' for gatt or a com port for bgapi.") - parser.add_argument("-d", "--duration", - dest="duration", type=int, default=60, - help="Duration of the recording in seconds.") - parser.add_argument("-f", "--filename", - dest="filename", type=str, default=None, - help="Name of the recording file.") + description='Start recording of an experiment.') + parser.add_argument("-d", "--directory", + dest="data_root", type=str, required=True, + help="Root-directory to store recorded data in.") + parser.add_argument("-n", "--participants", + dest="num_participants", type=int, required=True, + help="The number of participants in this run.") + parser.add_argument("-i", "--trial-id", + dest="trial_id", type=str, default=None, + help="The id of this trial. Data is stored in a subdirectory with this name." + + "If no id is provided, the current timestamp is used instead.") args = parser.parse_args(sys.argv[2:]) - from . import record_direct - record_direct(args.address, args.backend, - args.interface, args.name, args.duration, args.filename) + + experiment = ExperimentalRun(data_root=args.data_root, + num_participants=args.num_participants, trial_id=args.trial_id) + experiment.start() def view(self): parser = argparse.ArgumentParser( - description='View EEG data from an LSL stream.') + description='View data from an LSL stream.') + parser.add_argument("-t", "--type", type=str, default="EEG", dest="data_type", + help="Data type to view. Either EEG, PPG, ACC, or GYRO.") parser.add_argument("-w", "--window", dest="window", type=float, default=5., help="Window length to display in seconds.") @@ -116,7 +94,22 @@ def view(self): parser.add_argument("-b", "--backend", dest="backend", type=str, default='TkAgg', help="Matplotlib backend to use. Default: %(default)s") + parser.add_argument("-a", "--address", + dest="source_id", type=str, default=None, + help="Device MAC address.") args = parser.parse_args(sys.argv[2:]) from . import view view(args.window, args.scale, args.refresh, - args.figure, args.version, args.backend) + args.figure, args.version, args.backend, args.data_type, args.source_id) + + def replay(self): + parser = argparse.ArgumentParser( + description='Replay data from a recorded CSV file into a new LSL stream.') + parser.add_argument("-f", "--filename", + dest="filename", type=str, default=None, + help="Name of the recording file.") + # parser.add_argument("-t", "--type", type=str, default="EEG", + # help="Data type to record from. Either EEG, PPG, ACC, or GYRO.") + args = parser.parse_args(sys.argv[2:]) + from . import replay + replay(args.filename) diff --git a/muselsl/constants.py b/muselsl/constants.py index b4228da..b47387f 100644 --- a/muselsl/constants.py +++ b/muselsl/constants.py @@ -1,3 +1,5 @@ +from enum import Enum + MUSE_NB_EEG_CHANNELS = 5 MUSE_SAMPLING_EEG_RATE = 256 LSL_EEG_CHUNK = 12 @@ -38,3 +40,15 @@ VIEW_SUBSAMPLE = 2 VIEW_BUFFER = 12 + + +class StreamProcMessage(Enum): + Started = 0 + Aborting = 1 + + +class ChunkLength(Enum): + ACC = LSL_ACC_CHUNK + EEG = LSL_EEG_CHUNK + GYRO = LSL_GYRO_CHUNK + PPG = LSL_PPG_CHUNK diff --git a/muselsl/muse.py b/muselsl/muse.py index 6e502dd..5b28da8 100644 --- a/muselsl/muse.py +++ b/muselsl/muse.py @@ -47,6 +47,14 @@ def __init__(self, address, callback_eeg=None, callback_control=None, self.backend = helper.resolve_backend(backend) + def __exit__(self, exc_type, exc_value, traceback): + if hasattr(self, "device"): + self.stop() + self.disconnect() + + def __enter__(self): + return self + def connect(self, interface=None, backend='auto'): """Connect to the device""" try: diff --git a/muselsl/record.py b/muselsl/record.py index 5001c50..8738511 100644 --- a/muselsl/record.py +++ b/muselsl/record.py @@ -1,174 +1,65 @@ +import csv +from pathlib import Path import numpy as np -import pandas as pd -import os from pylsl import StreamInlet, resolve_byprop -from sklearn.linear_model import LinearRegression -from time import time, sleep, strftime, gmtime -from .stream import find_muse -from .muse import Muse -from . constants import LSL_SCAN_TIMEOUT, LSL_EEG_CHUNK, LSL_PPG_CHUNK, LSL_ACC_CHUNK, LSL_GYRO_CHUNK +from .constants import LSL_SCAN_TIMEOUT, ChunkLength -# Records a fixed duration of EEG data from an LSL stream into a CSV file +def record(filename, data_source="EEG", abort=None, source_id=None): + """Records a fixed duration of EEG data from an LSL stream into a CSV file""" -def record(duration, filename=None, dejitter=False, data_source="EEG"): - chunk_length = LSL_EEG_CHUNK - if data_source == "PPG": - chunk_length = LSL_PPG_CHUNK - if data_source == "ACC": - chunk_length = LSL_ACC_CHUNK - if data_source == "GYRO": - chunk_length = LSL_GYRO_CHUNK + float_precision = 3 - if not filename: - filename = os.path.join(os.getcwd( - ), "%s_recording_%s.csv" % (data_source, strftime('%Y-%m-%d-%H.%M.%S', gmtime()))) + directory = Path(filename).parent + directory.mkdir(parents=True, exist_ok=True) - print("Looking for a %s stream..." % (data_source)) + csv_file = Path(filename).open("w") + writer = csv.writer(csv_file, lineterminator="\n") + + chunk_length = ChunkLength[data_source].value + + print(f"Looking for a {data_source} stream...") streams = resolve_byprop('type', data_source, timeout=LSL_SCAN_TIMEOUT) if len(streams) == 0: - print("Can't find %s stream." % (data_source)) + print(f"Can't find {data_source} stream.") return - print("Started acquiring data.") - inlet = StreamInlet(streams[0], max_chunklen=chunk_length) - # eeg_time_correction = inlet.time_correction() - - print("Looking for a Markers stream...") - marker_streams = resolve_byprop( - 'name', 'Markers', timeout=LSL_SCAN_TIMEOUT) - - if marker_streams: - inlet_marker = StreamInlet(marker_streams[0]) - else: - inlet_marker = False - print("Can't find Markers stream.") + if source_id is not None: + streams = [s for s in streams if s.source_id() == 'Muse%s' % source_id] + assert len( + streams) == 1, f"Expected to find exactly one stream with source_id: {'Muse%s' % source_id}, but found {len(streams)}" + inlet = StreamInlet(streams[0], max_chunklen=chunk_length) info = inlet.info() description = info.desc() - Nchan = info.channel_count() - ch = description.child('channels').first_child() ch_names = [ch.child_value('label')] - for i in range(1, Nchan): + for i in range(1, info.channel_count()): ch = ch.next_sibling() ch_names.append(ch.child_value('label')) - res = [] - timestamps = [] - markers = [] - t_init = time() - time_correction = inlet.time_correction() - print('Start recording at time t=%.3f' % t_init) - print('Time correction: ', time_correction) - while (time() - t_init) < duration: + columns = ['timestamps'] + ch_names + writer.writerow(columns) + + first_iteration = True + + while not abort.is_set(): try: data, timestamp = inlet.pull_chunk(timeout=1.0, max_samples=chunk_length) - + ts = np.array(timestamp) + inlet.time_correction() if timestamp: - res.append(data) - timestamps.extend(timestamp) - if inlet_marker: - marker, timestamp = inlet_marker.pull_sample(timeout=0.0) - if timestamp: - markers.append([marker, timestamp]) - except KeyboardInterrupt: - break - - time_correction = inlet.time_correction() - print('Time correction: ', time_correction) - - res = np.concatenate(res, axis=0) - timestamps = np.array(timestamps) + time_correction - - if dejitter: - y = timestamps - X = np.atleast_2d(np.arange(0, len(y))).T - lr = LinearRegression() - lr.fit(X, y) - timestamps = lr.predict(X) - - res = np.c_[timestamps, res] - data = pd.DataFrame(data=res, columns=['timestamps'] + ch_names) - - if inlet_marker: - n_markers = len(markers[0][0]) - for ii in range(n_markers): - data['Marker%d' % ii] = 0 - # process markers: - for marker in markers: - # find index of markers - ix = np.argmin(np.abs(marker[1] - timestamps)) - for ii in range(n_markers): - data.loc[ix, 'Marker%d' % ii] = marker[0][ii] + tmp = np.c_[ts, data] + formatted = [[f"{x:.{float_precision}f}" for x in row] for row in tmp] + writer.writerows(formatted) - directory = os.path.dirname(filename) - if not os.path.exists(directory): - os.makedirs(directory) - - data.to_csv(filename, float_format='%.3f', index=False) - - print('Done - wrote file: ' + filename + '.') - -# Rercord directly from a Muse without the use of LSL - - -def record_direct(duration, address, filename=None, backend='auto', interface=None, name=None): - if backend == 'bluemuse': - raise(NotImplementedError( - 'Direct record not supported with BlueMuse backend. Use record after starting stream instead.')) - - if not address: - found_muse = find_muse(name) - if not found_muse: - print('Muse could not be found') - return - else: - address = found_muse['address'] - name = found_muse['name'] - print('Connecting to %s : %s...' % - (name if name else 'Muse', address)) - - if not filename: - filename = os.path.join(os.getcwd(), ("recording_%s.csv" % - strftime("%Y-%m-%d-%H.%M.%S", gmtime()))) - - eeg_samples = [] - timestamps = [] - - def save_eeg(new_samples, new_timestamps): - eeg_samples.append(new_samples) - timestamps.append(new_timestamps) - - muse = Muse(address, save_eeg) - muse.connect() - muse.start() - - t_init = time() - print('Start recording at time t=%.3f' % t_init) - - while (time() - t_init) < duration: - try: - sleep(1) + if first_iteration: + first_iteration = False + print(f"[Success] Started recording of {data_source}") except KeyboardInterrupt: + print("KeyboardInterrupt") break - muse.stop() - muse.disconnect() - - timestamps = np.concatenate(timestamps) - eeg_samples = np.concatenate(eeg_samples, 1).T - recording = pd.DataFrame(data=eeg_samples, - columns=['TP9', 'AF7', 'AF8', 'TP10', 'Right AUX']) - - recording['timestamps'] = timestamps - - directory = os.path.dirname(filename) - if not os.path.exists(directory): - os.makedirs(directory) - - recording.to_csv(filename, float_format='%.3f') - print('Done - wrote file: ' + filename + '.') + print(f"[Success] Stopped recording of {data_source}") \ No newline at end of file diff --git a/muselsl/replay.py b/muselsl/replay.py new file mode 100644 index 0000000..8ddf671 --- /dev/null +++ b/muselsl/replay.py @@ -0,0 +1,70 @@ +from time import time, sleep + +import pandas as pd +import numpy as np +from numpy import NaN +from pylsl import StreamInfo, StreamOutlet +from functools import partial +import pygatt +import subprocess +from sys import platform +from . import helper +from .muse import Muse +from .constants import MUSE_SCAN_TIMEOUT, AUTO_DISCONNECT_DELAY, \ + MUSE_NB_EEG_CHANNELS, MUSE_SAMPLING_EEG_RATE, LSL_EEG_CHUNK, \ + MUSE_NB_PPG_CHANNELS, MUSE_SAMPLING_PPG_RATE, LSL_PPG_CHUNK, \ + MUSE_NB_ACC_CHANNELS, MUSE_SAMPLING_ACC_RATE, LSL_ACC_CHUNK, \ + MUSE_NB_GYRO_CHANNELS, MUSE_SAMPLING_GYRO_RATE, LSL_GYRO_CHUNK + + + +def replay(filename, acc_enabled=False, gyro_enabled=False): + delay_time = 1/MUSE_SAMPLING_EEG_RATE + + # load and prepare data from csv + timestamp_csv = "TimeStamp" + eeg_channels_csv = ["RAW_TP9", "RAW_AF7", "RAW_AF8", "RAW_TP10"] + acc_channels_csv = ["Accelerometer_X", "Accelerometer_Y", "Accelerometer_Z"] + gyro_channels_csv = ["Gyro_X", "Gyro_Y", "Gyro_Z"] + + selected_csv_columns = [] + + data = pd.read_csv(filename) + data = drop_events(data) + data = data.set_index(timestamp_csv) + + selected_csv_columns += eeg_channels_csv + eeg_info = StreamInfo('Muse', 'EEG', MUSE_NB_EEG_CHANNELS, MUSE_SAMPLING_EEG_RATE, 'float32', + 'Muse') + eeg_info.desc().append_child_value("manufacturer", "Muse") + eeg_channels = eeg_info.desc().append_child("channels") + + for c in ['TP9', 'AF7', 'AF8', 'TP10']: + eeg_channels.append_child("channel") \ + .append_child_value("label", c) \ + .append_child_value("unit", "microvolts") \ + .append_child_value("type", "EEG") + + eeg_outlet = StreamOutlet(eeg_info, LSL_EEG_CHUNK) + + # convert everything to floats + data = data[selected_csv_columns].astype(float) + + print(f"Starting replay of {filename} with {len(data)} values at {MUSE_SAMPLING_EEG_RATE}Hz.") + print(f"This should take {len(data)/ MUSE_SAMPLING_EEG_RATE:.1f} seconds.") + for x in data.itertuples(): + timestamp, *data = tuple(x) + # convert timestamp to unix-time + timestamp = pd.to_datetime(timestamp).timestamp() + eeg_outlet.push_sample(data, timestamp) + sleep(delay_time) + + print("Replay finished.") + + +def drop_events(dataframe: "pd.DataFrame") -> "pd.DataFrame": + return dataframe[dataframe["Elements"].isna()] + + +if __name__ == "__main__": + replay("data/mindMonitor_2021-06-11--13-39-08.csv") diff --git a/muselsl/run_experiment.py b/muselsl/run_experiment.py new file mode 100644 index 0000000..ffcca2d --- /dev/null +++ b/muselsl/run_experiment.py @@ -0,0 +1,174 @@ +from .stream import list_muses +import subprocess +from multiprocessing import Process, Pipe, Queue, Event +from datetime import datetime +from pathlib import Path +from time import time, sleep +from .constants import StreamProcMessage +import csv + +muse_macs = { + "4EB2": "00:55:DA:B7:4E:B2" + , "528C": "00:55:DA:B5:52:8C" + , "528F": "00:55:DA:B5:52:8F" +} + +NUM_PARTICIPANTS_PER_TRIAL = 1 + +ALL_DATATYPES = ["EEG", "PPG", "ACC", "GYRO"] + +muselsl_cli = ["python3", "-m", "muselsl.__main__"] + + +class SubProcess: + def __init__(self, target, kwargs, abort_event, name=None): + self.process = Process(target=target, kwargs=kwargs, name=name) + self.abort_event = abort_event + self.name = name + + def start(self): + self.process.start() + + def abort(self): + self.abort_event.set() + + def join(self): + return self.process.join() + + +class Participant: + def __init__(self, experimental_run, participant_id, muse_id=None, name=None): + self.streaming_queue_rx = Queue() + self.streaming_queue_tx = Queue() + self.experimental_run = experimental_run + self.participant_id = participant_id + self.muse_id = muse_id + self.name = name + self.muse_mac = muse_macs.get(self.muse_id, None) + self.streaming_proc = None + self.recording_procs = {} + + def get_input(self): + # self.name = input("Name: ") + while self.muse_mac is None: + self.muse_id = input(f"[{self.participant_id}]Muse-ID: MUSE-").upper() + self.muse_mac = muse_macs.get(self.muse_id, None) + print(f"[{self.participant_id}]Using muse with mac: {self.muse_mac}") + + def start_streaming(self): + from . import stream + self.streaming_proc = Process(target=stream, name="streaming_proc", kwargs={ + "address": self.muse_mac, "ppg_enabled": True, + "acc_enabled": True, "gyro_enabled": True + , "message_set": self.streaming_queue_rx + , "message_get": self.streaming_queue_tx}) + self.streaming_proc.start() + while self.streaming_queue_rx.empty(): + sleep(0.5) + return self.streaming_queue_rx.get() == StreamProcMessage.Started + + def stop_streaming(self, join=True): + self.streaming_queue_tx.put(StreamProcMessage.Aborting) + if join: + self.streaming_proc.join() + + def _start_recording(self, filename, dejitter, data_type): + from . import record + e = Event() + p = SubProcess(target=record, abort_event=e, name=f"{data_type}_recording_proc" + , kwargs={ + "filename": filename + , "data_source": data_type + , "abort": e + , "source_id": self.muse_mac + }) + self.recording_procs[data_type] = p + p.start() + + def _stop_recording(self, data_type, join=True): + p = self.recording_procs.pop(data_type, None) + p.abort() + if join: + p.join() + + def record_all(self, dejitter, data_types): + data_path = Path(self.experimental_run.data_root) / Path(f"part{self.participant_id}") + for t in data_types: + self._start_recording(data_path / f"{t}.csv", dejitter, t) + + def stop_all_recordings(self, join=True): + for p in self.recording_procs.values(): + p.abort() + print(f"abborting {len(self.recording_procs.values())} processes") + if join: + for p in self.recording_procs.values(): + p.join() + + self.recording_procs = {} + + +class ExperimentalRun: + def __init__(self, data_root, num_participants, trial_id=None): + self.num_participants = num_participants + self.participants = [] + if trial_id is None: + trial_id = datetime.now().isoformat() + self.trial_id = trial_id + self.data_root = Path(data_root) / Path(f"trial{self.trial_id}") + self.data_root.mkdir(parents=True, exist_ok=False) + self.markers_path = self.data_root / "markers.csv" + + markers_file = self.markers_path.open("w") + fieldnames = ['timestamp', 'tag'] + self.markers = csv.DictWriter(markers_file, fieldnames) + self.markers.writeheader() + + def add_marker(self, tag): + self.markers.writerow({"timestamp": datetime.now().isoformat(), "tag": tag}) + + def start(self): + for i in range(self.num_participants): + tmp = Participant(self, i) + tmp.get_input() + self.participants.append(tmp) + + # for p in self.participants: + # success = False + # while not success: + # _ = input("Press any key to try again ...") + # success = p.start_streaming() + # print(f"{'Success' if success else 'Failed'} Started streaming of Part. {p.participant_id}") + + for p in self.participants: + p.record_all(dejitter=True, data_types=ALL_DATATYPES) + + print("all recording") + self.add_marker("recording_started") + while True: + try: + # for p in self.participants: + # if p.streaming_queue_rx.qsize() and p.streaming_queue_rx.get() == StreamProcMessage.Aborting: + # print("something went wrong") + # break + tag = input("Marker:") + self.add_marker(tag) + # sleep(1) + except KeyboardInterrupt: + self.add_marker("KeyboardInterrupt") + break + + for p in self.participants: + print("stopping recordings") + p.stop_all_recordings() + # print("stopping stream") + # p.stop_streaming() + + print("recordings stopped") + self.add_marker("recordings stopped") + # for p in self.participants: + # p.streaming_proc.join() + + +if __name__ == '__main__': + run = ExperimentalRun(data_root="recordings", num_participants=NUM_PARTICIPANTS_PER_TRIAL) + run.start() diff --git a/muselsl/stream.py b/muselsl/stream.py index b2907a1..9839cf8 100644 --- a/muselsl/stream.py +++ b/muselsl/stream.py @@ -10,7 +10,7 @@ MUSE_NB_EEG_CHANNELS, MUSE_SAMPLING_EEG_RATE, LSL_EEG_CHUNK, \ MUSE_NB_PPG_CHANNELS, MUSE_SAMPLING_PPG_RATE, LSL_PPG_CHUNK, \ MUSE_NB_ACC_CHANNELS, MUSE_SAMPLING_ACC_RATE, LSL_ACC_CHUNK, \ - MUSE_NB_GYRO_CHANNELS, MUSE_SAMPLING_GYRO_RATE, LSL_GYRO_CHUNK + MUSE_NB_GYRO_CHANNELS, MUSE_SAMPLING_GYRO_RATE, LSL_GYRO_CHUNK, StreamProcMessage # Returns a list of available Muse devices. @@ -59,12 +59,13 @@ def find_muse(name=None): # Begins LSL stream(s) from a Muse with a given address with data sources determined by arguments -def stream(address, backend='auto', interface=None, name=None, ppg_enabled=False, acc_enabled=False, gyro_enabled=False, eeg_disabled=False,): +def stream(address, backend='auto', interface=None, name=None, ppg_enabled=False, acc_enabled=False, gyro_enabled=False, eeg_disabled=False,message_get=None,message_set=None): bluemuse = backend == 'bluemuse' if not bluemuse: if not address: found_muse = find_muse(name) if not found_muse: + message_set.put(StreamProcMessage.Aborting) return else: address = found_muse['address'] @@ -137,42 +138,50 @@ def push(data, timestamps, outlet): if all(f is None for f in [push_eeg, push_ppg, push_acc, push_gyro]): print('Stream initiation failed: At least one data source must be enabled.') + message_set.put(StreamProcMessage.Aborting) return muse = Muse(address=address, callback_eeg=push_eeg, callback_ppg=push_ppg, callback_acc=push_acc, callback_gyro=push_gyro, backend=backend, interface=interface, name=name) - - if(bluemuse): - muse.connect() - if not address and not name: - print('Targeting first device BlueMuse discovers...') - else: - print('Targeting device: ' - + ':'.join(filter(None, [name, address])) + '...') - print('\n*BlueMuse will auto connect and stream when the device is found. \n*You can also use the BlueMuse interface to manage your stream(s).') - muse.start() - return - - didConnect = muse.connect() - - if(didConnect): - print('Connected.') - muse.start() - - eeg_string = " EEG" if not eeg_disabled else "" - ppg_string = " PPG" if ppg_enabled else "" - acc_string = " ACC" if acc_enabled else "" - gyro_string = " GYRO" if gyro_enabled else "" - - print("Streaming%s%s%s%s..." % - (eeg_string, ppg_string, acc_string, gyro_string)) - - while time() - muse.last_timestamp < AUTO_DISCONNECT_DELAY: - try: - sleep(1) - except KeyboardInterrupt: - muse.stop() - muse.disconnect() - break - - print('Disconnected.') + with muse: + if(bluemuse): + muse.connect() + if not address and not name: + print('Targeting first device BlueMuse discovers...') + else: + print('Targeting device: ' + + ':'.join(filter(None, [name, address])) + '...') + print('\n*BlueMuse will auto connect and stream when the device is found. \n*You can also use the BlueMuse interface to manage your stream(s).') + muse.start() + return + + didConnect = muse.connect() + + if(didConnect): + print('Connected.') + muse.start() + + eeg_string = " EEG" if not eeg_disabled else "" + ppg_string = " PPG" if ppg_enabled else "" + acc_string = " ACC" if acc_enabled else "" + gyro_string = " GYRO" if gyro_enabled else "" + + print("Streaming%s%s%s%s..." % + (eeg_string, ppg_string, acc_string, gyro_string)) + + message_set.put(StreamProcMessage.Started) + + def is_done(): + if message_get is not None: + return message_get.get() == StreamProcMessage.Aborting if message_get.qsize() else False + else: + return (time() - muse.last_timestamp >= AUTO_DISCONNECT_DELAY) + + while not is_done(): + try: + sleep(1) + except KeyboardInterrupt: + break + + print('Disconnected.') + message_set.put(StreamProcMessage.Aborting) \ No newline at end of file diff --git a/muselsl/view.py b/muselsl/view.py index 9cae138..ae3eebd 100644 --- a/muselsl/view.py +++ b/muselsl/view.py @@ -1,8 +1,8 @@ -def view(window=5, scale=100, refresh=0.2, figure="15x6", version=1, backend='TkAgg'): +def view(window=5, scale=100, refresh=0.2, figure="15x6", version=1, backend='TkAgg',data_type="EEG",source_id=None): if version == 2: from . import viewer_v2 - viewer_v2.view() + viewer_v2.view(data_type,source_id) else: from . import viewer_v1 viewer_v1.view(window, scale, refresh, figure, backend) diff --git a/muselsl/viewer_v2.py b/muselsl/viewer_v2.py index 7449b66..5058bef 100644 --- a/muselsl/viewer_v2.py +++ b/muselsl/viewer_v2.py @@ -76,22 +76,27 @@ """ -def view(): - print("Looking for an EEG stream...") - streams = resolve_byprop('type', 'EEG', timeout=LSL_SCAN_TIMEOUT) +def view(data_type, source_id = None): + print(f"Looking for an {data_type} stream...") + streams = resolve_byprop('type', data_type, timeout=LSL_SCAN_TIMEOUT) if len(streams) == 0: - raise(RuntimeError("Can't find EEG stream.")) + raise(RuntimeError(f"Can't find {data_type} stream.")) + + if source_id is not None: + streams = [s for s in streams if s.source_id() == 'Muse%s' % source_id] + assert len(streams) == 1, f"Expected to find exaclty one stream with source_id: {'Muse%s' % source_id}, but found {len(streams)}" + print("Start acquiring data.") inlet = StreamInlet(streams[0], max_chunklen=LSL_EEG_CHUNK) - Canvas(inlet) + Canvas(inlet,data_type=data_type) app.run() class Canvas(app.Canvas): - def __init__(self, lsl_inlet, scale=500, filt=True): - app.Canvas.__init__(self, title='EEG - Use your wheel to zoom!', + def __init__(self, lsl_inlet, data_type, scale=500, filt=True): + app.Canvas.__init__(self, title=f'{data_type} - Use your wheel to zoom!', keys='interactive') self.inlet = lsl_inlet @@ -162,9 +167,10 @@ def __init__(self, lsl_inlet, scale=500, filt=True): self.data_f = np.zeros((n_samples, self.n_chans)) self.data = np.zeros((n_samples, self.n_chans)) - - self.bf = create_filter(self.data_f.T, self.sfreq, 3, 40., + + self.bf = create_filter(self.data_f.T, self.sfreq, 3, min(40., self.sfreq / 2 - .1), method='fir') + print(f"sample_freq: {self.sfreq}") zi = lfilter_zi(self.bf, self.af) self.filt_state = np.tile(zi, (self.n_chans, 1)).transpose() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5ab3b20 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +bitstring +pygatt +pandas +scikit-learn +numpy +seaborn +pexpect +pylsl==1.10.5; "linux" in sys.platform +pylsl; "linux" not in sys.platform \ No newline at end of file diff --git a/setup-pi.sh b/setup-pi.sh new file mode 100644 index 0000000..864083e --- /dev/null +++ b/setup-pi.sh @@ -0,0 +1,7 @@ +sudo apt-get update +sudo apt-get install -y python3-pip libatlas-base-dev git +sudo pip3 install muselsl +wget https://github.com/sirexeclp/muse-lsl/releases/download/1/liblsl32.so +sudo cp liblsl32.so /usr/local/lib/python3.7/dist-packages/pylsl/liblsl32.so +#git clone https://github.com/sirexeclp/muse-lsl.git +#cd muse-lsl && git checkout feature/experiment diff --git a/setup.py b/setup.py index 24daa6a..fc924a7 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,5 @@ +from pathlib import Path + from setuptools import setup, find_packages from shutil import copyfile import os @@ -38,16 +40,7 @@ def copy_docs(): zip_safe=False, long_description=long_description, long_description_content_type='text/markdown', - install_requires=[ - "bitstring", - "pygatt", - "pandas", - "scikit-learn", - "numpy", - "seaborn", - "pexpect", - ] + - (["pylsl==1.10.5"] if os.sys.platform.startswith("linux") else ["pylsl"]), + install_requires=Path("requirements.txt").read_text().splitlines(), extras_require={"Viewer V2": ["mne", "vispy"]}, classifiers=[ # How mature is this project? Common values are