Skip to Content
DocumentationDeveloper Guide

Developer Guide

This comprehensive guide covers development for both the NeuraScale Console (frontend) and Neural Engine (backend), providing practical examples and tutorials for building BCI applications.

Console Development (Frontend)

The NeuraScale Console is a Next.js application that provides a modern web interface for device management, data visualization, and experiment control.

Getting Started with Console Development

Setup Development Environment

cd console # Install dependencies pnpm install # Copy environment template cp .env.example .env.local # Start development server pnpm dev

The console will be available at http://localhost:3000 

Project Structure

console/ ├── app/ # Next.js App Router │ ├── (auth)/ # Authentication routes │ ├── (dashboard)/ # Dashboard routes │ ├── api/ # API routes │ └── layout.tsx # Root layout ├── components/ # React components │ ├── ui/ # shadcn/ui components │ ├── dashboard/ # Dashboard components │ └── visualization/ # Data viz components ├── lib/ # Utilities │ ├── firebase/ # Firebase config │ ├── hooks/ # React hooks │ └── utils/ # Helper functions ├── prisma/ # Database schema └── public/ # Static assets

Key Technologies

  • Framework: Next.js 14 with App Router
  • UI Components: shadcn/ui + Tailwind CSS
  • State Management: Zustand + React Query
  • Authentication: Firebase Auth
  • Database: Prisma + NeonDB
  • Real-time: Socket.io client
  • Charts: Recharts + D3.js

Building UI Components

Creating a Device Card Component

// components/dashboard/device-card.tsx import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { Badge } from "@/components/ui/badge" import { Button } from "@/components/ui/button" import { Activity, Wifi, WifiOff } from "lucide-react" interface DeviceCardProps { device: { id: string name: string type: string status: 'connected' | 'disconnected' | 'connecting' batteryLevel?: number signalQuality?: number } onConnect: () => void onDisconnect: () => void } export function DeviceCard({ device, onConnect, onDisconnect }: DeviceCardProps) { return ( <Card> <CardHeader className="flex flex-row items-center justify-between"> <CardTitle className="text-lg">{device.name}</CardTitle> <Badge variant={device.status === 'connected' ? 'success' : 'secondary'}> {device.status} </Badge> </CardHeader> <CardContent> <div className="space-y-4"> <div className="flex items-center justify-between text-sm"> <span className="text-muted-foreground">Type</span> <span className="font-medium">{device.type}</span> </div> {device.batteryLevel !== undefined && ( <div className="flex items-center justify-between text-sm"> <span className="text-muted-foreground">Battery</span> <span className="font-medium">{device.batteryLevel}%</span> </div> )} {device.signalQuality !== undefined && ( <div className="flex items-center justify-between text-sm"> <span className="text-muted-foreground">Signal Quality</span> <div className="flex items-center gap-1"> <Activity className="h-4 w-4" /> <span className="font-medium">{device.signalQuality}%</span> </div> </div> )} <Button className="w-full" variant={device.status === 'connected' ? 'destructive' : 'default'} onClick={device.status === 'connected' ? onDisconnect : onConnect} > {device.status === 'connected' ? ( <> <WifiOff className="mr-2 h-4 w-4" /> Disconnect </> ) : ( <> <Wifi className="mr-2 h-4 w-4" /> Connect </> )} </Button> </div> </CardContent> </Card> ) }

Real-time Data Visualization

