Performance Tuning
This guide covers performance optimization techniques for NeuraScale across all components.
Overview
Performance optimization in NeuraScale focuses on:
- Real-time processing: Sub-10ms latency for neural signal analysis
- Scalability: Handle thousands of concurrent sessions
- Resource efficiency: Optimize compute and memory usage
- Cost optimization: Balance performance with cloud costs
Neural Engine Optimization
Signal Processing Pipeline
Batch Processing
# neural-engine/src/core/performance/batch_processor.py
from typing import List, Dict, Any
import numpy as np
from concurrent.futures import ThreadPoolExecutor
import asyncio
class BatchSignalProcessor:
"""Optimized batch processing for neural signals"""
def __init__(self, batch_size: int = 1000, num_workers: int = 4):
self.batch_size = batch_size
self.num_workers = num_workers
self.executor = ThreadPoolExecutor(max_workers=num_workers)
async def process_batch(self, signals: List[np.ndarray]) -> List[Dict[str, Any]]:
"""Process signals in optimized batches"""
# Batch signals for efficient processing
batches = [
signals[i:i + self.batch_size]
for i in range(0, len(signals), self.batch_size)
]
# Process batches in parallel
loop = asyncio.get_event_loop()
futures = [
loop.run_in_executor(self.executor, self._process_batch_sync, batch)
for batch in batches
]
results = await asyncio.gather(*futures)
return [item for batch in results for item in batch]
def _process_batch_sync(self, batch: List[np.ndarray]) -> List[Dict[str, Any]]:
"""Synchronous batch processing with vectorization"""
# Stack signals for vectorized operations
stacked = np.stack(batch)
# Vectorized FFT computation
fft_results = np.fft.rfft(stacked, axis=1)
power_spectrum = np.abs(fft_results) ** 2
# Vectorized feature extraction
features = {
'mean_power': np.mean(power_spectrum, axis=1),
'peak_frequency': np.argmax(power_spectrum, axis=1),
'spectral_entropy': self._compute_spectral_entropy_vectorized(power_spectrum)
}
return [
{
'features': {k: v[i] for k, v in features.items()},
'timestamp': np.datetime64('now')
}
for i in range(len(batch))
]
def _compute_spectral_entropy_vectorized(self, power_spectrum: np.ndarray) -> np.ndarray:
"""Vectorized spectral entropy computation"""
# Normalize power spectrum
norm_ps = power_spectrum / np.sum(power_spectrum, axis=1, keepdims=True)
# Compute entropy
return -np.sum(norm_ps * np.log2(norm_ps + 1e-10), axis=1)
Memory Optimization
Memory optimization is crucial for handling large-scale neural data processing.
# neural-engine/src/core/performance/memory_optimizer.py
import numpy as np
from typing import Iterator, Tuple
import gc
import psutil
class MemoryOptimizer:
"""Memory-efficient data handling for neural signals"""
def __init__(self, memory_limit_gb: float = 8.0):
self.memory_limit_bytes = memory_limit_gb * 1024 * 1024 * 1024
def load_data_chunked(
self,
file_path: str,
chunk_size: int = 10000
) -> Iterator[np.ndarray]:
"""Load large datasets in memory-efficient chunks"""
# Use memory mapping for large files
data = np.memmap(file_path, dtype='float32', mode='r')
total_samples = len(data)
for i in range(0, total_samples, chunk_size):
# Check memory usage
if self._get_memory_usage() > 0.8 * self.memory_limit_bytes:
gc.collect() # Force garbage collection
# Yield chunk
yield data[i:i + chunk_size].copy()
def optimize_array_dtype(self, data: np.ndarray) -> np.ndarray:
"""Optimize array dtype based on data range"""
data_min, data_max = data.min(), data.max()
# Choose optimal dtype
if data_min >= 0 and data_max <= 255:
return data.astype(np.uint8)
elif data_min >= -128 and data_max <= 127:
return data.astype(np.int8)
elif data_min >= -32768 and data_max <= 32767:
return data.astype(np.int16)
elif data_min >= 0 and data_max <= 65535:
return data.astype(np.uint16)
else:
# Use float32 instead of float64 when possible
if np.abs(data).max() < 3.4e38:
return data.astype(np.float32)
return data
def _get_memory_usage(self) -> int:
"""Get current process memory usage in bytes"""
process = psutil.Process()
return process.memory_info().rss
Database Performance
Cloud SQL Optimization
Connection Pooling
# neural-engine/src/infrastructure/database/connection_pool.py
from sqlalchemy import create_engine, pool
from sqlalchemy.orm import sessionmaker
import asyncpg
from typing import Optional
class DatabaseConnectionPool:
"""Optimized connection pooling for Cloud SQL"""
def __init__(
self,
connection_string: str,
pool_size: int = 20,
max_overflow: int = 10,
pool_timeout: int = 30
):
# Create engine with optimized pool settings
self.engine = create_engine(
connection_string,
poolclass=pool.QueuePool,
pool_size=pool_size,
max_overflow=max_overflow,
pool_timeout=pool_timeout,
pool_pre_ping=True, # Verify connections before use
pool_recycle=3600, # Recycle connections after 1 hour
echo_pool=False,
connect_args={
"server_settings": {
"jit": "off", # Disable JIT for consistent performance
"application_name": "neurascale_neural_engine"
},
"command_timeout": 60,
"keepalives": 1,
"keepalives_idle": 30,
"keepalives_interval": 10,
"keepalives_count": 5
}
)
self.SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=self.engine
)
# Async connection pool
self.async_pool: Optional[asyncpg.Pool] = None
async def init_async_pool(self):
"""Initialize async connection pool"""
self.async_pool = await asyncpg.create_pool(
self.connection_string,
min_size=10,
max_size=20,
max_queries=50000,
max_inactive_connection_lifetime=300,
command_timeout=60
)
async def execute_batch_async(self, queries: List[Tuple[str, List[Any]]]):
"""Execute multiple queries efficiently"""
async with self.async_pool.acquire() as conn:
async with conn.transaction():
for query, params in queries:
await conn.execute(query, *params)
BigQuery Optimization
# neural-engine/src/infrastructure/bigquery/optimizer.py
from google.cloud import bigquery
from typing import List, Dict, Any
class BigQueryOptimizer:
"""BigQuery optimization strategies"""
def __init__(self, client: bigquery.Client):
self.client = client
def create_clustered_table(
self,
dataset_id: str,
table_id: str,
schema: List[bigquery.SchemaField],
partition_field: str,
clustering_fields: List[str]
):
"""Create optimized clustered and partitioned table"""
table_ref = f"{self.client.project}.{dataset_id}.{table_id}"
table = bigquery.Table(table_ref, schema=schema)
# Configure partitioning
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field=partition_field,
expiration_ms=90 * 24 * 60 * 60 * 1000 # 90 days
)
# Configure clustering
table.clustering_fields = clustering_fields
# Create table
table = self.client.create_table(table)
# Set up scheduled query for incremental updates
self._create_scheduled_refresh(dataset_id, table_id)
def optimize_query(self, query: str) -> str:
"""Optimize BigQuery SQL query"""
optimizations = [
# Use approximate aggregation functions
("COUNT(DISTINCT", "APPROX_COUNT_DISTINCT("),
("PERCENTILE_CONT(", "APPROX_QUANTILES("),
# Limit data scanned
("SELECT *", "SELECT /* specific columns */"),
# Use partitioning
("WHERE timestamp", "WHERE DATE(timestamp)")
]
optimized = query
for old, new in optimizations:
if old in optimized:
optimized = optimized.replace(old, new)
return optimized
API Performance
FastAPI Optimization
# neural-engine/src/api/performance/fastapi_optimizer.py
from fastapi import FastAPI, Request
from fastapi.responses import ORJSONResponse
import orjson
from typing import Any, Dict
import uvloop
import asyncio
# Use uvloop for better async performance
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
class OptimizedFastAPI(FastAPI):
"""FastAPI with performance optimizations"""
def __init__(self, **kwargs):
# Use ORJSONResponse for faster JSON serialization
kwargs['default_response_class'] = ORJSONResponse
super().__init__(**kwargs)
# Add middleware for caching
self.add_middleware(CacheMiddleware)
# Add middleware for compression
self.add_middleware(CompressionMiddleware)
class CacheMiddleware:
"""Redis-based caching middleware"""
def __init__(self, app: FastAPI):
self.app = app
self.redis_client = None # Initialize Redis connection
async def __call__(self, request: Request, call_next):
# Generate cache key
cache_key = f"{request.method}:{request.url.path}:{request.url.query}"
# Try to get from cache
cached = await self.get_cached(cache_key)
if cached:
return ORJSONResponse(content=cached)
# Process request
response = await call_next(request)
# Cache successful responses
if response.status_code == 200:
await self.set_cache(cache_key, response)
return response
# Optimized endpoint example
@app.get("/api/v1/sessions/{session_id}/metrics")
async def get_session_metrics(
session_id: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
metric_types: Optional[List[str]] = Query(None)
):
"""Optimized metrics endpoint with caching and pagination"""
# Use connection pooling
async with db_pool.acquire() as conn:
# Build optimized query
query = """
SELECT timestamp, metric_type, value
FROM metrics
WHERE session_id = $1
AND ($2::timestamptz IS NULL OR timestamp >= $2)
AND ($3::timestamptz IS NULL OR timestamp <= $3)
AND ($4::text[] IS NULL OR metric_type = ANY($4))
ORDER BY timestamp DESC
LIMIT 1000
"""
# Execute query
rows = await conn.fetch(
query,
session_id,
start_time,
end_time,
metric_types
)
# Use orjson for fast serialization
return ORJSONResponse(
content={"metrics": [dict(row) for row in rows]}
)
Frontend Performance
React Optimization
// frontend/src/hooks/useOptimizedData.ts
import { useMemo, useCallback, useRef } from 'react';
import { useVirtualizer } from '@tanstack/react-virtual';
export function useOptimizedSignalDisplay(data: number[][]) {
// Memoize expensive calculations
const processedData = useMemo(() => {
return data.map(channel => {
// Downsample for display if needed
if (channel.length > 10000) {
return downsample(channel, 10000);
}
return channel;
});
}, [data]);
// Use virtualization for large datasets
const parentRef = useRef<HTMLDivElement>(null);
const virtualizer = useVirtualizer({
count: processedData.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 100,
overscan: 5
});
// Optimize render with WebGL
const renderSignal = useCallback((channelData: number[]) => {
// Use WebGL for efficient rendering
return <WebGLSignalRenderer data={channelData} />;
}, []);
return {
processedData,
virtualizer,
parentRef,
renderSignal
};
}
// Efficient downsampling
function downsample(data: number[], targetLength: number): number[] {
const factor = Math.floor(data.length / targetLength);
const result: number[] = [];
for (let i = 0; i < data.length; i += factor) {
// Use min/max pairs for lossless visualization
const chunk = data.slice(i, i + factor);
result.push(Math.min(...chunk), Math.max(...chunk));
}
return result;
}
Infrastructure Optimization
Kubernetes Resource Optimization
# neural-engine/kubernetes/optimized-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: neural-engine-optimized
spec:
replicas: 3
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
spec:
containers:
- name: neural-engine
image: gcr.io/neurascale/neural-engine:latest
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
env:
- name: WORKERS
value: "4"
- name: PYTHONUNBUFFERED
value: "1"
- name: OMP_NUM_THREADS
value: "2"
# JIT compilation settings
- name: NUMBA_CACHE_DIR
value: "/tmp/numba_cache"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
# Node affinity for GPU nodes
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
preference:
matchExpressions:
- key: accelerator
operator: In
values:
- nvidia-tesla-t4
# Topology spread for high availability
topologySpreadConstraints:
- maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: neural-engine
Auto-scaling Configuration
# neural-engine/kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: neural-engine-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: neural-engine
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: neural_processing_queue_depth
target:
type: AverageValue
averageValue: "100"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 30
- type: Pods
value: 4
periodSeconds: 60
Monitoring Performance
Performance Metrics
# neural-engine/src/monitoring/performance_metrics.py
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps
# Define metrics
request_latency = Histogram(
'neural_request_latency_seconds',
'Request latency in seconds',
['method', 'endpoint']
)
processing_time = Histogram(
'neural_processing_time_seconds',
'Signal processing time in seconds',
['operation', 'signal_type']
)
memory_usage = Gauge(
'neural_memory_usage_bytes',
'Memory usage in bytes',
['component']
)
gpu_utilization = Gauge(
'neural_gpu_utilization_percent',
'GPU utilization percentage',
['device']
)
def track_performance(operation: str):
"""Decorator to track performance metrics"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
processing_time.labels(
operation=operation,
signal_type=kwargs.get('signal_type', 'unknown')
).observe(duration)
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.time() - start_time
processing_time.labels(
operation=operation,
signal_type=kwargs.get('signal_type', 'unknown')
).observe(duration)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
Best Practices
Key Optimization Strategies
- Profile First: Always profile before optimizing. Use tools like cProfile, py-spy, and Chrome DevTools.
- Cache Aggressively: Implement caching at multiple levels: CDN, API, database, and application.
- Batch Operations: Process data in batches to reduce overhead and improve throughput.
- Async Everything: Use async/await for I/O operations to maximize concurrency.
Performance Checklist
- Enable connection pooling for all database connections
- Implement proper indexing strategy
- Use CDN for static assets
- Enable gzip compression
- Implement API response caching
- Use WebSocket for real-time data
- Optimize Docker images (multi-stage builds)
- Configure auto-scaling policies
- Monitor and alert on performance metrics
- Regular performance testing in CI/CD
Following these optimization strategies can improve NeuraScale’s performance by 10x or more, enabling real-time processing of thousands of concurrent neural sessions.
Last updated on