Skip to Content
DocumentationPerformance Tuning

Performance Tuning

This guide covers performance optimization techniques for NeuraScale across all components.

Overview

Performance optimization in NeuraScale focuses on:

  • Real-time processing: Sub-10ms latency for neural signal analysis
  • Scalability: Handle thousands of concurrent sessions
  • Resource efficiency: Optimize compute and memory usage
  • Cost optimization: Balance performance with cloud costs

Neural Engine Optimization

Signal Processing Pipeline

# neural-engine/src/core/performance/batch_processor.py from typing import List, Dict, Any import numpy as np from concurrent.futures import ThreadPoolExecutor import asyncio class BatchSignalProcessor: """Optimized batch processing for neural signals""" def __init__(self, batch_size: int = 1000, num_workers: int = 4): self.batch_size = batch_size self.num_workers = num_workers self.executor = ThreadPoolExecutor(max_workers=num_workers) async def process_batch(self, signals: List[np.ndarray]) -> List[Dict[str, Any]]: """Process signals in optimized batches""" # Batch signals for efficient processing batches = [ signals[i:i + self.batch_size] for i in range(0, len(signals), self.batch_size) ] # Process batches in parallel loop = asyncio.get_event_loop() futures = [ loop.run_in_executor(self.executor, self._process_batch_sync, batch) for batch in batches ] results = await asyncio.gather(*futures) return [item for batch in results for item in batch] def _process_batch_sync(self, batch: List[np.ndarray]) -> List[Dict[str, Any]]: """Synchronous batch processing with vectorization""" # Stack signals for vectorized operations stacked = np.stack(batch) # Vectorized FFT computation fft_results = np.fft.rfft(stacked, axis=1) power_spectrum = np.abs(fft_results) ** 2 # Vectorized feature extraction features = { 'mean_power': np.mean(power_spectrum, axis=1), 'peak_frequency': np.argmax(power_spectrum, axis=1), 'spectral_entropy': self._compute_spectral_entropy_vectorized(power_spectrum) } return [ { 'features': {k: v[i] for k, v in features.items()}, 'timestamp': np.datetime64('now') } for i in range(len(batch)) ] def _compute_spectral_entropy_vectorized(self, power_spectrum: np.ndarray) -> np.ndarray: """Vectorized spectral entropy computation""" # Normalize power spectrum norm_ps = power_spectrum / np.sum(power_spectrum, axis=1, keepdims=True) # Compute entropy return -np.sum(norm_ps * np.log2(norm_ps + 1e-10), axis=1)

Memory Optimization

Memory optimization is crucial for handling large-scale neural data processing.

# neural-engine/src/core/performance/memory_optimizer.py import numpy as np from typing import Iterator, Tuple import gc import psutil class MemoryOptimizer: """Memory-efficient data handling for neural signals""" def __init__(self, memory_limit_gb: float = 8.0): self.memory_limit_bytes = memory_limit_gb * 1024 * 1024 * 1024 def load_data_chunked( self, file_path: str, chunk_size: int = 10000 ) -> Iterator[np.ndarray]: """Load large datasets in memory-efficient chunks""" # Use memory mapping for large files data = np.memmap(file_path, dtype='float32', mode='r') total_samples = len(data) for i in range(0, total_samples, chunk_size): # Check memory usage if self._get_memory_usage() > 0.8 * self.memory_limit_bytes: gc.collect() # Force garbage collection # Yield chunk yield data[i:i + chunk_size].copy() def optimize_array_dtype(self, data: np.ndarray) -> np.ndarray: """Optimize array dtype based on data range""" data_min, data_max = data.min(), data.max() # Choose optimal dtype if data_min >= 0 and data_max <= 255: return data.astype(np.uint8) elif data_min >= -128 and data_max <= 127: return data.astype(np.int8) elif data_min >= -32768 and data_max <= 32767: return data.astype(np.int16) elif data_min >= 0 and data_max <= 65535: return data.astype(np.uint16) else: # Use float32 instead of float64 when possible if np.abs(data).max() < 3.4e38: return data.astype(np.float32) return data def _get_memory_usage(self) -> int: """Get current process memory usage in bytes""" process = psutil.Process() return process.memory_info().rss

Database Performance

Cloud SQL Optimization

