Developer Guide
This comprehensive guide covers development for both the NeuraScale Console (frontend) and Neural Engine (backend), providing practical examples and tutorials for building BCI applications.
Console Development (Frontend)
The NeuraScale Console is a Next.js application that provides a modern web interface for device management, data visualization, and experiment control.
Getting Started with Console Development
Setup Development Environment
cd console
# Install dependencies
pnpm install
# Copy environment template
cp .env.example .env.local
# Start development server
pnpm dev
The console will be available at http://localhost:3000
Project Structure
console/
├── app/ # Next.js App Router
│ ├── (auth)/ # Authentication routes
│ ├── (dashboard)/ # Dashboard routes
│ ├── api/ # API routes
│ └── layout.tsx # Root layout
├── components/ # React components
│ ├── ui/ # shadcn/ui components
│ ├── dashboard/ # Dashboard components
│ └── visualization/ # Data viz components
├── lib/ # Utilities
│ ├── firebase/ # Firebase config
│ ├── hooks/ # React hooks
│ └── utils/ # Helper functions
├── prisma/ # Database schema
└── public/ # Static assets
Key Technologies
- Framework: Next.js 14 with App Router
- UI Components: shadcn/ui + Tailwind CSS
- State Management: Zustand + React Query
- Authentication: Firebase Auth
- Database: Prisma + NeonDB
- Real-time: Socket.io client
- Charts: Recharts + D3.js
Building UI Components
Creating a Device Card Component
// components/dashboard/device-card.tsx
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"
import { Badge } from "@/components/ui/badge"
import { Button } from "@/components/ui/button"
import { Activity, Wifi, WifiOff } from "lucide-react"
interface DeviceCardProps {
device: {
id: string
name: string
type: string
status: 'connected' | 'disconnected' | 'connecting'
batteryLevel?: number
signalQuality?: number
}
onConnect: () => void
onDisconnect: () => void
}
export function DeviceCard({ device, onConnect, onDisconnect }: DeviceCardProps) {
return (
<Card>
<CardHeader className="flex flex-row items-center justify-between">
<CardTitle className="text-lg">{device.name}</CardTitle>
<Badge variant={device.status === 'connected' ? 'success' : 'secondary'}>
{device.status}
</Badge>
</CardHeader>
<CardContent>
<div className="space-y-4">
<div className="flex items-center justify-between text-sm">
<span className="text-muted-foreground">Type</span>
<span className="font-medium">{device.type}</span>
</div>
{device.batteryLevel !== undefined && (
<div className="flex items-center justify-between text-sm">
<span className="text-muted-foreground">Battery</span>
<span className="font-medium">{device.batteryLevel}%</span>
</div>
)}
{device.signalQuality !== undefined && (
<div className="flex items-center justify-between text-sm">
<span className="text-muted-foreground">Signal Quality</span>
<div className="flex items-center gap-1">
<Activity className="h-4 w-4" />
<span className="font-medium">{device.signalQuality}%</span>
</div>
</div>
)}
<Button
className="w-full"
variant={device.status === 'connected' ? 'destructive' : 'default'}
onClick={device.status === 'connected' ? onDisconnect : onConnect}
>
{device.status === 'connected' ? (
<>
<WifiOff className="mr-2 h-4 w-4" />
Disconnect
</>
) : (
<>
<Wifi className="mr-2 h-4 w-4" />
Connect
</>
)}
</Button>
</div>
</CardContent>
</Card>
)
}
Real-time Data Visualization
// components/visualization/eeg-chart.tsx
import { useEffect, useRef } from 'react'
import { Line } from 'recharts'
import { useSocket } from '@/lib/hooks/use-socket'
interface EEGChartProps {
deviceId: string
channels: number[]
}
export function EEGChart({ deviceId, channels }: EEGChartProps) {
const canvasRef = useRef<HTMLCanvasElement>(null)
const { socket, isConnected } = useSocket()
const bufferRef = useRef<number[][]>([])
useEffect(() => {
if (!socket || !isConnected) return
const handleData = (data: any) => {
if (data.deviceId !== deviceId) return
// Add data to buffer
bufferRef.current.push(data.samples)
// Keep only last 1000 samples
if (bufferRef.current.length > 1000) {
bufferRef.current.shift()
}
// Draw on canvas
drawEEG()
}
socket.on('eeg:data', handleData)
return () => {
socket.off('eeg:data', handleData)
}
}, [socket, isConnected, deviceId])
const drawEEG = () => {
const canvas = canvasRef.current
if (!canvas) return
const ctx = canvas.getContext('2d')
if (!ctx) return
// Clear canvas
ctx.clearRect(0, 0, canvas.width, canvas.height)
// Draw each channel
channels.forEach((channel, index) => {
const yOffset = (canvas.height / channels.length) * index
ctx.beginPath()
ctx.strokeStyle = `hsl(${index * 360 / channels.length}, 70%, 50%)`
ctx.lineWidth = 1
bufferRef.current.forEach((sample, x) => {
const y = yOffset + (sample[channel] * 50) // Scale amplitude
if (x === 0) {
ctx.moveTo(x, y)
} else {
ctx.lineTo(x, y)
}
})
ctx.stroke()
})
}
return (
<div className="relative w-full h-64 bg-background rounded-lg border">
<canvas
ref={canvasRef}
width={1000}
height={256}
className="w-full h-full"
/>
{!isConnected && (
<div className="absolute inset-0 flex items-center justify-center bg-background/80">
<p className="text-muted-foreground">Waiting for connection...</p>
</div>
)}
</div>
)
}
Authentication with Firebase
// lib/firebase/auth.tsx
import { createContext, useContext, useEffect, useState } from 'react'
import {
getAuth,
signInWithPopup,
GoogleAuthProvider,
onAuthStateChanged,
User
} from 'firebase/auth'
import { app } from './config'
interface AuthContextType {
user: User | null
loading: boolean
signInWithGoogle: () => Promise<void>
signOut: () => Promise<void>
}
const AuthContext = createContext<AuthContextType | null>(null)
export function AuthProvider({ children }: { children: React.ReactNode }) {
const [user, setUser] = useState<User | null>(null)
const [loading, setLoading] = useState(true)
const auth = getAuth(app)
useEffect(() => {
const unsubscribe = onAuthStateChanged(auth, (user) => {
setUser(user)
setLoading(false)
})
return unsubscribe
}, [auth])
const signInWithGoogle = async () => {
const provider = new GoogleAuthProvider()
await signInWithPopup(auth, provider)
}
const signOut = async () => {
await auth.signOut()
}
return (
<AuthContext.Provider value={{ user, loading, signInWithGoogle, signOut }}>
{children}
</AuthContext.Provider>
)
}
export const useAuth = () => {
const context = useContext(AuthContext)
if (!context) {
throw new Error('useAuth must be used within AuthProvider')
}
return context
}
Database Operations with Prisma
// app/api/experiments/route.ts
import { NextRequest, NextResponse } from 'next/server'
import { prisma } from '@/lib/prisma'
import { getAuth } from '@/lib/firebase/admin'
export async function GET(request: NextRequest) {
try {
// Verify authentication
const token = request.headers.get('authorization')?.split(' ')[1]
if (!token) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const decodedToken = await getAuth().verifyIdToken(token)
const userId = decodedToken.uid
// Get user's experiments
const experiments = await prisma.experiment.findMany({
where: { userId },
include: {
sessions: {
select: {
id: true,
name: true,
createdAt: true,
duration: true
}
}
},
orderBy: { createdAt: 'desc' }
})
return NextResponse.json(experiments)
} catch (error) {
console.error('Error fetching experiments:', error)
return NextResponse.json(
{ error: 'Internal server error' },
{ status: 500 }
)
}
}
export async function POST(request: NextRequest) {
try {
const token = request.headers.get('authorization')?.split(' ')[1]
if (!token) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const decodedToken = await getAuth().verifyIdToken(token)
const userId = decodedToken.uid
const body = await request.json()
// Create new experiment
const experiment = await prisma.experiment.create({
data: {
name: body.name,
description: body.description,
userId,
deviceType: body.deviceType,
configuration: body.configuration
}
})
return NextResponse.json(experiment, { status: 201 })
} catch (error) {
console.error('Error creating experiment:', error)
return NextResponse.json(
{ error: 'Internal server error' },
{ status: 500 }
)
}
}
Backend Development (Neural Engine)
Quick Start Integration
Python SDK
The Python SDK is the recommended way to integrate with NeuraScale for research and clinical applications.
Install the SDK
pip install neurascale-sdk
Initialize Connection
from neurascale import NeuraScaleClient
import asyncio
# Initialize client
client = NeuraScaleClient(
api_key="your-api-key",
base_url="https://api.neurascale.io"
)
# Test connection
async def test_connection():
status = await client.health.check()
print(f"NeuraScale status: {status}")
asyncio.run(test_connection())
Connect Your First Device
async def connect_device():
# List available devices
devices = await client.devices.list()
print(f"Found {len(devices)} devices")
# Connect to OpenBCI device
device = await client.devices.connect(
device_id="openbci_001",
config={
"sample_rate": 250,
"channels": [1, 2, 3, 4, 5, 6, 7, 8],
"filters": {
"notch": 60,
"bandpass": [0.5, 100]
}
}
)
print(f"Connected to {device.name}")
return device
device = asyncio.run(connect_device())
Start Real-time Streaming
async def stream_data():
# Start streaming
stream = await client.streams.start(
device_id="openbci_001",
session_name="my_first_recording"
)
# Process real-time data
async for data_packet in stream:
print(f"Received {len(data_packet.channels)} channels")
print(f"Timestamp: {data_packet.timestamp}")
# Process your data here
for channel in data_packet.channels:
print(f"Channel {channel.number}: {channel.value:.3f}µV")
# Stop after 100 packets for demo
if data_packet.sequence_number > 100:
break
# Stop streaming
await stream.stop()
asyncio.run(stream_data())
Language-Specific SDKs
Python
Python SDK - Advanced Usage
from neurascale import NeuraScaleClient
from neurascale.analysis import MotorImageryClassifier
from neurascale.preprocessing import SignalProcessor
import numpy as np
class NeuraScaleApp:
def __init__(self, api_key: str):
self.client = NeuraScaleClient(api_key=api_key)
self.processor = SignalProcessor()
self.classifier = MotorImageryClassifier()
async def setup_session(self):
"""Setup a new recording session"""
self.session = await self.client.sessions.create(
name="Motor Imagery Training",
participant_id="participant_001",
protocol="motor_imagery_4class",
metadata={
"condition": "training",
"experimenter": "researcher_01"
}
)
async def real_time_classification(self):
"""Real-time motor imagery classification"""
# Configure preprocessing
self.processor.configure(
notch_filter=60,
bandpass=[8, 30], # Focus on mu and beta bands
window_size=2.0, # 2-second windows
overlap=0.5 # 50% overlap
)
# Start streaming
stream = await self.client.streams.start(
device_id="openbci_001",
session_id=self.session.id
)
async for data_packet in stream:
# Preprocess signal
processed = self.processor.process(data_packet.raw_data)
# Extract features
features = self.processor.extract_features(
processed,
feature_types=['psd', 'csp', 'band_power']
)
# Classify movement intention
prediction = await self.classifier.predict(features)
print(f"Prediction: {prediction.label}")
print(f"Confidence: {prediction.confidence:.2f}")
# Send feedback to user interface
await self.send_feedback(prediction)
async def send_feedback(self, prediction):
"""Send real-time feedback"""
feedback = {
"timestamp": prediction.timestamp,
"direction": prediction.label,
"confidence": prediction.confidence,
"feedback_type": "visual_cursor"
}
await self.client.feedback.send(feedback)
# Usage
app = NeuraScaleApp("your-api-key")
asyncio.run(app.setup_session())
asyncio.run(app.real_time_classification())
Batch Processing Example
from neurascale.analysis import BatchProcessor
from datetime import datetime, timedelta
async def analyze_historical_data():
"""Analyze stored neural data"""
# Query data from last week
end_time = datetime.now()
start_time = end_time - timedelta(days=7)
data = await client.data.query(
session_ids=["session_123", "session_124"],
start_time=start_time,
end_time=end_time,
channels=[1, 2, 3, 4]
)
# Process in batches
processor = BatchProcessor()
results = await processor.analyze(
data=data,
analysis_type="spectral_analysis",
parameters={
"frequency_bands": {
"delta": [0.5, 4],
"theta": [4, 8],
"alpha": [8, 13],
"beta": [13, 30],
"gamma": [30, 100]
}
}
)
# Export results
await client.data.export(
results,
format="hdf5",
destination="gs://my-bucket/analysis-results/"
)
return results
Integration Patterns
Real-time Processing Pipeline
from neurascale import NeuraScaleClient
from neurascale.processing import ProcessingPipeline
import asyncio
class RealTimeProcessor:
def __init__(self, api_key: str):
self.client = NeuraScaleClient(api_key=api_key)
self.pipeline = ProcessingPipeline()
def setup_pipeline(self):
"""Configure processing pipeline"""
self.pipeline.add_stage("artifact_removal", {
"method": "ICA",
"n_components": 8
})
self.pipeline.add_stage("filtering", {
"notch": 60,
"bandpass": [0.5, 100],
"filter_type": "butterworth",
"order": 4
})
self.pipeline.add_stage("feature_extraction", {
"features": ["psd", "band_power", "hjorth"],
"window_size": 2.0,
"overlap": 0.5
})
self.pipeline.add_stage("classification", {
"model": "motor_imagery_4class",
"confidence_threshold": 0.7
})
async def process_stream(self, device_id: str):
"""Process real-time neural data stream"""
# Start streaming
stream = await self.client.streams.start(device_id)
async for data_packet in stream:
# Process through pipeline
result = await self.pipeline.process(data_packet)
# Handle results
if result.stage == "classification":
prediction = result.output
await self.handle_prediction(prediction)
# Store intermediate results if needed
if result.store_intermediate:
await self.client.data.store_processed(result)
async def handle_prediction(self, prediction):
"""Handle classification results"""
print(f"Movement detected: {prediction.label}")
print(f"Confidence: {prediction.confidence:.2f}")
# Send to external system
if prediction.confidence > 0.8:
await self.send_control_signal(prediction.label)
async def send_control_signal(self, movement: str):
"""Send control signal to external device"""
control_data = {
"command": movement,
"timestamp": time.time(),
"source": "neurascale"
}
# Send via webhook, WebSocket, or direct API call
await self.client.external.send_command(control_data)
# Usage
processor = RealTimeProcessor("your-api-key")
processor.setup_pipeline()
asyncio.run(processor.process_stream("openbci_001"))
Batch Processing for Research
from neurascale import NeuraScaleClient
from neurascale.analysis import BatchAnalyzer
import pandas as pd
import numpy as np
class ResearchAnalyzer:
def __init__(self, api_key: str):
self.client = NeuraScaleClient(api_key=api_key)
self.analyzer = BatchAnalyzer()
async def analyze_experiment(self, experiment_id: str):
"""Analyze complete experiment dataset"""
# Get all sessions from experiment
sessions = await self.client.experiments.get_sessions(experiment_id)
results = []
for session in sessions:
session_result = await self.analyze_session(session.id)
results.append(session_result)
# Combine results
combined_results = pd.concat(results, ignore_index=True)
# Perform group analysis
group_stats = self.perform_group_analysis(combined_results)
# Export results
await self.export_results(experiment_id, combined_results, group_stats)
return combined_results, group_stats
async def analyze_session(self, session_id: str):
"""Analyze single session"""
# Fetch session data
data = await self.client.data.query(
session_ids=[session_id],
preprocessing={
"remove_artifacts": True,
"apply_filters": True,
"epoch_events": True
}
)
# Configure analysis
self.analyzer.configure({
"time_frequency_analysis": {
"method": "morlet_wavelet",
"frequencies": np.logspace(np.log10(1), np.log10(100), 50),
"n_cycles": 7
},
"connectivity_analysis": {
"methods": ["coherence", "pli", "wpli"],
"frequency_bands": {
"theta": [4, 8],
"alpha": [8, 13],
"beta": [13, 30]
}
},
"event_related_analysis": {
"baseline": [-0.5, 0],
"time_window": [-1, 2],
"conditions": ["left_hand", "right_hand", "feet", "tongue"]
}
})
# Run analysis
results = await self.analyzer.analyze(data)
return results
def perform_group_analysis(self, data: pd.DataFrame):
"""Perform statistical analysis across participants"""
# Group-level statistics
group_stats = {
"participant_count": data['participant_id'].nunique(),
"total_trials": len(data),
"mean_accuracy": data['accuracy'].mean(),
"std_accuracy": data['accuracy'].std()
}
# ANOVA for condition effects
from scipy.stats import f_oneway
conditions = data['condition'].unique()
condition_groups = [data[data['condition'] == cond]['accuracy']
for cond in conditions]
f_stat, p_value = f_oneway(*condition_groups)
group_stats['anova_f'] = f_stat
group_stats['anova_p'] = p_value
return group_stats
async def export_results(self, experiment_id: str, data: pd.DataFrame, stats: dict):
"""Export results in multiple formats"""
# Export to cloud storage
await self.client.data.export(
data,
format="parquet",
destination=f"gs://research-data/{experiment_id}/results.parquet"
)
# Generate report
report = self.generate_report(data, stats)
await self.client.reports.create(
experiment_id=experiment_id,
report_type="analysis_summary",
content=report
)
# Usage for research
analyzer = ResearchAnalyzer("your-api-key")
results, stats = asyncio.run(analyzer.analyze_experiment("exp_001"))
Error Handling and Best Practices
Robust Connection Management
import asyncio
import logging
from neurascale import NeuraScaleClient, ConnectionError, DeviceError
class RobustNeuraScaleClient:
def __init__(self, api_key: str, max_retries: int = 3):
self.client = NeuraScaleClient(api_key=api_key)
self.max_retries = max_retries
self.logger = logging.getLogger(__name__)
async def connect_with_retry(self, device_id: str, config: dict):
"""Connect to device with automatic retry"""
for attempt in range(self.max_retries):
try:
device = await self.client.devices.connect(device_id, config)
self.logger.info(f"Connected to {device_id} on attempt {attempt + 1}")
return device
except ConnectionError as e:
self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
except DeviceError as e:
self.logger.error(f"Device error: {e}")
# Don't retry on device errors
raise
async def safe_stream_processing(self, device_id: str):
"""Stream processing with error recovery"""
while True:
try:
stream = await self.client.streams.start(device_id)
async for data_packet in stream:
await self.process_packet(data_packet)
except ConnectionError:
self.logger.warning("Stream connection lost, attempting to reconnect...")
await asyncio.sleep(1)
continue
except Exception as e:
self.logger.error(f"Unexpected error in stream processing: {e}")
await asyncio.sleep(5)
continue
async def process_packet(self, packet):
"""Process individual data packet with error handling"""
try:
# Your processing logic here
processed_data = self.preprocess(packet.data)
result = await self.analyze(processed_data)
await self.store_result(result)
except Exception as e:
self.logger.error(f"Error processing packet: {e}")
# Log bad packet for debugging
await self.log_error_packet(packet, str(e))
async def log_error_packet(self, packet, error_msg: str):
"""Log problematic packets for debugging"""
error_data = {
"timestamp": packet.timestamp,
"sequence_number": packet.sequence_number,
"error": error_msg,
"packet_data": packet.to_dict()
}
await self.client.debug.log_error(error_data)
Performance Optimization
import asyncio
from concurrent.futures import ThreadPoolExecutor
import numpy as np
class OptimizedProcessor:
def __init__(self, api_key: str):
self.client = NeuraScaleClient(api_key=api_key)
self.executor = ThreadPoolExecutor(max_workers=4)
self.buffer = collections.deque(maxlen=1000)
async def optimized_streaming(self, device_id: str):
"""Optimized streaming with concurrent processing"""
stream = await self.client.streams.start(device_id)
# Process packets concurrently
tasks = []
async for packet in stream:
# Add to buffer
self.buffer.append(packet)
# Process in background
task = asyncio.create_task(self.process_packet_async(packet))
tasks.append(task)
# Limit concurrent tasks
if len(tasks) >= 10:
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
tasks = list(pending)
async def process_packet_async(self, packet):
"""Process packet using thread pool for CPU-intensive tasks"""
# CPU-intensive processing in thread pool
loop = asyncio.get_event_loop()
processed = await loop.run_in_executor(
self.executor,
self.cpu_intensive_processing,
packet.data
)
# I/O operations in async context
await self.store_processed_data(processed)
def cpu_intensive_processing(self, data: np.ndarray) -> np.ndarray:
"""CPU-intensive signal processing"""
# Example: Complex filtering, feature extraction, etc.
from scipy import signal
# Apply multiple filters
filtered = signal.filtfilt(*signal.butter(4, [0.5, 100], fs=250), data)
# Extract features
features = self.extract_complex_features(filtered)
return features
def extract_complex_features(self, data: np.ndarray) -> np.ndarray:
"""Extract complex features from neural data"""
features = []
# Time domain features
features.extend([
np.mean(data, axis=1),
np.std(data, axis=1),
np.var(data, axis=1)
])
# Frequency domain features
freqs, psd = signal.welch(data, fs=250, axis=1)
features.append(np.mean(psd, axis=1))
return np.concatenate(features)
This developer guide provides practical examples for integrating with NeuraScale across multiple programming languages. For more advanced use cases, see our API Documentation and System Modeling sections.
Next Steps
- Get API Keys: Sign up at neurascale.io to get your API credentials
- Try Examples: Start with the quick start examples for your preferred language
- Read API Docs: Review the complete API documentation for detailed reference
- Join Community: Connect with other developers in our GitHub Discussions
- Get Support: Contact support@neurascale.io for technical assistance
Ready to build the future of brain-computer interfaces? Let’s get started!