Skip to content

Connectors

BaseConnector

Bases: Generic[T], ABC

Abstract base class for all connectors in the pipeline.

This class should be subclassed to create specific connectors. Subclasses must implement the input and output methods.

Source code in healthchain/io/base.py
class BaseConnector(Generic[T], ABC):
    """
    Abstract base class for all connectors in the pipeline.

    This class should be subclassed to create specific connectors.
    Subclasses must implement the input and output methods.
    """

    @abstractmethod
    def input(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert input data to the pipeline's internal format.

        Args:
            data (DataContainer[T]): The input data to be converted.

        Returns:
            DataContainer[T]: The converted data.
        """
        pass

    @abstractmethod
    def output(self, data: DataContainer[T]) -> DataContainer[T]:
        """
        Convert pipeline's internal format to output data.

        Args:
            data (DataContainer[T]): The data to be converted for output.

        Returns:
            DataContainer[T]: The converted output data.
        """
        pass

input(data) abstractmethod

Convert input data to the pipeline's internal format.

PARAMETER DESCRIPTION
data

The input data to be converted.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted data.

Source code in healthchain/io/base.py
@abstractmethod
def input(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert input data to the pipeline's internal format.

    Args:
        data (DataContainer[T]): The input data to be converted.

    Returns:
        DataContainer[T]: The converted data.
    """
    pass

output(data) abstractmethod

Convert pipeline's internal format to output data.

PARAMETER DESCRIPTION
data

The data to be converted for output.

TYPE: DataContainer[T]

RETURNS DESCRIPTION
DataContainer[T]

DataContainer[T]: The converted output data.

Source code in healthchain/io/base.py
@abstractmethod
def output(self, data: DataContainer[T]) -> DataContainer[T]:
    """
    Convert pipeline's internal format to output data.

    Args:
        data (DataContainer[T]): The data to be converted for output.

    Returns:
        DataContainer[T]: The converted output data.
    """
    pass

CdaConnector

Bases: BaseConnector

CDAConnector class for handling CDA (Clinical Document Architecture) documents.

This connector is responsible for parsing CDA documents, extracting relevant clinical data, and updating the document with new information. It serves as both an input and output connector in the pipeline.

ATTRIBUTE DESCRIPTION
overwrite

Flag to determine if existing data should be overwritten when updating the CDA document.

TYPE: bool

cda_doc

The parsed CDA document.

TYPE: CdaAnnotator

METHOD DESCRIPTION
input

Parses the input CDA document and extracts clinical data.

output

Updates the CDA document with new data and returns the response.

Source code in healthchain/io/cdaconnector.py
class CdaConnector(BaseConnector):
    """
    CDAConnector class for handling CDA (Clinical Document Architecture) documents.

    This connector is responsible for parsing CDA documents, extracting relevant
    clinical data, and updating the document with new information. It serves as
    both an input and output connector in the pipeline.

    Attributes:
        overwrite (bool): Flag to determine if existing data should be overwritten
                          when updating the CDA document.
        cda_doc (CdaAnnotator): The parsed CDA document.

    Methods:
        input: Parses the input CDA document and extracts clinical data.
        output: Updates the CDA document with new data and returns the response.
    """

    def __init__(self, overwrite: bool = False):
        self.overwrite = overwrite
        self.cda_doc = None

    def input(self, in_data: CdaRequest) -> Document:
        """
        Parse the input CDA document and extract clinical data.

        This method takes a CdaRequest object containing the CDA document as input,
        parses it using the CdaAnnotator, and extracts relevant clinical data.
        The extracted data is then used to create a CcdData object and a healthchain
        Document object, which is returned.

        Args:
            in_data (CdaRequest): The input request containing the CDA document.

        Returns:
            Document: A Document object containing the extracted clinical data
                      and the original note text.

        """
        self.cda_doc = CdaAnnotator.from_xml(in_data.document)

        # TODO: Temporary fix for the note section, this might be more of a concern for the Annotator class
        if isinstance(self.cda_doc.note, dict):
            note_text = " ".join(str(value) for value in self.cda_doc.note.values())
        elif isinstance(self.cda_doc.note, str):
            note_text = self.cda_doc.note
        else:
            log.warning("Note section is not a string or dictionary")
            note_text = ""

        ccd_data = CcdData(
            concepts=ConceptLists(
                problems=self.cda_doc.problem_list,
                medications=self.cda_doc.medication_list,
                allergies=self.cda_doc.allergy_list,
            ),
            note=note_text,
        )

        doc = Document(data=ccd_data.note)
        doc.hl7.set_ccd_data(ccd_data)

        return doc

    def output(self, out_data: Document) -> CdaResponse:
        """
        Update the CDA document with new data and return the response.

        This method takes a Document object containing updated clinical data,
        updates the CDA document with this new information, and returns a
        CdaResponse object with the updated CDA document.

        Args:
            out_data (Document): A Document object containing the updated
                                 clinical data (problems, allergies, medications).

        Returns:
            CdaResponse: A response object containing the updated CDA document.

        Note:
            The method updates the CDA document with new problems, allergies,
            and medications if they are present in the input Document object.
            The update behavior (overwrite or append) is determined by the
            `overwrite` attribute of the CdaConnector instance.
        """
        # TODO: check what to do with overwrite
        updated_ccd_data = out_data.generate_ccd(overwrite=self.overwrite)

        # Update the CDA document with the results

        if updated_ccd_data.concepts.problems:
            log.debug(
                f"Updating CDA document with {len(updated_ccd_data.concepts.problems)} problem(s)."
            )
            self.cda_doc.add_to_problem_list(
                updated_ccd_data.concepts.problems, overwrite=self.overwrite
            )
        if updated_ccd_data.concepts.allergies:
            log.debug(
                f"Updating CDA document with {len(updated_ccd_data.concepts.allergies)} allergy(ies)."
            )
            self.cda_doc.add_to_allergy_list(
                updated_ccd_data.concepts.allergies, overwrite=self.overwrite
            )
        if updated_ccd_data.concepts.medications:
            log.debug(
                f"Updating CDA document with {len(updated_ccd_data.concepts.medications)} medication(s)."
            )
            self.cda_doc.add_to_medication_list(
                updated_ccd_data.concepts.medications, overwrite=self.overwrite
            )

        # Export the updated CDA document
        response_document = self.cda_doc.export()

        return CdaResponse(document=response_document)

input(in_data)

Parse the input CDA document and extract clinical data.

This method takes a CdaRequest object containing the CDA document as input, parses it using the CdaAnnotator, and extracts relevant clinical data. The extracted data is then used to create a CcdData object and a healthchain Document object, which is returned.

PARAMETER DESCRIPTION
in_data

The input request containing the CDA document.

TYPE: CdaRequest

RETURNS DESCRIPTION
Document

A Document object containing the extracted clinical data and the original note text.

TYPE: Document

Source code in healthchain/io/cdaconnector.py
def input(self, in_data: CdaRequest) -> Document:
    """
    Parse the input CDA document and extract clinical data.

    This method takes a CdaRequest object containing the CDA document as input,
    parses it using the CdaAnnotator, and extracts relevant clinical data.
    The extracted data is then used to create a CcdData object and a healthchain
    Document object, which is returned.

    Args:
        in_data (CdaRequest): The input request containing the CDA document.

    Returns:
        Document: A Document object containing the extracted clinical data
                  and the original note text.

    """
    self.cda_doc = CdaAnnotator.from_xml(in_data.document)

    # TODO: Temporary fix for the note section, this might be more of a concern for the Annotator class
    if isinstance(self.cda_doc.note, dict):
        note_text = " ".join(str(value) for value in self.cda_doc.note.values())
    elif isinstance(self.cda_doc.note, str):
        note_text = self.cda_doc.note
    else:
        log.warning("Note section is not a string or dictionary")
        note_text = ""

    ccd_data = CcdData(
        concepts=ConceptLists(
            problems=self.cda_doc.problem_list,
            medications=self.cda_doc.medication_list,
            allergies=self.cda_doc.allergy_list,
        ),
        note=note_text,
    )

    doc = Document(data=ccd_data.note)
    doc.hl7.set_ccd_data(ccd_data)

    return doc

output(out_data)

Update the CDA document with new data and return the response.

This method takes a Document object containing updated clinical data, updates the CDA document with this new information, and returns a CdaResponse object with the updated CDA document.

PARAMETER DESCRIPTION
out_data

A Document object containing the updated clinical data (problems, allergies, medications).

TYPE: Document

RETURNS DESCRIPTION
CdaResponse

A response object containing the updated CDA document.

TYPE: CdaResponse

Note

The method updates the CDA document with new problems, allergies, and medications if they are present in the input Document object. The update behavior (overwrite or append) is determined by the overwrite attribute of the CdaConnector instance.

Source code in healthchain/io/cdaconnector.py
def output(self, out_data: Document) -> CdaResponse:
    """
    Update the CDA document with new data and return the response.

    This method takes a Document object containing updated clinical data,
    updates the CDA document with this new information, and returns a
    CdaResponse object with the updated CDA document.

    Args:
        out_data (Document): A Document object containing the updated
                             clinical data (problems, allergies, medications).

    Returns:
        CdaResponse: A response object containing the updated CDA document.

    Note:
        The method updates the CDA document with new problems, allergies,
        and medications if they are present in the input Document object.
        The update behavior (overwrite or append) is determined by the
        `overwrite` attribute of the CdaConnector instance.
    """
    # TODO: check what to do with overwrite
    updated_ccd_data = out_data.generate_ccd(overwrite=self.overwrite)

    # Update the CDA document with the results

    if updated_ccd_data.concepts.problems:
        log.debug(
            f"Updating CDA document with {len(updated_ccd_data.concepts.problems)} problem(s)."
        )
        self.cda_doc.add_to_problem_list(
            updated_ccd_data.concepts.problems, overwrite=self.overwrite
        )
    if updated_ccd_data.concepts.allergies:
        log.debug(
            f"Updating CDA document with {len(updated_ccd_data.concepts.allergies)} allergy(ies)."
        )
        self.cda_doc.add_to_allergy_list(
            updated_ccd_data.concepts.allergies, overwrite=self.overwrite
        )
    if updated_ccd_data.concepts.medications:
        log.debug(
            f"Updating CDA document with {len(updated_ccd_data.concepts.medications)} medication(s)."
        )
        self.cda_doc.add_to_medication_list(
            updated_ccd_data.concepts.medications, overwrite=self.overwrite
        )

    # Export the updated CDA document
    response_document = self.cda_doc.export()

    return CdaResponse(document=response_document)

CdsFhirConnector

Bases: BaseConnector

CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents for CDS Hooks.

This connector facilitates the conversion between CDSRequest objects and Document objects, as well as the creation of CDSResponse objects from processed Documents.

ATTRIBUTE DESCRIPTION
hook_name

The name of the CDS Hook being used.

TYPE: str

Source code in healthchain/io/cdsfhirconnector.py
class CdsFhirConnector(BaseConnector):
    """
    CdsFhirConnector class for handling FHIR (Fast Healthcare Interoperability Resources) documents
    for CDS Hooks.

    This connector facilitates the conversion between CDSRequest objects and Document objects,
    as well as the creation of CDSResponse objects from processed Documents.

    Attributes:
        hook_name (str): The name of the CDS Hook being used.
    """

    def __init__(self, hook_name: str):
        self.hook_name = hook_name

    def input(self, in_data: CDSRequest) -> Document:
        """
        Converts a CDSRequest object into a Document object containing FHIR resources.

        This method takes a CDSRequest object as input, extracts the context and prefetch data,
        and creates a CdsFhirData object. It then returns a Document object containing the FHIR data
        and any extracted text content from DocumentReference resources.

        Args:
            in_data (CDSRequest): The input CDSRequest object containing context and prefetch data.

        Returns:
            Document: A Document object with the following attributes:
                - data: Either a string representation of the prefetch data, or if a DocumentReference
                  is present, the text content from that resource
                - hl7: Contains the CdsFhirData object with context and prefetch data

        Raises:
            ValueError: If neither prefetch nor fhirServer is provided in the input data
            NotImplementedError: If fhirServer is provided (not yet implemented)
            ValueError: If the provided prefetch data is invalid

        Note:
            The method currently only supports prefetch data and does not handle FHIR server interactions.
            When a DocumentReference resource is present in the prefetch data, its text content will be
            extracted and stored as the Document's data field.
        """
        if in_data.prefetch is None and in_data.fhirServer is None:
            raise ValueError(
                "Either prefetch or fhirServer must be provided to extract FHIR data!"
            )

        if in_data.fhirServer is not None:
            raise NotImplementedError("FHIR server is not implemented yet!")

        try:
            cds_fhir_data = CdsFhirData.create(
                context=in_data.context.model_dump(), prefetch=in_data.prefetch
            )
        except Exception as e:
            raise ValueError("Invalid prefetch data provided: {e}!") from e

        doc = Document(data=str(cds_fhir_data.model_dump_prefetch()))

        # Extract text from DocumentReference resources if present
        for entry in cds_fhir_data.prefetch.entry_field:
            if entry.resource_field.resourceType == "DocumentReference":
                doc.data = entry.resource_field.text_field.div_field

        doc.hl7.set_fhir_data(cds_fhir_data)

        return doc

    def output(self, out_data: Document) -> CDSResponse:
        """
        Generates a CDSResponse object from a processed Document object.

        This method takes a Document object that has been processed and potentially
        contains CDS cards and system actions. It creates and returns a CDSResponse
        object based on the contents of the Document.

        Args:
            out_data (Document): A Document object potentially containing CDS cards
                                 and system actions.

        Returns:
            CDSResponse: A response object containing CDS cards and optional system actions.
                         If no cards are found in the Document, an empty list of cards is returned.

        Note:
            - If out_data.cds_cards is None, a warning is logged and an empty list of cards is returned.
            - System actions (out_data.cds_actions) are included in the response if present.
        """
        if out_data.cds.get_cards() is None:
            log.warning("No CDS cards found in Document, returning empty list of cards")
            return CDSResponse(cards=[])

        return CDSResponse(
            cards=out_data.cds.get_cards(), systemActions=out_data.cds.get_actions()
        )

input(in_data)

Converts a CDSRequest object into a Document object containing FHIR resources.

This method takes a CDSRequest object as input, extracts the context and prefetch data, and creates a CdsFhirData object. It then returns a Document object containing the FHIR data and any extracted text content from DocumentReference resources.

PARAMETER DESCRIPTION
in_data

The input CDSRequest object containing context and prefetch data.

TYPE: CDSRequest

RETURNS DESCRIPTION
Document

A Document object with the following attributes: - data: Either a string representation of the prefetch data, or if a DocumentReference is present, the text content from that resource - hl7: Contains the CdsFhirData object with context and prefetch data

TYPE: Document

RAISES DESCRIPTION
ValueError

If neither prefetch nor fhirServer is provided in the input data

NotImplementedError

If fhirServer is provided (not yet implemented)

ValueError

If the provided prefetch data is invalid

Note

The method currently only supports prefetch data and does not handle FHIR server interactions. When a DocumentReference resource is present in the prefetch data, its text content will be extracted and stored as the Document's data field.

Source code in healthchain/io/cdsfhirconnector.py
def input(self, in_data: CDSRequest) -> Document:
    """
    Converts a CDSRequest object into a Document object containing FHIR resources.

    This method takes a CDSRequest object as input, extracts the context and prefetch data,
    and creates a CdsFhirData object. It then returns a Document object containing the FHIR data
    and any extracted text content from DocumentReference resources.

    Args:
        in_data (CDSRequest): The input CDSRequest object containing context and prefetch data.

    Returns:
        Document: A Document object with the following attributes:
            - data: Either a string representation of the prefetch data, or if a DocumentReference
              is present, the text content from that resource
            - hl7: Contains the CdsFhirData object with context and prefetch data

    Raises:
        ValueError: If neither prefetch nor fhirServer is provided in the input data
        NotImplementedError: If fhirServer is provided (not yet implemented)
        ValueError: If the provided prefetch data is invalid

    Note:
        The method currently only supports prefetch data and does not handle FHIR server interactions.
        When a DocumentReference resource is present in the prefetch data, its text content will be
        extracted and stored as the Document's data field.
    """
    if in_data.prefetch is None and in_data.fhirServer is None:
        raise ValueError(
            "Either prefetch or fhirServer must be provided to extract FHIR data!"
        )

    if in_data.fhirServer is not None:
        raise NotImplementedError("FHIR server is not implemented yet!")

    try:
        cds_fhir_data = CdsFhirData.create(
            context=in_data.context.model_dump(), prefetch=in_data.prefetch
        )
    except Exception as e:
        raise ValueError("Invalid prefetch data provided: {e}!") from e

    doc = Document(data=str(cds_fhir_data.model_dump_prefetch()))

    # Extract text from DocumentReference resources if present
    for entry in cds_fhir_data.prefetch.entry_field:
        if entry.resource_field.resourceType == "DocumentReference":
            doc.data = entry.resource_field.text_field.div_field

    doc.hl7.set_fhir_data(cds_fhir_data)

    return doc

output(out_data)

Generates a CDSResponse object from a processed Document object.

This method takes a Document object that has been processed and potentially contains CDS cards and system actions. It creates and returns a CDSResponse object based on the contents of the Document.

PARAMETER DESCRIPTION
out_data

A Document object potentially containing CDS cards and system actions.

TYPE: Document

RETURNS DESCRIPTION
CDSResponse

A response object containing CDS cards and optional system actions. If no cards are found in the Document, an empty list of cards is returned.

TYPE: CDSResponse

Note
  • If out_data.cds_cards is None, a warning is logged and an empty list of cards is returned.
  • System actions (out_data.cds_actions) are included in the response if present.
Source code in healthchain/io/cdsfhirconnector.py
def output(self, out_data: Document) -> CDSResponse:
    """
    Generates a CDSResponse object from a processed Document object.

    This method takes a Document object that has been processed and potentially
    contains CDS cards and system actions. It creates and returns a CDSResponse
    object based on the contents of the Document.

    Args:
        out_data (Document): A Document object potentially containing CDS cards
                             and system actions.

    Returns:
        CDSResponse: A response object containing CDS cards and optional system actions.
                     If no cards are found in the Document, an empty list of cards is returned.

    Note:
        - If out_data.cds_cards is None, a warning is logged and an empty list of cards is returned.
        - System actions (out_data.cds_actions) are included in the response if present.
    """
    if out_data.cds.get_cards() is None:
        log.warning("No CDS cards found in Document, returning empty list of cards")
        return CDSResponse(cards=[])

    return CDSResponse(
        cards=out_data.cds.get_cards(), systemActions=out_data.cds.get_actions()
    )