Skip to content

Langchain

zenml.integrations.langchain special

Initialization of the langchain integration.

LangchainIntegration (Integration)

Definition of langchain integration for ZenML.

Source code in zenml/integrations/langchain/__init__.py
class LangchainIntegration(Integration):
    """Definition of langchain integration for ZenML."""

    NAME = LANGCHAIN
    REQUIREMENTS = ["langchain>=0.0.116"]

    @classmethod
    def activate(cls) -> None:
        """Activates the integration."""
        from zenml.integrations.langchain import materializers  # noqa

activate() classmethod

Activates the integration.

Source code in zenml/integrations/langchain/__init__.py
@classmethod
def activate(cls) -> None:
    """Activates the integration."""
    from zenml.integrations.langchain import materializers  # noqa

materializers special

Initialization of the langchain materializer.

document_materializer

Implementation of ZenML's Langchain Document materializer.

LangchainDocumentMaterializer (BaseMaterializer)

Handle Langchain Document objects.

Source code in zenml/integrations/langchain/materializers/document_materializer.py
class LangchainDocumentMaterializer(BaseMaterializer):
    """Handle Langchain Document objects."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    ASSOCIATED_TYPES = (Document,)

    def __init__(self, **kwargs: Any) -> None:
        """Initializes the Langchain Document materializer.

        Args:
            **kwargs: Keyword arguments.
        """
        super().__init__(**kwargs)
        self._pydantic_materializer = PydanticMaterializer(**kwargs)

    def load(self, data_type: Type[Document]) -> Any:
        """Reads Langchain Document from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        return self._pydantic_materializer.load(data_type)

    def save(self, data: Document) -> None:
        """Serialize a Langchain Document to JSON.

        Args:
            data: The data to store.
        """
        self._pydantic_materializer.save(data)

    def extract_metadata(self, data: Document) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given Langchain Document object.

        Args:
            data: The Langchain Document object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        return self._pydantic_materializer.extract_metadata(data)
__init__(self, **kwargs) special

Initializes the Langchain Document materializer.

Parameters:

Name Type Description Default
**kwargs Any

Keyword arguments.

{}
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def __init__(self, **kwargs: Any) -> None:
    """Initializes the Langchain Document materializer.

    Args:
        **kwargs: Keyword arguments.
    """
    super().__init__(**kwargs)
    self._pydantic_materializer = PydanticMaterializer(**kwargs)
extract_metadata(self, data)

Extract metadata from the given Langchain Document object.

Parameters:

Name Type Description Default
data Document

The Langchain Document object to extract metadata from.

required

Returns:

Type Description
Dict[str, 'MetadataType']

The extracted metadata as a dictionary.

Source code in zenml/integrations/langchain/materializers/document_materializer.py
def extract_metadata(self, data: Document) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given Langchain Document object.

    Args:
        data: The Langchain Document object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    return self._pydantic_materializer.extract_metadata(data)
load(self, data_type)

Reads Langchain Document from JSON.

Parameters:

Name Type Description Default
data_type Type[Document]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/integrations/langchain/materializers/document_materializer.py
def load(self, data_type: Type[Document]) -> Any:
    """Reads Langchain Document from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    return self._pydantic_materializer.load(data_type)
save(self, data)

Serialize a Langchain Document to JSON.

Parameters:

Name Type Description Default
data Document

The data to store.

required
Source code in zenml/integrations/langchain/materializers/document_materializer.py
def save(self, data: Document) -> None:
    """Serialize a Langchain Document to JSON.

    Args:
        data: The data to store.
    """
    self._pydantic_materializer.save(data)

openai_embedding_materializer

Implementation of the Langchain OpenAI embedding materializer.

LangchainOpenaiEmbeddingMaterializer (BaseMaterializer)

Handle langchain openai embedding objects.

Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
class LangchainOpenaiEmbeddingMaterializer(BaseMaterializer):
    """Handle langchain openai embedding objects."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.MODEL
    ASSOCIATED_TYPES = (OpenAIEmbeddings,)

    def load(self, data_type: Type[OpenAIEmbeddings]) -> OpenAIEmbeddings:
        """Reads an OpenAI embedding from a pickle file.

        Args:
            data_type: The type of the vector store.

        Returns:
            The vector store.
        """
        super().load(data_type)

        # validate langchain package version
        langchain_version_filepath = os.path.join(
            self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
        )
        source_langchain_version = read_file_contents_as_string(
            langchain_version_filepath
        )
        try:
            package_version = pkg_resources.get_distribution(
                LANGCHAIN_PACKAGE_NAME
            ).version
        except pkg_resources.DistributionNotFound:
            logger.warn(
                f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
            )
            package_version = "not installed"
        if source_langchain_version != package_version:
            logger.warn(
                f"Your `OpenAIEmbedding` was materialized with {source_langchain_version} "
                f"but you are currently using {package_version}. "
                f"This might cause unexpected behavior. Attempting to load."
            )

        # validate python version
        python_version_filepath = os.path.join(
            self.uri, DEFAULT_PYTHON_VERSION_FILENAME
        )
        source_python_version = read_file_contents_as_string(
            python_version_filepath
        )
        current_python_version = Environment().python_version()
        if source_python_version != current_python_version:
            logger.warn(
                f"Your `OpenAIEmbedding` was materialized with {source_python_version} "
                f"but you are currently using {current_python_version}. "
                f"This might cause unexpected behavior. Attempting to load."
            )

        pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
        with fileio.open(pickle_filepath, "rb") as fid:
            embedding = pickle.load(fid)
        return cast(OpenAIEmbeddings, embedding)

    def save(self, embedding: OpenAIEmbeddings) -> None:
        """Save an OpenAI embedding as a pickle file.

        Args:
            embedding: The embedding to save.
        """
        super().save(embedding)

        pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
        with fileio.open(pickle_filepath, "wb") as fid:
            pickle.dump(embedding, fid)

        # save python version for validation on loading
        python_version_filepath = os.path.join(
            self.uri, DEFAULT_PYTHON_VERSION_FILENAME
        )
        current_python_version = Environment().python_version()
        write_file_contents_as_string(
            python_version_filepath, current_python_version
        )

        # save langchain package version foa
        try:
            package_version = pkg_resources.get_distribution(
                LANGCHAIN_PACKAGE_NAME
            ).version
        except pkg_resources.DistributionNotFound:
            logger.warn(
                f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
            )
            package_version = "not installed"
        langchain_version_filepath = os.path.join(
            self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
        )
        write_file_contents_as_string(
            langchain_version_filepath, package_version
        )
load(self, data_type)

Reads an OpenAI embedding from a pickle file.

Parameters:

Name Type Description Default
data_type Type[langchain.embeddings.openai.OpenAIEmbeddings]

The type of the vector store.

required

Returns:

Type Description
OpenAIEmbeddings

The vector store.

Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
def load(self, data_type: Type[OpenAIEmbeddings]) -> OpenAIEmbeddings:
    """Reads an OpenAI embedding from a pickle file.

    Args:
        data_type: The type of the vector store.

    Returns:
        The vector store.
    """
    super().load(data_type)

    # validate langchain package version
    langchain_version_filepath = os.path.join(
        self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
    )
    source_langchain_version = read_file_contents_as_string(
        langchain_version_filepath
    )
    try:
        package_version = pkg_resources.get_distribution(
            LANGCHAIN_PACKAGE_NAME
        ).version
    except pkg_resources.DistributionNotFound:
        logger.warn(
            f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
        )
        package_version = "not installed"
    if source_langchain_version != package_version:
        logger.warn(
            f"Your `OpenAIEmbedding` was materialized with {source_langchain_version} "
            f"but you are currently using {package_version}. "
            f"This might cause unexpected behavior. Attempting to load."
        )

    # validate python version
    python_version_filepath = os.path.join(
        self.uri, DEFAULT_PYTHON_VERSION_FILENAME
    )
    source_python_version = read_file_contents_as_string(
        python_version_filepath
    )
    current_python_version = Environment().python_version()
    if source_python_version != current_python_version:
        logger.warn(
            f"Your `OpenAIEmbedding` was materialized with {source_python_version} "
            f"but you are currently using {current_python_version}. "
            f"This might cause unexpected behavior. Attempting to load."
        )

    pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
    with fileio.open(pickle_filepath, "rb") as fid:
        embedding = pickle.load(fid)
    return cast(OpenAIEmbeddings, embedding)