// components/visualization/eeg-chart.tsx import { useEffect, useRef } from 'react' import { Line } from 'recharts' import { useSocket } from '@/lib/hooks/use-socket' interface EEGChartProps { deviceId: string channels: number[] } export function EEGChart({ deviceId, channels }: EEGChartProps) { const canvasRef = useRef<HTMLCanvasElement>(null) const { socket, isConnected } = useSocket() const bufferRef = useRef<number[][]>([]) useEffect(() => { if (!socket || !isConnected) return const handleData = (data: any) => { if (data.deviceId !== deviceId) return // Add data to buffer bufferRef.current.push(data.samples) // Keep only last 1000 samples if (bufferRef.current.length > 1000) { bufferRef.current.shift() } // Draw on canvas drawEEG() } socket.on('eeg:data', handleData) return () => { socket.off('eeg:data', handleData) } }, [socket, isConnected, deviceId]) const drawEEG = () => { const canvas = canvasRef.current if (!canvas) return const ctx = canvas.getContext('2d') if (!ctx) return // Clear canvas ctx.clearRect(0, 0, canvas.width, canvas.height) // Draw each channel channels.forEach((channel, index) => { const yOffset = (canvas.height / channels.length) * index ctx.beginPath() ctx.strokeStyle = `hsl(${index * 360 / channels.length}, 70%, 50%)` ctx.lineWidth = 1 bufferRef.current.forEach((sample, x) => { const y = yOffset + (sample[channel] * 50) // Scale amplitude if (x === 0) { ctx.moveTo(x, y) } else { ctx.lineTo(x, y) } }) ctx.stroke() }) } return ( <div className="relative w-full h-64 bg-background rounded-lg border"> <canvas ref={canvasRef} width={1000} height={256} className="w-full h-full" /> {!isConnected && ( <div className="absolute inset-0 flex items-center justify-center bg-background/80"> <p className="text-muted-foreground">Waiting for connection...</p> </div> )} </div> ) }

Authentication with Firebase

// lib/firebase/auth.tsx import { createContext, useContext, useEffect, useState } from 'react' import { getAuth, signInWithPopup, GoogleAuthProvider, onAuthStateChanged, User } from 'firebase/auth' import { app } from './config' interface AuthContextType { user: User | null loading: boolean signInWithGoogle: () => Promise<void> signOut: () => Promise<void> } const AuthContext = createContext<AuthContextType | null>(null) export function AuthProvider({ children }: { children: React.ReactNode }) { const [user, setUser] = useState<User | null>(null) const [loading, setLoading] = useState(true) const auth = getAuth(app) useEffect(() => { const unsubscribe = onAuthStateChanged(auth, (user) => { setUser(user) setLoading(false) }) return unsubscribe }, [auth]) const signInWithGoogle = async () => { const provider = new GoogleAuthProvider() await signInWithPopup(auth, provider) } const signOut = async () => { await auth.signOut() } return ( <AuthContext.Provider value={{ user, loading, signInWithGoogle, signOut }}> {children} </AuthContext.Provider> ) } export const useAuth = () => { const context = useContext(AuthContext) if (!context) { throw new Error('useAuth must be used within AuthProvider') } return context }

Database Operations with Prisma

// app/api/experiments/route.ts import { NextRequest, NextResponse } from 'next/server' import { prisma } from '@/lib/prisma' import { getAuth } from '@/lib/firebase/admin' export async function GET(request: NextRequest) { try { // Verify authentication const token = request.headers.get('authorization')?.split(' ')[1] if (!token) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } const decodedToken = await getAuth().verifyIdToken(token) const userId = decodedToken.uid // Get user's experiments const experiments = await prisma.experiment.findMany({ where: { userId }, include: { sessions: { select: { id: true, name: true, createdAt: true, duration: true } } }, orderBy: { createdAt: 'desc' } }) return NextResponse.json(experiments) } catch (error) { console.error('Error fetching experiments:', error) return NextResponse.json( { error: 'Internal server error' }, { status: 500 } ) } } export async function POST(request: NextRequest) { try { const token = request.headers.get('authorization')?.split(' ')[1] if (!token) { return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) } const decodedToken = await getAuth().verifyIdToken(token) const userId = decodedToken.uid const body = await request.json() // Create new experiment const experiment = await prisma.experiment.create({ data: { name: body.name, description: body.description, userId, deviceType: body.deviceType, configuration: body.configuration } }) return NextResponse.json(experiment, { status: 201 }) } catch (error) { console.error('Error creating experiment:', error) return NextResponse.json( { error: 'Internal server error' }, { status: 500 } ) } }

Backend Development (Neural Engine)

Quick Start Integration

Python SDK

The Python SDK is the recommended way to integrate with NeuraScale for research and clinical applications.

Install the SDK

pip install neurascale-sdk

Initialize Connection

from neurascale import NeuraScaleClient import asyncio # Initialize client client = NeuraScaleClient( api_key="your-api-key", base_url="https://api.neurascale.io" ) # Test connection async def test_connection(): status = await client.health.check() print(f"NeuraScale status: {status}") asyncio.run(test_connection())

Connect Your First Device

