Device Integration Guide
This comprehensive guide covers integration with 30+ BCI devices supported by NeuraScale, from consumer-grade headsets to clinical-grade arrays.
Supported Device Categories
NeuraScale supports a wide range of brain-computer interface devices across different categories:
Consumer BCI Devices
- OpenBCI (Cyton, Ganglion, Cyton+Daisy)
- Emotiv (EPOC+, Insight, EPOC X)
- Muse (Muse 2, Muse S)
- NeuroSky (MindWave, MindWave Mobile)
Research-Grade Systems
- g.tec (g.USBamp, g.Nautilus, g.HIamp)
- BrainProducts (actiCHamp, LiveAmp, QuickAmp)
- ANT Neuro (eego™ systems)
- BioSemi (ActiveTwo)
Clinical Arrays
- Blackrock (Utah Array, CerePlex)
- Plexon (OmniPlex)
- Ripple (Grapevine)
- Custom LSL streams
OpenBCI Integration
OpenBCI devices are among the most popular research-grade BCI systems, offering excellent value and flexibility.
Cyton Board
OpenBCI Cyton (8-channel EEG)
The Cyton board is OpenBCI’s flagship 8-channel biosensing board.
Hardware Setup
-
Assemble the Cyton Board
- Attach the USB dongle to your computer
- Power on the Cyton board
- Ensure the blue LED is blinking (pairing mode)
-
Electrode Preparation
# Standard 10-20 electrode positions for 8 channels # Fp1, Fp2, C3, C4, P7, P8, O1, O2
-
Connect Electrodes
- Use gold cup electrodes with Ten20 paste
- Connect ground electrode to ear lobe or mastoid
- Verify impedances are below 5kΩ
Software Integration
from neurascale import NeuraScaleClient
from neurascale.devices import OpenBCICyton
# Initialize client
client = NeuraScaleClient(api_key="your-api-key")
# Configure Cyton device
cyton_config = {
"device_id": "openbci_cyton_001",
"connection": {
"port": "/dev/ttyUSB0", # Linux/macOS
# "port": "COM3", # Windows
"baudrate": 115200,
"timeout": 5.0
},
"acquisition": {
"sample_rate": 250, # Hz
"channels": [1, 2, 3, 4, 5, 6, 7, 8],
"gain": 24, # 24x gain
"bias": True,
"srb2": True
},
"filters": {
"notch": 60, # Power line noise
"highpass": 0.5, # Remove DC drift
"lowpass": 100 # Anti-aliasing
}
}
# Connect device
cyton = await client.devices.connect("openbci_cyton_001", cyton_config)
print(f"Connected to {cyton.name}")
# Start data streaming
stream = await cyton.start_stream()
async for data_packet in stream:
print(f"Timestamp: {data_packet.timestamp}")
print(f"Channels: {[ch.value for ch in data_packet.channels]}")
# Process data in real-time
await process_eeg_data(data_packet)
Advanced Configuration
# Custom channel configuration
channel_config = {
1: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True},
2: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True},
3: {"gain": 12, "input_type": "normal", "bias": True, "srb2": False}, # Different gain
4: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True},
5: {"gain": 24, "input_type": "shorted", "bias": False, "srb2": False}, # Test channel
6: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True},
7: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True},
8: {"gain": 24, "input_type": "bias_meas", "bias": False, "srb2": False} # Bias measurement
}
# Apply channel configuration
await cyton.configure_channels(channel_config)
# Enable test signals for calibration
await cyton.enable_test_signal(frequency=10, amplitude=1.0) # 10Hz, 1µV
# Check impedances
impedances = await cyton.measure_impedance()
for channel, impedance in impedances.items():
print(f"Channel {channel}: {impedance/1000:.1f}kΩ")
Troubleshooting
Common Issues:
- Device not found: Check USB permissions and drivers
- Poor signal quality: Verify electrode contact and impedances
- Data dropouts: Check USB cable and interference sources
# Diagnostic function
async def diagnose_cyton(device):
# Check firmware version
firmware = await device.get_firmware_version()
print(f"Firmware: {firmware}")
# Test board functionality
board_test = await device.run_board_test()
print(f"Board test: {'PASS' if board_test else 'FAIL'}")
# Check signal quality
quality = await device.assess_signal_quality(duration=10)
for ch, q in quality.items():
status = "GOOD" if q > 0.8 else "POOR"
print(f"Channel {ch}: {q:.2f} ({status})")
Emotiv Integration
Emotiv devices offer consumer-friendly BCI solutions with advanced signal processing.
EPOC+
Emotiv EPOC+ (14-channel EEG)
Setup and Authentication
-
Emotiv Account Setup
- Create account at emotiv.com
- Obtain Client ID and Client Secret
- Purchase appropriate license
-
Device Authentication
from neurascale.devices import EmotivEPOCPlus # Configure authentication emotiv_auth = { "client_id": "your_client_id", "client_secret": "your_client_secret", "license_key": "your_license_key" } # Initialize EPOC+ device epoc_config = { "device_id": "emotiv_epoc_plus_001", "authentication": emotiv_auth, "acquisition": { "sample_rate": 128, # Fixed for EPOC+ "channels": [ "AF3", "F7", "F3", "FC5", "T7", "P7", "O1", "O2", "P8", "T8", "FC6", "F4", "F8", "AF4" ], "motion_sensors": True, "contact_quality": True } } epoc = await client.devices.connect("emotiv_epoc_plus_001", epoc_config)
Contact Quality Monitoring
# Real-time contact quality monitoring
async def monitor_contact_quality():
while True:
quality = await epoc.get_contact_quality()
print("Contact Quality:")
for electrode, q in quality.items():
status = "GOOD" if q > 80 else "FAIR" if q > 50 else "POOR"
print(f" {electrode}: {q}% ({status})")
# Alert on poor contact
poor_contacts = [e for e, q in quality.items() if q < 50]
if poor_contacts:
print(f"Warning: Poor contact on {poor_contacts}")
await asyncio.sleep(5)
# Start monitoring
quality_task = asyncio.create_task(monitor_contact_quality())
Data Streaming with Motion
# Stream EEG and motion data
stream = await epoc.start_stream()
async for packet in stream:
# EEG data (14 channels)
eeg_data = packet.eeg_channels
# Motion data
gyro = packet.gyroscope # 3-axis gyroscope
accel = packet.accelerometer # 3-axis accelerometer
# Contact quality (updated every second)
if packet.has_contact_quality:
contact_quality = packet.contact_quality
# Process combined data
await process_multimodal_data({
'eeg': eeg_data,
'motion': {'gyro': gyro, 'accel': accel},
'quality': contact_quality
})
Advanced Features
# Enable advanced Emotiv features
await epoc.enable_performance_metrics()
await epoc.enable_facial_expressions()
await epoc.enable_mental_commands()
# Get performance metrics
performance = await epoc.get_performance_metrics()
print(f"Attention: {performance.attention:.2f}")
print(f"Meditation: {performance.meditation:.2f}")
print(f"Engagement: {performance.engagement:.2f}")
# Detect facial expressions
expressions = await epoc.get_facial_expressions()
if expressions.blink > 0.5:
print("Eye blink detected")
if expressions.smile > 0.7:
print("Smile detected")
# Mental commands (requires training)
commands = await epoc.get_mental_commands()
if commands.push > 0.8:
print("Push command detected")
Research-Grade Systems
g.tec Integration
g.USBamp
g.tec g.USBamp (16-channel)
from neurascale.devices import GtecUSBamp
# Configure g.USBamp
gusb_config = {
"device_id": "gtec_usbamp_001",
"connection": {
"serial_number": "UA-2019.05.15",
"driver_version": "3.20.02"
},
"acquisition": {
"sample_rate": 512, # Up to 38.4 kHz
"channels": list(range(1, 17)), # 16 channels
"input_range": 500, # µV
"reference": "common", # common, differential, single-ended
"ground": "channel_16"
},
"filters": {
"notch": [50, 100], # Multiple notch filters
"highpass": 0.1,
"lowpass": 100,
"bandstop": [[49, 51], [99, 101]] # Precise frequency bands
},
"triggers": {
"digital_inputs": True,
"analog_trigger": {
"enable": True,
"channel": "AIN1",
"threshold": 2.5 # Volts
}
}
}
gusb = await client.devices.connect("gtec_usbamp_001", gusb_config)
# High-precision streaming
stream = await gusb.start_stream()
async for packet in stream:
# High-resolution data (24-bit ADC)
eeg_data = packet.channels # 16 channels
digital_triggers = packet.digital_inputs
analog_trigger = packet.analog_trigger
# Process triggers
if any(digital_triggers):
trigger_channels = [i for i, t in enumerate(digital_triggers) if t]
await handle_digital_triggers(trigger_channels, packet.timestamp)
if analog_trigger > gusb_config["triggers"]["analog_trigger"]["threshold"]:
await handle_analog_trigger(analog_trigger, packet.timestamp)
BrainProducts Integration
from neurascale.devices import BrainProductsAmp
# Configure BrainProducts actiCHamp
actichamp_config = {
"device_id": "bp_actichamp_001",
"connection": {
"interface": "usb",
"serial_number": "17010768"
},
"acquisition": {
"sample_rate": 1000,
"channels": list(range(1, 65)), # 64 channels
"resolution": "24bit",
"input_range": "±100mV",
"dc_coupling": True
},
"modules": {
"aux_box": True,
"trigger_box": True,
"impedance_meter": True
},
"triggers": {
"parallel_port": True,
"serial_triggers": True,
"network_triggers": True
}
}
actichamp = await client.devices.connect("bp_actichamp_001", actichamp_config)
# Real-time impedance monitoring
async def monitor_impedances():
while True:
impedances = await actichamp.measure_all_impedances()
high_impedance = []
for ch, imp in impedances.items():
if imp > 50000: # 50kΩ threshold
high_impedance.append(f"{ch}: {imp/1000:.1f}kΩ")
if high_impedance:
print(f"High impedance electrodes: {', '.join(high_impedance)}")
await asyncio.sleep(30) # Check every 30 seconds
LSL Stream Integration
Lab Streaming Layer (LSL) provides a unified interface for various devices.
from neurascale.devices import LSLStream
from pylsl import resolve_streams
# Discover available LSL streams
available_streams = resolve_streams(timeout=5.0)
print("Available LSL streams:")
for stream in available_streams:
print(f" {stream.name()} ({stream.type()}) - {stream.channel_count()} channels")
# Configure LSL device
lsl_config = {
"device_id": "lsl_eeg_001",
"stream": {
"name": "EEG_Stream",
"type": "EEG",
"source_id": "myuniquesourceid"
},
"buffering": {
"max_buflen": 360, # 6 minutes
"max_chunklen": 0, # No chunking
"recover": True
},
"clock_sync": {
"enable": True,
"correction_interval": 5.0
}
}
# Connect to LSL stream
lsl_device = await client.devices.connect("lsl_eeg_001", lsl_config)
# Handle clock synchronization
async def sync_clocks():
while True:
# Get time correction
correction = await lsl_device.get_time_correction()
print(f"Clock offset: {correction:.3f}s")
if abs(correction) > 0.01: # 10ms threshold
print("Warning: Large clock offset detected")
await asyncio.sleep(30)
# Stream with timestamp correction
stream = await lsl_device.start_stream()
async for packet in stream:
# Apply time correction
corrected_timestamp = packet.timestamp + await lsl_device.get_time_correction()
# Process synchronized data
await process_synchronized_data(packet.data, corrected_timestamp)
Multi-Device Synchronization
Hardware Synchronization
# Synchronize multiple devices with hardware triggers
sync_config = {
"master_device": "gtec_usbamp_001",
"slave_devices": [
"openbci_cyton_001",
"emotiv_epoc_001"
],
"sync_method": "hardware_trigger",
"sync_frequency": 1.0, # 1 Hz sync pulses
"sync_pin": "digital_out_1"
}
# Configure master device
master = await client.devices.get("gtec_usbamp_001")
await master.configure_sync_output(
pin="digital_out_1",
frequency=1.0,
pulse_width=100 # milliseconds
)
# Configure slave devices
for slave_id in sync_config["slave_devices"]:
slave = await client.devices.get(slave_id)
await slave.configure_sync_input(
pin="digital_in_1",
trigger_mode="rising_edge"
)
# Start synchronized recording
session = await client.sessions.create_multi_device(
devices=sync_config["master_device"] + sync_config["slave_devices"],
sync_config=sync_config
)
await session.start_synchronized_recording()
Software Synchronization
# Software-based synchronization using NTP
from neurascale.sync import SoftwareSync
sync_manager = SoftwareSync()
# Add devices to sync group
sync_group = await sync_manager.create_group("experiment_001")
await sync_group.add_device("openbci_cyton_001")
await sync_group.add_device("emotiv_epoc_001")
await sync_group.add_device("muse_001")
# Enable NTP synchronization
await sync_group.enable_ntp_sync(
ntp_server="pool.ntp.org",
sync_interval=60 # seconds
)
# Start synchronized streaming
streams = await sync_group.start_synchronized_streams()
# Process synchronized data
async def process_multi_device_data():
async for synchronized_packet in streams:
# All devices have synchronized timestamps
device_data = {}
for device_id, packet in synchronized_packet.items():
device_data[device_id] = {
'timestamp': packet.timestamp,
'data': packet.channels,
'sync_quality': packet.sync_quality
}
# Check synchronization quality
timestamps = [data['timestamp'] for data in device_data.values()]
max_offset = max(timestamps) - min(timestamps)
if max_offset < 10: # 10ms tolerance
await process_synchronized_data(device_data)
else:
print(f"Warning: Sync offset {max_offset:.1f}ms")
For detailed device-specific configuration options and troubleshooting, refer to the manufacturer’s documentation and our Troubleshooting Guide.
Best Practices
Device Selection Guidelines
-
Research Applications
- Use research-grade systems (g.tec, BrainProducts) for publication-quality data
- Consider channel count requirements (8-256 channels)
- Ensure adequate sampling rate (250-2000 Hz for EEG)
-
Clinical Applications
- Choose FDA-approved devices for clinical trials
- Prioritize reliability and support
- Consider infection control requirements
-
Consumer Applications
- Emotiv/Muse for ease of use
- OpenBCI for customization and cost-effectiveness
- Consider wireless connectivity needs
Performance Optimization
# Optimize for real-time performance
optimization_config = {
"buffer_management": {
"buffer_size": "minimal", # Minimize latency
"double_buffering": True,
"memory_mapping": True
},
"processing": {
"parallel_channels": True,
"gpu_acceleration": False, # CPU often faster for small datasets
"vectorization": True
},
"network": {
"tcp_nodelay": True,
"socket_buffer_size": 65536,
"compression": False # Trade bandwidth for latency
}
}
# Apply optimizations
await client.configure_performance(optimization_config)
Quality Assurance
# Implement comprehensive quality checks
class QualityMonitor:
def __init__(self, device):
self.device = device
self.quality_thresholds = {
"impedance": 50000, # 50kΩ
"signal_range": 200, # ±200µV
"packet_loss": 0.01, # 1%
"sync_offset": 10 # 10ms
}
async def check_signal_quality(self, packet):
issues = []
# Check impedances
if hasattr(packet, 'impedance'):
high_impedance = [
ch for ch, imp in packet.impedance.items()
if imp > self.quality_thresholds["impedance"]
]
if high_impedance:
issues.append(f"High impedance: {high_impedance}")
# Check signal range
for channel in packet.channels:
if abs(channel.value) > self.quality_thresholds["signal_range"]:
issues.append(f"Saturation on channel {channel.number}")
# Check packet loss
if packet.packet_loss > self.quality_thresholds["packet_loss"]:
issues.append(f"Packet loss: {packet.packet_loss:.2%}")
return issues
async def monitor_continuously(self):
stream = await self.device.start_stream()
async for packet in stream:
issues = await self.check_signal_quality(packet)
if issues:
print(f"Quality issues detected: {', '.join(issues)}")
await self.handle_quality_issues(issues)
# Use quality monitor
monitor = QualityMonitor(device)
monitor_task = asyncio.create_task(monitor.monitor_continuously())