Skip to Content
DocumentationDevice Integration Guide

Device Integration Guide

This comprehensive guide covers integration with 30+ BCI devices supported by NeuraScale, from consumer-grade headsets to clinical-grade arrays.

Supported Device Categories

NeuraScale supports a wide range of brain-computer interface devices across different categories:

Consumer BCI Devices

  • OpenBCI (Cyton, Ganglion, Cyton+Daisy)
  • Emotiv (EPOC+, Insight, EPOC X)
  • Muse (Muse 2, Muse S)
  • NeuroSky (MindWave, MindWave Mobile)

Research-Grade Systems

  • g.tec (g.USBamp, g.Nautilus, g.HIamp)
  • BrainProducts (actiCHamp, LiveAmp, QuickAmp)
  • ANT Neuro (eego™ systems)
  • BioSemi (ActiveTwo)

Clinical Arrays

  • Blackrock (Utah Array, CerePlex)
  • Plexon (OmniPlex)
  • Ripple (Grapevine)
  • Custom LSL streams

OpenBCI Integration

OpenBCI devices are among the most popular research-grade BCI systems, offering excellent value and flexibility.

OpenBCI Cyton (8-channel EEG)

The Cyton board is OpenBCI’s flagship 8-channel biosensing board.

Hardware Setup

  1. Assemble the Cyton Board

    • Attach the USB dongle to your computer
    • Power on the Cyton board
    • Ensure the blue LED is blinking (pairing mode)
  2. Electrode Preparation

    # Standard 10-20 electrode positions for 8 channels # Fp1, Fp2, C3, C4, P7, P8, O1, O2
  3. Connect Electrodes

    • Use gold cup electrodes with Ten20 paste
    • Connect ground electrode to ear lobe or mastoid
    • Verify impedances are below 5kΩ

Software Integration

from neurascale import NeuraScaleClient from neurascale.devices import OpenBCICyton # Initialize client client = NeuraScaleClient(api_key="your-api-key") # Configure Cyton device cyton_config = { "device_id": "openbci_cyton_001", "connection": { "port": "/dev/ttyUSB0", # Linux/macOS # "port": "COM3", # Windows "baudrate": 115200, "timeout": 5.0 }, "acquisition": { "sample_rate": 250, # Hz "channels": [1, 2, 3, 4, 5, 6, 7, 8], "gain": 24, # 24x gain "bias": True, "srb2": True }, "filters": { "notch": 60, # Power line noise "highpass": 0.5, # Remove DC drift "lowpass": 100 # Anti-aliasing } } # Connect device cyton = await client.devices.connect("openbci_cyton_001", cyton_config) print(f"Connected to {cyton.name}") # Start data streaming stream = await cyton.start_stream() async for data_packet in stream: print(f"Timestamp: {data_packet.timestamp}") print(f"Channels: {[ch.value for ch in data_packet.channels]}") # Process data in real-time await process_eeg_data(data_packet)

Advanced Configuration

# Custom channel configuration channel_config = { 1: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True}, 2: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True}, 3: {"gain": 12, "input_type": "normal", "bias": True, "srb2": False}, # Different gain 4: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True}, 5: {"gain": 24, "input_type": "shorted", "bias": False, "srb2": False}, # Test channel 6: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True}, 7: {"gain": 24, "input_type": "normal", "bias": True, "srb2": True}, 8: {"gain": 24, "input_type": "bias_meas", "bias": False, "srb2": False} # Bias measurement } # Apply channel configuration await cyton.configure_channels(channel_config) # Enable test signals for calibration await cyton.enable_test_signal(frequency=10, amplitude=1.0) # 10Hz, 1µV # Check impedances impedances = await cyton.measure_impedance() for channel, impedance in impedances.items(): print(f"Channel {channel}: {impedance/1000:.1f}kΩ")

Troubleshooting

Common Issues:

  • Device not found: Check USB permissions and drivers
  • Poor signal quality: Verify electrode contact and impedances
  • Data dropouts: Check USB cable and interference sources
