Skip to content

Connectors

BaseConnector

Bases: Generic[T], ABC

Abstract base class for all connectors in the pipeline.

This class should be subclassed to create specific connectors. Subclasses must implement the input and output methods.

Source code in healthchain/io/base.py
class BaseConnector(Generic[T], ABC):
    """
    Abstract base class for all connectors in the pipeline.

    This class should be subclassed to create specific connectors.
    Subclasses must implement the input and output methods.
    """

    @abstractmethod
    def input(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert input data to the pipeline's internal format.

        Args:
            data (DataContainer[T]): The input data to be converted.

        Returns:
            DataContainer[T]: The converted data.
        """
        pass

    @abstractmethod
    def output(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert pipeline's internal format to output data.

        Args:
            data (DataContainer[T]): The data to be converted for output.

        Returns:
            DataContainer[T]: The converted output data.
        """
        pass

input(data) abstractmethod

Convert input data to the pipeline's internal format.

PARAMETER DESCRIPTION
data

The input data to be converted.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted data.

Source code in healthchain/io/base.py
@abstractmethod
def input(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert input data to the pipeline's internal format.

    Args:
        data (DataContainer[T]): The input data to be converted.

    Returns:
        DataContainer[T]: The converted data.
    """
    pass

output(data) abstractmethod

Convert pipeline's internal format to output data.

PARAMETER DESCRIPTION
data

The data to be converted for output.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted output data.

Source code in healthchain/io/base.py
@abstractmethod
def output(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert pipeline's internal format to output data.

    Args:
        data (DataContainer[T]): The data to be converted for output.

    Returns:
        DataContainer[T]: The converted output data.
    """
    pass

CdaConnector

Bases: BaseConnector

CDAConnector class for handling CDA (Clinical Document Architecture) documents.

This connector is responsible for parsing CDA documents, extracting relevant clinical data, and updating the document with new information. It serves as both an input and output connector in the pipeline.

The connector uses the InteropEngine to convert between CDA and FHIR formats, preserving the clinical content while allowing for manipulation of the data within the HealthChain pipeline.

ATTRIBUTE DESCRIPTION
engine

The interoperability engine for CDA conversions.

TYPE: InteropEngine

original_cda

The original CDA document for use in output.

TYPE: str

note_document_reference

Reference to the note document extracted from the CDA.

TYPE: DocumentReference

METHOD DESCRIPTION
input

Parses the input CDA document and extracts clinical data.

output

Updates the CDA document with new data and returns the response.

Source code in healthchain/io/cdaconnector.py
class CdaConnector(BaseConnector):
    """
    CDAConnector class for handling CDA (Clinical Document Architecture) documents.

    This connector is responsible for parsing CDA documents, extracting relevant
    clinical data, and updating the document with new information. It serves as
    both an input and output connector in the pipeline.

    The connector uses the InteropEngine to convert between CDA and FHIR formats,
    preserving the clinical content while allowing for manipulation of the data
    within the HealthChain pipeline.

    Attributes:
        engine (InteropEngine): The interoperability engine for CDA conversions.
        original_cda (str): The original CDA document for use in output.
        note_document_reference (DocumentReference): Reference to the note document
                                                    extracted from the CDA.

    Methods:
        input: Parses the input CDA document and extracts clinical data.
        output: Updates the CDA document with new data and returns the response.
    """

    def __init__(self, config_dir: str = None):
        self.engine = create_engine(config_dir=config_dir)
        self.original_cda = None
        self.note_document_reference = None

    def input(self, cda_request: CdaRequest) -> Document:
        """
        Parse the input CDA document and extract clinical data into a HealthChain Document object.

        This method takes a CdaRequest object as input, parses it using the InteropEngine to convert
        CDA to FHIR resources, and creates a Document object with the extracted data. It creates a
        DocumentReference for the original CDA XML and extracts clinical data (problems, medications,
        allergies) into FHIR resources.

        Args:
            cda_request (CdaRequest): Request object containing the CDA XML document to process.

        Returns:
            Document: A Document object containing:
                - The extracted note text as the document data
                - FHIR resources organized into appropriate lists:
                  - problem_list: List of Condition resources
                  - medication_list: List of MedicationStatement resources
                  - allergy_list: List of AllergyIntolerance resources
                - DocumentReference resources for the original CDA and extracted notes with a parent-child relationship

        Note:
            If a DocumentReference resource is found in the converted FHIR resources,
            it is assumed to contain the note text and is stored for later use.
        """
        # Store original CDA for later use
        self.original_cda = cda_request.document

        # Convert CDA to FHIR using the InteropEngine
        fhir_resources = self.engine.to_fhir(
            self.original_cda, src_format=FormatType.CDA
        )

        # Create a FHIR DocumentReference for the original CDA document
        cda_document_reference = create_document_reference(
            data=self.original_cda,
            content_type="text/xml",
            description="Original CDA Document processed by HealthChain",
            attachment_title="Original CDA document in XML format",
        )

        # Extract any DocumentReference resources for notes
        note_text = ""
        doc = Document(data=note_text)  # Create document with empty text initially

        # Create FHIR Bundle and add documents
        doc.fhir.bundle = create_bundle()
        doc.fhir.add_document_reference(cda_document_reference)

        problem_list = []
        medication_list = []
        allergy_list = []

        for resource in fhir_resources:
            if isinstance(resource, Condition):
                problem_list.append(resource)
                set_problem_list_item_category(resource)
            elif isinstance(resource, MedicationStatement):
                medication_list.append(resource)
            elif isinstance(resource, AllergyIntolerance):
                allergy_list.append(resource)
            elif isinstance(resource, DocumentReference):
                if (
                    resource.content
                    and resource.content[0].attachment
                    and resource.content[0].attachment.data is not None
                ):
                    content = read_content_attachment(resource)
                    if content is not None:
                        note_text = content[0]["data"]
                        self.note_document_reference = resource
                    else:
                        log.warning(
                            f"No content found in DocumentReference: {resource.id}"
                        )

        doc.fhir.problem_list = problem_list
        doc.fhir.medication_list = medication_list
        doc.fhir.allergy_list = allergy_list

        # Update document text
        doc.data = note_text

        # Add the note document reference
        if self.note_document_reference is not None:
            doc.fhir.add_document_reference(
                self.note_document_reference, parent_id=cda_document_reference.id
            )

        return doc

    def output(self, document: Document) -> CdaResponse:
        """
        Convert FHIR resources back to CDA format and return the response.

        This method takes a Document object containing FHIR resources (problems,
        medications, allergies) and converts them back to CDA format using the
        InteropEngine. It combines all resources from the document's FHIR lists
        and includes the note document reference if available.

        Args:
            document (Document): A Document object containing FHIR resources
                                 in problem_list, medication_list, and allergy_list.

        Returns:
            CdaResponse: A response object containing the CDA document generated
                        from the FHIR resources.
        """
        # Collect all FHIR resources to convert to CDA
        resources = []

        if document.fhir.problem_list:
            resources.extend(document.fhir.problem_list)

        if document.fhir.allergy_list:
            resources.extend(document.fhir.allergy_list)

        if document.fhir.medication_list:
            resources.extend(document.fhir.medication_list)

        # Add the note document reference
        if self.note_document_reference is not None:
            resources.append(self.note_document_reference)

        # Convert FHIR resources to CDA using InteropEngine
        response_document = self.engine.from_fhir(resources, dest_format=FormatType.CDA)

        return CdaResponse(document=response_document)

input(cda_request)

Parse the input CDA document and extract clinical data into a HealthChain Document object.

This method takes a CdaRequest object as input, parses it using the InteropEngine to convert CDA to FHIR resources, and creates a Document object with the extracted data. It creates a DocumentReference for the original CDA XML and extracts clinical data (problems, medications, allergies) into FHIR resources.

PARAMETER DESCRIPTION
cda_request

Request object containing the CDA XML document to process.

TYPE: CdaRequest

RETURNS DESCRIPTION
Document

A Document object containing: - The extracted note text as the document data - FHIR resources organized into appropriate lists: - problem_list: List of Condition resources - medication_list: List of MedicationStatement resources - allergy_list: List of AllergyIntolerance resources - DocumentReference resources for the original CDA and extracted notes with a parent-child relationship

TYPE: Document

Note

If a DocumentReference resource is found in the converted FHIR resources, it is assumed to contain the note text and is stored for later use.

Source code in healthchain/io/cdaconnector.py
def input(self, cda_request: CdaRequest) -> Document:
    """
    Parse the input CDA document and extract clinical data into a HealthChain Document object.

    This method takes a CdaRequest object as input, parses it using the InteropEngine to convert
    CDA to FHIR resources, and creates a Document object with the extracted data. It creates a
    DocumentReference for the original CDA XML and extracts clinical data (problems, medications,
    allergies) into FHIR resources.

    Args:
        cda_request (CdaRequest): Request object containing the CDA XML document to process.

    Returns:
        Document: A Document object containing:
            - The extracted note text as the document data
            - FHIR resources organized into appropriate lists:
              - problem_list: List of Condition resources
              - medication_list: List of MedicationStatement resources
              - allergy_list: List of AllergyIntolerance resources
            - DocumentReference resources for the original CDA and extracted notes with a parent-child relationship

    Note:
        If a DocumentReference resource is found in the converted FHIR resources,
        it is assumed to contain the note text and is stored for later use.
    """
    # Store original CDA for later use
    self.original_cda = cda_request.document

    # Convert CDA to FHIR using the InteropEngine
    fhir_resources = self.engine.to_fhir(
        self.original_cda, src_format=FormatType.CDA
    )

    # Create a FHIR DocumentReference for the original CDA document
    cda_document_reference = create_document_reference(
        data=self.original_cda,
        content_type="text/xml",
        description="Original CDA Document processed by HealthChain",
        attachment_title="Original CDA document in XML format",
    )

    # Extract any DocumentReference resources for notes
    note_text = ""
    doc = Document(data=note_text)  # Create document with empty text initially

    # Create FHIR Bundle and add documents
    doc.fhir.bundle = create_bundle()
    doc.fhir.add_document_reference(cda_document_reference)

    problem_list = []
    medication_list = []
    allergy_list = []

    for resource in fhir_resources:
        if isinstance(resource, Condition):
            problem_list.append(resource)
            set_problem_list_item_category(resource)
        elif isinstance(resource, MedicationStatement):
            medication_list.append(resource)
        elif isinstance(resource, AllergyIntolerance):
            allergy_list.append(resource)
        elif isinstance(resource, DocumentReference):
            if (
                resource.content
                and resource.content[0].attachment
                and resource.content[0].attachment.data is not None
            ):
                content = read_content_attachment(resource)
                if content is not None:
                    note_text = content[0]["data"]
                    self.note_document_reference = resource
                else:
                    log.warning(
                        f"No content found in DocumentReference: {resource.id}"
                    )

    doc.fhir.problem_list = problem_list
    doc.fhir.medication_list = medication_list
    doc.fhir.allergy_list = allergy_list

    # Update document text
    doc.data = note_text

    # Add the note document reference
    if self.note_document_reference is not None:
        doc.fhir.add_document_reference(
            self.note_document_reference, parent_id=cda_document_reference.id
        )

    return doc

output(document)

Convert FHIR resources back to CDA format and return the response.

This method takes a Document object containing FHIR resources (problems, medications, allergies) and converts them back to CDA format using the InteropEngine. It combines all resources from the document's FHIR lists and includes the note document reference if available.

PARAMETER DESCRIPTION
document

A Document object containing FHIR resources in problem_list, medication_list, and allergy_list.

TYPE: Document

RETURNS DESCRIPTION
CdaResponse

A response object containing the CDA document generated from the FHIR resources.

TYPE: CdaResponse

Source code in healthchain/io/cdaconnector.py
def output(self, document: Document) -> CdaResponse:
    """
    Convert FHIR resources back to CDA format and return the response.

    This method takes a Document object containing FHIR resources (problems,
    medications, allergies) and converts them back to CDA format using the
    InteropEngine. It combines all resources from the document's FHIR lists
    and includes the note document reference if available.

    Args:
        document (Document): A Document object containing FHIR resources
                             in problem_list, medication_list, and allergy_list.

    Returns:
        CdaResponse: A response object containing the CDA document generated
                    from the FHIR resources.
    """
    # Collect all FHIR resources to convert to CDA
    resources = []

    if document.fhir.problem_list:
        resources.extend(document.fhir.problem_list)

    if document.fhir.allergy_list:
        resources.extend(document.fhir.allergy_list)

    if document.fhir.medication_list:
        resources.extend(document.fhir.medication_list)

    # Add the note document reference
    if self.note_document_reference is not None:
        resources.append(self.note_document_reference)

    # Convert FHIR resources to CDA using InteropEngine
    response_document = self.engine.from_fhir(resources, dest_format=FormatType.CDA)

    return CdaResponse(document=response_document)

CdsFhirConnector

Bases: BaseConnector

CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents for CDS Hooks.

This connector facilitates the conversion between CDSRequest objects and Document objects, as well as the creation of CDSResponse objects from processed Documents.

ATTRIBUTE DESCRIPTION
hook_name

The name of the CDS Hook being used.

TYPE: str

Source code in healthchain/io/cdsfhirconnector.py
class CdsFhirConnector(BaseConnector):
    """
    CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents
    for CDS Hooks.

    This connector facilitates the conversion between CDSRequest objects and Document objects,
    as well as the creation of CDSResponse objects from processed Documents.

    Attributes:
        hook_name (str): The name of the CDS Hook being used.
    """

    def __init__(self, hook_name: str):
        self.hook_name = hook_name

    def input(
        self, cds_request: CDSRequest, prefetch_document_key: Optional[str] = "document"
    ) -> Document:
        """
        Converts a CDSRequest object into a Document object.

        Takes a CDSRequest containing FHIR resources and extracts them into a Document object.
        The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources.
        If a DocumentReference resource is provided via prefetch_document_key, its text content
        will be extracted into Document.data. For multiple attachments, the text content will be
        concatenated with newlines.

        Args:
            cds_request (CDSRequest): The CDSRequest containing FHIR resources in its prefetch
                and/or a FHIR server URL.
            prefetch_document_key (str, optional): Key in the prefetch data containing a
                DocumentReference resource whose text content should be extracted.
                Defaults to "document".

        Returns:
            Document: A Document object containing:
                - All prefetched FHIR resources in fhir.prefetch_resources
                - Any text content from the DocumentReference in data (empty string if none found)
                - For multiple attachments, text content is concatenated with newlines

        Raises:
            ValueError: If neither prefetch nor fhirServer is provided in cds_request
            ValueError: If the prefetch data is invalid or cannot be processed
            NotImplementedError: If fhirServer is provided (FHIR server support not implemented)
        """
        if cds_request.prefetch is None and cds_request.fhirServer is None:
            raise ValueError(
                "Either prefetch or fhirServer must be provided to extract FHIR data!"
            )

        if cds_request.fhirServer is not None:
            raise NotImplementedError("FHIR server is not implemented yet!")

        # Create an empty Document object
        doc = Document(data="")

        # Validate the prefetch data
        validated_prefetch = Prefetch(prefetch=cds_request.prefetch)

        # Set the prefetch resources
        doc.fhir.prefetch_resources = validated_prefetch.prefetch

        # Extract text content from DocumentReference resource if provided
        document_resource = validated_prefetch.prefetch.get(prefetch_document_key)

        if not document_resource:
            log.warning(
                f"No DocumentReference resource found in prefetch data with key {prefetch_document_key}"
            )
        elif isinstance(document_resource, DocumentReference):
            try:
                attachments = read_content_attachment(
                    document_resource, include_data=True
                )
                for attachment in attachments:
                    if len(attachments) > 1:
                        doc.data += attachment.get("data", "") + "\n"
                    else:
                        doc.data += attachment.get("data", "")
            except Exception as e:
                log.warning(f"Error extracting text from DocumentReference: {e}")

        return doc

    def output(self, document: Document) -> CDSResponse:
        """
        Convert Document to CDSResponse.

        This method takes a Document object containing CDS cards and actions,
        and converts them into a CDSResponse object that follows the CDS Hooks
        specification.

        Args:
            document (Document): The Document object containing CDS results.

        Returns:
            CDSResponse: A response object containing CDS cards and optional system actions.
                         If no cards are found in the Document, an empty list of cards is returned.
        """
        if document.cds.cards is None:
            log.warning("No CDS cards found in Document, returning empty list of cards")
            return CDSResponse(cards=[])

        return CDSResponse(cards=document.cds.cards, systemActions=document.cds.actions)

input(cds_request, prefetch_document_key='document')

Converts a CDSRequest object into a Document object.

Takes a CDSRequest containing FHIR resources and extracts them into a Document object. The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources. If a DocumentReference resource is provided via prefetch_document_key, its text content will be extracted into Document.data. For multiple attachments, the text content will be concatenated with newlines.

PARAMETER DESCRIPTION
cds_request

The CDSRequest containing FHIR resources in its prefetch and/or a FHIR server URL.

TYPE: CDSRequest

prefetch_document_key

Key in the prefetch data containing a DocumentReference resource whose text content should be extracted. Defaults to "document".

TYPE: str DEFAULT: 'document'

RETURNS DESCRIPTION
Document

A Document object containing: - All prefetched FHIR resources in fhir.prefetch_resources - Any text content from the DocumentReference in data (empty string if none found) - For multiple attachments, text content is concatenated with newlines

TYPE: Document

RAISES DESCRIPTION
ValueError

If neither prefetch nor fhirServer is provided in cds_request

ValueError

If the prefetch data is invalid or cannot be processed

NotImplementedError

If fhirServer is provided (FHIR server support not implemented)

Source code in healthchain/io/cdsfhirconnector.py
def input(
    self, cds_request: CDSRequest, prefetch_document_key: Optional[str] = "document"
) -> Document:
    """
    Converts a CDSRequest object into a Document object.

    Takes a CDSRequest containing FHIR resources and extracts them into a Document object.
    The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources.
    If a DocumentReference resource is provided via prefetch_document_key, its text content
    will be extracted into Document.data. For multiple attachments, the text content will be
    concatenated with newlines.

    Args:
        cds_request (CDSRequest): The CDSRequest containing FHIR resources in its prefetch
            and/or a FHIR server URL.
        prefetch_document_key (str, optional): Key in the prefetch data containing a
            DocumentReference resource whose text content should be extracted.
            Defaults to "document".

    Returns:
        Document: A Document object containing:
            - All prefetched FHIR resources in fhir.prefetch_resources
            - Any text content from the DocumentReference in data (empty string if none found)
            - For multiple attachments, text content is concatenated with newlines

    Raises:
        ValueError: If neither prefetch nor fhirServer is provided in cds_request
        ValueError: If the prefetch data is invalid or cannot be processed
        NotImplementedError: If fhirServer is provided (FHIR server support not implemented)
    """
    if cds_request.prefetch is None and cds_request.fhirServer is None:
        raise ValueError(
            "Either prefetch or fhirServer must be provided to extract FHIR data!"
        )

    if cds_request.fhirServer is not None:
        raise NotImplementedError("FHIR server is not implemented yet!")

    # Create an empty Document object
    doc = Document(data="")

    # Validate the prefetch data
    validated_prefetch = Prefetch(prefetch=cds_request.prefetch)

    # Set the prefetch resources
    doc.fhir.prefetch_resources = validated_prefetch.prefetch

    # Extract text content from DocumentReference resource if provided
    document_resource = validated_prefetch.prefetch.get(prefetch_document_key)

    if not document_resource:
        log.warning(
            f"No DocumentReference resource found in prefetch data with key {prefetch_document_key}"
        )
    elif isinstance(document_resource, DocumentReference):
        try:
            attachments = read_content_attachment(
                document_resource, include_data=True
            )
            for attachment in attachments:
                if len(attachments) > 1:
                    doc.data += attachment.get("data", "") + "\n"
                else:
                    doc.data += attachment.get("data", "")
        except Exception as e:
            log.warning(f"Error extracting text from DocumentReference: {e}")

    return doc

output(document)

Convert Document to CDSResponse.

This method takes a Document object containing CDS cards and actions, and converts them into a CDSResponse object that follows the CDS Hooks specification.

PARAMETER DESCRIPTION
document

The Document object containing CDS results.

TYPE: Document

RETURNS DESCRIPTION
CDSResponse

A response object containing CDS cards and optional system actions. If no cards are found in the Document, an empty list of cards is returned.

TYPE: CDSResponse

Source code in healthchain/io/cdsfhirconnector.py
def output(self, document: Document) -> CDSResponse:
    """
    Convert Document to CDSResponse.

    This method takes a Document object containing CDS cards and actions,
    and converts them into a CDSResponse object that follows the CDS Hooks
    specification.

    Args:
        document (Document): The Document object containing CDS results.

    Returns:
        CDSResponse: A response object containing CDS cards and optional system actions.
                     If no cards are found in the Document, an empty list of cards is returned.
    """
    if document.cds.cards is None:
        log.warning("No CDS cards found in Document, returning empty list of cards")
        return CDSResponse(cards=[])

    return CDSResponse(cards=document.cds.cards, systemActions=document.cds.actions)