Security Best Practices
NeuraScale handles sensitive neural data and medical information, requiring robust security measures. This guide covers security best practices for deployment, development, and operations.
Security Framework Overview
NeuraScale implements defense-in-depth security with multiple layers of protection:
Security Architecture
Data Protection
Data Classification
NeuraScale implements a comprehensive data classification system:
Classification | Description | Examples | Security Requirements |
---|---|---|---|
Public | Non-sensitive information | Documentation, marketing | Standard protection |
Internal | Business information | System logs, metrics | Access control required |
Confidential | Sensitive business data | User profiles, API keys | Encryption + access control |
Restricted | Highly sensitive data | Neural data, medical records | Full encryption + audit + compliance |
Encryption at Rest
Database Encryption
Database Encryption Implementation
# Field-level encryption for sensitive data
from cryptography.fernet import Fernet
from sqlalchemy_utils import EncryptedType
from sqlalchemy_utils.types.encrypted.encrypted_type import AesEngine
class Patient(db.Model):
id = db.Column(db.String(36), primary_key=True)
# Encrypted PII fields
name = db.Column(EncryptedType(db.String(255), secret_key, AesEngine, 'pkcs5'))
email = db.Column(EncryptedType(db.String(255), secret_key, AesEngine, 'pkcs5'))
medical_record_number = db.Column(EncryptedType(db.String(100), secret_key, AesEngine, 'pkcs5'))
# Non-sensitive fields (not encrypted)
created_at = db.Column(db.DateTime)
study_id = db.Column(db.String(50)) # De-identified
# Cloud SQL encryption configuration
resource "google_sql_database_instance" "neurascale" {
name = "neurascale-prod"
database_version = "POSTGRES_15"
region = "northamerica-northeast1"
settings {
tier = "db-n1-standard-4"
# Encryption with customer-managed keys
disk_encryption_configuration {
kms_key_name = google_kms_crypto_key.sql_key.id
}
backup_configuration {
enabled = true
point_in_time_recovery_enabled = true
# Backups encrypted with same key
}
}
}
Best Practices:
- Use separate encryption keys per environment
- Rotate encryption keys quarterly
- Implement key escrow for data recovery
- Monitor key usage and access patterns
Encryption in Transit
All communication uses TLS 1.3 with perfect forward secrecy:
# TLS configuration for Cloud Load Balancer
resource "google_compute_ssl_certificate" "neurascale" {
name_prefix = "neurascale-ssl"
private_key = file("private-key.pem")
certificate = file("certificate.pem")
lifecycle {
create_before_destroy = true
}
}
resource "google_compute_target_https_proxy" "neurascale" {
name = "neurascale-https-proxy"
url_map = google_compute_url_map.neurascale.id
ssl_certificates = [google_compute_ssl_certificate.neurascale.id]
# TLS 1.3 only
ssl_policy = google_compute_ssl_policy.modern.id
}
resource "google_compute_ssl_policy" "modern" {
name = "neurascale-modern-tls"
profile = "MODERN"
min_tls_version = "TLS_1_2" # Minimum TLS 1.2
}
Access Control
Firebase Authentication
NeuraScale uses Firebase Authentication for the console, providing secure, scalable user management with support for multiple authentication providers.
Authentication Architecture
Firebase Configuration
Client SDK
Client-Side Configuration
// lib/firebase/config.ts
import { initializeApp, getApps } from 'firebase/app'
import { getAuth, connectAuthEmulator } from 'firebase/auth'
import { getAnalytics } from 'firebase/analytics'
const firebaseConfig = {
apiKey: process.env.NEXT_PUBLIC_FIREBASE_API_KEY,
authDomain: process.env.NEXT_PUBLIC_FIREBASE_AUTH_DOMAIN,
projectId: process.env.NEXT_PUBLIC_FIREBASE_PROJECT_ID,
storageBucket: process.env.NEXT_PUBLIC_FIREBASE_STORAGE_BUCKET,
messagingSenderId: process.env.NEXT_PUBLIC_FIREBASE_MESSAGING_SENDER_ID,
appId: process.env.NEXT_PUBLIC_FIREBASE_APP_ID,
measurementId: process.env.NEXT_PUBLIC_FIREBASE_MEASUREMENT_ID
}
// Initialize Firebase
const app = getApps().length ? getApps()[0] : initializeApp(firebaseConfig)
const auth = getAuth(app)
// Use emulator in development
if (process.env.NODE_ENV === 'development') {
connectAuthEmulator(auth, 'http://localhost:9099')
}
// Enable analytics in production
const analytics = typeof window !== 'undefined' &&
process.env.NODE_ENV === 'production' ?
getAnalytics(app) : null
export { app, auth, analytics }
Authentication Providers
Configure OAuth Providers
Google Authentication
import { GoogleAuthProvider, signInWithPopup } from 'firebase/auth'
const googleProvider = new GoogleAuthProvider()
googleProvider.addScope('profile')
googleProvider.addScope('email')
export async function signInWithGoogle() {
try {
const result = await signInWithPopup(auth, googleProvider)
const user = result.user
// Get ID token for API calls
const idToken = await user.getIdToken()
// Store in secure HTTP-only cookie
await fetch('/api/auth/session', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ idToken })
})
return user
} catch (error) {
console.error('Google sign-in failed:', error)
throw error
}
}
Implement Email/Password Authentication
import {
createUserWithEmailAndPassword,
signInWithEmailAndPassword,
sendEmailVerification
} from 'firebase/auth'
export async function signUpWithEmail(email: string, password: string) {
try {
// Create account
const { user } = await createUserWithEmailAndPassword(auth, email, password)
// Send verification email
await sendEmailVerification(user)
// Create user profile in database
await fetch('/api/users', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${await user.getIdToken()}`
},
body: JSON.stringify({
uid: user.uid,
email: user.email,
createdAt: new Date().toISOString()
})
})
return user
} catch (error) {
if (error.code === 'auth/email-already-in-use') {
throw new Error('Email already registered')
}
throw error
}
}
Add Multi-Factor Authentication
import {
multiFactor,
PhoneAuthProvider,
PhoneMultiFactorGenerator
} from 'firebase/auth'
export async function enrollMFA(phoneNumber: string) {
const user = auth.currentUser
if (!user) throw new Error('No authenticated user')
const multiFactorSession = await multiFactor(user).getSession()
// Send verification code
const phoneAuthCredential = PhoneAuthProvider.credential(
verificationId,
verificationCode
)
const multiFactorAssertion = PhoneMultiFactorGenerator.assertion(
phoneAuthCredential
)
// Enroll MFA
await multiFactor(user).enroll(multiFactorAssertion, 'Phone Number')
}
Traditional Authentication (Neural Engine)
For the Neural Engine API, we use JWT tokens with the following implementation:
Multi-Factor Authentication
# MFA implementation with TOTP
from pyotp import TOTP
import qrcode
class MFAService:
def setup_mfa(self, user_id: str) -> dict:
# Generate secret key
secret = pyotp.random_base32()
# Create TOTP object
totp = TOTP(secret)
# Generate QR code for authenticator app
provisioning_uri = totp.provisioning_uri(
name=user.email,
issuer_name="NeuraScale"
)
qr = qrcode.QRCode()
qr.add_data(provisioning_uri)
qr.make(fit=True)
# Store secret (encrypted)
user.mfa_secret = self.encrypt_secret(secret)
user.mfa_enabled = True
db.session.commit()
return {
"secret": secret,
"qr_code": qr.make_image(),
"backup_codes": self.generate_backup_codes(user_id)
}
def verify_mfa(self, user_id: str, token: str) -> bool:
user = User.query.get(user_id)
if not user.mfa_enabled:
return True
secret = self.decrypt_secret(user.mfa_secret)
totp = TOTP(secret)
# Allow 30-second window
return totp.verify(token, valid_window=1)
def generate_backup_codes(self, user_id: str) -> list:
"""Generate single-use backup codes"""
codes = [secrets.token_hex(4) for _ in range(10)]
# Store hashed backup codes
for code in codes:
backup_code = BackupCode(
user_id=user_id,
code_hash=bcrypt.hashpw(code.encode(), bcrypt.gensalt()),
used=False
)
db.session.add(backup_code)
db.session.commit()
return codes
JWT Token Management
# Secure JWT implementation
import jwt
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization
class JWTService:
def __init__(self, private_key_path: str, public_key_path: str):
# Load RSA keys
with open(private_key_path, 'rb') as f:
self.private_key = serialization.load_pem_private_key(
f.read(), password=None
)
with open(public_key_path, 'rb') as f:
self.public_key = serialization.load_pem_public_key(f.read())
def generate_tokens(self, user: User) -> dict:
now = datetime.utcnow()
# Access token (short-lived)
access_payload = {
'sub': str(user.id),
'email': user.email,
'role': user.role,
'permissions': user.get_permissions(),
'iat': now,
'exp': now + timedelta(hours=1),
'iss': 'neurascale.io',
'aud': 'neurascale-api',
'jti': str(uuid.uuid4()), # Unique token ID
'type': 'access'
}
# Refresh token (long-lived)
refresh_payload = {
'sub': str(user.id),
'iat': now,
'exp': now + timedelta(days=30),
'iss': 'neurascale.io',
'aud': 'neurascale-api',
'jti': str(uuid.uuid4()),
'type': 'refresh'
}
access_token = jwt.encode(
access_payload,
self.private_key,
algorithm='RS256'
)
refresh_token = jwt.encode(
refresh_payload,
self.private_key,
algorithm='RS256'
)
# Store refresh token in database
self.store_refresh_token(user.id, refresh_payload['jti'])
return {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_in': 3600,
'token_type': 'Bearer'
}
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(
token,
self.public_key,
algorithms=['RS256'],
audience='neurascale-api',
issuer='neurascale.io'
)
# Check if token is blacklisted
if self.is_token_blacklisted(payload['jti']):
raise jwt.InvalidTokenError("Token has been revoked")
return payload
except jwt.ExpiredSignatureError:
raise jwt.InvalidTokenError("Token has expired")
except jwt.InvalidTokenError as e:
raise jwt.InvalidTokenError(f"Invalid token: {str(e)}")
Role-Based Access Control
# RBAC implementation
from enum import Enum
from functools import wraps
class Role(Enum):
ADMIN = "admin"
CLINICIAN = "clinician"
RESEARCHER = "researcher"
TECHNICIAN = "technician"
PATIENT = "patient"
class Permission(Enum):
# Device permissions
DEVICE_READ = "device:read"
DEVICE_WRITE = "device:write"
DEVICE_STREAM = "device:stream"
# Session permissions
SESSION_CREATE = "session:create"
SESSION_READ = "session:read"
SESSION_UPDATE = "session:update"
SESSION_DELETE = "session:delete"
# Data permissions
DATA_READ = "data:read"
DATA_EXPORT = "data:export"
DATA_ANALYZE = "data:analyze"
# Admin permissions
USER_MANAGE = "user:manage"
SYSTEM_CONFIG = "system:config"
AUDIT_VIEW = "audit:view"
# Role-permission mapping
ROLE_PERMISSIONS = {
Role.ADMIN: [p for p in Permission], # All permissions
Role.CLINICIAN: [
Permission.DEVICE_READ,
Permission.DEVICE_STREAM,
Permission.SESSION_CREATE,
Permission.SESSION_READ,
Permission.SESSION_UPDATE,
Permission.DATA_READ,
Permission.DATA_ANALYZE,
],
Role.RESEARCHER: [
Permission.SESSION_READ,
Permission.DATA_READ,
Permission.DATA_EXPORT,
Permission.DATA_ANALYZE,
],
Role.TECHNICIAN: [
Permission.DEVICE_READ,
Permission.DEVICE_WRITE,
Permission.SESSION_READ,
],
Role.PATIENT: [
Permission.SESSION_READ, # Own sessions only
Permission.DATA_READ, # Own data only
],
}
def require_permission(permission: Permission):
"""Decorator to enforce permission-based access control"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Get current user from JWT token
current_user = get_current_user()
if not current_user:
abort(401, "Authentication required")
# Check if user has required permission
user_permissions = ROLE_PERMISSIONS.get(current_user.role, [])
if permission not in user_permissions:
audit_log.log_access_denied(
user_id=current_user.id,
resource=f.__name__,
permission=permission.value,
ip_address=request.remote_addr
)
abort(403, f"Permission denied: {permission.value}")
# Log successful access
audit_log.log_access_granted(
user_id=current_user.id,
resource=f.__name__,
permission=permission.value,
ip_address=request.remote_addr
)
return f(*args, **kwargs)
return decorated_function
return decorator
# Usage example
@app.route('/api/v1/devices/<device_id>/stream/start', methods=['POST'])
@require_permission(Permission.DEVICE_STREAM)
def start_device_stream(device_id):
# Implementation here
pass
Authorization Policies
# Resource-based authorization
class ResourcePolicy:
def __init__(self, user: User):
self.user = user
def can_access_session(self, session: Session) -> bool:
"""Check if user can access specific session"""
# Admin can access all sessions
if self.user.role == Role.ADMIN:
return True
# Clinician can access sessions they created or are assigned to
if self.user.role == Role.CLINICIAN:
return (session.created_by == self.user.id or
self.user.id in session.assigned_clinicians)
# Researcher can access sessions in their studies
if self.user.role == Role.RESEARCHER:
return session.study_id in self.user.authorized_studies
# Patient can only access their own sessions
if self.user.role == Role.PATIENT:
return session.patient_id == self.user.patient_profile.id
return False
def can_export_data(self, session: Session) -> bool:
"""Check if user can export session data"""
# Must have basic session access first
if not self.can_access_session(session):
return False
# Additional restrictions for data export
if self.user.role == Role.PATIENT:
return False # Patients cannot export raw data
# Check data use agreement
if not self.user.has_signed_data_use_agreement():
return False
return True
def get_data_access_level(self, session: Session) -> str:
"""Determine level of data access for session"""
if not self.can_access_session(session):
return "none"
if self.user.role in [Role.ADMIN, Role.CLINICIAN]:
return "full" # Access to raw data and PII
if self.user.role == Role.RESEARCHER:
return "de_identified" # De-identified data only
if self.user.role == Role.PATIENT:
return "summary" # High-level summaries only
return "none"
Network Security
VPC Configuration
# Secure VPC setup
resource "google_compute_network" "neurascale_vpc" {
name = "neurascale-vpc"
auto_create_subnetworks = false
}
# Private subnet for application servers
resource "google_compute_subnetwork" "private_subnet" {
name = "neurascale-private"
ip_cidr_range = "10.0.1.0/24"
region = "northamerica-northeast1"
network = google_compute_network.neurascale_vpc.id
# Enable private Google access
private_ip_google_access = true
# Secondary range for pods and services
secondary_ip_range {
range_name = "pods"
ip_cidr_range = "10.1.0.0/16"
}
secondary_ip_range {
range_name = "services"
ip_cidr_range = "10.2.0.0/16"
}
}
# Firewall rules
resource "google_compute_firewall" "allow_internal" {
name = "neurascale-allow-internal"
network = google_compute_network.neurascale_vpc.name
allow {
protocol = "tcp"
ports = ["0-65535"]
}
source_ranges = ["10.0.0.0/8"]
}
resource "google_compute_firewall" "allow_https" {
name = "neurascale-allow-https"
network = google_compute_network.neurascale_vpc.name
allow {
protocol = "tcp"
ports = ["443"]
}
source_ranges = ["0.0.0.0/0"]
target_tags = ["https-server"]
}
# Deny all other inbound traffic
resource "google_compute_firewall" "deny_all" {
name = "neurascale-deny-all"
network = google_compute_network.neurascale_vpc.name
deny {
protocol = "all"
}
source_ranges = ["0.0.0.0/0"]
priority = 1000
}
Cloud Armor Security
# DDoS protection and Web Application Firewall
resource "google_compute_security_policy" "neurascale_policy" {
name = "neurascale-security-policy"
# Rate limiting rule
rule {
action = "throttle"
priority = "1000"
match {
versioned_expr = "SRC_IPS_V1"
config {
src_ip_ranges = ["*"]
}
}
rate_limit_options {
conform_action = "allow"
exceed_action = "deny(429)"
rate_limit_threshold {
count = 100
interval_sec = 60
}
}
}
# Block known bad IPs
rule {
action = "deny(403)"
priority = "2000"
match {
versioned_expr = "SRC_IPS_V1"
config {
src_ip_ranges = [
"192.0.2.0/24", # Example bad IP range
]
}
}
}
# OWASP ModSecurity Core Rule Set
rule {
action = "deny(403)"
priority = "3000"
match {
expr {
expression = "evaluatePreconfiguredExpr('xss-stable')"
}
}
}
rule {
action = "deny(403)"
priority = "3001"
match {
expr {
expression = "evaluatePreconfiguredExpr('sqli-stable')"
}
}
}
# Default allow rule
rule {
action = "allow"
priority = "2147483647"
match {
versioned_expr = "SRC_IPS_V1"
config {
src_ip_ranges = ["*"]
}
}
}
}
Compliance
HIPAA and SOC 2 Type II Requirements
Why Both HIPAA and SOC 2? While HIPAA compliance is mandatory for handling Protected Health Information (PHI), SOC 2 Type II certification provides additional assurance about our security controls, operational processes, and data handling practices. Most healthcare organizations require both certifications as they complement each other:
- HIPAA focuses specifically on protecting patient health information
- SOC 2 Type II validates our overall security posture, availability, processing integrity, confidentiality, and privacy controls
Together, they demonstrate a comprehensive approach to security and compliance that meets healthcare industry standards.
SOC 2 Type II Certification
NeuraScale maintains SOC 2 Type II certification covering all five trust service criteria:
Security
Security Controls
- Firewall and intrusion detection systems
- Multi-factor authentication enforcement
- Regular vulnerability assessments
- Security awareness training
- Incident response procedures
- Change management controls
HIPAA Compliance
NeuraScale implements comprehensive HIPAA safeguards:
Administrative
Administrative Safeguards
Requirement | Implementation | Evidence |
---|---|---|
Security Officer | Dedicated CISO and security team | Org chart, responsibilities |
Workforce Training | Annual security training program | Training records, certificates |
Access Management | Role-based access control | User access logs, reviews |
Incident Response | 24/7 security operations center | Incident response plan, logs |
Business Associate | HIPAA agreements with vendors | Signed BAAs on file |
Risk Assessment | Annual security risk assessments | Risk assessment reports |
# HIPAA audit logging
class HIPAAAuditLogger:
def __init__(self):
self.logger = logging.getLogger('hipaa_audit')
self.logger.setLevel(logging.INFO)
# Send to Cloud Logging with long retention
handler = google.cloud.logging.handlers.CloudLoggingHandler(
client=google.cloud.logging.Client(),
name='hipaa-audit-log'
)
self.logger.addHandler(handler)
def log_phi_access(self, user_id: str, patient_id: str,
action: str, resource: str):
"""Log access to Protected Health Information"""
audit_event = {
'event_type': 'PHI_ACCESS',
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'patient_id': patient_id,
'action': action,
'resource': resource,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'session_id': session.get('session_id'),
'compliance_framework': 'HIPAA'
}
self.logger.info('PHI Access Event', extra=audit_event)
def log_data_export(self, user_id: str, export_details: dict):
"""Log data export events"""
audit_event = {
'event_type': 'DATA_EXPORT',
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'export_format': export_details.get('format'),
'record_count': export_details.get('record_count'),
'date_range': export_details.get('date_range'),
'purpose': export_details.get('purpose'),
'retention_schedule': export_details.get('retention')
}
self.logger.info('Data Export Event', extra=audit_event)
Integrated Compliance Approach
Important: Healthcare organizations typically require both HIPAA compliance and SOC 2 Type II certification. Here’s how NeuraScale integrates both frameworks:
Compliance Synergies
Requirement | HIPAA Implementation | SOC 2 Control | Benefit |
---|---|---|---|
Access Control | Minimum necessary standard | CC6.1 - Logical access controls | Unified IAM system |
Encryption | §164.312(a)(2)(iv) - PHI encryption | CC6.7 - Data transmission controls | Single encryption framework |
Audit Logging | §164.312(b) - Activity logs | CC7.2 - System monitoring | Consolidated audit trail |
Risk Assessment | §164.308(a)(1) - Risk analysis | CC3.2 - Risk assessment process | Integrated risk management |
Incident Response | §164.308(a)(6) - Response procedures | CC7.3 - Incident management | Unified incident handling |
Business Continuity | §164.308(a)(7) - Contingency plan | A1.2 - Availability commitments | Single DR/BC strategy |
Vendor Management | Business Associate Agreements | CC9.2 - Vendor risk management | Consolidated vendor controls |
Training | §164.308(a)(5) - Security training | CC1.4 - Security awareness | Combined training program |
Audit and Reporting
class ComplianceReporting:
"""Unified compliance reporting for HIPAA and SOC 2"""
def generate_compliance_report(self, report_type: str, period: str):
report = {
'generated_at': datetime.utcnow(),
'period': period,
'type': report_type
}
if report_type == 'HIPAA_AUDIT':
report['sections'] = [
self.generate_phi_access_report(),
self.generate_breach_assessment(),
self.generate_baa_status(),
self.generate_risk_analysis()
]
elif report_type == 'SOC2_ATTESTATION':
report['sections'] = [
self.generate_control_effectiveness(),
self.generate_exception_report(),
self.generate_change_management(),
self.generate_availability_metrics()
]
elif report_type == 'INTEGRATED_COMPLIANCE':
# Combined report for healthcare clients
report['sections'] = [
self.generate_unified_controls(),
self.generate_combined_risks(),
self.generate_compliance_metrics(),
self.generate_attestation_summary()
]
return report
def generate_unified_controls(self):
"""Show how controls satisfy both frameworks"""
controls = []
# Example: Access control satisfies both
controls.append({
'control_id': 'AC-001',
'description': 'Role-based access control with MFA',
'hipaa_requirements': ['§164.308(a)(4)', '§164.312(a)(1)'],
'soc2_criteria': ['CC6.1', 'CC6.2', 'CC6.3'],
'effectiveness': 'Operating effectively',
'last_tested': '2024-01-15',
'evidence': ['access_logs', 'user_reviews', 'mfa_reports']
})
return controls
Key Benefits of Dual Compliance
- Enhanced Trust: Healthcare organizations see both certifications as table stakes
- Reduced Audit Burden: Many controls overlap, reducing duplicate work
- Comprehensive Coverage: HIPAA focuses on PHI, SOC 2 covers broader security
- Continuous Monitoring: SOC 2 requires ongoing control effectiveness
- Third-Party Validation: Independent auditors verify both frameworks
Data Retention and Disposal
# Automated data lifecycle management
class DataLifecycleManager:
def __init__(self):
self.retention_policies = {
'clinical_data': timedelta(days=2555), # 7 years for HIPAA
'research_data': timedelta(days=3653), # 10 years for research
'audit_logs': timedelta(days=2555), # 7 years for compliance
'system_logs': timedelta(days=90), # 90 days for operations
'backup_data': timedelta(days=2555), # 7 years for recovery
}
def apply_retention_policy(self):
"""Apply retention policies across all data types"""
for data_type, retention_period in self.retention_policies.items():
cutoff_date = datetime.utcnow() - retention_period
if data_type == 'clinical_data':
self.archive_clinical_data(cutoff_date)
elif data_type == 'audit_logs':
self.archive_audit_logs(cutoff_date)
# ... handle other data types
def secure_delete(self, file_path: str):
"""Securely delete files using DoD 5220.22-M standard"""
# Three-pass overwrite
with open(file_path, 'r+b') as f:
file_size = f.seek(0, 2)
f.seek(0)
# Pass 1: Write random data
f.write(os.urandom(file_size))
f.flush()
os.fsync(f.fileno())
# Pass 2: Write complement
f.seek(0)
f.write(bytes([0xFF] * file_size))
f.flush()
os.fsync(f.fileno())
# Pass 3: Write zeros
f.seek(0)
f.write(bytes([0x00] * file_size))
f.flush()
os.fsync(f.fileno())
# Remove file
os.remove(file_path)
# Log destruction
audit_log.log_data_destruction(
file_path=file_path,
destruction_method="DoD_5220.22-M",
timestamp=datetime.utcnow()
)
Incident Response
Detection and Monitoring
# Security monitoring and alerting
class SecurityMonitor:
def __init__(self):
self.alert_thresholds = {
'failed_logins': 5,
'suspicious_api_calls': 100,
'data_export_volume': 1000, # MB
'unusual_access_patterns': 10
}
def monitor_failed_logins(self):
"""Monitor for brute force attacks"""
# Query recent failed logins
recent_failures = db.session.query(AuditLog).filter(
AuditLog.event_type == 'LOGIN_FAILED',
AuditLog.timestamp > datetime.utcnow() - timedelta(minutes=15)
).all()
# Group by IP address
failures_by_ip = {}
for failure in recent_failures:
ip = failure.ip_address
failures_by_ip[ip] = failures_by_ip.get(ip, 0) + 1
# Alert on threshold breach
for ip, count in failures_by_ip.items():
if count >= self.alert_thresholds['failed_logins']:
self.send_security_alert(
alert_type='BRUTE_FORCE_DETECTED',
details={
'ip_address': ip,
'failed_attempts': count,
'time_window': '15 minutes'
},
severity='HIGH'
)
# Temporarily block IP
self.block_ip_address(ip, duration_minutes=60)
def detect_anomalous_access(self, user_id: str):
"""Detect unusual access patterns"""
# Get user's typical access patterns
user_profile = self.get_user_access_profile(user_id)
# Current session analysis
current_session = self.get_current_session(user_id)
anomalies = []
# Check for unusual times
if self.is_unusual_time(current_session.login_time, user_profile):
anomalies.append('unusual_time')
# Check for unusual location
if self.is_unusual_location(current_session.ip_address, user_profile):
anomalies.append('unusual_location')
# Check for unusual data access volume
if self.is_unusual_data_access(current_session, user_profile):
anomalies.append('unusual_data_volume')
if len(anomalies) >= 2: # Multiple anomalies = high risk
self.send_security_alert(
alert_type='ANOMALOUS_ACCESS_DETECTED',
details={
'user_id': user_id,
'anomalies': anomalies,
'session_id': current_session.id
},
severity='MEDIUM'
)
# Require additional authentication
self.require_step_up_auth(user_id)
def send_security_alert(self, alert_type: str, details: dict, severity: str):
"""Send security alerts to SOC team"""
alert = {
'timestamp': datetime.utcnow().isoformat(),
'alert_type': alert_type,
'severity': severity,
'details': details,
'source': 'neurascale-security-monitor'
}
# Send to Slack/PagerDuty
if severity in ['HIGH', 'CRITICAL']:
self.send_pagerduty_alert(alert)
else:
self.send_slack_alert(alert)
# Log to security incident system
self.log_security_incident(alert)
Incident Response Playbook
Immediate Actions (0-15 minutes)
-
Identify and Contain
# Isolate affected systems kubectl scale deployment neural-engine --replicas=0 # Block suspicious IP addresses gcloud compute firewall-rules create block-suspicious-ip \ --direction=INGRESS \ --action=DENY \ --source-ranges=<SUSPICIOUS_IP>
-
Preserve Evidence
# Capture logs gcloud logging read "severity>=WARNING" \ --format="value(textPayload)" > incident-logs.txt # Create disk snapshots gcloud compute disks snapshot <DISK_NAME> \ --snapshot-names=incident-evidence-$(date +%Y%m%d-%H%M)
-
Notify Stakeholders
- Security team (immediate)
- Legal/Compliance (within 1 hour)
- Affected customers (as required)
- Regulatory bodies (if PHI involved)
Secure Development
Secure Coding Practices
# Input validation and sanitization
from marshmallow import Schema, fields, validate, ValidationError
import bleach
class NeuralDataSchema(Schema):
device_id = fields.Str(
required=True,
validate=validate.Regexp(r'^[a-zA-Z0-9_-]+$'),
error_messages={'invalid': 'Device ID contains invalid characters'}
)
sample_rate = fields.Int(
required=True,
validate=validate.Range(min=1, max=10000),
error_messages={'invalid': 'Sample rate must be between 1 and 10000 Hz'}
)
channels = fields.List(
fields.Int(validate=validate.Range(min=1, max=256)),
validate=validate.Length(min=1, max=64),
required=True
)
metadata = fields.Dict(
keys=fields.Str(validate=validate.Length(max=50)),
values=fields.Str(validate=validate.Length(max=500))
)
def validate_and_sanitize_input(data: dict) -> dict:
"""Validate and sanitize all user inputs"""
schema = NeuralDataSchema()
try:
# Validate structure and types
validated_data = schema.load(data)
# Sanitize string fields
if 'metadata' in validated_data:
for key, value in validated_data['metadata'].items():
validated_data['metadata'][key] = bleach.clean(
value,
tags=[], # No HTML tags allowed
strip=True
)
return validated_data
except ValidationError as err:
raise ValueError(f"Input validation failed: {err.messages}")
# SQL injection prevention
def get_user_sessions(user_id: str, start_date: datetime, end_date: datetime):
"""Safe database query using parameterized statements"""
# Input validation
if not isinstance(user_id, str) or not user_id.strip():
raise ValueError("Invalid user_id")
if start_date >= end_date:
raise ValueError("start_date must be before end_date")
# Parameterized query (prevents SQL injection)
query = """
SELECT id, name, start_time, end_time, status
FROM sessions
WHERE user_id = %s
AND start_time >= %s
AND start_time <= %s
ORDER BY start_time DESC
LIMIT 100
"""
return db.session.execute(
text(query),
(user_id, start_date, end_date)
).fetchall()
# XSS prevention
def render_user_content(content: str) -> str:
"""Safely render user-generated content"""
# Whitelist allowed HTML tags and attributes
allowed_tags = ['p', 'br', 'strong', 'em', 'ul', 'ol', 'li']
allowed_attributes = {}
cleaned_content = bleach.clean(
content,
tags=allowed_tags,
attributes=allowed_attributes,
strip=True
)
return cleaned_content
Security Testing
# Security testing framework
import pytest
import requests
from selenium import webdriver
from zap import ZAPv2
class SecurityTestSuite:
def __init__(self):
self.base_url = "https://staging-api.neurascale.io"
self.zap = ZAPv2()
def test_authentication_bypass(self):
"""Test for authentication bypass vulnerabilities"""
protected_endpoints = [
'/api/v1/devices',
'/api/v1/sessions',
'/api/v1/users/profile'
]
for endpoint in protected_endpoints:
# Test without token
response = requests.get(f"{self.base_url}{endpoint}")
assert response.status_code == 401, f"Endpoint {endpoint} not properly protected"
# Test with invalid token
headers = {'Authorization': 'Bearer invalid_token'}
response = requests.get(f"{self.base_url}{endpoint}", headers=headers)
assert response.status_code == 401, f"Endpoint {endpoint} accepts invalid tokens"
def test_sql_injection(self):
"""Test for SQL injection vulnerabilities"""
# Common SQL injection payloads
payloads = [
"'; DROP TABLE users; --",
"' OR '1'='1",
"' UNION SELECT * FROM users --",
"admin'/*",
"'; EXEC xp_cmdshell('dir'); --"
]
for payload in payloads:
# Test in various parameters
test_data = {
'device_id': payload,
'user_id': payload,
'session_name': payload
}
response = requests.post(
f"{self.base_url}/api/v1/sessions",
json=test_data,
headers={'Authorization': 'Bearer valid_test_token'}
)
# Should return 400 (validation error) not 500 (database error)
assert response.status_code != 500, f"Potential SQL injection with payload: {payload}"
def test_xss_prevention(self):
"""Test for XSS vulnerabilities"""
xss_payloads = [
"<script>alert('XSS')</script>",
"<img src=x onerror=alert('XSS')>",
"javascript:alert('XSS')",
"<svg onload=alert('XSS')>",
"';alert('XSS');//"
]
for payload in xss_payloads:
# Test in session metadata
test_data = {
'name': f"Test Session {payload}",
'description': f"Description with {payload}",
'metadata': {'notes': payload}
}
response = requests.post(
f"{self.base_url}/api/v1/sessions",
json=test_data,
headers={'Authorization': 'Bearer valid_test_token'}
)
if response.status_code == 201:
# Verify payload was sanitized
session_data = response.json()
assert payload not in str(session_data), f"XSS payload not sanitized: {payload}"
def test_rate_limiting(self):
"""Test rate limiting implementation"""
# Rapid requests to test rate limiting
responses = []
for i in range(150): # Exceed 100 req/min limit
response = requests.get(f"{self.base_url}/api/v1/status")
responses.append(response.status_code)
# Should see 429 (Too Many Requests) responses
assert 429 in responses, "Rate limiting not properly implemented"
def run_zap_security_scan(self):
"""Run OWASP ZAP security scan"""
# Start ZAP daemon
self.zap.core.new_session()
# Spider the application
self.zap.spider.scan(self.base_url)
# Wait for spider to complete
while int(self.zap.spider.status()) < 100:
time.sleep(1)
# Active security scan
self.zap.ascan.scan(self.base_url)
# Wait for scan to complete
while int(self.zap.ascan.status()) < 100:
time.sleep(5)
# Get results
alerts = self.zap.core.alerts()
# Filter high-risk issues
high_risk_alerts = [
alert for alert in alerts
if alert['risk'] in ['High', 'Critical']
]
assert len(high_risk_alerts) == 0, f"High-risk security issues found: {high_risk_alerts}"
Security Monitoring
Real-time Alerting
# Cloud Monitoring alerting policies
resource "google_monitoring_alert_policy" "failed_login_attempts" {
display_name = "High Failed Login Attempts"
combiner = "OR"
conditions {
display_name = "Failed login rate"
condition_threshold {
filter = "resource.type=\"cloud_run_revision\" AND jsonPayload.event_type=\"LOGIN_FAILED\""
duration = "300s"
comparison = "COMPARISON_GT"
threshold_value = 10
aggregations {
alignment_period = "60s"
per_series_aligner = "ALIGN_RATE"
}
}
}
notification_channels = [
google_monitoring_notification_channel.pagerduty.name
]
alert_strategy {
auto_close = "1800s"
}
}
resource "google_monitoring_alert_policy" "data_exfiltration" {
display_name = "Unusual Data Export Volume"
combiner = "OR"
conditions {
display_name = "High data export rate"
condition_threshold {
filter = "resource.type=\"cloud_run_revision\" AND jsonPayload.event_type=\"DATA_EXPORT\""
duration = "900s"
comparison = "COMPARISON_GT"
threshold_value = 1000000000 # 1GB
aggregations {
alignment_period = "300s"
per_series_aligner = "ALIGN_SUM"
}
}
}
notification_channels = [
google_monitoring_notification_channel.security_team.name
]
}
For more security guidance, see our Security Architecture documentation and Compliance Checklist.
Next Steps
- Review Security Policies: Familiarize yourself with our security policies and procedures
- Complete Security Training: All team members must complete annual security training
- Configure Development Environment: Follow secure development setup guide
- Implement Security Controls: Apply security controls appropriate for your role
- Regular Security Reviews: Participate in quarterly security reviews
Remember: Security is everyone’s responsibility. When in doubt, ask the security team for guidance.
For security incidents or questions: security@neurascale.io