# neural-engine/src/infrastructure/database/connection_pool.py from sqlalchemy import create_engine, pool from sqlalchemy.orm import sessionmaker import asyncpg from typing import Optional class DatabaseConnectionPool: """Optimized connection pooling for Cloud SQL""" def __init__( self, connection_string: str, pool_size: int = 20, max_overflow: int = 10, pool_timeout: int = 30 ): # Create engine with optimized pool settings self.engine = create_engine( connection_string, poolclass=pool.QueuePool, pool_size=pool_size, max_overflow=max_overflow, pool_timeout=pool_timeout, pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour echo_pool=False, connect_args={ "server_settings": { "jit": "off", # Disable JIT for consistent performance "application_name": "neurascale_neural_engine" }, "command_timeout": 60, "keepalives": 1, "keepalives_idle": 30, "keepalives_interval": 10, "keepalives_count": 5 } ) self.SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=self.engine ) # Async connection pool self.async_pool: Optional[asyncpg.Pool] = None async def init_async_pool(self): """Initialize async connection pool""" self.async_pool = await asyncpg.create_pool( self.connection_string, min_size=10, max_size=20, max_queries=50000, max_inactive_connection_lifetime=300, command_timeout=60 ) async def execute_batch_async(self, queries: List[Tuple[str, List[Any]]]): """Execute multiple queries efficiently""" async with self.async_pool.acquire() as conn: async with conn.transaction(): for query, params in queries: await conn.execute(query, *params)

BigQuery Optimization

# neural-engine/src/infrastructure/bigquery/optimizer.py from google.cloud import bigquery from typing import List, Dict, Any class BigQueryOptimizer: """BigQuery optimization strategies""" def __init__(self, client: bigquery.Client): self.client = client def create_clustered_table( self, dataset_id: str, table_id: str, schema: List[bigquery.SchemaField], partition_field: str, clustering_fields: List[str] ): """Create optimized clustered and partitioned table""" table_ref = f"{self.client.project}.{dataset_id}.{table_id}" table = bigquery.Table(table_ref, schema=schema) # Configure partitioning table.time_partitioning = bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field=partition_field, expiration_ms=90 * 24 * 60 * 60 * 1000 # 90 days ) # Configure clustering table.clustering_fields = clustering_fields # Create table table = self.client.create_table(table) # Set up scheduled query for incremental updates self._create_scheduled_refresh(dataset_id, table_id) def optimize_query(self, query: str) -> str: """Optimize BigQuery SQL query""" optimizations = [ # Use approximate aggregation functions ("COUNT(DISTINCT", "APPROX_COUNT_DISTINCT("), ("PERCENTILE_CONT(", "APPROX_QUANTILES("), # Limit data scanned ("SELECT *", "SELECT /* specific columns */"), # Use partitioning ("WHERE timestamp", "WHERE DATE(timestamp)") ] optimized = query for old, new in optimizations: if old in optimized: optimized = optimized.replace(old, new) return optimized

API Performance

FastAPI Optimization

# neural-engine/src/api/performance/fastapi_optimizer.py from fastapi import FastAPI, Request from fastapi.responses import ORJSONResponse import orjson from typing import Any, Dict import uvloop import asyncio # Use uvloop for better async performance asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) class OptimizedFastAPI(FastAPI): """FastAPI with performance optimizations""" def __init__(self, **kwargs): # Use ORJSONResponse for faster JSON serialization kwargs['default_response_class'] = ORJSONResponse super().__init__(**kwargs) # Add middleware for caching self.add_middleware(CacheMiddleware) # Add middleware for compression self.add_middleware(CompressionMiddleware) class CacheMiddleware: """Redis-based caching middleware""" def __init__(self, app: FastAPI): self.app = app self.redis_client = None # Initialize Redis connection async def __call__(self, request: Request, call_next): # Generate cache key cache_key = f"{request.method}:{request.url.path}:{request.url.query}" # Try to get from cache cached = await self.get_cached(cache_key) if cached: return ORJSONResponse(content=cached) # Process request response = await call_next(request) # Cache successful responses if response.status_code == 200: await self.set_cache(cache_key, response) return response # Optimized endpoint example @app.get("/api/v1/sessions/{session_id}/metrics") async def get_session_metrics( session_id: str, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, metric_types: Optional[List[str]] = Query(None) ): """Optimized metrics endpoint with caching and pagination""" # Use connection pooling async with db_pool.acquire() as conn: # Build optimized query query = """ SELECT timestamp, metric_type, value FROM metrics WHERE session_id = $1 AND ($2::timestamptz IS NULL OR timestamp >= $2) AND ($3::timestamptz IS NULL OR timestamp <= $3) AND ($4::text[] IS NULL OR metric_type = ANY($4)) ORDER BY timestamp DESC LIMIT 1000 """ # Execute query rows = await conn.fetch( query, session_id, start_time, end_time, metric_types ) # Use orjson for fast serialization return ORJSONResponse( content={"metrics": [dict(row) for row in rows]} )

Frontend Performance

React Optimization

