Domain: Healthcare Systems — HIPAA, EHR, HL7/FHIR
Healthcare IT là một trong những domain phức tạp nhất trong software engineering: không chỉ phải đảm bảo tính mạng (life-critical systems), mà còn phải tuân thủ hàng loạt quy định pháp lý nghiêm ngặt về bảo mật dữ liệu. Một lỗi ở đây không chỉ làm crash một trang web — nó có thể gây nguy hiểm tính mạng hoặc vi phạm HIPAA với mức phạt lên đến $50,000/vi phạm và có thể lên đến $1.5M/năm.
Section này mô tả cách các hệ thống healthcare như Epic, Cerner, Allscripts, hoặc các HIS (Hospital Information System) tại Việt Nam thực sự hoạt động, cùng với những thách thức về interoperability, compliance, và security mà senior engineers phải đối mặt.
1. Healthcare IT Landscape — Bối cảnh và Kiến trúc Tổng thể
1.1 Các thành phần chính trong Healthcare System
┌─────────────────────────────────────────────────────────────────────┐
│ Patient Portal │
│ (Web/Mobile, Patient-facing) │
└──────────────────────────────┬──────────────────────────────────────┘
│ HTTPS/FHIR
▼
┌──────────────────────────────────────────────────────────────────────┐
│ API Gateway / HIE │
│ (Health Information Exchange/Interoperability) │
└───────┬──────────────┬─────────────┬────────────────┬───────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌────────┐ ┌──────────┐
│ EHR │ │ PACS │ │ LIS │ │ RIS │
│(Epic, │ │(Imaging)│ │ (Lab) │ │(Radiology│
│Cerner) │ │ │ │ │ │ Info) │
└────┬───┘ └────┬────┘ └───┬────┘ └────┬─────┘
│ │ │ │
└─────────────┴────────────┴─────────────────┘
│
▼
┌────────────────────┐
│ Integration Engine│
│ (Mirth, Rhapsody) │
│ HL7 v2/FHIR │
└────────────────────┘
Key Components:
- EHR (Electronic Health Record): Hệ thống lưu trữ toàn bộ hồ sơ bệnh án điện tử
- PACS (Picture Archiving and Communication System): Lưu trữ và quản lý hình ảnh y khoa (X-ray, CT, MRI)
- LIS (Laboratory Information System): Quản lý kết quả xét nghiệm
- RIS (Radiology Information System): Quản lý lịch hẹn và báo cáo chẩn đoán hình ảnh
- HIE (Health Information Exchange): Trao đổi dữ liệu giữa các tổ chức y tế
- Integration Engine: Trung gian chuyển đổi và định tuyến message (HL7/FHIR)
1.2 Tại sao Healthcare IT khó?
Regulatory Complexity:
┌──────────────────────────────────────────────────────────────┐
│ HIPAA (US) — Privacy + Security + Breach Notification │
│ HITECH Act — EHR incentives + increased penalties │
│ GDPR (EU) — Right to erasure (conflicts with EHR retention!) │
│ FDA regulations — Medical device software (SaMD) │
│ State laws — California CMIA, Texas HB 300 │
└──────────────────────────────────────────────────────────────┘
Technical Challenges:
- Legacy Systems: Hầu hết bệnh viện vẫn dùng HL7 v2 (chuẩn năm 1987!)
- Vendor Lock-in: Epic, Cerner chiếm 50%+ thị trường US, proprietary APIs
- Data Silos: Mỗi department dùng hệ thống riêng, integration nightmare
- High Availability: 99.99% uptime requirement (4 phút downtime/tháng)
- Latency Sensitivity: Clinical decision support cần response < 200ms
Engineering Trade-offs:
Security ←→ Usability
(nhiều auth steps) ←→ (doctors cần access nhanh trong emergency)
Compliance ←→ Innovation Speed
(audit mọi thay đổi) ←→ (deploy nhanh hotfix)
De-identification ←→ Clinical Utility
(remove PII cho research) ←→ (contextual info needed)
2. HIPAA Compliance — Tuân thủ Privacy và Security
2.1 HIPAA là gì và tại sao engineer phải biết?
HIPAA (Health Insurance Portability and Accountability Act) có 3 rules chính:
- Privacy Rule: Quy định ai được truy cập PHI (Protected Health Information)
- Security Rule: Yêu cầu kỹ thuật để bảo vệ ePHI (electronic PHI)
- Breach Notification Rule: Notification trong vòng 60 ngày nếu data breach
PHI (Protected Health Information) bao gồm 18 identifiers:
1. Names 10. Vehicle identifiers (VIN, plate)
2. Geographic subdivisions 11. Device identifiers/serial numbers
3. Dates (birth, death, admit) 12. URLs
4. Phone numbers 13. IP addresses
5. Fax numbers 14. Biometric identifiers (fingerprint)
6. Email addresses 15. Photos
7. SSN 16. Any other unique identifying number
8. MRN (Medical Record Number) 17. Code or characteristic
9. Health plan beneficiary no. 18. Certificate/license numbers
Quan trọng: Ngay cả IP address của bệnh nhân cũng là PHI theo HIPAA!
2.2 Security Rule — Technical Requirements
HIPAA Security Rule có 3 categories với tổng cộng 18 standards:
A. Administrative Safeguards
✓ Security Management Process
- Risk Analysis (annual assessment)
- Risk Management (mitigation plan)
- Sanction Policy (employee violations)
- Information System Activity Review (audit logs)
✓ Workforce Security
- Authorization/Supervision
- Workforce Clearance (background check)
- Termination Procedures (revoke access immediately)
✓ Access Management
- Isolate healthcare clearinghouse functions
- Access Authorization (role-based access)
- Access Establishment/Modification
B. Physical Safeguards
✓ Facility Access Controls
- Badge systems, biometric access
- Visitor logs, escort procedures
- Secure disposal (shred PHI, wipe drives)
✓ Workstation Security
- Auto-lock after 5 min idle
- Privacy screens in public areas
- No unauthorized USB devices
✓ Device and Media Controls
- Encrypt all portable devices (laptops, USB)
- Serial number tracking
- Secure decommissioning (NIST 800-88 wipe standards)
C. Technical Safeguards (engineer's focus!)
✓ Access Control
- Unique user identification (no shared accounts!)
- Emergency access procedure (break-glass)
- Auto logoff (session timeout)
- Encryption and decryption
✓ Audit Controls
- Log access to PHI (who, what, when, where)
- Detect unauthorized access attempts
- Retain logs 6 years minimum
✓ Integrity Controls
- Detect unauthorized alterations
- Digital signatures for critical data
✓ Transmission Security
- TLS 1.3 for data in transit
- No unencrypted email of PHI
- VPN for remote access
2.3 Encryption Requirements — Technical Implementation
At Rest Encryption:
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import os
class PHIEncryption:
"""
HIPAA-compliant encryption for PHI at rest.
Uses AES-256-GCM (FIPS 140-2 validated algorithm).
"""
def __init__(self, master_key: bytes):
"""
master_key should come from KMS (AWS KMS, Azure Key Vault, HSM)
and be rotated every 90 days per security policy.
"""
self.master_key = master_key
def encrypt_phi(self, plaintext: str, patient_id: str) -> dict:
"""
Encrypt PHI with authenticated encryption.
Returns: dict with ciphertext, nonce, tag, and key_version.
"""
# Generate unique nonce (96-bit for GCM)
nonce = os.urandom(12)
# Use patient_id as additional authenticated data (AAD)
# This binds ciphertext to specific patient
aad = patient_id.encode('utf-8')
# Encrypt with AES-256-GCM
aesgcm = AESGCM(self.master_key)
ciphertext = aesgcm.encrypt(
nonce,
plaintext.encode('utf-8'),
aad
)
return {
'ciphertext': ciphertext.hex(),
'nonce': nonce.hex(),
'aad': patient_id,
'key_version': 'v1', # for key rotation tracking
'algorithm': 'AES-256-GCM'
}
def decrypt_phi(self, encrypted_data: dict) -> str:
"""
Decrypt PHI and verify authenticity.
Raises: InvalidTag if ciphertext was tampered with.
"""
nonce = bytes.fromhex(encrypted_data['nonce'])
ciphertext = bytes.fromhex(encrypted_data['ciphertext'])
aad = encrypted_data['aad'].encode('utf-8')
aesgcm = AESGCM(self.master_key)
plaintext = aesgcm.decrypt(nonce, ciphertext, aad)
return plaintext.decode('utf-8')
# Database schema for encrypted PHI
"""
CREATE TABLE patient_records (
patient_id UUID PRIMARY KEY,
encrypted_ssn BYTEA NOT NULL, -- encrypted at application layer
encrypted_diagnosis BYTEA NOT NULL,
nonce BYTEA NOT NULL,
key_version VARCHAR(10) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
-- Non-PHI fields can be plaintext for indexing
facility_id UUID NOT NULL,
record_type VARCHAR(50) NOT NULL,
-- Audit metadata
created_by UUID NOT NULL,
last_accessed_at TIMESTAMPTZ,
last_accessed_by UUID
);
-- Transparent Data Encryption (TDE) at database level
-- Encrypts entire database file with separate key
ALTER DATABASE healthcare_db SET encryption = ON;
"""
In Transit Encryption:
# NGINX config for HIPAA-compliant TLS
server {
listen 443 ssl http2;
server_name api.hospital.com;
# TLS 1.3 only (TLS 1.2 acceptable, but 1.3 preferred)
ssl_protocols TLSv1.3 TLSv1.2;
# FIPS 140-2 compliant cipher suites
ssl_ciphers 'TLS_AES_256_GCM_SHA384:TLS_CHACHA20_POLY1305_SHA256:TLS_AES_128_GCM_SHA256:ECDHE-RSA-AES256-GCM-SHA384';
ssl_prefer_server_ciphers on;
# HSTS — force HTTPS for 2 years
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
# Certificate pinning (prevent MITM)
add_header Public-Key-Pins 'pin-sha256="base64+primary=="; pin-sha256="base64+backup=="; max-age=5184000';
# Security headers
add_header X-Frame-Options "DENY" always;
add_header X-Content-Type-Options "nosniff" always;
add_header Content-Security-Policy "default-src 'self'" always;
# Disable TLS session tickets (forward secrecy)
ssl_session_tickets off;
location /api/ {
proxy_pass http://backend:8080;
# Log PHI access for audit
access_log /var/log/nginx/phi-access.log combined;
}
}
2.4 Access Control — Role-Based Access Control (RBAC)
-- RBAC schema for healthcare system
CREATE TABLE roles (
role_id UUID PRIMARY KEY,
role_name VARCHAR(100) NOT NULL UNIQUE,
description TEXT,
-- HIPAA: Document minimum necessary principle
data_access_scope VARCHAR(50) NOT NULL, -- 'OWN_PATIENTS', 'DEPARTMENT', 'FACILITY', 'ALL'
CONSTRAINT chk_scope CHECK (data_access_scope IN (
'OWN_PATIENTS', -- Primary care provider
'DEPARTMENT', -- ED doctors see all ED patients
'FACILITY', -- Hospital administrator
'ALL' -- System admin (requires justification)
))
);
-- Pre-defined roles
INSERT INTO roles VALUES
('...', 'PHYSICIAN', 'Licensed physician', 'OWN_PATIENTS'),
('...', 'NURSE', 'Registered nurse', 'OWN_PATIENTS'),
('...', 'ED_PHYSICIAN', 'Emergency dept physician', 'DEPARTMENT'),
('...', 'RADIOLOGIST', 'Radiologist', 'DEPARTMENT'),
('...', 'LAB_TECH', 'Lab technician', 'DEPARTMENT'),
('...', 'BILLING_STAFF', 'Billing department', 'FACILITY'),
('...', 'RESEARCHER', 'Research staff (de-identified data only)', 'NONE'),
('...', 'SYSTEM_ADMIN', 'IT administrator', 'ALL');
CREATE TABLE permissions (
permission_id UUID PRIMARY KEY,
resource_type VARCHAR(100) NOT NULL, -- 'PATIENT_RECORD', 'LAB_RESULT', etc.
action VARCHAR(50) NOT NULL, -- 'READ', 'WRITE', 'DELETE'
conditions JSONB, -- context-based access control
CONSTRAINT chk_action CHECK (action IN ('READ', 'WRITE', 'DELETE', 'EXPORT'))
);
CREATE TABLE role_permissions (
role_id UUID REFERENCES roles(role_id),
permission_id UUID REFERENCES permissions(permission_id),
granted_at TIMESTAMPTZ DEFAULT now(),
granted_by UUID NOT NULL, -- who granted this permission
justification TEXT NOT NULL, -- HIPAA: document why access needed
PRIMARY KEY (role_id, permission_id)
);
CREATE TABLE user_roles (
user_id UUID NOT NULL,
role_id UUID REFERENCES roles(role_id),
assigned_at TIMESTAMPTZ DEFAULT now(),
assigned_by UUID NOT NULL,
expires_at TIMESTAMPTZ, -- temporary access (e.g., locum doctors)
department_id UUID, -- scope limitation
PRIMARY KEY (user_id, role_id)
);
Emergency Access (Break-Glass):
from typing import Optional
from datetime import datetime, timedelta
import logging
class BreakGlassAccess:
"""
Emergency access mechanism for life-threatening situations.
HIPAA allows emergency access to PHI, but requires:
1. Detailed audit logging
2. Post-access review
3. Justification documentation
"""
def grant_emergency_access(
self,
user_id: str,
patient_id: str,
justification: str,
requesting_officer: str
) -> str:
"""
Grant temporary elevated access in emergency.
Returns: access_token valid for 1 hour
"""
# Log the break-glass event IMMEDIATELY
audit_id = self.log_break_glass_event(
user_id=user_id,
patient_id=patient_id,
justification=justification,
requesting_officer=requesting_officer
)
# Alert security team in real-time
self.send_alert_to_security_team(audit_id)
# Grant temporary access
access_token = self.create_temporary_token(
user_id=user_id,
patient_id=patient_id,
expires_in=timedelta(hours=1),
access_level='EMERGENCY_FULL'
)
# Schedule automatic review
self.schedule_post_access_review(
audit_id=audit_id,
review_deadline=datetime.now() + timedelta(hours=24)
)
logging.critical(
f"BREAK-GLASS ACCESS GRANTED: "
f"user={user_id} patient={patient_id} "
f"officer={requesting_officer} audit_id={audit_id}"
)
return access_token
def post_access_review(self, audit_id: str, reviewer_id: str, verdict: str):
"""
Mandatory review within 24h after break-glass access.
Verdict options:
- JUSTIFIED: Emergency was legitimate
- UNJUSTIFIED: Inappropriate access → HR referral
- REQUIRES_INVESTIGATION: Escalate to compliance
"""
if verdict == 'UNJUSTIFIED':
self.trigger_hr_investigation(audit_id)
self.send_breach_notification_if_needed(audit_id)
2.5 Business Associate Agreement (BAA)
Khi làm việc với third-party services (AWS, Google Cloud, Twilio, etc.), bạn PHẢI có BAA (Business Associate Agreement) trước khi xử lý PHI.
Common mistake:
# ❌ HIPAA VIOLATION — No BAA with OpenAI
import openai
def get_diagnosis_suggestion(symptoms: str) -> str:
# Sending PHI to third-party without BAA
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Patient symptoms: {symptoms}"}]
)
return response.choices[0].message.content
# ✅ CORRECT — Use BAA-covered service or de-identify first
def get_diagnosis_suggestion_hipaa_compliant(symptoms: str) -> str:
# De-identify before sending to third-party
de_identified = remove_phi_from_text(symptoms)
# Or use HIPAA-compliant AI service with signed BAA
response = compliant_ai_service.analyze(de_identified)
return response
Services with BAA available:
- AWS (S3, RDS, Lambda, etc.) — sign BAA in AWS Artifact
- Google Cloud (GCP) — sign BAA in admin console
- Azure — sign BAA in compliance manager
- Twilio — Enterprise plan with HIPAA compliance
- ❌ OpenAI — No BAA available (as of 2026)
- ❌ Most analytics tools (Google Analytics, Mixpanel) — No BAA
3. EHR Architecture — Electronic Health Records
3.1 Data Model — Clinical Data Representation
EHR không chỉ là "database lưu patient info" — nó là một complex domain model với temporal data, versioning, và rich relationships.
-- Core patient demographics
CREATE TABLE patients (
patient_id UUID PRIMARY KEY,
mrn VARCHAR(20) UNIQUE NOT NULL, -- Medical Record Number
encrypted_ssn BYTEA,
encrypted_first_name BYTEA NOT NULL,
encrypted_last_name BYTEA NOT NULL,
date_of_birth DATE NOT NULL,
gender VARCHAR(20),
encrypted_address BYTEA,
encrypted_phone BYTEA,
-- Clinical identifiers
blood_type VARCHAR(5), -- 'A+', 'O-', etc.
organ_donor BOOLEAN,
-- Metadata
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
is_deceased BOOLEAN DEFAULT FALSE,
deceased_date DATE,
-- Audit trail
created_by UUID NOT NULL,
facility_id UUID NOT NULL
);
-- Encounters — Each hospital visit or appointment
CREATE TABLE encounters (
encounter_id UUID PRIMARY KEY,
patient_id UUID NOT NULL REFERENCES patients(patient_id),
encounter_type VARCHAR(50) NOT NULL, -- 'INPATIENT', 'OUTPATIENT', 'EMERGENCY'
admission_time TIMESTAMPTZ NOT NULL,
discharge_time TIMESTAMPTZ,
chief_complaint TEXT, -- Why patient came in
-- Care team
attending_physician UUID NOT NULL,
admitting_physician UUID,
consulting_physicians UUID[],
primary_nurse UUID,
-- Location tracking
department_id UUID NOT NULL,
room_number VARCHAR(20),
bed_number VARCHAR(20),
-- Clinical status
acuity_level INTEGER, -- 1-5, ESI (Emergency Severity Index)
isolation_required BOOLEAN DEFAULT FALSE,
isolation_type VARCHAR(50), -- 'CONTACT', 'DROPLET', 'AIRBORNE'
-- Disposition
discharge_disposition VARCHAR(50), -- 'HOME', 'TRANSFERRED', 'ADMITTED', 'DECEASED'
CONSTRAINT chk_encounter_type CHECK (encounter_type IN (
'INPATIENT', 'OUTPATIENT', 'EMERGENCY', 'OBSERVATION', 'TELEMEDICINE'
))
);
-- Clinical observations (vitals, measurements)
CREATE TABLE observations (
observation_id UUID PRIMARY KEY,
encounter_id UUID NOT NULL REFERENCES encounters(encounter_id),
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- LOINC code (Logical Observation Identifiers Names and Codes)
loinc_code VARCHAR(20) NOT NULL, -- e.g., '8480-6' for Systolic BP
observation_type VARCHAR(100) NOT NULL,
-- Value (polymorphic — can be numeric, text, or coded)
value_numeric DECIMAL(10,2),
value_text TEXT,
value_code VARCHAR(50),
value_unit VARCHAR(20), -- 'mmHg', 'mg/dL', 'bpm'
-- Reference ranges
reference_low DECIMAL(10,2),
reference_high DECIMAL(10,2),
is_abnormal BOOLEAN,
-- Time and performer
observed_at TIMESTAMPTZ NOT NULL,
observed_by UUID NOT NULL,
-- Status
status VARCHAR(20) NOT NULL, -- 'FINAL', 'PRELIMINARY', 'CORRECTED'
CONSTRAINT chk_status CHECK (status IN ('PRELIMINARY', 'FINAL', 'AMENDED', 'CANCELLED'))
);
-- Example LOINC codes:
-- 8480-6: Systolic blood pressure
-- 8462-4: Diastolic blood pressure
-- 8867-4: Heart rate
-- 2339-0: Glucose [Mass/volume] in Blood
-- 718-7: Hemoglobin [Mass/volume] in Blood
-- Medications (prescriptions and administrations)
CREATE TABLE medication_orders (
order_id UUID PRIMARY KEY,
encounter_id UUID NOT NULL REFERENCES encounters(encounter_id),
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- Drug identification (RxNorm codes)
rxnorm_code VARCHAR(20) NOT NULL,
medication_name VARCHAR(200) NOT NULL,
generic_name VARCHAR(200),
-- Dosage
dose DECIMAL(10,2) NOT NULL,
dose_unit VARCHAR(20) NOT NULL, -- 'mg', 'mL', 'tablets'
route VARCHAR(50) NOT NULL, -- 'ORAL', 'IV', 'IM'
frequency VARCHAR(50) NOT NULL, -- 'BID', 'TID', 'Q6H'
-- Schedule
start_date TIMESTAMPTZ NOT NULL,
end_date TIMESTAMPTZ,
is_prn BOOLEAN DEFAULT FALSE, -- "as needed"
prn_reason TEXT,
-- Safety
indication TEXT NOT NULL,
allergy_checked BOOLEAN NOT NULL DEFAULT FALSE,
interaction_checked BOOLEAN NOT NULL DEFAULT FALSE,
-- Prescriber
ordered_by UUID NOT NULL,
ordered_at TIMESTAMPTZ NOT NULL,
-- Status
status VARCHAR(20) NOT NULL,
discontinue_reason TEXT,
CONSTRAINT chk_route CHECK (route IN (
'ORAL', 'IV', 'IM', 'SC', 'TOPICAL', 'INHALATION', 'RECTAL', 'OPHTHALMIC'
)),
CONSTRAINT chk_status CHECK (status IN (
'ACTIVE', 'COMPLETED', 'DISCONTINUED', 'ON_HOLD', 'CANCELLED'
))
);
CREATE TABLE medication_administrations (
administration_id UUID PRIMARY KEY,
order_id UUID NOT NULL REFERENCES medication_orders(order_id),
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- What was given
dose_given DECIMAL(10,2) NOT NULL,
dose_unit VARCHAR(20) NOT NULL,
route_used VARCHAR(50) NOT NULL,
-- When and by whom
administered_at TIMESTAMPTZ NOT NULL,
administered_by UUID NOT NULL,
-- Barcode verification (5 rights check)
patient_barcode_scanned BOOLEAN NOT NULL,
medication_barcode_scanned BOOLEAN NOT NULL,
-- Outcome
status VARCHAR(20) NOT NULL, -- 'GIVEN', 'REFUSED', 'HELD'
refusal_reason TEXT,
adverse_reaction TEXT,
CONSTRAINT chk_status CHECK (status IN ('GIVEN', 'REFUSED', 'HELD', 'WASTED'))
);
-- Allergies and adverse reactions
CREATE TABLE allergies (
allergy_id UUID PRIMARY KEY,
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- Allergen identification
allergen_type VARCHAR(50) NOT NULL, -- 'MEDICATION', 'FOOD', 'ENVIRONMENT'
allergen_code VARCHAR(20), -- RxNorm or SNOMED code
allergen_name VARCHAR(200) NOT NULL,
-- Reaction
reaction TEXT NOT NULL,
severity VARCHAR(20) NOT NULL, -- 'MILD', 'MODERATE', 'SEVERE', 'ANAPHYLAXIS'
-- Onset and verification
onset_date DATE,
verified_by UUID,
verification_status VARCHAR(20) NOT NULL, -- 'CONFIRMED', 'SUSPECTED', 'REFUTED'
-- Metadata
recorded_at TIMESTAMPTZ NOT NULL,
recorded_by UUID NOT NULL,
is_active BOOLEAN DEFAULT TRUE,
CONSTRAINT chk_allergen_type CHECK (allergen_type IN (
'MEDICATION', 'FOOD', 'ENVIRONMENT', 'OTHER'
)),
CONSTRAINT chk_severity CHECK (severity IN (
'MILD', 'MODERATE', 'SEVERE', 'ANAPHYLAXIS'
))
);
-- Diagnoses (ICD-10 coded)
CREATE TABLE diagnoses (
diagnosis_id UUID PRIMARY KEY,
encounter_id UUID NOT NULL REFERENCES encounters(encounter_id),
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- ICD-10 code (International Classification of Diseases)
icd10_code VARCHAR(10) NOT NULL,
diagnosis_name VARCHAR(200) NOT NULL,
-- Type and status
diagnosis_type VARCHAR(20) NOT NULL, -- 'ADMITTING', 'WORKING', 'FINAL'
is_primary BOOLEAN DEFAULT FALSE,
-- Time
diagnosed_at TIMESTAMPTZ NOT NULL,
diagnosed_by UUID NOT NULL,
resolved_at TIMESTAMPTZ,
-- Severity and status
severity VARCHAR(20),
status VARCHAR(20) NOT NULL, -- 'ACTIVE', 'RESOLVED', 'INACTIVE'
CONSTRAINT chk_type CHECK (diagnosis_type IN (
'ADMITTING', 'WORKING', 'FINAL', 'DIFFERENTIAL'
))
);
-- Procedures (CPT coded for billing)
CREATE TABLE procedures (
procedure_id UUID PRIMARY KEY,
encounter_id UUID NOT NULL REFERENCES encounters(encounter_id),
patient_id UUID NOT NULL REFERENCES patients(patient_id),
-- CPT code (Current Procedural Terminology)
cpt_code VARCHAR(10) NOT NULL,
procedure_name VARCHAR(200) NOT NULL,
-- Scheduling
scheduled_time TIMESTAMPTZ,
start_time TIMESTAMPTZ,
end_time TIMESTAMPTZ,
-- Performers
primary_surgeon UUID,
assisting_surgeons UUID[],
anesthesiologist UUID,
-- Location
operating_room VARCHAR(20),
-- Status
status VARCHAR(20) NOT NULL,
cancellation_reason TEXT,
-- Documentation
procedure_note TEXT,
complications TEXT,
CONSTRAINT chk_status CHECK (status IN (
'SCHEDULED', 'IN_PROGRESS', 'COMPLETED', 'CANCELLED', 'POSTPONED'
))
);
3.2 Temporal Data and Versioning
Clinical data thay đổi theo thời gian, và PHẢI lưu lại history (medical-legal requirement).
-- Problem list (chronic conditions) với versioning
CREATE TABLE problem_list (
problem_id UUID PRIMARY KEY,
patient_id UUID NOT NULL REFERENCES patients(patient_id),
version INTEGER NOT NULL, -- version counter
-- Clinical info
snomed_code VARCHAR(20) NOT NULL, -- SNOMED CT code
problem_name VARCHAR(200) NOT NULL,
-- Time bounds
onset_date DATE,
resolved_date DATE,
valid_from TIMESTAMPTZ NOT NULL, -- when this version became current
valid_to TIMESTAMPTZ, -- NULL = current version
-- Status
status VARCHAR(20) NOT NULL,
severity VARCHAR(20),
-- Provenance
recorded_by UUID NOT NULL,
updated_by UUID,
update_reason TEXT,
-- Ensure only one current version per problem
CONSTRAINT unique_current_version UNIQUE (patient_id, problem_id, valid_to)
WHERE (valid_to IS NULL),
CONSTRAINT chk_status CHECK (status IN (
'ACTIVE', 'CHRONIC', 'INTERMITTENT', 'RESOLVED', 'INACTIVE'
))
);
-- Query current problems for a patient
CREATE VIEW current_problems AS
SELECT * FROM problem_list WHERE valid_to IS NULL;
-- Slowly Changing Dimension (SCD Type 2) pattern
-- Example: Patient changes address
CREATE TABLE patient_addresses_history (
address_id UUID PRIMARY KEY,
patient_id UUID NOT NULL REFERENCES patients(patient_id),
encrypted_street BYTEA NOT NULL,
encrypted_city BYTEA NOT NULL,
encrypted_state BYTEA NOT NULL,
encrypted_zip BYTEA NOT NULL,
-- Temporal columns
valid_from TIMESTAMPTZ NOT NULL,
valid_to TIMESTAMPTZ, -- NULL = current address
is_current BOOLEAN GENERATED ALWAYS AS (valid_to IS NULL) STORED,
-- Audit
updated_by UUID NOT NULL,
update_reason VARCHAR(100)
);
3.3 Clinical Decision Support (CDS)
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
@dataclass
class Alert:
severity: str # 'INFO', 'WARNING', 'CRITICAL'
message: str
recommendation: str
evidence: str
override_allowed: bool
class ClinicalDecisionSupport:
"""
Real-time clinical alerts and treatment recommendations.
Common CDS rules:
1. Drug-drug interactions
2. Drug-allergy checking
3. Duplicate therapy detection
4. Dosage range checking
5. Lab value monitoring
6. Clinical guidelines compliance
"""
def check_medication_order(
self,
patient_id: str,
medication: dict,
current_medications: List[dict],
allergies: List[dict],
lab_results: List[dict]
) -> List[Alert]:
"""
Run all medication safety checks before order is finalized.
"""
alerts = []
# 1. Allergy check (highest priority)
allergy_alert = self.check_allergies(medication, allergies)
if allergy_alert:
alerts.append(allergy_alert)
# 2. Drug-drug interactions
interaction_alerts = self.check_interactions(medication, current_medications)
alerts.extend(interaction_alerts)
# 3. Duplicate therapy
duplicate_alert = self.check_duplicate_therapy(medication, current_medications)
if duplicate_alert:
alerts.append(duplicate_alert)
# 4. Renal dosing adjustment
renal_alert = self.check_renal_dosing(medication, lab_results)
if renal_alert:
alerts.append(renal_alert)
# 5. Pregnancy category check
pregnancy_alert = self.check_pregnancy_safety(patient_id, medication)
if pregnancy_alert:
alerts.append(pregnancy_alert)
return alerts
def check_allergies(self, medication: dict, allergies: List[dict]) -> Optional[Alert]:
"""
Check if patient has allergy to ordered medication or drug class.
"""
med_rxnorm = medication['rxnorm_code']
for allergy in allergies:
if not allergy['is_active']:
continue
# Direct match
if allergy['allergen_code'] == med_rxnorm:
return Alert(
severity='CRITICAL',
message=f"⚠️ ALLERGY ALERT: Patient allergic to {medication['name']}",
recommendation="DO NOT ADMINISTER. Consider alternative medication.",
evidence=f"Documented reaction: {allergy['reaction']} ({allergy['severity']})",
override_allowed=False # Hard stop
)
# Cross-allergy check (e.g., Penicillin → Cephalosporins)
if self.check_cross_allergy(med_rxnorm, allergy['allergen_code']):
return Alert(
severity='WARNING',
message=f"Cross-allergy risk: Patient allergic to {allergy['allergen_name']}",
recommendation=f"Use with caution. Consider desensitization protocol.",
evidence=f"Known cross-reactivity between drug classes",
override_allowed=True # Provider can override with justification
)
return None
def check_interactions(
self,
new_med: dict,
current_meds: List[dict]
) -> List[Alert]:
"""
Check for drug-drug interactions.
Uses FDA interaction tables and clinical databases.
"""
alerts = []
for med in current_meds:
if med['status'] != 'ACTIVE':
continue
# Query interaction database (simplified)
interaction = self.drug_interaction_db.query(
drug1=new_med['rxnorm_code'],
drug2=med['rxnorm_code']
)
if interaction:
if interaction['severity'] == 'SEVERE':
alerts.append(Alert(
severity='CRITICAL',
message=f"⚠️ SEVERE INTERACTION: {new_med['name']} + {med['name']}",
recommendation=interaction['recommendation'],
evidence=interaction['mechanism'],
override_allowed=True
))
elif interaction['severity'] == 'MODERATE':
alerts.append(Alert(
severity='WARNING',
message=f"Moderate interaction: {new_med['name']} + {med['name']}",
recommendation=interaction['recommendation'],
evidence=interaction['mechanism'],
override_allowed=True
))
return alerts
def check_renal_dosing(self, medication: dict, lab_results: List[dict]) -> Optional[Alert]:
"""
Adjust dosage based on kidney function (eGFR).
"""
# Find most recent creatinine
recent_cr = next(
(lab for lab in sorted(lab_results, key=lambda x: x['observed_at'], reverse=True)
if lab['loinc_code'] == '2160-0'), # Serum creatinine
None
)
if not recent_cr:
return Alert(
severity='WARNING',
message="No recent creatinine found",
recommendation="Order serum creatinine before administering renally-cleared drug",
evidence="Unable to assess renal function",
override_allowed=True
)
# Calculate eGFR (simplified CKD-EPI equation)
egfr = self.calculate_egfr(recent_cr['value_numeric'], patient_age, patient_sex)
# Check if dose adjustment needed
if medication['requires_renal_adjustment']:
recommended_dose = self.get_renal_adjusted_dose(
medication['rxnorm_code'],
egfr
)
if recommended_dose != medication['dose']:
return Alert(
severity='WARNING',
message=f"Renal dosing adjustment recommended (eGFR: {egfr} mL/min)",
recommendation=f"Reduce dose to {recommended_dose} {medication['dose_unit']}",
evidence=f"Based on eGFR {egfr} mL/min (Stage {self.ckd_stage(egfr)} CKD)",
override_allowed=True
)
return None
4. HL7 v2 — Legacy Messaging Standard
HL7 v2 (Health Level 7 version 2) là chuẩn messaging được sử dụng rộng rãi nhất trong healthcare IT, tuy nhiên nó đã 40 tuổi và có nhiều quirks.
4.1 HL7 Message Structure
MSH|^~\&|SENDING_APP|SENDING_FACILITY|RECEIVING_APP|RECEIVING_FACILITY|20260417120000||ADT^A01|MSG00001|P|2.5
EVN|A01|20260417120000
PID|1||MRN123456^^^HOSPITAL^MR||Doe^John^A||19800101|M|||123 Main St^^Boston^MA^02101^USA|||||||123-45-6789
PV1|1|I|ICU^201^01^HOSPITAL||||DOC123^Smith^Jane|||ICU|||||||DOC123|Emergency|V123456||||||||||||||||||||HOSPITAL|||||20260417080000
Segment structure:
MSH = Message Header
EVN = Event Type
PID = Patient Identification
PV1 = Patient Visit
Anatomy of an HL7 message:
Segment|Field1|Field2^Component1^Component2|...
Delimiters:
| = Field separator
^ = Component separator (within field)
& = Subcomponent separator
~ = Field repeat separator
\ = Escape character
4.2 Common HL7 Message Types
ADT (Admit, Discharge, Transfer):
A01 = Patient admission
A02 = Patient transfer
A03 = Patient discharge
A04 = Patient registration
A08 = Update patient information
A11 = Cancel patient admit
A31 = Update person information
ORM (Order Message):
O01 = Order message
O02 = Order response
ORU (Observation Result):
R01 = Unsolicited observation message (lab results)
DFT (Detailed Financial Transaction):
P03 = Post detail financial transaction
SIU (Scheduling Information Unsolicited):
S12 = New appointment booking
S13 = Appointment rescheduling
S15 = Appointment cancellation
4.3 HL7 Message Parsing và Generation
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class HL7Segment:
segment_type: str
fields: List[str]
def get_field(self, index: int) -> str:
"""Fields are 1-indexed in HL7."""
if index == 0:
return self.segment_type
try:
return self.fields[index - 1]
except IndexError:
return ""
def get_component(self, field_index: int, component_index: int) -> str:
"""Extract component from composite field."""
field = self.get_field(field_index)
components = field.split('^')
try:
return components[component_index]
except IndexError:
return ""
class HL7Message:
"""
HL7 v2.x message parser and generator.
"""
def __init__(self, message_text: str = None):
self.segments: List[HL7Segment] = []
self.field_separator = '|'
self.encoding_chars = '^~\\&'
if message_text:
self.parse(message_text)
def parse(self, message_text: str):
"""Parse HL7 message into segments."""
lines = message_text.strip().split('\n')
for line in lines:
if not line.strip():
continue
# MSH segment is special — field separator is in position 3
if line.startswith('MSH'):
segment_type = 'MSH'
self.field_separator = line[3]
self.encoding_chars = line[4:8]
# MSH fields start after encoding chars
fields = line[9:].split(self.field_separator)
else:
parts = line.split(self.field_separator)
segment_type = parts[0]
fields = parts[1:]
self.segments.append(HL7Segment(segment_type, fields))
def get_segment(self, segment_type: str, occurrence: int = 0) -> HL7Segment:
"""Get segment by type (e.g., 'PID', 'PV1')."""
segments = [s for s in self.segments if s.segment_type == segment_type]
try:
return segments[occurrence]
except IndexError:
return None
def extract_patient_info(self) -> Dict:
"""Extract patient demographics from PID segment."""
pid = self.get_segment('PID')
if not pid:
return None
# PID-2: Patient ID (deprecated, use PID-3)
# PID-3: Patient Identifier List (MRN)
patient_id_field = pid.get_field(3)
mrn = patient_id_field.split('^')[0]
# PID-5: Patient Name (Last^First^Middle)
name_field = pid.get_field(5)
name_parts = name_field.split('^')
last_name = name_parts[0] if len(name_parts) > 0 else ""
first_name = name_parts[1] if len(name_parts) > 1 else ""
middle_name = name_parts[2] if len(name_parts) > 2 else ""
# PID-7: Date of Birth (YYYYMMDD)
dob_str = pid.get_field(7)
dob = datetime.strptime(dob_str, '%Y%m%d').date() if dob_str else None
# PID-8: Sex
sex = pid.get_field(8)
# PID-11: Patient Address
address_field = pid.get_field(11)
address_parts = address_field.split('^')
street = address_parts[0] if len(address_parts) > 0 else ""
city = address_parts[2] if len(address_parts) > 2 else ""
state = address_parts[3] if len(address_parts) > 3 else ""
zip_code = address_parts[4] if len(address_parts) > 4 else ""
# PID-13: Phone Number
phone = pid.get_field(13)
# PID-19: SSN
ssn = pid.get_field(19)
return {
'mrn': mrn,
'first_name': first_name,
'last_name': last_name,
'middle_name': middle_name,
'date_of_birth': dob,
'sex': sex,
'address': {
'street': street,
'city': city,
'state': state,
'zip': zip_code
},
'phone': phone,
'ssn': ssn
}
def extract_lab_results(self) -> List[Dict]:
"""Extract lab results from OBX segments in ORU message."""
results = []
obx_segments = [s for s in self.segments if s.segment_type == 'OBX']
for obx in obx_segments:
# OBX-2: Value Type ('NM' = Numeric, 'ST' = String, 'TX' = Text)
value_type = obx.get_field(2)
# OBX-3: Observation Identifier (LOINC code)
obs_id_field = obx.get_field(3)
obs_id_parts = obs_id_field.split('^')
loinc_code = obs_id_parts[0]
test_name = obs_id_parts[1] if len(obs_id_parts) > 1 else ""
# OBX-5: Observation Value
value = obx.get_field(5)
# OBX-6: Units
units = obx.get_field(6)
# OBX-7: Reference Range
reference_range = obx.get_field(7)
# OBX-8: Abnormal Flags ('H' = High, 'L' = Low, 'N' = Normal)
abnormal_flag = obx.get_field(8)
# OBX-11: Observation Result Status
# 'F' = Final, 'P' = Preliminary, 'C' = Corrected
status = obx.get_field(11)
# OBX-14: Date/Time of Observation
observed_datetime = obx.get_field(14)
results.append({
'loinc_code': loinc_code,
'test_name': test_name,
'value': value,
'value_type': value_type,
'units': units,
'reference_range': reference_range,
'is_abnormal': abnormal_flag in ['H', 'L', 'HH', 'LL'],
'abnormal_flag': abnormal_flag,
'status': status,
'observed_at': observed_datetime
})
return results
def generate_ack(self, original_message: 'HL7Message', ack_code: str = 'AA') -> str:
"""
Generate ACK (acknowledgment) message.
ack_code:
AA = Application Accept
AE = Application Error
AR = Application Reject
"""
msh = original_message.get_segment('MSH')
# Swap sender/receiver
sending_app = msh.get_field(3)
sending_facility = msh.get_field(4)
receiving_app = msh.get_field(5)
receiving_facility = msh.get_field(6)
message_control_id = msh.get_field(10)
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
ack_message = (
f"MSH|^~\\&|{receiving_app}|{receiving_facility}|{sending_app}|{sending_facility}|{timestamp}||ACK|ACK{message_control_id}|P|2.5\r"
f"MSA|{ack_code}|{message_control_id}\r"
)
return ack_message
# Example usage
hl7_adt_a01 = """
MSH|^~\\&|HIS|HOSPITAL|LAB|LABSYS|20260417120000||ADT^A01|MSG00001|P|2.5
EVN|A01|20260417120000
PID|1||MRN123456^^^HOSPITAL^MR||Nguyen^Van^A||19850315|M|||45 Le Loi St^^HCMC^SGN^700000^VN|||||||123456789
PV1|1|I|ICU^201^01||||DOC001^Tran^Thi^Bich|||ICU|||||||DOC001|Emergency||||||||||||||||||||HOSPITAL|||||20260417080000
"""
parser = HL7Message(hl7_adt_a01)
patient = parser.extract_patient_info()
print(f"Patient: {patient['first_name']} {patient['last_name']}, MRN: {patient['mrn']}")
# Generate ACK
ack = parser.generate_ack(parser, 'AA')
print(ack)
4.4 Integration Engine (Mirth Connect)
// Mirth Connect JavaScript transformer
// Convert HL7 ADT^A01 to FHIR Patient resource
// Input: HL7 ADT message
// Output: FHIR Patient JSON
var pid = msg['PID'];
var pv1 = msg['PV1'];
// Extract patient demographics
var mrn = pid['PID.3']['PID.3.1'].toString();
var lastName = pid['PID.5']['PID.5.1'].toString();
var firstName = pid['PID.5']['PID.5.2'].toString();
var dob = pid['PID.7']['PID.7.1'].toString(); // YYYYMMDD
var gender = pid['PID.8']['PID.8.1'].toString(); // M/F
// Transform gender code
var fhirGender = gender === 'M' ? 'male' : gender === 'F' ? 'female' : 'unknown';
// Format date of birth (YYYYMMDD → YYYY-MM-DD)
var formattedDob = dob.substring(0, 4) + '-' + dob.substring(4, 6) + '-' + dob.substring(6, 8);
// Build FHIR Patient resource
var fhirPatient = {
"resourceType": "Patient",
"identifier": [
{
"system": "urn:oid:2.16.840.1.113883.19.5",
"value": mrn,
"type": {
"coding": [
{
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
"code": "MR",
"display": "Medical Record Number"
}
]
}
}
],
"name": [
{
"use": "official",
"family": lastName,
"given": [firstName]
}
],
"gender": fhirGender,
"birthDate": formattedDob,
"active": true
};
// Output transformed message
return JSON.stringify(fhirPatient, null, 2);
5. FHIR — Fast Healthcare Interoperability Resources
FHIR (phát âm là "fire") là chuẩn hiện đại thay thế HL7 v2, sử dụng RESTful API, JSON/XML, và resource-based model.
5.1 FHIR Core Concepts
Resource: Đơn vị dữ liệu cơ bản (Patient, Observation, Medication, etc.)
{
"resourceType": "Patient",
"id": "example",
"meta": {
"versionId": "1",
"lastUpdated": "2026-04-17T10:30:00Z"
},
"identifier": [
{
"system": "http://hospital.org/mrn",
"value": "MRN123456"
}
],
"active": true,
"name": [
{
"use": "official",
"family": "Nguyen",
"given": ["Van", "A"]
}
],
"gender": "male",
"birthDate": "1985- 03-15",
"address": [
{
"use": "home",
"line": ["45 Le Loi Street"],
"city": "Ho Chi Minh City",
"state": "SGN",
"postalCode": "700000",
"country": "VN"
}
],
"telecom": [
{
"system": "phone",
"value": "+84-28-1234-5678",
"use": "home"
},
{
"system": "email",
"value": "patient@example.com",
"use": "home"
}
]
}
Common FHIR Resources:
- Patient: Demographics and administrative information
- Practitioner: Healthcare professional (doctor, nurse, etc.)
- Observation: Measurements and simple assertions (vitals, lab tests)
- Condition: Clinical conditions, problems, diagnoses
- Procedure: Performed procedures
- MedicationRequest: Prescription or order
- MedicationAdministration: Record of medication given
- Encounter: Interaction between patient and healthcare provider
- AllergyIntolerance: Allergy or adverse reaction risk
- DiagnosticReport: Lab report, radiology report
- DocumentReference: Clinical document (CDA, PDF)
5.2 FHIR RESTful API
# Create (POST)
POST /fhir/Patient HTTP/1.1
Host: api.hospital.com
Content-Type: application/fhir+json
Authorization: Bearer <token>
{
"resourceType": "Patient",
"name": [{"family": "Nguyen", "given": ["Van", "A"]}],
"gender": "male",
"birthDate": "1985-03-15"
}
# Read (GET)
GET /fhir/Patient/123 HTTP/1.1
# Update (PUT)
PUT /fhir/Patient/123 HTTP/1.1
Content-Type: application/fhir+json
{
"resourceType": "Patient",
"id": "123",
...
}
# Delete (DELETE)
DELETE /fhir/Patient/123 HTTP/1.1
# Search (GET with query params)
GET /fhir/Patient?family=Nguyen&birthdate=1985-03-15 HTTP/1.1
# Search with POST (for complex queries)
POST /fhir/Patient/_search HTTP/1.1
Content-Type: application/x-www-form-urlencoded
family=Nguyen&birthdate=ge1980-01-01
FHIR Search Parameters:
# Common search parameters
_id=123 # by ID
_lastUpdated=gt2026-01-01 # updated after date
_tag=http://example.org|vip # by tag
_profile=http://hl7.org/fhir/us/core/Patient # by profile
# Resource-specific parameters (Patient)
family=Nguyen # family name
given=Van # given name
birthdate=1985-03-15 # exact birthdate
birthdate=ge1980 # greater than or equal
gender=male # gender
identifier=MRN123456 # identifier value
# Modifiers
family:exact=Nguyen # exact match (case sensitive)
name:contains=nguyen # substring match
birthdate=le2000-12-31 # less than or equal
# Composite parameters
_has:Observation:patient:code=http://loinc.org|8480-6
# Patients with blood pressure observations
# Prefixes for numbers and dates
eq = equal
ne = not equal
gt = greater than
lt = less than
ge = greater than or equal
le = less than or equal
sa = starts after
eb = ends before
5.3 FHIR Client Implementation
import requests
from typing import List, Dict, Optional
from datetime import datetime
import json
class FHIRClient:
"""
FHIR R4 client implementation.
Supports SMART on FHIR authentication.
"""
def __init__(self, base_url: str, access_token: Optional[str] = None):
self.base_url = base_url.rstrip('/')
self.access_token = access_token
self.session = requests.Session()
if access_token:
self.session.headers.update({
'Authorization': f'Bearer {access_token}',
'Accept': 'application/fhir+json',
'Content-Type': 'application/fhir+json'
})
def create(self, resource: Dict) -> Dict:
"""Create a new resource (POST)."""
resource_type = resource['resourceType']
url = f"{self.base_url}/{resource_type}"
response = self.session.post(url, json=resource)
response.raise_for_status()
return response.json()
def read(self, resource_type: str, resource_id: str) -> Dict:
"""Read a resource by ID (GET)."""
url = f"{self.base_url}/{resource_type}/{resource_id}"
response = self.session.get(url)
response.raise_for_status()
return response.json()
def update(self, resource: Dict) -> Dict:
"""Update an existing resource (PUT)."""
resource_type = resource['resourceType']
resource_id = resource['id']
url = f"{self.base_url}/{resource_type}/{resource_id}"
response = self.session.put(url, json=resource)
response.raise_for_status()
return response.json()
def delete(self, resource_type: str, resource_id: str) -> bool:
"""Delete a resource (DELETE)."""
url = f"{self.base_url}/{resource_type}/{resource_id}"
response = self.session.delete(url)
response.raise_for_status()
return response.status_code == 204
def search(self, resource_type: str, params: Dict) -> List[Dict]:
"""
Search for resources with query parameters.
Returns list of matching resources.
"""
url = f"{self.base_url}/{resource_type}"
response = self.session.get(url, params=params)
response.raise_for_status()
bundle = response.json()
# Extract resources from Bundle
resources = []
if bundle.get('entry'):
resources = [entry['resource'] for entry in bundle['entry']]
return resources
def search_by_patient(self, resource_type: str, patient_id: str) -> List[Dict]:
"""Search for resources associated with a patient."""
return self.search(resource_type, {'patient': patient_id})
def get_patient_observations(
self,
patient_id: str,
code: Optional[str] = None,
date_range: Optional[tuple] = None
) -> List[Dict]:
"""
Get observations for a patient with optional filtering.
Args:
patient_id: Patient resource ID
code: LOINC code (e.g., '8480-6' for systolic BP)
date_range: Tuple of (start_date, end_date)
"""
params = {'patient': patient_id}
if code:
params['code'] = f"http://loinc.org|{code}"
if date_range:
start, end = date_range
params['date'] = f"ge{start}&date=le{end}"
# Sort by date descending
params['_sort'] = '-date'
return self.search('Observation', params)
def create_observation(
self,
patient_id: str,
loinc_code: str,
display: str,
value: float,
unit: str,
performer_id: str
) -> Dict:
"""
Create a new observation (e.g., vital sign, lab result).
"""
observation = {
"resourceType": "Observation",
"status": "final",
"category": [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/observation-category",
"code": "vital-signs",
"display": "Vital Signs"
}]
}],
"code": {
"coding": [{
"system": "http://loinc.org",
"code": loinc_code,
"display": display
}]
},
"subject": {
"reference": f"Patient/{patient_id}"
},
"performer": [{
"reference": f"Practitioner/{performer_id}"
}],
"effectiveDateTime": datetime.now().isoformat(),
"valueQuantity": {
"value": value,
"unit": unit,
"system": "http://unitsofmeasure.org",
"code": unit
}
}
return self.create(observation)
# Example usage
fhir_client = FHIRClient(
base_url='https://api.hospital.com/fhir',
access_token='your_access_token_here'
)
# Create a blood pressure observation
bp_obs = fhir_client.create_observation(
patient_id='123',
loinc_code='8480-6',
display='Systolic blood pressure',
value=120,
unit='mmHg',
performer_id='doctor-456'
)
# Search for all observations for a patient
observations = fhir_client.get_patient_observations(
patient_id='123',
date_range=('2026-01-01', '2026-04-17')
)
print(f"Found {len(observations)} observations")
for obs in observations:
code = obs['code']['coding'][0]['display']
value = obs.get('valueQuantity', {}).get('value', 'N/A')
unit = obs.get('valueQuantity', {}).get('unit', '')
print(f" {code}: {value} {unit}")
5.4 FHIR Bundles — Transaction and Batch Operations
def create_patient_with_observations(
patient_data: Dict,
observations: List[Dict]
) -> Dict:
"""
Use FHIR Bundle to create patient and related observations atomically.
"""
bundle = {
"resourceType": "Bundle",
"type": "transaction",
"entry": []
}
# Add patient as first entry
bundle['entry'].append({
"fullUrl": "urn:uuid:patient-temp-id",
"resource": patient_data,
"request": {
"method": "POST",
"url": "Patient"
}
})
# Add observations referencing the temporary patient ID
for obs in observations:
obs['subject'] = {"reference": "urn:uuid:patient-temp-id"}
bundle['entry'].append({
"resource": obs,
"request": {
"method": "POST",
"url": "Observation"
}
})
# POST bundle to server (all-or-nothing transaction)
response = requests.post(
'https://api.hospital.com/fhir',
json=bundle,
headers={
'Content-Type': 'application/fhir+json',
'Authorization': 'Bearer <token>'
}
)
return response.json()
6. Data Security — Encryption, Access Control, De-identification
6.1 De-identification for Research
HIPAA Safe Harbor method — remove 18 identifiers:
import re
from typing import Dict
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
class PHIDeidentifier:
"""
De-identify PHI for research purposes.
Two methods:
1. Safe Harbor: Remove 18 HIPAA identifiers
2. Expert Determination: Statistical guarantee of re-identification risk < threshold
"""
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
def remove_identifiers_safe_harbor(self, text: str) -> str:
"""
Remove all 18 HIPAA Safe Harbor identifiers.
"""
# Name detection
text = self._redact_names(text)
# Dates (except year)
text = self._generalize_dates(text)
# Phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE]', text)
# Email addresses
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', text)
# SSN
text = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', text)
# MRN (assuming format MRN######)
text = re.sub(r'\bMRN\d+\b', '[MRN]', text, flags=re.IGNORECASE)
# IP addresses
text = re.sub(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', '[IP]', text)
# URLs
text = re.sub(r'https?://[^\s]+', '[URL]', text)
# Geographic subdivisions smaller than state (zip codes)
text = self._generalize_zip_codes(text)
return text
def _generalize_dates(self, text: str) -> str:
"""
Replace specific dates with just year (except for those > 89 years old).
"""
# Match dates in various formats
date_pattern = r'\b(\d{1,2}[-/]\d{1,2}[-/])(\d{4})\b'
def replace_date(match):
year = match.group(2)
# For patients > 89 years old, even year should be redacted
if int(year) < 1937: # 2026 - 89 = 1937
return '[DATE]'
return year
return re.sub(date_pattern, replace_date, text)
def _generalize_zip_codes(self, text: str) -> str:
"""
Replace zip codes with first 3 digits only (unless population < 20,000).
"""
def replace_zip(match):
zip_code = match.group()
# In production, check census data for population
# If population < 20,000, return '000'
return zip_code[:3] + '00'
return re.sub(r'\b\d{5}(?:-\d{4})?\b', replace_zip, text)
def _redact_names(self, text: str) -> str:
"""Use NER model to detect and redact names."""
analyzer_results = self.analyzer.analyze(
text=text,
entities=["PERSON"],
language='en'
)
anonymized_text = self.anonymizer.anonymize(
text=text,
analyzer_results=analyzer_results
)
return anonymized_text.text
def k_anonymize(self, dataframe, quasi_identifiers: List[str], k: int = 5):
"""
K-anonymization: Ensure each combination of quasi-identifiers
appears at least k times in dataset.
Args:
dataframe: pandas DataFrame with patient data
quasi_identifiers: columns that could re-identify (age, zip, gender)
k: minimum group size
"""
# Group by quasi-identifiers
grouped = dataframe.groupby(quasi_identifiers)
# Suppress or generalize groups smaller than k
filtered = grouped.filter(lambda x: len(x) >= k)
return filtered
6.2 Audit Logging — Comprehensive Tracking
-- Audit log schema (immutable, append-only)
CREATE TABLE phi_access_log (
log_id BIGSERIAL PRIMARY KEY,
event_id UUID UNIQUE NOT NULL DEFAULT gen_random_uuid(),
-- Who
user_id UUID NOT NULL,
user_type VARCHAR(50) NOT NULL, -- 'PHYSICIAN', 'NURSE', 'ADMIN', 'SYSTEM'
session_id UUID NOT NULL,
-- What
action VARCHAR(50) NOT NULL, -- 'VIEW', 'CREATE', 'UPDATE', 'DELETE', 'EXPORT'
resource_type VARCHAR(100) NOT NULL, -- 'PATIENT_RECORD', 'LAB_RESULT', etc.
resource_id UUID NOT NULL,
patient_id UUID NOT NULL, -- Denormalized for fast patient audit queries
-- When
timestamp TIMESTAMPTZ NOT NULL DEFAULT now(),
-- Where
ip_address INET NOT NULL,
user_agent TEXT,
facility_id UUID,
department_id UUID,
workstation_id VARCHAR(100),
-- Why (optional but recommended)
access_reason TEXT,
is_break_glass BOOLEAN DEFAULT FALSE,
-- What changed (for UPDATE/DELETE)
old_value JSONB,
new_value JSONB,
changed_fields TEXT[],
-- Result
was_successful BOOLEAN NOT NULL,
failure_reason TEXT,
-- Context
correlation_id UUID, -- Link related actions (e.g., create encounter + observations)
parent_log_id BIGINT, -- For nested operations
CONSTRAINT chk_action CHECK (action IN (
'VIEW', 'CREATE', 'UPDATE', 'DELETE', 'EXPORT',
'SEARCH', 'PRINT', 'EMAIL', 'FAX', 'DOWNLOAD'
))
);
-- Indexes for common queries
CREATE INDEX idx_phi_access_log_patient ON phi_access_log(patient_id, timestamp DESC);
CREATE INDEX idx_phi_access_log_user ON phi_access_log(user_id, timestamp DESC);
CREATE INDEX idx_phi_access_log_timestamp ON phi_access_log(timestamp DESC);
CREATE INDEX idx_phi_access_log_break_glass ON phi_access_log(is break_glass) WHERE is_break_glass = TRUE;
-- Partitioning for performance (partition by month)
CREATE TABLE phi_access_log_2026_04 PARTITION OF phi_access_log
FOR VALUES FROM ('2026-04-01') TO ('2026-05-01');
-- Retention policy: Keep audit logs for 6 years minimum (HIPAA requirement)
CREATE OR REPLACE FUNCTION archive_old_audit_logs() RETURNS void AS $
BEGIN
-- Move logs older than 6 years to archive storage
INSERT INTO phi_access_log_archive
SELECT * FROM phi_access_log
WHERE timestamp < now() - INTERVAL '6 years';
DELETE FROM phi_access_log
WHERE timestamp < now() - INTERVAL '6 years';
END;
$ LANGUAGE plpgsql;
Audit logging middleware:
from functools import wraps
from flask import request, g
import logging
import json
def audit_phi_access(action: str, resource_type: str):
"""
Decorator for auditing PHI access in Flask routes.
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Capture context before executing function
user_id = g.current_user.id
session_id = g.session_id
ip_address = request.remote_addr
user_agent = request.user_agent.string
timestamp = datetime.now()
# Extract resource_id from route params
resource_id = kwargs.get('id') or kwargs.get('patient_id')
# Execute the actual function
try:
result = f(*args, **kwargs)
was_successful = True
failure_reason = None
except Exception as e:
was_successful = False
failure_reason = str(e)
raise
finally:
# Log to audit table (asynchronous to avoid blocking)
audit_entry = {
'user_id': user_id,
'session_id': session_id,
'action': action,
'resource_type': resource_type,
'resource_id': resource_id,
'timestamp': timestamp.isoformat(),
'ip_address': ip_address,
'user_agent': user_agent,
'was_successful': was_successful,
'failure_reason': failure_reason
}
# Send to audit service (async queue)
audit_queue.send_message(json.dumps(audit_entry))
# Also log to structured logger for SIEM
logging.info(
"PHI_ACCESS",
extra=audit_entry
)
return result
return decorated_function
return decorator
# Usage
@app.route('/api/patients/<patient_id>/records', methods=['GET'])
@audit_phi_access(action='VIEW', resource_type='PATIENT_RECORD')
def get_patient_records(patient_id):
# Your logic here
records = db.query(f"SELECT * FROM records WHERE patient_id = {patient_id}")
return jsonify(records)
6.3 Breach Detection and Response
from typing import List
from datetime import datetime, timedelta
class BreachDetector:
"""
Detect potential HIPAA breaches from audit logs.
Suspicious patterns:
1. Unusual access volume
2. Access outside normal working hours
3. Access to VIP/celebrity patients
4. Bulk exports
5. Access from unusual locations
6. Multiple failed access attempts
"""
def detect_unusual_access_volume(self, user_id: str, threshold: int = 100):
"""
Alert if user accesses more than threshold patients in 24 hours.
"""
query = """
SELECT COUNT(DISTINCT patient_id) as patient_count
FROM phi_access_log
WHERE user_id = %s
AND timestamp > now() - INTERVAL '24 hours'
AND was_successful = TRUE
"""
result = db.query(query, [user_id])
patient_count = result[0]['patient_count']
if patient_count > threshold:
self.raise_alert(
severity='HIGH',
message=f"User {user_id} accessed {patient_count} patients in 24h",
recommendation="Investigate potential data exfiltration"
)
def detect_vip_access(self, user_id: str, patient_id: str) -> bool:
"""
Check if user has legitimate reason to access VIP patient.
"""
# Check if patient is VIP
is_vip = db.query("SELECT is_vip FROM patients WHERE patient_id = %s", [patient_id])
if not is_vip:
return True # Not VIP, no special handling
# Check if user is on patient's care team
is_on_care_team = db.query("""
SELECT 1 FROM care_team
WHERE patient_id = %s AND user_id = %s
""", [patient_id, user_id])
if not is_on_care_team:
self.raise_alert(
severity='CRITICAL',
message=f"Unauthorized access to VIP patient {patient_id}",
recommendation="Immediate investigation required"
)
return False
return True
def detect_after_hours_access(self, user_id: str):
"""
Alert on access outside expected working hours.
"""
query = """
SELECT *
FROM phi_access_log
WHERE user_id = %s
AND timestamp > now() - INTERVAL '7 days'
AND (
EXTRACT(HOUR FROM timestamp) < 6
OR EXTRACT(HOUR FROM timestamp) > 22
)
AND was_successful = TRUE
"""
after_hours_logs = db.query(query, [user_id])
if len(after_hours_logs) > 10:
self.raise_alert(
severity='MEDIUM',
message=f"User {user_id} has {len(after_hours_logs)} after-hours accesses",
recommendation="Verify if access was legitimate"
)
def mandatory_breach_notification(self, affected_patients: List[str]):
"""
Trigger breach notification process if breach affects > 500 patients.
HIPAA requirements:
- Notify affected individuals within 60 days
- Notify HHS (Department of Health and Human Services)
- Notify media if breach affects > 500 residents of a state
- Maintain log of breaches affecting < 500 individuals
"""
num_affected = len(affected_patients)
if num_affected >= 500:
# Major breach — immediate notification required
self.notify_hhs(affected_patients)
self.notify_affected_individuals(affected_patients)
self.notify_media_if_required(affected_patients)
else:
# Minor breach — log and aggregate for annual report
self.log_minor_breach(affected_patients)
7. Compliance Monitoring — Continuous Compliance
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum
class ComplianceStatus(Enum):
COMPLIANT = "compliant"
NON_COMPLIANT = "non_compliant"
AT_RISK = "at_risk"
UNKNOWN = "unknown"
@dataclass
class ComplianceCheck:
check_id: str
category: str # 'TECHNICAL', 'ADMINISTRATIVE', 'PHYSICAL'
description: str
status: ComplianceStatus
evidence: str
last_verified: datetime
next_verification_due: datetime
responsible_party: str
class HIPAAComplianceMonitor:
"""
Automated compliance monitoring system.
Continuously verifies HIPAA controls are in place.
"""
def run_all_checks(self) -> Dict[str, List[ComplianceCheck]]:
"""
Run all compliance checks and return results.
"""
results = {
'TECHNICAL': [],
'ADMINISTRATIVE': [],
'PHYSICAL': []
}
# Technical safeguards
results['TECHNICAL'].extend([
self.check_encryption_at_rest(),
self.check_encryption_in_transit(),
self.check_access_controls(),
self.check_audit_logging(),
self.check_session_timeouts(),
self.check_password_policies(),
self.check_mfa_enabled(),
self.check_vulnerability_scanning()
])
# Administrative safeguards
results['ADMINISTRATIVE'].extend([
self.check_risk_assessment_current(),
self.check_workforce_training_current(),
self.check_baa_in_place(),
self.check_incident_response_plan(),
self.check_disaster_recovery_plan()
])
# Physical safeguards
results['PHYSICAL'].extend([
self.check_facility_access_controls(),
self.check_workstation_security(),
self.check_device_encryption()
])
return results
def check_encryption_at_rest(self) -> ComplianceCheck:
"""Verify all PHI is encrypted at rest."""
# Check database encryption status
db_encrypted = self.verify_database_encryption()
# Check file storage encryption (S3, etc.)
storage_encrypted = self.verify_storage_encryption()
# Check backup encryption
backup_encrypted = self.verify_backup_encryption()
all_encrypted = db_encrypted and storage_encrypted and backup_encrypted
return ComplianceCheck(
check_id='TECH-001',
category='TECHNICAL',
description='All PHI encrypted at rest with AES-256',
status=ComplianceStatus.COMPLIANT if all_encrypted else ComplianceStatus.NON_COMPLIANT,
evidence=f"DB: {db_encrypted}, Storage: {storage_encrypted}, Backup: {backup_encrypted}",
last_verified=datetime.now(),
next_verification_due=datetime.now() + timedelta(days=30),
responsible_party='Security Team'
)
def check_audit_logging(self) -> ComplianceCheck:
"""Verify audit logs are being collected and retained."""
# Check if audit logging is enabled
logging_enabled = self.verify_audit_logging_enabled()
# Check if logs are retained for 6 years
retention_ok = self.verify_log_retention_policy()
# Check if logs are tamper-proof (write-once storage)
tamper_proof = self.verify_audit_log_integrity()
# Check if logs are being monitored
monitoring_active = self.verify_log_monitoring_active()
compliant = all([logging_enabled, retention_ok, tamper_proof, monitoring_active])
return ComplianceCheck(
check_id='TECH-004',
category='TECHNICAL',
description='Comprehensive audit logging of PHI access',
status=ComplianceStatus.COMPLIANT if compliant else ComplianceStatus.NON_COMPLIANT,
evidence=f"Logging: {logging_enabled}, Retention: {retention_ok}, Tamper-proof: {tamper_proof}, Monitoring: {monitoring_active}",
last_verified=datetime.now(),
next_verification_due=datetime.now() + timedelta(days=7),
responsible_party='Engineering Team'
)
def check_workforce_training_current(self) -> ComplianceCheck:
"""Verify all workforce members have current HIPAA training."""
query = """
SELECT
COUNT(*) as total_employees,
COUNT(CASE WHEN last_training_date > now() - INTERVAL '1 year' THEN 1 END) as trained
FROM employees
WHERE has_phi_access = TRUE
"""
result = db.query(query)[0]
total = result['total_employees']
trained = result['trained']
compliance_rate = (trained / total * 100) if total > 0 else 0
# Require 100% compliance
compliant = compliance_rate == 100
return ComplianceCheck(
check_id='ADMIN-002',
category='ADMINISTRATIVE',
description='Annual HIPAA training for all workforce members',
status=ComplianceStatus.COMPLIANT if compliant else ComplianceStatus.NON_COMPLIANT,
evidence=f"{trained}/{total} employees trained ({compliance_rate:.1f}%)",
last_verified=datetime.now(),
next_verification_due=datetime.now() + timedelta(days=30),
responsible_party='HR & Compliance'
)
HIPAA Compliance Checklist (Engineering Focus):
## Technical Safeguards Checklist
### Access Control (164.312(a)(1))
- [ ] Unique user identification (no shared accounts)
- [ ] Emergency access procedure (break-glass with audit)
- [ ] Automatic logoff (session timeout after inactivity)
- [ ] Encryption and decryption (PHI encrypted at rest and in transit)
### Audit Controls (164.312(b))
- [ ] Log all PHI access (who, what, when, where, why)
- [ ] Audit logs retained for 6 years minimum
- [ ] Audit logs tamper-proof (append-only)
- [ ] Regular audit log review (automated + manual)
### Integrity (164.312(c)(1))
- [ ] Detect unauthorized alterations to PHI
- [ ] Data integrity verification (checksums, digital signatures)
- [ ] Version control for clinical documents
### Transmission Security (164.312(e)(1))
- [ ] TLS 1.3 (or TLS 1.2 minimum) for all PHI transmission
- [ ] No unencrypted email of PHI
- [ ] VPN for remote access
- [ ] Encrypt all portable media (USB drives, laptops)
### Additional Technical Controls
- [ ] Multi-factor authentication (MFA) for remote access
- [ ] Role-based access control (RBAC) implemented
- [ ] Principle of least privilege enforced
- [ ] Regular vulnerability scanning
- [ ] Penetration testing annually
- [ ] Patch management process (apply security patches within 30 days)
- [ ] Antivirus/antimalware on all endpoints
- [ ] DLP (Data Loss Prevention) to prevent PHI exfiltration
- [ ] Network segmentation (separate PHI systems)
- [ ] IDS/IPS (Intrusion Detection/Prevention)
8. Interoperability Challenges
8.1 Data Mapping — HL7 to FHIR
class HL7toFHIRMapper:
"""
Map HL7 v2 messages to FHIR resources.
Challenges:
- HL7 v2 is flexible (too flexible) — many local variations
- Field meanings vary by implementation
- No universal identifier system
- Timezone handling
"""
def map_adt_to_bundle(self, hl7_message: HL7Message) -> Dict:
"""
Convert ADT^A01 (patient admission) to FHIR Bundle with:
- Patient resource
- Encounter resource
- Condition resources (problems)
"""
msh = hl7_message.get_segment('MSH')
pid = hl7_message.get_segment('PID')
pv1 = hl7_message.get_segment('PV1')
bundle = {
"resourceType": "Bundle",
"type": "transaction",
"entry": []
}
# Map Patient
patient = self.map_patient(pid)
bundle['entry'].append({
"fullUrl": f"urn:uuid:{patient['id']}",
"resource": patient,
"request": {"method": "POST", "url": "Patient"}
})
# Map Encounter
encounter = self.map_encounter(pv1, patient['id'])
bundle['entry'].append({
"fullUrl": f"urn:uuid:{encounter['id']}",
"resource": encounter,
"request": {"method": "POST", "url": "Encounter"}
})
return bundle
def map_patient(self, pid: HL7Segment) -> Dict:
"""Map PID segment to FHIR Patient resource."""
# PID-3: Patient Identifier List
mrn = pid.get_component(3, 0)
mrn_system = pid.get_component(3, 3) or "local"
# PID-5: Patient Name
family = pid.get_component(5, 0)
given = pid.get_component(5, 1)
# PID-7: Date of Birth (YYYYMMDD)
dob_str = pid.get_field(7)
dob = f"{dob_str[0:4]}-{dob_str[4:6]}-{dob_str[6:8]}" if dob_str else None
# PID-8: Administrative Sex
gender_map = {'M': 'male', 'F': 'female', 'O': 'other', 'U': 'unknown'}
gender = gender_map.get(pid.get_field(8), 'unknown')
return {
"resourceType": "Patient",
"id": str(uuid.uuid4()),
"identifier": [{
"system": f"urn:oid:{mrn_system}",
"value": mrn,
"type": {
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v2-0203",
"code": "MR"
}]
}
}],
"name": [{
"use": "official",
"family": family,
"given": [given]
}],
"gender": gender,
"birthDate": dob
}
def map_encounter(self, pv1: HL7Segment, patient_id: str) -> Dict:
"""Map PV1 segment to FHIR Encounter resource."""
# PV1-2: Patient Class (I=Inpatient, O=Outpatient, E=Emergency)
class_map = {
'I': 'IMP', # Inpatient encounter
'O': 'AMB', # Ambulatory
'E': 'EMER', # Emergency
}
patient_class = class_map.get(pv1.get_field(2), 'AMB')
# PV1-3: Assigned Patient Location
location = pv1.get_field(3)
room = pv1.get_component(3, 1)
bed = pv1.get_component(3, 2)
# PV1-7: Attending Doctor
attending_id = pv1.get_component(7, 0)
# PV1-44: Admit Date/Time
admit_datetime = pv1.get_field(44)
return {
"resourceType": "Encounter",
"id": str(uuid.uuid4()),
"status": "in-progress",
"class": {
"system": "http://terminology.hl7.org/CodeSystem/v3-ActCode",
"code": patient_class
},
"subject": {
"reference": f"urn:uuid:{patient_id}"
},
"participant": [{
"type": [{
"coding": [{
"system": "http://terminology.hl7.org/CodeSystem/v3-ParticipationType",
"code": "ATND",
"display": "attender"
}]
}],
"individual": {
"reference": f"Practitioner/{attending_id}"
}
}],
"period": {
"start": self.parse_hl7_datetime(admit_datetime)
},
"location": [{
"location": {
"display": f"Room {room}, Bed {bed}"
}
}]
}
def parse_hl7_datetime(self, hl7_datetime: str) -> str:
"""
Convert HL7 datetime format to ISO 8601.
HL7: YYYYMMDDHHmmss[.SSSS][+/-ZZZZ]
ISO: YYYY-MM-DDTHH:mm:ss±HH:MM
"""
if not hl7_datetime:
return None
# Parse components
year = hl7_datetime[0:4]
month = hl7_datetime[4:6]
day = hl7_datetime[6:8]
hour = hl7_datetime[8:10] if len(hl7_datetime) > 8 else "00"
minute = hl7_datetime[10:12] if len(hl7_datetime) > 10 else "00"
second = hl7_datetime[12:14] if len(hl7_datetime) > 12 else "00"
# TODO: Parse timezone offset
return f"{year}-{month}-{day}T{hour}:{minute}:{second}Z"
8.2 Vendor Integration — Epic, Cerner APIs
import requests
from typing import Dict, Optional
class EpicFHIRClient:
"""
Epic FHIR API client (Epic is the largest EHR vendor in US).
Epic uses SMART on FHIR for OAuth 2.0 authentication.
"""
def __init__(self, base_url: str, client_id: str, client_secret: str):
self.base_url = base_url
self.client_id = client_id
self.client_secret = client_secret
self.access_token = None
def get_authorization_url(self, redirect_uri: str, state: str) -> str:
"""
Step 1: Get authorization URL for OAuth 2.0 flow.
User will be redirected to Epic login page.
"""
# Discover OAuth endpoints from .well-known/smart-configuration
smart_config = requests.get(f"{self.base_url}/.well-known/smart-configuration").json()
authorize_url = smart_config['authorization_endpoint']
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': redirect_uri,
'scope': 'patient/Patient.read patient/Observation.read launch/patient',
'state': state,
'aud': self.base_url
}
return f"{authorize_url}?{'&'.join([f'{k}={v}' for k, v in params.items()])}"
def exchange_code_for_token(self, code: str, redirect_uri: str) -> Dict:
"""
Step 2: Exchange authorization code for access token.
"""
smart_config = requests.get(f"{self.base_url}/.well-known/smart-configuration").json()
token_url = smart_config['token_endpoint']
data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_uri,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = requests.post(token_url, data=data)
response.raise_for_status()
token_response = response.json()
self.access_token = token_response['access_token']
return token_response
def get_patient(self, patient_id: str) -> Dict:
"""Get Patient resource from Epic."""
headers = {
'Authorization': f'Bearer {self.access_token}',
'Accept': 'application/fhir+json'
}
response = requests.get(
f"{self.base_url}/api/FHIR/R4/Patient/{patient_id}",
headers=headers
)
response.raise_for_status()
return response.json()
class CernerFHIRClient:
"""
Cerner FHIR API client (second largest EHR vendor).
Cerner also uses SMART on FHIR, similar to Epic.
"""
# Implementation similar to EpicFHIRClient
pass
9. Interview Questions — Design and Trade-offs
Q1: Design a HIPAA-compliant EHR system
Requirements:
- Support 1000+ concurrent users
- Store patient demographics, encounters, medications, lab results
- Comply with HIPAA Security Rule
- 99.9% uptime SLA
- Sub-200ms API response time
Solution Outline:
Architecture:
┌─────────────────────────────────────────────────────────────┐
│ Load Balancer (AWS ALB) + WAF │
│ - TLS termination │
│ - DDoS protection │
└──────────────┬──────────────────────────────────────────────┘
│
┌──────────┴──────────┐
│ │
┌───▼────┐ ┌─────▼────┐
│API GW 1│ │ API GW 2 │ (Auto-scaling)
└───┬────┘ └─────┬────┘
│ │
└──────────┬──────────┘
│
┌──────────▼──────────────────────────────────┐
│ Application Servers (Kubernetes) │
│ - PHI encryption/decryption │
│ - RBAC enforcement │
│ - Audit logging │
└──────────┬──────────────┬───────────────────┘
│ │
┌───────▼─────┐ ┌────▼──────────┐
│ Primary │ │ Read │
│ DB (RDS) │◄──┤ Replicas │
│ (PHI) │ │ │
└─────────────┘ └───────────────┘
│
┌───────▼──────────┐
│ Audit DB │
│ (PostgreSQL) │
│ (append-only) │
└──────────────────┘
Key Design Decisions:
- Database Encryption: TDE (Transparent Data Encryption) + application-layer encryption
- Access Control: JWT tokens with short expiry (15 min), RBAC with attribute-based conditions
- Audit Logging: Asynchronous logging to separate database, streamed to SIEM
- High Availability: Multi-AZ deployment, automated failover, regular DR drills
- Performance: Redis cache for read-heavy data (metadata, code tables), connection pooling
- Compliance: Automated compliance checks, regular penetration testing, SOC 2 certification
Trade-offs:
- Security vs Performance: Encryption adds latency (~5-10ms), accept for compliance
- Consistency vs Availability: Use strong consistency for writes, eventual consistency for reads from replicas
- Cost vs Compliance: Expensive HSM for key management, but necessary for HIPAA
Q2: How would you handle a patient merging two duplicate accounts?
Challenge: Detect that Patient A and Patient B are actually the same person, merge all PHI while maintaining audit trail.
Solution:
def merge_patients(keep_patient_id: str, merge_patient_id: str, operator_id: str) -> bool:
"""
Merge duplicate patient records while maintaining complete audit trail.
Steps:
1. Verify operator has authority to merge
2. Lock both patient records
3. Merge data from merge_patient → keep_patient
4. Mark merge_patient as merged (DON'T delete — maintains referential integrity)
5. Create merge audit record
6. Notify all systems of merge
"""
# 1. Authorization check
if not has_permission(operator_id, 'PATIENT_MERGE'):
raise PermissionError("User not authorized to merge patients")
# 2. Begin transaction
with db.transaction():
# Lock both patients
keep_patient = db.query("SELECT * FROM patients WHERE patient_id = %s FOR UPDATE", [keep_patient_id])
merge_patient = db.query("SELECT * FROM patients WHERE patient_id = %s FOR UPDATE", [merge_patient_id])
# 3. Merge demographics (prefer most complete record)
merged_demographics = merge_demographics_fields(keep_patient, merge_patient)
db.execute("UPDATE patients SET ... WHERE patient_id = %s", [keep_patient_id])
# 4. Reassign all linked records
tables = [
'encounters', 'observations', 'medication_orders',
'diagnoses', 'procedures', 'allergies', 'lab_results'
]
for table in tables:
db.execute(f"""
UPDATE {table}
SET patient_id = %s,
updated_at = now(),
updated_by = %s,
update_reason = 'PATIENT_MERGE'
WHERE patient_id = %s
""", [keep_patient_id, operator_id, merge_patient_id])
# 5. Mark merge_patient as merged (DON'T delete)
db.execute("""
UPDATE patients
SET is_merged = TRUE,
merged_into_patient_id = %s,
merged_at = now(),
merged_by = %s
WHERE patient_id = %s
""", [keep_patient_id, operator_id, merge_patient_id])
# 6. Create merge audit record
db.execute("""
INSERT INTO patient_merge_history
(keep_patient_id, merge_patient_id, merged_by, merged_at, reason)
VALUES (%s, %s, %s, now(), %s)
""", [keep_patient_id, merge_patient_id, operator_id, "Duplicate record"])
# 7. Publish event to all integrated systems
event_bus.publish('PatientMerged', {
'keep_patient_id': keep_patient_id,
'merge_patient_id': merge_patient_id
})
return True
Q3: Design data retention policy balancing HIPAA requirements with GDPR "right to be forgotten"
Conflict:
- HIPAA: Retain medical records for 6 years minimum (some states require longer)
- GDPR: Patients have "right to erasure" of personal data
Solution:
class DataRetentionPolicy:
"""
Implement compliant data retention balancing HIPAA and GDPR.
Strategy:
1. HIPAA takes precedence for active medical care
2. After retention period expires, support GDPR erasure
3. De-identify rather than delete (preserves research value)
4. Maintain minimal audit trail even after erasure
"""
def handle_erasure_request(self, patient_id: str, request_date: datetime) -> str:
"""
Process GDPR erasure request.
Returns: Status ('IMMEDIATE', 'DEFERRED', 'PARTIAL')
"""
# 1. Check if retention period has expired
last_encounter = db.query("""
SELECT MAX(discharge_time) as last_encounter
FROM encounters
WHERE patient_id = %s
""", [patient_id])[0]['last_encounter']
retention_expiry = last_encounter + timedelta(days=6*365) # 6 years
if datetime.now() < retention_expiry:
# Still within retention period — DEFER erasure
return self.defer_erasure(patient_id, retention_expiry)
# 2. Retention period expired — proceed with de-identification
return self.de_identify_patient(patient_id)
def defer_erasure(self, patient_id: str, defer_until: datetime) -> str:
"""
Log erasure request but defer until retention period expires.
"""
db.execute("""
INSERT INTO pending_erasure_requests
(patient_id, requested_at, deferred_until, reason)
VALUES (%s, now(), %s, 'HIPAA_RETENTION')
""", [patient_id, defer_until])
# Notify patient
send_notification(patient_id, f"""
Your data erasure request has been received.
Per HIPAA requirements, medical records must be retained until {defer_until.strftime('%Y-%m-%d')}.
Your data will be automatically erased after this date unless required by law.
""")
return 'DEFERRED'
def de_identify_patient(self, patient_id: str) -> str:
"""
De-identify patient data (Safe Harbor method).
This satisfies both HIPAA (data retained) and GDPR (not identifiable).
"""
with db.transaction():
# Replace identifiers with anonymous tokens
anonymous_id = str(uuid.uuid4())
# De-identify patient record
db.execute("""
UPDATE patients
SET
encrypted_first_name = %s,
encrypted_last_name = %s,
date_of_birth = date_trunc('year', date_of_birth), -- Keep year only
encrypted_ssn = NULL,
encrypted_address = NULL,
encrypted_phone = NULL,
is_de_identified = TRUE,
de_identified_at = now(),
original_patient_id = %s
WHERE patient_id = %s
""", [b'[REDACTED]', b'[REDACTED]', patient_id, patient_id])
# De-identify audit logs (remove IP, user agent)
db.execute("""
UPDATE phi_access_log
SET
ip_address = '0.0.0.0',
user_agent = '[REDACTED]'
WHERE patient_id = %s
""", [patient_id])
# Log the de-identification
db.execute("""
INSERT INTO de_identification_log
(patient_id, de_identified_at, method)
VALUES (%s, now(), 'SAFE_HARBOR')
""", [patient_id])
return 'COMPLETED'
Tóm tắt
Healthcare IT là domain đặc biệt đòi hỏi sự kết hợp giữa:
- Kiến thức kỹ thuật sâu: Encryption, access control, audit logging
- Tuân thủ pháp lý nghiêm ngặt: HIPAA có thể phạt $50K/vi phạm
- Độ tin cậy cao: Life-critical systems không được phép fail
- Interoperability: Tích hợp với legacy systems (HL7 v2) và modern APIs (FHIR)
Key Takeaways cho Senior Engineers:
- Security First: Mọi quyết định thiết kế phải xem xét impact lên security và compliance
- Audit Everything: Mọi truy cập PHI phải được log đầy đủ và tamper-proof
- Encrypt Everywhere: At rest, in transit, in memory nếu có thể
- Defense in Depth: Không tin vào một layer security duy nhất
- Prepare for Breaches: Incident response plan và breach notification process
- Understand Regulations: HIPAA không chỉ là "checkbox" — phải hiểu spirit of the law
- Balance Trade-offs: Security vs usability, compliance cost vs risk, performance vs audit overhead
Healthcare IT không dành cho faint of heart — một bug có thể gây nguy hiểm tính mạng, một lỗ hổng security có thể dẫn đến breach với mức phạt hàng triệu đô. Nhưng đó cũng là domain có impact thực sự lên cuộc sống con người, và là cơ hội để senior engineers thể hiện khả năng thiết kế hệ thống phức tạp, secure, và compliant.
Tài liệu tham khảo
Standards & Specifications:
- HL7 v2.x Specification
- FHIR R4 Documentation
- SMART on FHIR
- LOINC (Lab codes)
- SNOMED CT (Clinical terminology)
- ICD-10 (Diagnosis codes)
Regulations:
- HIPAA Privacy Rule
- HIPAA Security Rule
- HITECH Act
- NIST 800-66: HIPAA Security Rule Implementation Guide
Books:
- "HL7 for BizTalk" by Howard Edidin (good intro to HL7)
- "Learning FHIR" by Thomas Beale (comprehensive FHIR guide)
- "Information Security in Healthcare" by Rainu Kaushal
Tools & Platforms:
- Mirth Connect — Open source integration engine
- HAPI FHIR — Java-based FHIR library
- Synthea — Synthetic patient generator (for testing)
- Epic App Orchard — Epic FHIR app gallery
- Cerner Code Console — Cerner FHIR documentation