# Diagnostic function async def diagnose_cyton(device): # Check firmware version firmware = await device.get_firmware_version() print(f"Firmware: {firmware}") # Test board functionality board_test = await device.run_board_test() print(f"Board test: {'PASS' if board_test else 'FAIL'}") # Check signal quality quality = await device.assess_signal_quality(duration=10) for ch, q in quality.items(): status = "GOOD" if q > 0.8 else "POOR" print(f"Channel {ch}: {q:.2f} ({status})")

Emotiv Integration

Emotiv devices offer consumer-friendly BCI solutions with advanced signal processing.

Emotiv EPOC+ (14-channel EEG)

Setup and Authentication

  1. Emotiv Account Setup

    • Create account at emotiv.com
    • Obtain Client ID and Client Secret
    • Purchase appropriate license
  2. Device Authentication

    from neurascale.devices import EmotivEPOCPlus # Configure authentication emotiv_auth = { "client_id": "your_client_id", "client_secret": "your_client_secret", "license_key": "your_license_key" } # Initialize EPOC+ device epoc_config = { "device_id": "emotiv_epoc_plus_001", "authentication": emotiv_auth, "acquisition": { "sample_rate": 128, # Fixed for EPOC+ "channels": [ "AF3", "F7", "F3", "FC5", "T7", "P7", "O1", "O2", "P8", "T8", "FC6", "F4", "F8", "AF4" ], "motion_sensors": True, "contact_quality": True } } epoc = await client.devices.connect("emotiv_epoc_plus_001", epoc_config)

Contact Quality Monitoring

# Real-time contact quality monitoring async def monitor_contact_quality(): while True: quality = await epoc.get_contact_quality() print("Contact Quality:") for electrode, q in quality.items(): status = "GOOD" if q > 80 else "FAIR" if q > 50 else "POOR" print(f" {electrode}: {q}% ({status})") # Alert on poor contact poor_contacts = [e for e, q in quality.items() if q < 50] if poor_contacts: print(f"Warning: Poor contact on {poor_contacts}") await asyncio.sleep(5) # Start monitoring quality_task = asyncio.create_task(monitor_contact_quality())

Data Streaming with Motion

# Stream EEG and motion data stream = await epoc.start_stream() async for packet in stream: # EEG data (14 channels) eeg_data = packet.eeg_channels # Motion data gyro = packet.gyroscope # 3-axis gyroscope accel = packet.accelerometer # 3-axis accelerometer # Contact quality (updated every second) if packet.has_contact_quality: contact_quality = packet.contact_quality # Process combined data await process_multimodal_data({ 'eeg': eeg_data, 'motion': {'gyro': gyro, 'accel': accel}, 'quality': contact_quality })

Advanced Features

# Enable advanced Emotiv features await epoc.enable_performance_metrics() await epoc.enable_facial_expressions() await epoc.enable_mental_commands() # Get performance metrics performance = await epoc.get_performance_metrics() print(f"Attention: {performance.attention:.2f}") print(f"Meditation: {performance.meditation:.2f}") print(f"Engagement: {performance.engagement:.2f}") # Detect facial expressions expressions = await epoc.get_facial_expressions() if expressions.blink > 0.5: print("Eye blink detected") if expressions.smile > 0.7: print("Smile detected") # Mental commands (requires training) commands = await epoc.get_mental_commands() if commands.push > 0.8: print("Push command detected")

Research-Grade Systems

g.tec Integration

g.tec g.USBamp (16-channel)