// frontend/src/hooks/useOptimizedData.ts import { useMemo, useCallback, useRef } from 'react'; import { useVirtualizer } from '@tanstack/react-virtual'; export function useOptimizedSignalDisplay(data: number[][]) { // Memoize expensive calculations const processedData = useMemo(() => { return data.map(channel => { // Downsample for display if needed if (channel.length > 10000) { return downsample(channel, 10000); } return channel; }); }, [data]); // Use virtualization for large datasets const parentRef = useRef<HTMLDivElement>(null); const virtualizer = useVirtualizer({ count: processedData.length, getScrollElement: () => parentRef.current, estimateSize: () => 100, overscan: 5 }); // Optimize render with WebGL const renderSignal = useCallback((channelData: number[]) => { // Use WebGL for efficient rendering return <WebGLSignalRenderer data={channelData} />; }, []); return { processedData, virtualizer, parentRef, renderSignal }; } // Efficient downsampling function downsample(data: number[], targetLength: number): number[] { const factor = Math.floor(data.length / targetLength); const result: number[] = []; for (let i = 0; i < data.length; i += factor) { // Use min/max pairs for lossless visualization const chunk = data.slice(i, i + factor); result.push(Math.min(...chunk), Math.max(...chunk)); } return result; }

Infrastructure Optimization

Kubernetes Resource Optimization

# neural-engine/kubernetes/optimized-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: neural-engine-optimized spec: replicas: 3 strategy: type: RollingUpdate rollingUpdate: maxSurge: 1 maxUnavailable: 0 template: spec: containers: - name: neural-engine image: gcr.io/neurascale/neural-engine:latest resources: requests: memory: "2Gi" cpu: "1000m" limits: memory: "4Gi" cpu: "2000m" env: - name: WORKERS value: "4" - name: PYTHONUNBUFFERED value: "1" - name: OMP_NUM_THREADS value: "2" # JIT compilation settings - name: NUMBA_CACHE_DIR value: "/tmp/numba_cache" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5 # Node affinity for GPU nodes affinity: nodeAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 100 preference: matchExpressions: - key: accelerator operator: In values: - nvidia-tesla-t4 # Topology spread for high availability topologySpreadConstraints: - maxSkew: 1 topologyKey: kubernetes.io/hostname whenUnsatisfiable: DoNotSchedule labelSelector: matchLabels: app: neural-engine

Auto-scaling Configuration

# neural-engine/kubernetes/hpa.yaml apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: neural-engine-hpa spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: neural-engine minReplicas: 3 maxReplicas: 20 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80 - type: Pods pods: metric: name: neural_processing_queue_depth target: type: AverageValue averageValue: "100" behavior: scaleDown: stabilizationWindowSeconds: 300 policies: - type: Percent value: 10 periodSeconds: 60 scaleUp: stabilizationWindowSeconds: 60 policies: - type: Percent value: 100 periodSeconds: 30 - type: Pods value: 4 periodSeconds: 60

Monitoring Performance

Performance Metrics

# neural-engine/src/monitoring/performance_metrics.py from prometheus_client import Counter, Histogram, Gauge import time from functools import wraps # Define metrics request_latency = Histogram( 'neural_request_latency_seconds', 'Request latency in seconds', ['method', 'endpoint'] ) processing_time = Histogram( 'neural_processing_time_seconds', 'Signal processing time in seconds', ['operation', 'signal_type'] ) memory_usage = Gauge( 'neural_memory_usage_bytes', 'Memory usage in bytes', ['component'] ) gpu_utilization = Gauge( 'neural_gpu_utilization_percent', 'GPU utilization percentage', ['device'] ) def track_performance(operation: str): """Decorator to track performance metrics""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) return result finally: duration = time.time() - start_time processing_time.labels( operation=operation, signal_type=kwargs.get('signal_type', 'unknown') ).observe(duration) @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) return result finally: duration = time.time() - start_time processing_time.labels( operation=operation, signal_type=kwargs.get('signal_type', 'unknown') ).observe(duration) return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper return decorator

Best Practices

Key Optimization Strategies

  • Profile First: Always profile before optimizing. Use tools like cProfile, py-spy, and Chrome DevTools.
  • Cache Aggressively: Implement caching at multiple levels: CDN, API, database, and application.
  • Batch Operations: Process data in batches to reduce overhead and improve throughput.
  • Async Everything: Use async/await for I/O operations to maximize concurrency.

Performance Checklist

  • Enable connection pooling for all database connections
  • Implement proper indexing strategy
  • Use CDN for static assets
  • Enable gzip compression
  • Implement API response caching
  • Use WebSocket for real-time data
  • Optimize Docker images (multi-stage builds)
  • Configure auto-scaling policies
  • Monitor and alert on performance metrics
  • Regular performance testing in CI/CD

Following these optimization strategies can improve NeuraScale’s performance by 10x or more, enabling real-time processing of thousands of concurrent neural sessions.

Last updated on