save(self, embedding)

Save an OpenAI embedding as a pickle file.

Parameters:

Name Type Description Default
embedding OpenAIEmbeddings

The embedding to save.

required
Source code in zenml/integrations/langchain/materializers/openai_embedding_materializer.py
def save(self, embedding: OpenAIEmbeddings) -> None:
    """Save an OpenAI embedding as a pickle file.

    Args:
        embedding: The embedding to save.
    """
    super().save(embedding)

    pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
    with fileio.open(pickle_filepath, "wb") as fid:
        pickle.dump(embedding, fid)

    # save python version for validation on loading
    python_version_filepath = os.path.join(
        self.uri, DEFAULT_PYTHON_VERSION_FILENAME
    )
    current_python_version = Environment().python_version()
    write_file_contents_as_string(
        python_version_filepath, current_python_version
    )

    # save langchain package version foa
    try:
        package_version = pkg_resources.get_distribution(
            LANGCHAIN_PACKAGE_NAME
        ).version
    except pkg_resources.DistributionNotFound:
        logger.warn(
            f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
        )
        package_version = "not installed"
    langchain_version_filepath = os.path.join(
        self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
    )
    write_file_contents_as_string(
        langchain_version_filepath, package_version
    )

vector_store_materializer

Implementation of the langchain vector store materializer.

LangchainVectorStoreMaterializer (BaseMaterializer)