from neurascale.devices import GtecUSBamp # Configure g.USBamp gusb_config = { "device_id": "gtec_usbamp_001", "connection": { "serial_number": "UA-2019.05.15", "driver_version": "3.20.02" }, "acquisition": { "sample_rate": 512, # Up to 38.4 kHz "channels": list(range(1, 17)), # 16 channels "input_range": 500, # µV "reference": "common", # common, differential, single-ended "ground": "channel_16" }, "filters": { "notch": [50, 100], # Multiple notch filters "highpass": 0.1, "lowpass": 100, "bandstop": [[49, 51], [99, 101]] # Precise frequency bands }, "triggers": { "digital_inputs": True, "analog_trigger": { "enable": True, "channel": "AIN1", "threshold": 2.5 # Volts } } } gusb = await client.devices.connect("gtec_usbamp_001", gusb_config) # High-precision streaming stream = await gusb.start_stream() async for packet in stream: # High-resolution data (24-bit ADC) eeg_data = packet.channels # 16 channels digital_triggers = packet.digital_inputs analog_trigger = packet.analog_trigger # Process triggers if any(digital_triggers): trigger_channels = [i for i, t in enumerate(digital_triggers) if t] await handle_digital_triggers(trigger_channels, packet.timestamp) if analog_trigger > gusb_config["triggers"]["analog_trigger"]["threshold"]: await handle_analog_trigger(analog_trigger, packet.timestamp)

BrainProducts Integration

from neurascale.devices import BrainProductsAmp # Configure BrainProducts actiCHamp actichamp_config = { "device_id": "bp_actichamp_001", "connection": { "interface": "usb", "serial_number": "17010768" }, "acquisition": { "sample_rate": 1000, "channels": list(range(1, 65)), # 64 channels "resolution": "24bit", "input_range": "±100mV", "dc_coupling": True }, "modules": { "aux_box": True, "trigger_box": True, "impedance_meter": True }, "triggers": { "parallel_port": True, "serial_triggers": True, "network_triggers": True } } actichamp = await client.devices.connect("bp_actichamp_001", actichamp_config) # Real-time impedance monitoring async def monitor_impedances(): while True: impedances = await actichamp.measure_all_impedances() high_impedance = [] for ch, imp in impedances.items(): if imp > 50000: # 50kΩ threshold high_impedance.append(f"{ch}: {imp/1000:.1f}kΩ") if high_impedance: print(f"High impedance electrodes: {', '.join(high_impedance)}") await asyncio.sleep(30) # Check every 30 seconds

LSL Stream Integration

Lab Streaming Layer (LSL) provides a unified interface for various devices.

from neurascale.devices import LSLStream from pylsl import resolve_streams # Discover available LSL streams available_streams = resolve_streams(timeout=5.0) print("Available LSL streams:") for stream in available_streams: print(f" {stream.name()} ({stream.type()}) - {stream.channel_count()} channels") # Configure LSL device lsl_config = { "device_id": "lsl_eeg_001", "stream": { "name": "EEG_Stream", "type": "EEG", "source_id": "myuniquesourceid" }, "buffering": { "max_buflen": 360, # 6 minutes "max_chunklen": 0, # No chunking "recover": True }, "clock_sync": { "enable": True, "correction_interval": 5.0 } } # Connect to LSL stream lsl_device = await client.devices.connect("lsl_eeg_001", lsl_config) # Handle clock synchronization async def sync_clocks(): while True: # Get time correction correction = await lsl_device.get_time_correction() print(f"Clock offset: {correction:.3f}s") if abs(correction) > 0.01: # 10ms threshold print("Warning: Large clock offset detected") await asyncio.sleep(30) # Stream with timestamp correction stream = await lsl_device.start_stream() async for packet in stream: # Apply time correction corrected_timestamp = packet.timestamp + await lsl_device.get_time_correction() # Process synchronized data await process_synchronized_data(packet.data, corrected_timestamp)

Multi-Device Synchronization

Hardware Synchronization

# Synchronize multiple devices with hardware triggers sync_config = { "master_device": "gtec_usbamp_001", "slave_devices": [ "openbci_cyton_001", "emotiv_epoc_001" ], "sync_method": "hardware_trigger", "sync_frequency": 1.0, # 1 Hz sync pulses "sync_pin": "digital_out_1" } # Configure master device master = await client.devices.get("gtec_usbamp_001") await master.configure_sync_output( pin="digital_out_1", frequency=1.0, pulse_width=100 # milliseconds ) # Configure slave devices for slave_id in sync_config["slave_devices"]: slave = await client.devices.get(slave_id) await slave.configure_sync_input( pin="digital_in_1", trigger_mode="rising_edge" ) # Start synchronized recording session = await client.sessions.create_multi_device( devices=sync_config["master_device"] + sync_config["slave_devices"], sync_config=sync_config ) await session.start_synchronized_recording()