async def connect_device(): # List available devices devices = await client.devices.list() print(f"Found {len(devices)} devices") # Connect to OpenBCI device device = await client.devices.connect( device_id="openbci_001", config={ "sample_rate": 250, "channels": [1, 2, 3, 4, 5, 6, 7, 8], "filters": { "notch": 60, "bandpass": [0.5, 100] } } ) print(f"Connected to {device.name}") return device device = asyncio.run(connect_device())

Start Real-time Streaming

async def stream_data(): # Start streaming stream = await client.streams.start( device_id="openbci_001", session_name="my_first_recording" ) # Process real-time data async for data_packet in stream: print(f"Received {len(data_packet.channels)} channels") print(f"Timestamp: {data_packet.timestamp}") # Process your data here for channel in data_packet.channels: print(f"Channel {channel.number}: {channel.value:.3f}µV") # Stop after 100 packets for demo if data_packet.sequence_number > 100: break # Stop streaming await stream.stop() asyncio.run(stream_data())

Language-Specific SDKs

Python SDK - Advanced Usage

from neurascale import NeuraScaleClient from neurascale.analysis import MotorImageryClassifier from neurascale.preprocessing import SignalProcessor import numpy as np class NeuraScaleApp: def __init__(self, api_key: str): self.client = NeuraScaleClient(api_key=api_key) self.processor = SignalProcessor() self.classifier = MotorImageryClassifier() async def setup_session(self): """Setup a new recording session""" self.session = await self.client.sessions.create( name="Motor Imagery Training", participant_id="participant_001", protocol="motor_imagery_4class", metadata={ "condition": "training", "experimenter": "researcher_01" } ) async def real_time_classification(self): """Real-time motor imagery classification""" # Configure preprocessing self.processor.configure( notch_filter=60, bandpass=[8, 30], # Focus on mu and beta bands window_size=2.0, # 2-second windows overlap=0.5 # 50% overlap ) # Start streaming stream = await self.client.streams.start( device_id="openbci_001", session_id=self.session.id ) async for data_packet in stream: # Preprocess signal processed = self.processor.process(data_packet.raw_data) # Extract features features = self.processor.extract_features( processed, feature_types=['psd', 'csp', 'band_power'] ) # Classify movement intention prediction = await self.classifier.predict(features) print(f"Prediction: {prediction.label}") print(f"Confidence: {prediction.confidence:.2f}") # Send feedback to user interface await self.send_feedback(prediction) async def send_feedback(self, prediction): """Send real-time feedback""" feedback = { "timestamp": prediction.timestamp, "direction": prediction.label, "confidence": prediction.confidence, "feedback_type": "visual_cursor" } await self.client.feedback.send(feedback) # Usage app = NeuraScaleApp("your-api-key") asyncio.run(app.setup_session()) asyncio.run(app.real_time_classification())

Batch Processing Example

from neurascale.analysis import BatchProcessor from datetime import datetime, timedelta async def analyze_historical_data(): """Analyze stored neural data""" # Query data from last week end_time = datetime.now() start_time = end_time - timedelta(days=7) data = await client.data.query( session_ids=["session_123", "session_124"], start_time=start_time, end_time=end_time, channels=[1, 2, 3, 4] ) # Process in batches processor = BatchProcessor() results = await processor.analyze( data=data, analysis_type="spectral_analysis", parameters={ "frequency_bands": { "delta": [0.5, 4], "theta": [4, 8], "alpha": [8, 13], "beta": [13, 30], "gamma": [30, 100] } } ) # Export results await client.data.export( results, format="hdf5", destination="gs://my-bucket/analysis-results/" ) return results

Integration Patterns

Real-time Processing Pipeline