Handle langchain vector store objects.

Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
class LangchainVectorStoreMaterializer(BaseMaterializer):
    """Handle langchain vector store objects."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    ASSOCIATED_TYPES = (VectorStore,)

    def load(self, data_type: Type[VectorStore]) -> VectorStore:
        """Reads a langchain vector store from a pickle file.

        Args:
            data_type: The type of the vector store.

        Returns:
            The vector store.
        """
        super().load(data_type)

        # validate langchain package version
        langchain_version_filepath = os.path.join(
            self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
        )
        source_langchain_version = read_file_contents_as_string(
            langchain_version_filepath
        )
        try:
            package_version = pkg_resources.get_distribution(
                LANGCHAIN_PACKAGE_NAME
            ).version
        except pkg_resources.DistributionNotFound:
            logger.warn(
                f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
            )
            package_version = "not installed"
        if source_langchain_version != package_version:
            logger.warn(
                f"Your `VectorStore` was materialized with {source_langchain_version} "
                f"but you are currently using {package_version}. "
                f"This might cause unexpected behavior. Attempting to load."
            )

        # validate python version
        python_version_filepath = os.path.join(
            self.uri, DEFAULT_PYTHON_VERSION_FILENAME
        )
        source_python_version = read_file_contents_as_string(
            python_version_filepath
        )
        current_python_version = Environment().python_version()
        if source_python_version != current_python_version:
            logger.warn(
                f"Your `VectorStore` was materialized with {source_python_version} "
                f"but you are currently using {current_python_version}. "
                f"This might cause unexpected behavior. Attempting to load."
            )

        pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
        with fileio.open(pickle_filepath, "rb") as fid:
            vector_store = pickle.load(fid)
        return cast(VectorStore, vector_store)

    def save(self, vector_store: VectorStore) -> None:
        """Save a langchain vector store as a pickle file.

        Args:
            vector_store: The vector store to save.
        """
        super().save(vector_store)

        pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
        with fileio.open(pickle_filepath, "wb") as fid:
            pickle.dump(vector_store, fid)

        # save python version for validation on loading
        python_version_filepath = os.path.join(
            self.uri, DEFAULT_PYTHON_VERSION_FILENAME
        )
        current_python_version = Environment().python_version()
        write_file_contents_as_string(
            python_version_filepath, current_python_version
        )

        # save langchain package version foa
        try:
            package_version = pkg_resources.get_distribution(
                LANGCHAIN_PACKAGE_NAME
            ).version
        except pkg_resources.DistributionNotFound:
            logger.warn(
                f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
            )
            package_version = "not installed"
        langchain_version_filepath = os.path.join(
            self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
        )
        write_file_contents_as_string(
            langchain_version_filepath, package_version
        )
load(self, data_type)

Reads a langchain vector store from a pickle file.

Parameters:

Name Type Description Default
data_type Type[langchain.vectorstores.base.VectorStore]

The type of the vector store.

required

Returns:

Type Description
VectorStore

The vector store.

Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
def load(self, data_type: Type[VectorStore]) -> VectorStore:
    """Reads a langchain vector store from a pickle file.

    Args:
        data_type: The type of the vector store.

    Returns:
        The vector store.
    """
    super().load(data_type)

    # validate langchain package version
    langchain_version_filepath = os.path.join(
        self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
    )
    source_langchain_version = read_file_contents_as_string(
        langchain_version_filepath
    )
    try:
        package_version = pkg_resources.get_distribution(
            LANGCHAIN_PACKAGE_NAME
        ).version
    except pkg_resources.DistributionNotFound:
        logger.warn(
            f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
        )
        package_version = "not installed"
    if source_langchain_version != package_version:
        logger.warn(
            f"Your `VectorStore` was materialized with {source_langchain_version} "
            f"but you are currently using {package_version}. "
            f"This might cause unexpected behavior. Attempting to load."
        )

    # validate python version
    python_version_filepath = os.path.join(
        self.uri, DEFAULT_PYTHON_VERSION_FILENAME
    )
    source_python_version = read_file_contents_as_string(
        python_version_filepath
    )
    current_python_version = Environment().python_version()
    if source_python_version != current_python_version:
        logger.warn(
            f"Your `VectorStore` was materialized with {source_python_version} "
            f"but you are currently using {current_python_version}. "
            f"This might cause unexpected behavior. Attempting to load."
        )

    pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
    with fileio.open(pickle_filepath, "rb") as fid:
        vector_store = pickle.load(fid)
    return cast(VectorStore, vector_store)
save(self, vector_store)

Save a langchain vector store as a pickle file.

Parameters:

Name Type Description Default
vector_store VectorStore

The vector store to save.

required
Source code in zenml/integrations/langchain/materializers/vector_store_materializer.py
def save(self, vector_store: VectorStore) -> None:
    """Save a langchain vector store as a pickle file.

    Args:
        vector_store: The vector store to save.
    """
    super().save(vector_store)

    pickle_filepath = os.path.join(self.uri, DEFAULT_PICKLE_FILENAME)
    with fileio.open(pickle_filepath, "wb") as fid:
        pickle.dump(vector_store, fid)

    # save python version for validation on loading
    python_version_filepath = os.path.join(
        self.uri, DEFAULT_PYTHON_VERSION_FILENAME
    )
    current_python_version = Environment().python_version()
    write_file_contents_as_string(
        python_version_filepath, current_python_version
    )

    # save langchain package version foa
    try:
        package_version = pkg_resources.get_distribution(
            LANGCHAIN_PACKAGE_NAME
        ).version
    except pkg_resources.DistributionNotFound:
        logger.warn(
            f"'{LANGCHAIN_PACKAGE_NAME}' package is not installed."
        )
        package_version = "not installed"
    langchain_version_filepath = os.path.join(
        self.uri, DEFAULT_LANGCHAIN_VERSION_FILENAME
    )
    write_file_contents_as_string(
        langchain_version_filepath, package_version
    )