Skip to content

Connectors

BaseConnector

Bases: Generic[T], ABC

Abstract base class for all connectors in the pipeline.

This class should be subclassed to create specific connectors. Subclasses must implement the input and output methods.

Source code in healthchain/io/base.py
class BaseConnector(Generic[T], ABC):
    """
    Abstract base class for all connectors in the pipeline.

    This class should be subclassed to create specific connectors.
    Subclasses must implement the input and output methods.
    """

    @abstractmethod
    def input(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert input data to the pipeline's internal format.

        Args:
            data (DataContainer[T]): The input data to be converted.

        Returns:
            DataContainer[T]: The converted data.
        """
        pass

    @abstractmethod
    def output(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert pipeline's internal format to output data.

        Args:
            data (DataContainer[T]): The data to be converted for output.

        Returns:
            DataContainer[T]: The converted output data.
        """
        pass

input(data) abstractmethod

Convert input data to the pipeline's internal format.

PARAMETER DESCRIPTION
data

The input data to be converted.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted data.

Source code in healthchain/io/base.py
@abstractmethod
def input(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert input data to the pipeline's internal format.

    Args:
        data (DataContainer[T]): The input data to be converted.

    Returns:
        DataContainer[T]: The converted data.
    """
    pass

output(data) abstractmethod

Convert pipeline's internal format to output data.

PARAMETER DESCRIPTION
data

The data to be converted for output.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted output data.

Source code in healthchain/io/base.py
@abstractmethod
def output(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert pipeline's internal format to output data.

    Args:
        data (DataContainer[T]): The data to be converted for output.

    Returns:
        DataContainer[T]: The converted output data.
    """
    pass

CdaConnector

Bases: BaseConnector

CDAConnector class for handling CDA (Clinical Document Architecture) documents.

This connector is responsible for parsing CDA documents, extracting relevant clinical data, and updating the document with new information. It serves as both an input and output connector in the pipeline.

ATTRIBUTE DESCRIPTION
overwrite

Flag to determine if existing data should be overwritten when updating the CDA document.

TYPE: bool

cda_doc

The parsed CDA document.

TYPE: CdaAnnotator

METHOD DESCRIPTION
input

Parses the input CDA document and extracts clinical data.

output

Updates the CDA document with new data and returns the response.

Source code in healthchain/io/cdaconnector.py
class CdaConnector(BaseConnector):
    """
    CDAConnector class for handling CDA (Clinical Document Architecture) documents.

    This connector is responsible for parsing CDA documents, extracting relevant
    clinical data, and updating the document with new information. It serves as
    both an input and output connector in the pipeline.

    Attributes:
        overwrite (bool): Flag to determine if existing data should be overwritten
                          when updating the CDA document.
        cda_doc (CdaAnnotator): The parsed CDA document.

    Methods:
        input: Parses the input CDA document and extracts clinical data.
        output: Updates the CDA document with new data and returns the response.
    """

    def __init__(self, overwrite: bool = False):
        self.overwrite = overwrite
        self.cda_doc = None

    def input(self, cda_request: CdaRequest) -> Document:
        """
        Parse the input CDA document and extract clinical data into a HealthChain Document object.

        This method takes a CdaRequest object as input, parses it, and extracts clinical data into a
        FHIR Bundle. It creates two DocumentReference resources:
        1. The original CDA XML document
        2. The extracted note text from the CDA document

        The note text document is linked to the original CDA document through a relationship.
        Continuity of Care Document data (problems, medications, allergies) are also extracted into FHIR resources
        and added to the bundle.

        Args:
            cda_request (CdaRequest): Request object containing the CDA XML document to process.

        Returns:
            Document: A Document object containing:
                - The extracted note text as the document data
                - A FHIR Bundle with:
                    - DocumentReference for the original CDA XML
                    - DocumentReference for the extracted note text
                    - Extracted clinical data as FHIR resources (Condition,
                      MedicationStatement, AllergyIntolerance)

        Note:
            The note text is extracted from the CDA document's note section. If the note
            is a dictionary of sections, they are joined with spaces. If no valid note
            is found, an empty string is used.
        """
        self.cda_doc = CdaAnnotator.from_xml(cda_request.document)

        # Create a FHIR DocumentReference for the original CDA document
        cda_document_reference = create_document_reference(
            data=cda_request.document,
            content_type="text/xml",
            description="Original CDA Document processed by HealthChain",
            attachment_title="Original CDA document in XML format",
        )

        # TODO: Temporary fix for the note section, this might be more of a concern for the Annotator class
        if isinstance(self.cda_doc.note, dict):
            note_text = " ".join(str(value) for value in self.cda_doc.note.values())
        elif isinstance(self.cda_doc.note, str):
            note_text = self.cda_doc.note
        else:
            log.warning("Note section is not a string or dictionary")
            note_text = ""

        # Create a FHIR DocumentReference for the note text
        note_document_reference = create_document_reference(
            data=note_text,
            content_type="text/plain",
            description="Text from note section of related CDA document extracted by HealthChain",
            attachment_title="Note text from the related CDA document",
        )

        doc = Document(data=note_text)

        # Create FHIR Bundle and add documents
        doc.fhir.bundle = create_bundle()
        doc.fhir.add_document_reference(cda_document_reference)
        doc.fhir.add_document_reference(
            note_document_reference, parent_id=cda_document_reference.id
        )

        # Set lists with the correct FHIR resources
        doc.fhir.problem_list = self.cda_doc.problem_list
        doc.fhir.medication_list = self.cda_doc.medication_list
        doc.fhir.allergy_list = self.cda_doc.allergy_list

        # Set the category for each problem in the problem list
        for condition in doc.fhir.problem_list:
            set_problem_list_item_category(condition)

        return doc

    def output(self, document: Document) -> CdaResponse:
        """
        Update the CDA document with new data and return the response.

        This method takes a Document object containing updated clinical data,
        updates the CDA document with this new information, and returns a
        CdaResponse object with the updated CDA document.

        Args:
            document (Document): A Document object containing the updated
                                 clinical data (problems, allergies, medications).

        Returns:
            CdaResponse: A response object containing the updated CDA document.

        Note:
            The method updates the CDA document with new problems, allergies,
            and medications if they are present in the input Document object.
            The update behavior (overwrite or append) is determined by the
            `overwrite` attribute of the CdaConnector instance.
        """
        # Update the CDA document with the results from FHIR Bundle
        if document.fhir.problem_list:
            log.debug(
                f"Updating CDA document with {len(document.fhir.problem_list)} problem(s)."
            )
            self.cda_doc.add_to_problem_list(
                document.fhir.problem_list, overwrite=self.overwrite
            )

        # Update allergies
        if document.fhir.allergy_list:
            log.debug(
                f"Updating CDA document with {len(document.fhir.allergy_list)} allergy(ies)."
            )
            self.cda_doc.add_to_allergy_list(
                document.fhir.allergy_list, overwrite=self.overwrite
            )

        # Update medications
        if document.fhir.medication_list:
            log.debug(
                f"Updating CDA document with {len(document.fhir.medication_list)} medication(s)."
            )
            self.cda_doc.add_to_medication_list(
                document.fhir.medication_list, overwrite=self.overwrite
            )

        # Export the updated CDA document
        response_document = self.cda_doc.export()

        return CdaResponse(document=response_document)

input(cda_request)

Parse the input CDA document and extract clinical data into a HealthChain Document object.

This method takes a CdaRequest object as input, parses it, and extracts clinical data into a FHIR Bundle. It creates two DocumentReference resources: 1. The original CDA XML document 2. The extracted note text from the CDA document

The note text document is linked to the original CDA document through a relationship. Continuity of Care Document data (problems, medications, allergies) are also extracted into FHIR resources and added to the bundle.

PARAMETER DESCRIPTION
cda_request

Request object containing the CDA XML document to process.

TYPE: CdaRequest

RETURNS DESCRIPTION
Document

A Document object containing: - The extracted note text as the document data - A FHIR Bundle with: - DocumentReference for the original CDA XML - DocumentReference for the extracted note text - Extracted clinical data as FHIR resources (Condition, MedicationStatement, AllergyIntolerance)

TYPE: Document

Note

The note text is extracted from the CDA document's note section. If the note is a dictionary of sections, they are joined with spaces. If no valid note is found, an empty string is used.

Source code in healthchain/io/cdaconnector.py
def input(self, cda_request: CdaRequest) -> Document:
    """
    Parse the input CDA document and extract clinical data into a HealthChain Document object.

    This method takes a CdaRequest object as input, parses it, and extracts clinical data into a
    FHIR Bundle. It creates two DocumentReference resources:
    1. The original CDA XML document
    2. The extracted note text from the CDA document

    The note text document is linked to the original CDA document through a relationship.
    Continuity of Care Document data (problems, medications, allergies) are also extracted into FHIR resources
    and added to the bundle.

    Args:
        cda_request (CdaRequest): Request object containing the CDA XML document to process.

    Returns:
        Document: A Document object containing:
            - The extracted note text as the document data
            - A FHIR Bundle with:
                - DocumentReference for the original CDA XML
                - DocumentReference for the extracted note text
                - Extracted clinical data as FHIR resources (Condition,
                  MedicationStatement, AllergyIntolerance)

    Note:
        The note text is extracted from the CDA document's note section. If the note
        is a dictionary of sections, they are joined with spaces. If no valid note
        is found, an empty string is used.
    """
    self.cda_doc = CdaAnnotator.from_xml(cda_request.document)

    # Create a FHIR DocumentReference for the original CDA document
    cda_document_reference = create_document_reference(
        data=cda_request.document,
        content_type="text/xml",
        description="Original CDA Document processed by HealthChain",
        attachment_title="Original CDA document in XML format",
    )

    # TODO: Temporary fix for the note section, this might be more of a concern for the Annotator class
    if isinstance(self.cda_doc.note, dict):
        note_text = " ".join(str(value) for value in self.cda_doc.note.values())
    elif isinstance(self.cda_doc.note, str):
        note_text = self.cda_doc.note
    else:
        log.warning("Note section is not a string or dictionary")
        note_text = ""

    # Create a FHIR DocumentReference for the note text
    note_document_reference = create_document_reference(
        data=note_text,
        content_type="text/plain",
        description="Text from note section of related CDA document extracted by HealthChain",
        attachment_title="Note text from the related CDA document",
    )

    doc = Document(data=note_text)

    # Create FHIR Bundle and add documents
    doc.fhir.bundle = create_bundle()
    doc.fhir.add_document_reference(cda_document_reference)
    doc.fhir.add_document_reference(
        note_document_reference, parent_id=cda_document_reference.id
    )

    # Set lists with the correct FHIR resources
    doc.fhir.problem_list = self.cda_doc.problem_list
    doc.fhir.medication_list = self.cda_doc.medication_list
    doc.fhir.allergy_list = self.cda_doc.allergy_list

    # Set the category for each problem in the problem list
    for condition in doc.fhir.problem_list:
        set_problem_list_item_category(condition)

    return doc

output(document)

Update the CDA document with new data and return the response.

This method takes a Document object containing updated clinical data, updates the CDA document with this new information, and returns a CdaResponse object with the updated CDA document.

PARAMETER DESCRIPTION
document

A Document object containing the updated clinical data (problems, allergies, medications).

TYPE: Document

RETURNS DESCRIPTION
CdaResponse

A response object containing the updated CDA document.

TYPE: CdaResponse

Note

The method updates the CDA document with new problems, allergies, and medications if they are present in the input Document object. The update behavior (overwrite or append) is determined by the overwrite attribute of the CdaConnector instance.

Source code in healthchain/io/cdaconnector.py
def output(self, document: Document) -> CdaResponse:
    """
    Update the CDA document with new data and return the response.

    This method takes a Document object containing updated clinical data,
    updates the CDA document with this new information, and returns a
    CdaResponse object with the updated CDA document.

    Args:
        document (Document): A Document object containing the updated
                             clinical data (problems, allergies, medications).

    Returns:
        CdaResponse: A response object containing the updated CDA document.

    Note:
        The method updates the CDA document with new problems, allergies,
        and medications if they are present in the input Document object.
        The update behavior (overwrite or append) is determined by the
        `overwrite` attribute of the CdaConnector instance.
    """
    # Update the CDA document with the results from FHIR Bundle
    if document.fhir.problem_list:
        log.debug(
            f"Updating CDA document with {len(document.fhir.problem_list)} problem(s)."
        )
        self.cda_doc.add_to_problem_list(
            document.fhir.problem_list, overwrite=self.overwrite
        )

    # Update allergies
    if document.fhir.allergy_list:
        log.debug(
            f"Updating CDA document with {len(document.fhir.allergy_list)} allergy(ies)."
        )
        self.cda_doc.add_to_allergy_list(
            document.fhir.allergy_list, overwrite=self.overwrite
        )

    # Update medications
    if document.fhir.medication_list:
        log.debug(
            f"Updating CDA document with {len(document.fhir.medication_list)} medication(s)."
        )
        self.cda_doc.add_to_medication_list(
            document.fhir.medication_list, overwrite=self.overwrite
        )

    # Export the updated CDA document
    response_document = self.cda_doc.export()

    return CdaResponse(document=response_document)

CdsFhirConnector

Bases: BaseConnector

CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents for CDS Hooks.

This connector facilitates the conversion between CDSRequest objects and Document objects, as well as the creation of CDSResponse objects from processed Documents.

ATTRIBUTE DESCRIPTION
hook_name

The name of the CDS Hook being used.

TYPE: str

Source code in healthchain/io/cdsfhirconnector.py
class CdsFhirConnector(BaseConnector):
    """
    CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents
    for CDS Hooks.

    This connector facilitates the conversion between CDSRequest objects and Document objects,
    as well as the creation of CDSResponse objects from processed Documents.

    Attributes:
        hook_name (str): The name of the CDS Hook being used.
    """

    def __init__(self, hook_name: str):
        self.hook_name = hook_name

    def input(
        self, cds_request: CDSRequest, prefetch_document_key: Optional[str] = "document"
    ) -> Document:
        """
        Converts a CDSRequest object into a Document object.

        Takes a CDSRequest containing FHIR resources and extracts them into a Document object.
        The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources.
        If a DocumentReference resource is provided via prefetch_document_key, its text content
        will be extracted into Document.data. For multiple attachments, the text content will be
        concatenated with newlines.

        Args:
            cds_request (CDSRequest): The CDSRequest containing FHIR resources in its prefetch
                and/or a FHIR server URL.
            prefetch_document_key (str, optional): Key in the prefetch data containing a
                DocumentReference resource whose text content should be extracted.
                Defaults to "document".

        Returns:
            Document: A Document object containing:
                - All prefetched FHIR resources in fhir.prefetch_resources
                - Any text content from the DocumentReference in data (empty string if none found)
                - For multiple attachments, text content is concatenated with newlines

        Raises:
            ValueError: If neither prefetch nor fhirServer is provided in cds_request
            ValueError: If the prefetch data is invalid or cannot be processed
            NotImplementedError: If fhirServer is provided (FHIR server support not implemented)
        """
        if cds_request.prefetch is None and cds_request.fhirServer is None:
            raise ValueError(
                "Either prefetch or fhirServer must be provided to extract FHIR data!"
            )

        if cds_request.fhirServer is not None:
            raise NotImplementedError("FHIR server is not implemented yet!")

        # Create an empty Document object
        doc = Document(data="")

        # Validate the prefetch data
        validated_prefetch = Prefetch(prefetch=cds_request.prefetch)

        # Set the prefetch resources
        doc.fhir.prefetch_resources = validated_prefetch.prefetch

        # Extract text content from DocumentReference resource if provided
        document_resource = validated_prefetch.prefetch.get(prefetch_document_key)

        if not document_resource:
            log.warning(
                f"No DocumentReference resource found in prefetch data with key {prefetch_document_key}"
            )
        elif isinstance(document_resource, DocumentReference):
            try:
                attachments = read_content_attachment(
                    document_resource, include_data=True
                )
                for attachment in attachments:
                    if len(attachments) > 1:
                        doc.data += attachment.get("data", "") + "\n"
                    else:
                        doc.data += attachment.get("data", "")
            except Exception as e:
                log.warning(f"Error extracting text from DocumentReference: {e}")

        return doc

    def output(self, document: Document) -> CDSResponse:
        """
        Convert Document to CDSResponse.

        This method takes a Document object containing CDS cards and actions,
        and converts them into a CDSResponse object that follows the CDS Hooks
        specification.

        Args:
            document (Document): The Document object containing CDS results.

        Returns:
            CDSResponse: A response object containing CDS cards and optional system actions.
                         If no cards are found in the Document, an empty list of cards is returned.
        """
        if document.cds.cards is None:
            log.warning("No CDS cards found in Document, returning empty list of cards")
            return CDSResponse(cards=[])

        return CDSResponse(cards=document.cds.cards, systemActions=document.cds.actions)

input(cds_request, prefetch_document_key='document')

Converts a CDSRequest object into a Document object.

Takes a CDSRequest containing FHIR resources and extracts them into a Document object. The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources. If a DocumentReference resource is provided via prefetch_document_key, its text content will be extracted into Document.data. For multiple attachments, the text content will be concatenated with newlines.

PARAMETER DESCRIPTION
cds_request

The CDSRequest containing FHIR resources in its prefetch and/or a FHIR server URL.

TYPE: CDSRequest

prefetch_document_key

Key in the prefetch data containing a DocumentReference resource whose text content should be extracted. Defaults to "document".

TYPE: str DEFAULT: 'document'

RETURNS DESCRIPTION
Document

A Document object containing: - All prefetched FHIR resources in fhir.prefetch_resources - Any text content from the DocumentReference in data (empty string if none found) - For multiple attachments, text content is concatenated with newlines

TYPE: Document

RAISES DESCRIPTION
ValueError

If neither prefetch nor fhirServer is provided in cds_request

ValueError

If the prefetch data is invalid or cannot be processed

NotImplementedError

If fhirServer is provided (FHIR server support not implemented)

Source code in healthchain/io/cdsfhirconnector.py
def input(
    self, cds_request: CDSRequest, prefetch_document_key: Optional[str] = "document"
) -> Document:
    """
    Converts a CDSRequest object into a Document object.

    Takes a CDSRequest containing FHIR resources and extracts them into a Document object.
    The Document will contain all prefetched FHIR resources in its fhir.prefetch_resources.
    If a DocumentReference resource is provided via prefetch_document_key, its text content
    will be extracted into Document.data. For multiple attachments, the text content will be
    concatenated with newlines.

    Args:
        cds_request (CDSRequest): The CDSRequest containing FHIR resources in its prefetch
            and/or a FHIR server URL.
        prefetch_document_key (str, optional): Key in the prefetch data containing a
            DocumentReference resource whose text content should be extracted.
            Defaults to "document".

    Returns:
        Document: A Document object containing:
            - All prefetched FHIR resources in fhir.prefetch_resources
            - Any text content from the DocumentReference in data (empty string if none found)
            - For multiple attachments, text content is concatenated with newlines

    Raises:
        ValueError: If neither prefetch nor fhirServer is provided in cds_request
        ValueError: If the prefetch data is invalid or cannot be processed
        NotImplementedError: If fhirServer is provided (FHIR server support not implemented)
    """
    if cds_request.prefetch is None and cds_request.fhirServer is None:
        raise ValueError(
            "Either prefetch or fhirServer must be provided to extract FHIR data!"
        )

    if cds_request.fhirServer is not None:
        raise NotImplementedError("FHIR server is not implemented yet!")

    # Create an empty Document object
    doc = Document(data="")

    # Validate the prefetch data
    validated_prefetch = Prefetch(prefetch=cds_request.prefetch)

    # Set the prefetch resources
    doc.fhir.prefetch_resources = validated_prefetch.prefetch

    # Extract text content from DocumentReference resource if provided
    document_resource = validated_prefetch.prefetch.get(prefetch_document_key)

    if not document_resource:
        log.warning(
            f"No DocumentReference resource found in prefetch data with key {prefetch_document_key}"
        )
    elif isinstance(document_resource, DocumentReference):
        try:
            attachments = read_content_attachment(
                document_resource, include_data=True
            )
            for attachment in attachments:
                if len(attachments) > 1:
                    doc.data += attachment.get("data", "") + "\n"
                else:
                    doc.data += attachment.get("data", "")
        except Exception as e:
            log.warning(f"Error extracting text from DocumentReference: {e}")

    return doc

output(document)

Convert Document to CDSResponse.

This method takes a Document object containing CDS cards and actions, and converts them into a CDSResponse object that follows the CDS Hooks specification.

PARAMETER DESCRIPTION
document

The Document object containing CDS results.

TYPE: Document

RETURNS DESCRIPTION
CDSResponse

A response object containing CDS cards and optional system actions. If no cards are found in the Document, an empty list of cards is returned.

TYPE: CDSResponse

Source code in healthchain/io/cdsfhirconnector.py
def output(self, document: Document) -> CDSResponse:
    """
    Convert Document to CDSResponse.

    This method takes a Document object containing CDS cards and actions,
    and converts them into a CDSResponse object that follows the CDS Hooks
    specification.

    Args:
        document (Document): The Document object containing CDS results.

    Returns:
        CDSResponse: A response object containing CDS cards and optional system actions.
                     If no cards are found in the Document, an empty list of cards is returned.
    """
    if document.cds.cards is None:
        log.warning("No CDS cards found in Document, returning empty list of cards")
        return CDSResponse(cards=[])

    return CDSResponse(cards=document.cds.cards, systemActions=document.cds.actions)