from neurascale import NeuraScaleClient from neurascale.processing import ProcessingPipeline import asyncio class RealTimeProcessor: def __init__(self, api_key: str): self.client = NeuraScaleClient(api_key=api_key) self.pipeline = ProcessingPipeline() def setup_pipeline(self): """Configure processing pipeline""" self.pipeline.add_stage("artifact_removal", { "method": "ICA", "n_components": 8 }) self.pipeline.add_stage("filtering", { "notch": 60, "bandpass": [0.5, 100], "filter_type": "butterworth", "order": 4 }) self.pipeline.add_stage("feature_extraction", { "features": ["psd", "band_power", "hjorth"], "window_size": 2.0, "overlap": 0.5 }) self.pipeline.add_stage("classification", { "model": "motor_imagery_4class", "confidence_threshold": 0.7 }) async def process_stream(self, device_id: str): """Process real-time neural data stream""" # Start streaming stream = await self.client.streams.start(device_id) async for data_packet in stream: # Process through pipeline result = await self.pipeline.process(data_packet) # Handle results if result.stage == "classification": prediction = result.output await self.handle_prediction(prediction) # Store intermediate results if needed if result.store_intermediate: await self.client.data.store_processed(result) async def handle_prediction(self, prediction): """Handle classification results""" print(f"Movement detected: {prediction.label}") print(f"Confidence: {prediction.confidence:.2f}") # Send to external system if prediction.confidence > 0.8: await self.send_control_signal(prediction.label) async def send_control_signal(self, movement: str): """Send control signal to external device""" control_data = { "command": movement, "timestamp": time.time(), "source": "neurascale" } # Send via webhook, WebSocket, or direct API call await self.client.external.send_command(control_data) # Usage processor = RealTimeProcessor("your-api-key") processor.setup_pipeline() asyncio.run(processor.process_stream("openbci_001"))

Batch Processing for Research

from neurascale import NeuraScaleClient from neurascale.analysis import BatchAnalyzer import pandas as pd import numpy as np class ResearchAnalyzer: def __init__(self, api_key: str): self.client = NeuraScaleClient(api_key=api_key) self.analyzer = BatchAnalyzer() async def analyze_experiment(self, experiment_id: str): """Analyze complete experiment dataset""" # Get all sessions from experiment sessions = await self.client.experiments.get_sessions(experiment_id) results = [] for session in sessions: session_result = await self.analyze_session(session.id) results.append(session_result) # Combine results combined_results = pd.concat(results, ignore_index=True) # Perform group analysis group_stats = self.perform_group_analysis(combined_results) # Export results await self.export_results(experiment_id, combined_results, group_stats) return combined_results, group_stats async def analyze_session(self, session_id: str): """Analyze single session""" # Fetch session data data = await self.client.data.query( session_ids=[session_id], preprocessing={ "remove_artifacts": True, "apply_filters": True, "epoch_events": True } ) # Configure analysis self.analyzer.configure({ "time_frequency_analysis": { "method": "morlet_wavelet", "frequencies": np.logspace(np.log10(1), np.log10(100), 50), "n_cycles": 7 }, "connectivity_analysis": { "methods": ["coherence", "pli", "wpli"], "frequency_bands": { "theta": [4, 8], "alpha": [8, 13], "beta": [13, 30] } }, "event_related_analysis": { "baseline": [-0.5, 0], "time_window": [-1, 2], "conditions": ["left_hand", "right_hand", "feet", "tongue"] } }) # Run analysis results = await self.analyzer.analyze(data) return results def perform_group_analysis(self, data: pd.DataFrame): """Perform statistical analysis across participants""" # Group-level statistics group_stats = { "participant_count": data['participant_id'].nunique(), "total_trials": len(data), "mean_accuracy": data['accuracy'].mean(), "std_accuracy": data['accuracy'].std() } # ANOVA for condition effects from scipy.stats import f_oneway conditions = data['condition'].unique() condition_groups = [data[data['condition'] == cond]['accuracy'] for cond in conditions] f_stat, p_value = f_oneway(*condition_groups) group_stats['anova_f'] = f_stat group_stats['anova_p'] = p_value return group_stats async def export_results(self, experiment_id: str, data: pd.DataFrame, stats: dict): """Export results in multiple formats""" # Export to cloud storage await self.client.data.export( data, format="parquet", destination=f"gs://research-data/{experiment_id}/results.parquet" ) # Generate report report = self.generate_report(data, stats) await self.client.reports.create( experiment_id=experiment_id, report_type="analysis_summary", content=report ) # Usage for research analyzer = ResearchAnalyzer("your-api-key") results, stats = asyncio.run(analyzer.analyze_experiment("exp_001"))

Error Handling and Best Practices

Robust Connection Management

