FHIR Gateway
The FHIR Gateway provides a unified interface for connecting to multiple FHIR servers with automatic authentication, connection pooling, error handling, and simplified CRUD operations. It comes in two variants:
FHIRGateway
- Synchronous FHIR client (httpx.Client
)AsyncFHIRGateway
- Asynchronous FHIR client (httpx.AsyncClient
)
Both handle the complexity of managing multiple FHIR clients and provide a consistent API across different healthcare systems.
Sync vs Async: When to Choose What
Choose sync for: Getting started, interaction with legacy systems, simpler debugging - safe for most use cases
Choose async for: High-throughput scenarios, concurrent requests, modern applications
Key difference: Async version includes connection pooling and the modify()
context manager for automatic resource saving.
Basic Usage
Synchronous Gateway
from healthchain.gateway import FHIRGateway
from fhir.resources.patient import Patient
gateway = FHIRGateway()
# Connect to FHIR server
gateway.add_source(
"my_fhir_server",
"fhir://fhir.example.com/api/FHIR/R4/?client_id=your_app&client_secret=secret&token_url=https://fhir.example.com/oauth2/token"
)
with gateway:
# FHIR operations
patient = gateway.read(Patient, "123", "my_fhir_server")
print(f"Patient: {patient.name[0].family}")
Asynchronous Gateway
import asyncio
from healthchain.gateway import AsyncFHIRGateway
from fhir.resources.patient import Patient
gateway = AsyncFHIRGateway()
# Connect to FHIR server
gateway.add_source(
"my_fhir_server",
"fhir://fhir.example.com/api/FHIR/R4/?client_id=your_app&client_secret=secret&token_url=https://fhir.example.com/oauth2/token"
)
async with gateway:
# Fetch multiple resources concurrently
tasks = [
gateway.read(Patient, "123", "my_fhir_server"),
gateway.read(Patient, "456", "my_fhir_server"),
gateway.read(Patient, "789", "my_fhir_server")
]
patients = await asyncio.gather(*tasks)
for patient in patients:
print(f"Patient: {patient.name[0].family}")
Adding Sources 🏥
The gateway currently supports adding sources with OAuth2 authentication flow.
# Epic Sandbox (JWT assertion)
gateway.add_source(
"epic",
(
"fhir://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4/"
"?client_id=your_app"
"&client_secret_path=keys/private.pem"
"&token_url=https://fhir.epic.com/interconnect-fhir-oauth/oauth2/token"
"&use_jwt_assertion=true"
)
)
# Medplum (Client Credentials)
gateway.add_source(
"medplum",
(
"fhir://api.medplum.com/fhir/R4/"
"?client_id=your_app"
"&client_secret=secret"
"&token_url=https://api.medplum.com/oauth2/token"
"&scope=openid"
)
)
For more information on configuring specific FHIR servers
Epic FHIR API:
- Epic on FHIR Documentation
- Epic OAuth2 Setup
- Test Patients in Epic Sandbox
- Useful Epic Sandbox Setup Guide
Medplum FHIR API:
General Resources:
Connection String Format
Connection strings use the fhir://
scheme with query parameters:
Required Parameters:
client_id
: OAuth2 client IDtoken_url
: OAuth2 token endpoint
Optional Parameters:
client_secret
: OAuth2 client secret (for client credentials flow)client_secret_path
: Path to private key file (for JWT assertion)scope
: OAuth2 scope (default: "system/*.read system/*.write
")use_jwt_assertion
: Use JWT assertion flow (default: false)audience
: Token audience (for some servers)
FHIR Operations 🔥
Note
These examples assume you have already created and configured your gateway as shown in the Basic Usage section above.
Create Resources
from fhir.resources.patient import Patient
from fhir.resources.humanname import HumanName
# Create a new patient
patient = Patient(
name=[HumanName(family="Smith", given=["John"])],
gender="male",
birthDate="1990-01-01"
)
created_patient = gateway.create(resource=patient, source="medplum")
print(f"Created patient with ID: {created_patient.id}")
Read Resources
from fhir.resources.patient import Patient
# Read a specific patient (Derrick Lin, Epic Sandbox)
patient = gateway.read(
resource_type=Patient,
fhir_id="eq081-VQEgP8drUUqCWzHfw3",
source="epic"
)
Update Resources
from fhir.resources.patient import Patient
# Read, modify, and update (async)
patient = await gateway.read(Patient, "123", "medplum")
patient.name[0].family = "Johnson"
updated_patient = await gateway.update(patient, "medplum")
# Using async context manager - automatically saves on exit
async with gateway.modify(Patient, "123", "medplum") as patient:
patient.active = True
patient.name[0].given = ["Jane"]
# Automatic save on exit
Delete Resources
from fhir.resources.patient import Patient
# Delete a patient
success = gateway.delete(Patient, "123", "medplum")
if success:
print("Patient deleted successfully")
Search Operations
Basic Search
from fhir.resources.patient import Patient
from fhir.resources.bundle import Bundle
# Search by name
search_params = {"family": "Smith", "given": "John"}
results: Bundle = gateway.search(Patient, search_params, "epic")
for entry in results.entry:
patient = entry.resource
print(f"Found: {patient.name[0].family}, {patient.name[0].given[0]}")
Advanced Search
from fhir.resources.patient import Patient
# Complex search with multiple parameters
search_params = {
"birthdate": "1990-01-01",
"gender": "male",
"address-city": "Boston",
"_count": 50,
"_sort": "family"
}
results = gateway.search(Patient, search_params, "epic")
print(f"Found {len(results.entry)} patients")
Transform Handlers 🤖
Transform handlers allow you to create custom API endpoints that process and enhance FHIR resources with additional logic, AI insights, or data transformations before returning them to clients. These handlers run before the response is sent, enabling real-time data enrichment and processing.
from fhir.resources.patient import Patient
from fhir.resources.observation import Observation
@fhir_gateway.transform(Patient)
def get_enhanced_patient_summary(id: str, source: str = None) -> Patient:
"""Create enhanced patient summary with AI insights"""
# Read the patient
patient = fhir_gateway.read(Patient, id, source)
# Get lab results
lab_results = fhir_gateway.search(
resource_type=Observation,
search_params={"patient": id, "category": "laboratory"},
source=source
)
insights = nlp_pipeline.process(patient, lab_results)
# Add AI summary
patient.extension = patient.extension or []
patient.extension.append({
"url": "http://healthchain.org/fhir/summary",
"valueString": insights.summary
})
# Update the patient
fhir_gateway.update(patient, source)
return patient
# The handler is automatically called via HTTP endpoint:
# GET /fhir/transform/Patient/123?source=epic
from fhir.resources.patient import Patient
from fhir.resources.observation import Observation
@fhir_gateway.transform(Patient)
async def get_enhanced_patient_summary(id: str, source: str = None) -> Patient:
# Use the context manager to modify the patient
async with fhir_gateway.modify(Patient, id, source=source) as patient:
# Get lab results
lab_results = await fhir_gateway.search(
resource_type=Observation,
search_params={"patient": id, "category": "laboratory"},
source=source
)
insights = nlp_pipeline.process(patient, lab_results)
# Add AI summary
patient.extension = patient.extension or []
patient.extension.append({
"url": "http://healthchain.org/fhir/summary",
"valueString": insights.summary
})
return patient
# The handler is automatically called via HTTP endpoint:
# GET /fhir/transform/Patient/123?source=epic
Aggregate Handlers 🔗
Aggregate handlers allow you to combine data from multiple FHIR sources into a single resource. This is useful for creating unified views across different EHR systems or consolidating patient data from various healthcare providers.
from fhir.resources.observation import Observation
from fhir.resources.bundle import Bundle
@gateway.aggregate(Observation)
def aggregate_vitals(patient_id: str, sources: list = None) -> Bundle:
"""Aggregate vital signs from multiple sources."""
sources = sources or ["epic", "cerner"]
all_observations = []
for source in sources:
try:
results = gateway.search(
Observation,
{"patient": patient_id, "category": "vital-signs"},
source
)
processed_observations = process_observations(results)
all_observations.append(processed_observations)
except Exception as e:
print(f"Could not get vitals from {source}: {e}")
return Bundle(type="searchset", entry=[{"resource": obs} for obs in all_observations])
# The handler is automatically called via HTTP endpoint:
# GET /fhir/aggregate/Observation?patient_id=123&sources=epic&sources=cerner
from fhir.resources.observation import Observation
from fhir.resources.bundle import Bundle
@gateway.aggregate(Observation)
async def aggregate_vitals(patient_id: str, sources: list = None) -> Bundle:
"""Aggregate vital signs from multiple sources."""
sources = sources or ["epic", "cerner"]
all_observations = []
for source in sources:
try:
results = await gateway.search(
Observation,
{"patient": patient_id, "category": "vital-signs"},
source
)
processed_observations = process_observations(results)
all_observations.append(processed_observations)
except Exception as e:
print(f"Could not get vitals from {source}: {e}")
return Bundle(type="searchset", entry=[{"resource": obs} for obs in all_observations])
# The handler is automatically called via HTTP endpoint:
# GET /fhir/aggregate/Observation?patient_id=123&sources=epic&sources=cerner
Server Capabilities
- GET
/fhir/metadata
- Returns FHIR-styleCapabilityStatement
of transform and aggregate endpoints - GET
/fhir/status
- Returns Gateway status and connection health
Connection Pool Management (Async Only)
When you add a connection to a FHIR server, the async gateway will automatically add it to a connection pool to manage connections to FHIR servers.
Pool Configuration
# Create gateway with optimized connection settings
gateway = AsyncFHIRGateway(
max_connections=100, # Total connections across all sources
max_keepalive_connections=20, # Keep-alive connections per source
keepalive_expiry=30.0, # Keep connections alive for 30 seconds
)
# Add multiple sources - they share the connection pool
gateway.add_source("epic", "fhir://epic.org/...")
gateway.add_source("cerner", "fhir://cerner.org/...")
gateway.add_source("medplum", "fhir://medplum.com/...")
stats = gateway.get_pool_status()
print(stats)