Software Synchronization

# Software-based synchronization using NTP from neurascale.sync import SoftwareSync sync_manager = SoftwareSync() # Add devices to sync group sync_group = await sync_manager.create_group("experiment_001") await sync_group.add_device("openbci_cyton_001") await sync_group.add_device("emotiv_epoc_001") await sync_group.add_device("muse_001") # Enable NTP synchronization await sync_group.enable_ntp_sync( ntp_server="pool.ntp.org", sync_interval=60 # seconds ) # Start synchronized streaming streams = await sync_group.start_synchronized_streams() # Process synchronized data async def process_multi_device_data(): async for synchronized_packet in streams: # All devices have synchronized timestamps device_data = {} for device_id, packet in synchronized_packet.items(): device_data[device_id] = { 'timestamp': packet.timestamp, 'data': packet.channels, 'sync_quality': packet.sync_quality } # Check synchronization quality timestamps = [data['timestamp'] for data in device_data.values()] max_offset = max(timestamps) - min(timestamps) if max_offset < 10: # 10ms tolerance await process_synchronized_data(device_data) else: print(f"Warning: Sync offset {max_offset:.1f}ms")

For detailed device-specific configuration options and troubleshooting, refer to the manufacturer’s documentation and our Troubleshooting Guide.

Best Practices

Device Selection Guidelines

  1. Research Applications

    • Use research-grade systems (g.tec, BrainProducts) for publication-quality data
    • Consider channel count requirements (8-256 channels)
    • Ensure adequate sampling rate (250-2000 Hz for EEG)
  2. Clinical Applications

    • Choose FDA-approved devices for clinical trials
    • Prioritize reliability and support
    • Consider infection control requirements
  3. Consumer Applications

    • Emotiv/Muse for ease of use
    • OpenBCI for customization and cost-effectiveness
    • Consider wireless connectivity needs

Performance Optimization

# Optimize for real-time performance optimization_config = { "buffer_management": { "buffer_size": "minimal", # Minimize latency "double_buffering": True, "memory_mapping": True }, "processing": { "parallel_channels": True, "gpu_acceleration": False, # CPU often faster for small datasets "vectorization": True }, "network": { "tcp_nodelay": True, "socket_buffer_size": 65536, "compression": False # Trade bandwidth for latency } } # Apply optimizations await client.configure_performance(optimization_config)

Quality Assurance

# Implement comprehensive quality checks class QualityMonitor: def __init__(self, device): self.device = device self.quality_thresholds = { "impedance": 50000, # 50kΩ "signal_range": 200, # ±200µV "packet_loss": 0.01, # 1% "sync_offset": 10 # 10ms } async def check_signal_quality(self, packet): issues = [] # Check impedances if hasattr(packet, 'impedance'): high_impedance = [ ch for ch, imp in packet.impedance.items() if imp > self.quality_thresholds["impedance"] ] if high_impedance: issues.append(f"High impedance: {high_impedance}") # Check signal range for channel in packet.channels: if abs(channel.value) > self.quality_thresholds["signal_range"]: issues.append(f"Saturation on channel {channel.number}") # Check packet loss if packet.packet_loss > self.quality_thresholds["packet_loss"]: issues.append(f"Packet loss: {packet.packet_loss:.2%}") return issues async def monitor_continuously(self): stream = await self.device.start_stream() async for packet in stream: issues = await self.check_signal_quality(packet) if issues: print(f"Quality issues detected: {', '.join(issues)}") await self.handle_quality_issues(issues) # Use quality monitor monitor = QualityMonitor(device) monitor_task = asyncio.create_task(monitor.monitor_continuously())
Last updated on