import asyncio import logging from neurascale import NeuraScaleClient, ConnectionError, DeviceError class RobustNeuraScaleClient: def __init__(self, api_key: str, max_retries: int = 3): self.client = NeuraScaleClient(api_key=api_key) self.max_retries = max_retries self.logger = logging.getLogger(__name__) async def connect_with_retry(self, device_id: str, config: dict): """Connect to device with automatic retry""" for attempt in range(self.max_retries): try: device = await self.client.devices.connect(device_id, config) self.logger.info(f"Connected to {device_id} on attempt {attempt + 1}") return device except ConnectionError as e: self.logger.warning(f"Connection attempt {attempt + 1} failed: {e}") if attempt < self.max_retries - 1: await asyncio.sleep(2 ** attempt) # Exponential backoff else: raise except DeviceError as e: self.logger.error(f"Device error: {e}") # Don't retry on device errors raise async def safe_stream_processing(self, device_id: str): """Stream processing with error recovery""" while True: try: stream = await self.client.streams.start(device_id) async for data_packet in stream: await self.process_packet(data_packet) except ConnectionError: self.logger.warning("Stream connection lost, attempting to reconnect...") await asyncio.sleep(1) continue except Exception as e: self.logger.error(f"Unexpected error in stream processing: {e}") await asyncio.sleep(5) continue async def process_packet(self, packet): """Process individual data packet with error handling""" try: # Your processing logic here processed_data = self.preprocess(packet.data) result = await self.analyze(processed_data) await self.store_result(result) except Exception as e: self.logger.error(f"Error processing packet: {e}") # Log bad packet for debugging await self.log_error_packet(packet, str(e)) async def log_error_packet(self, packet, error_msg: str): """Log problematic packets for debugging""" error_data = { "timestamp": packet.timestamp, "sequence_number": packet.sequence_number, "error": error_msg, "packet_data": packet.to_dict() } await self.client.debug.log_error(error_data)

Performance Optimization

import asyncio from concurrent.futures import ThreadPoolExecutor import numpy as np class OptimizedProcessor: def __init__(self, api_key: str): self.client = NeuraScaleClient(api_key=api_key) self.executor = ThreadPoolExecutor(max_workers=4) self.buffer = collections.deque(maxlen=1000) async def optimized_streaming(self, device_id: str): """Optimized streaming with concurrent processing""" stream = await self.client.streams.start(device_id) # Process packets concurrently tasks = [] async for packet in stream: # Add to buffer self.buffer.append(packet) # Process in background task = asyncio.create_task(self.process_packet_async(packet)) tasks.append(task) # Limit concurrent tasks if len(tasks) >= 10: done, pending = await asyncio.wait( tasks, return_when=asyncio.FIRST_COMPLETED ) tasks = list(pending) async def process_packet_async(self, packet): """Process packet using thread pool for CPU-intensive tasks""" # CPU-intensive processing in thread pool loop = asyncio.get_event_loop() processed = await loop.run_in_executor( self.executor, self.cpu_intensive_processing, packet.data ) # I/O operations in async context await self.store_processed_data(processed) def cpu_intensive_processing(self, data: np.ndarray) -> np.ndarray: """CPU-intensive signal processing""" # Example: Complex filtering, feature extraction, etc. from scipy import signal # Apply multiple filters filtered = signal.filtfilt(*signal.butter(4, [0.5, 100], fs=250), data) # Extract features features = self.extract_complex_features(filtered) return features def extract_complex_features(self, data: np.ndarray) -> np.ndarray: """Extract complex features from neural data""" features = [] # Time domain features features.extend([ np.mean(data, axis=1), np.std(data, axis=1), np.var(data, axis=1) ]) # Frequency domain features freqs, psd = signal.welch(data, fs=250, axis=1) features.append(np.mean(psd, axis=1)) return np.concatenate(features)

This developer guide provides practical examples for integrating with NeuraScale across multiple programming languages. For more advanced use cases, see our API Documentation and System Modeling sections.

Next Steps

  1. Get API Keys: Sign up at neurascale.io  to get your API credentials
  2. Try Examples: Start with the quick start examples for your preferred language
  3. Read API Docs: Review the complete API documentation for detailed reference
  4. Join Community: Connect with other developers in our GitHub Discussions 
  5. Get Support: Contact support@neurascale.io for technical assistance

Ready to build the future of brain-computer interfaces? Let’s get started!

Last updated on