Skip to content

Spark

zenml.integrations.spark special

The Spark integration module to enable distributed processing for steps.

SparkIntegration (Integration)

Definition of Spark integration for ZenML.

Source code in zenml/integrations/spark/__init__.py
class SparkIntegration(Integration):
    """Definition of Spark integration for ZenML."""

    NAME = SPARK
    REQUIREMENTS = ["pyspark==3.2.1"]

    @classmethod
    def activate(cls) -> None:
        """Activating the corresponding Spark materializers."""
        from zenml.integrations.spark import materializers  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Spark integration.

        Returns:
            The flavor wrapper for the step operator flavor
        """
        from zenml.integrations.spark.flavors import (
            KubernetesSparkStepOperatorFlavor,
        )

        return [KubernetesSparkStepOperatorFlavor]

activate() classmethod

Activating the corresponding Spark materializers.

Source code in zenml/integrations/spark/__init__.py
@classmethod
def activate(cls) -> None:
    """Activating the corresponding Spark materializers."""
    from zenml.integrations.spark import materializers  # noqa

flavors() classmethod

Declare the stack component flavors for the Spark integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

The flavor wrapper for the step operator flavor

Source code in zenml/integrations/spark/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Spark integration.

    Returns:
        The flavor wrapper for the step operator flavor
    """
    from zenml.integrations.spark.flavors import (
        KubernetesSparkStepOperatorFlavor,
    )

    return [KubernetesSparkStepOperatorFlavor]

flavors special

Spark integration flavors.

spark_on_kubernetes_step_operator_flavor

Spark on Kubernetes step operator flavor.

KubernetesSparkStepOperatorConfig (SparkStepOperatorConfig) pydantic-model

Config for the Kubernetes Spark step operator.

Attributes:

Name Type Description
namespace Optional[str]

the namespace under which the driver and executor pods will run.

service_account Optional[str]

the service account that will be used by various Spark components (to create and watch the pods).

Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorConfig(SparkStepOperatorConfig):
    """Config for the Kubernetes Spark step operator.

    Attributes:
        namespace: the namespace under which the driver and executor pods
            will run.
        service_account: the service account that will be used by various Spark
            components (to create and watch the pods).
    """

    namespace: Optional[str] = None
    service_account: Optional[str] = None
KubernetesSparkStepOperatorFlavor (SparkStepOperatorFlavor)

Flavor for the Kubernetes Spark step operator.

Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorFlavor(SparkStepOperatorFlavor):
    """Flavor for the Kubernetes Spark step operator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SPARK_KUBERNETES_STEP_OPERATOR

    @property
    def config_class(self) -> Type[KubernetesSparkStepOperatorConfig]:
        """Returns `KubernetesSparkStepOperatorConfig` config class.

        Returns:
                The config class.
        """
        return KubernetesSparkStepOperatorConfig

    @property
    def implementation_class(self) -> Type["KubernetesSparkStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators import (
            KubernetesSparkStepOperator,
        )

        return KubernetesSparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig] property readonly

Returns KubernetesSparkStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig]

The config class.

implementation_class: Type[KubernetesSparkStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesSparkStepOperator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

spark_step_operator_flavor

Spark step operator flavor.

SparkStepOperatorConfig (BaseStepOperatorConfig, SparkStepOperatorSettings) pydantic-model

Spark step operator config.

Attributes:

Name Type Description
master str

is the master URL for the cluster. You might see different schemes for different cluster managers which are supported by Spark like Mesos, YARN, or Kubernetes. Within the context of this PR, the implementation supports Kubernetes as a cluster manager.

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseStepOperatorConfig, SparkStepOperatorSettings
):
    """Spark step operator config.

    Attributes:
        master: is the master URL for the cluster. You might see different
            schemes for different cluster managers which are supported by Spark
            like Mesos, YARN, or Kubernetes. Within the context of this PR,
            the implementation supports Kubernetes as a cluster manager.
    """

    master: str
SparkStepOperatorFlavor (BaseStepOperatorFlavor)

Spark step operator flavor.

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorFlavor(BaseStepOperatorFlavor):
    """Spark step operator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return "spark"

    @property
    def config_class(self) -> Type[SparkStepOperatorConfig]:
        """Returns `SparkStepOperatorConfig` config class.

        Returns:
                The config class.
        """
        return SparkStepOperatorConfig

    @property
    def implementation_class(self) -> Type["SparkStepOperator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.spark.step_operators.spark_step_operator import (
            SparkStepOperator,
        )

        return SparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig] property readonly

Returns SparkStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig]

The config class.

implementation_class: Type[SparkStepOperator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SparkStepOperator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

SparkStepOperatorSettings (BaseSettings) pydantic-model

Spark step operator settings.

Attributes:

Name Type Description
deploy_mode str

can either be 'cluster' (default) or 'client' and it decides where the driver node of the application will run.

submit_kwargs Optional[Dict[str, Any]]

is the JSON string of a dict, which will be used to define additional params if required (Spark has quite a lot of different parameters, so including them, all in the step operator was not implemented).

Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorSettings(BaseSettings):
    """Spark step operator settings.

    Attributes:
        deploy_mode: can either be 'cluster' (default) or 'client' and it
            decides where the driver node of the application will run.
        submit_kwargs: is the JSON string of a dict, which will be used
            to define additional params if required (Spark has quite a
            lot of different parameters, so including them, all in the step
            operator was not implemented).
    """

    deploy_mode: str = "cluster"
    submit_kwargs: Optional[Dict[str, Any]] = None

    @validator("submit_kwargs", pre=True)
    def _convert_json_string(
        cls, value: Union[None, str, Dict[str, Any]]
    ) -> Optional[Dict[str, Any]]:
        """Converts potential JSON strings passed via the CLI to dictionaries.

        Args:
            value: The value to convert.

        Returns:
            The converted value.

        Raises:
            TypeError: If the value is not a `str`, `Dict` or `None`.
            ValueError: If the value is an invalid json string or a json string
                that does not decode into a dictionary.
        """
        if isinstance(value, str):
            try:
                dict_ = json.loads(value)
            except json.JSONDecodeError as e:
                raise ValueError(f"Invalid json string '{value}'") from e

            if not isinstance(dict_, Dict):
                raise ValueError(
                    f"Json string '{value}' did not decode into a dictionary."
                )

            return dict_
        elif isinstance(value, Dict) or value is None:
            return value
        else:
            raise TypeError(f"{value} is not a json string or a dictionary.")

materializers special

Spark Materializers.

spark_dataframe_materializer

Implementation of the Spark Dataframe Materializer.

SparkDataFrameMaterializer (BaseMaterializer)

Materializer to read/write Spark dataframes.

Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
class SparkDataFrameMaterializer(BaseMaterializer):
    """Materializer to read/write Spark dataframes."""

    ASSOCIATED_TYPES = (DataFrame,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> DataFrame:
        """Reads and returns a spark dataframe.

        Args:
            data_type: The type of the data to read.

        Returns:
            A loaded spark dataframe.
        """
        super().handle_input(data_type)
        # Create the Spark session
        spark = SparkSession.builder.getOrCreate()

        # Read the data
        path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
        return spark.read.parquet(path)

    def handle_return(self, df: DataFrame) -> None:
        """Writes a spark dataframe.

        Args:
            df: A spark dataframe object.
        """
        super().handle_return(df)

        # Write the dataframe to the artifact store
        path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
        df.write.parquet(path)
handle_input(self, data_type)

Reads and returns a spark dataframe.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
DataFrame

A loaded spark dataframe.

Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def handle_input(self, data_type: Type[Any]) -> DataFrame:
    """Reads and returns a spark dataframe.

    Args:
        data_type: The type of the data to read.

    Returns:
        A loaded spark dataframe.
    """
    super().handle_input(data_type)
    # Create the Spark session
    spark = SparkSession.builder.getOrCreate()

    # Read the data
    path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
    return spark.read.parquet(path)
handle_return(self, df)

Writes a spark dataframe.

Parameters:

Name Type Description Default
df DataFrame

A spark dataframe object.

required
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def handle_return(self, df: DataFrame) -> None:
    """Writes a spark dataframe.

    Args:
        df: A spark dataframe object.
    """
    super().handle_return(df)

    # Write the dataframe to the artifact store
    path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
    df.write.parquet(path)

spark_model_materializer

Implementation of the Spark Model Materializer.

SparkModelMaterializer (BaseMaterializer)

Materializer to read/write Spark models.

Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
class SparkModelMaterializer(BaseMaterializer):
    """Materializer to read/write Spark models."""

    ASSOCIATED_TYPES = (Transformer, Estimator, Model)
    ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)

    def handle_input(
        self, model_type: Type[Any]
    ) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
        """Reads and returns a Spark ML model.

        Args:
            model_type: The type of the model to read.

        Returns:
            A loaded spark model.
        """
        super().handle_input(model_type)
        path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
        return model_type.load(path)  # type: ignore[no-any-return]

    def handle_return(
        self, model: Union[Transformer, Estimator, Model]  # type: ignore[type-arg]
    ) -> None:
        """Writes a spark model.

        Args:
            model: A spark model.
        """
        super().handle_return(model)

        # Write the dataframe to the artifact store
        path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
        model.save(path)  # type: ignore[union-attr]
handle_input(self, model_type)

Reads and returns a Spark ML model.

Parameters:

Name Type Description Default
model_type Type[Any]

The type of the model to read.

required

Returns:

Type Description
Union[pyspark.ml.base.Transformer, pyspark.ml.base.Estimator, pyspark.ml.base.Model]

A loaded spark model.

Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def handle_input(
    self, model_type: Type[Any]
) -> Union[Transformer, Estimator, Model]:  # type: ignore[type-arg]
    """Reads and returns a Spark ML model.

    Args:
        model_type: The type of the model to read.

    Returns:
        A loaded spark model.
    """
    super().handle_input(model_type)
    path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
    return model_type.load(path)  # type: ignore[no-any-return]
handle_return(self, model)

Writes a spark model.

Parameters:

Name Type Description Default
model Union[pyspark.ml.base.Transformer, pyspark.ml.base.Estimator, pyspark.ml.base.Model]

A spark model.

required
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def handle_return(
    self, model: Union[Transformer, Estimator, Model]  # type: ignore[type-arg]
) -> None:
    """Writes a spark model.

    Args:
        model: A spark model.
    """
    super().handle_return(model)

    # Write the dataframe to the artifact store
    path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
    model.save(path)  # type: ignore[union-attr]

step_operators special

Spark Step Operators.

kubernetes_step_operator

Implementation of the Kubernetes Spark Step Operator.

KubernetesSparkStepOperator (SparkStepOperator)

Step operator which runs Steps with Spark on Kubernetes.

Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
class KubernetesSparkStepOperator(SparkStepOperator):
    """Step operator which runs Steps with Spark on Kubernetes."""

    @property
    def config(self) -> KubernetesSparkStepOperatorConfig:
        """Returns the `KubernetesSparkStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(KubernetesSparkStepOperatorConfig, self._config)

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote container
            registry and a remote artifact store.
        """

        def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the Spark "
                    "step operator."
                )

            container_registry = stack.container_registry
            assert container_registry is not None

            if container_registry.config.is_local:
                return False, (
                    "The Spark step operator runs code remotely and "
                    "needs to push/pull Docker images, but the "
                    f"container registry `{container_registry.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote container registry when using the "
                    "Spark step operator."
                )

            return True, ""

        return StackValidator(
            required_components={StackComponentType.CONTAINER_REGISTRY},
            custom_validation_function=_validate_remote_components,
        )

    @property
    def application_path(self) -> Any:
        """Provides the application path in the corresponding docker image.

        Returns:
            The path to the application entrypoint within the docker image
        """
        return f"local://{DOCKER_IMAGE_WORKDIR}/{ENTRYPOINT_NAME}"

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.

        Raises:
            FileExistsError: If the entrypoint file already exists.
        """
        steps_to_run = [
            step
            for step in deployment.steps.values()
            if step.config.step_operator == self.name
        ]
        if not steps_to_run:
            return

        entrypoint_path = os.path.join(get_source_root_path(), ENTRYPOINT_NAME)

        try:
            fileio.copy(LOCAL_ENTRYPOINT, entrypoint_path, overwrite=False)
        except OSError:
            raise FileExistsError(
                f"The Kubernetes Spark step operator needs to copy the step "
                f"entrypoint to {entrypoint_path}, however a file with this "
                f"path already exists."
            )

        try:
            # Build and push the image
            docker_image_builder = PipelineDockerImageBuilder()
            image_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
        finally:
            fileio.remove(entrypoint_path)

        for step in steps_to_run:
            step.config.extra[SPARK_DOCKER_IMAGE_KEY] = image_digest

    def _backend_configuration(
        self,
        spark_config: SparkConf,
        step_config: "StepConfiguration",
    ) -> None:
        """Configures Spark to run on Kubernetes.

        This method will build and push a docker image for the drivers and
        executors and adjust the config accordingly.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            step_config: Configuration of the step to run.
        """
        docker_image = step_config.extra[SPARK_DOCKER_IMAGE_KEY]
        # Adjust the spark configuration
        spark_config.set("spark.kubernetes.container.image", docker_image)
        if self.config.namespace:
            spark_config.set(
                "spark.kubernetes.namespace",
                self.config.namespace,
            )
        if self.config.service_account:
            spark_config.set(
                "spark.kubernetes.authenticate.driver.serviceAccountName",
                self.config.service_account,
            )
application_path: Any property readonly

Provides the application path in the corresponding docker image.

Returns:

Type Description
Any

The path to the application entrypoint within the docker image

config: KubernetesSparkStepOperatorConfig property readonly

Returns the KubernetesSparkStepOperatorConfig config.

Returns:

Type Description
KubernetesSparkStepOperatorConfig

The configuration.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote container registry and a remote artifact store.

prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required

Exceptions:

Type Description
FileExistsError

If the entrypoint file already exists.

Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.

    Raises:
        FileExistsError: If the entrypoint file already exists.
    """
    steps_to_run = [
        step
        for step in deployment.steps.values()
        if step.config.step_operator == self.name
    ]
    if not steps_to_run:
        return

    entrypoint_path = os.path.join(get_source_root_path(), ENTRYPOINT_NAME)

    try:
        fileio.copy(LOCAL_ENTRYPOINT, entrypoint_path, overwrite=False)
    except OSError:
        raise FileExistsError(
            f"The Kubernetes Spark step operator needs to copy the step "
            f"entrypoint to {entrypoint_path}, however a file with this "
            f"path already exists."
        )

    try:
        # Build and push the image
        docker_image_builder = PipelineDockerImageBuilder()
        image_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
    finally:
        fileio.remove(entrypoint_path)

    for step in steps_to_run:
        step.config.extra[SPARK_DOCKER_IMAGE_KEY] = image_digest

spark_entrypoint_configuration

Spark step operator entrypoint configuration.

SparkEntrypointConfiguration (StepOperatorEntrypointConfiguration)

Entrypoint configuration for the Spark step operator.

Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
class SparkEntrypointConfiguration(StepOperatorEntrypointConfiguration):
    """Entrypoint configuration for the Spark step operator."""

    def run(self) -> None:
        """Runs the entrypoint configuration.

        This prepends the directory containing the source files to the python
        path so that spark can find them.
        """
        with source_utils.prepend_python_path([DOCKER_IMAGE_WORKDIR]):
            super().run()
run(self)

Runs the entrypoint configuration.

This prepends the directory containing the source files to the python path so that spark can find them.

Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
def run(self) -> None:
    """Runs the entrypoint configuration.

    This prepends the directory containing the source files to the python
    path so that spark can find them.
    """
    with source_utils.prepend_python_path([DOCKER_IMAGE_WORKDIR]):
        super().run()

spark_step_operator

Implementation of the Spark Step Operator.

SparkStepOperator (BaseStepOperator)

Base class for all Spark-related step operators.

Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
class SparkStepOperator(BaseStepOperator):
    """Base class for all Spark-related step operators."""

    @property
    def config(self) -> SparkStepOperatorConfig:
        """Returns the `SparkStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(SparkStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Spark step operator.

        Returns:
            The settings class.
        """
        return SparkStepOperatorSettings

    @property
    def application_path(self) -> Optional[str]:
        """Optional method for providing the application path.

        This is especially critical when using 'spark-submit' as it defines the
        path (to the application in the environment where Spark is running)
        which is used within the command.

        For more information on how to set this property please check:

        https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management

        Returns:
            The path to the application entrypoint
        """
        return None

    def _resource_configuration(
        self,
        spark_config: SparkConf,
        resource_settings: "ResourceSettings",
    ) -> None:
        """Configures Spark to handle the resource settings.

        This should serve as the layer between our ResourceSettings
        and Spark's own ways of configuring its resources.

        Note: This is still work-in-progress. In the future, we would like to
        enable much more than executor cores and memory with a dedicated
        ResourceSettings object.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            resource_settings: the resource settings for this step
        """
        if resource_settings.cpu_count:
            spark_config.set(
                "spark.executor.cores",
                str(int(resource_settings.cpu_count)),
            )

        if resource_settings.memory:
            # TODO[LOW]: Fix the conversion of the memory unit with a new
            #   type of resource configuration.
            spark_config.set(
                "spark.executor.memory",
                resource_settings.memory.lower().strip("b"),
            )

    def _backend_configuration(
        self,
        spark_config: SparkConf,
        step_config: "StepConfiguration",
    ) -> None:
        """Configures Spark to handle backends like YARN, Mesos or Kubernetes.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            step_config: Configuration of the step to run.
        """

    def _io_configuration(self, spark_config: SparkConf) -> None:
        """Configures Spark to handle different input/output sources.

        When you work with the Spark integration, you get materializers
        such as SparkDataFrameMaterializer, SparkModelMaterializer. However, in
        many cases, these materializer work only if the environment, where
        Spark is running, is configured according to the artifact store.

        Take s3 as an example. When you want to save a dataframe to an S3
        artifact store, you need to provide configuration parameters such as,
        '"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" to
        Spark. This method aims to provide these configuration parameters.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters

        Raises:
            RuntimeError: when the step operator is being used with an S3
                artifact store and the artifact store does not have the
                required authentication
        """
        # Get active artifact store
        client = Client()
        artifact_store = client.active_stack.artifact_store

        from zenml.integrations.s3 import S3_ARTIFACT_STORE_FLAVOR

        # If S3, preconfigure the spark session
        if artifact_store.flavor == S3_ARTIFACT_STORE_FLAVOR:
            (
                key,
                secret,
                _,
            ) = artifact_store._get_credentials()  # type:ignore[attr-defined]
            if key and secret:
                spark_config.setAll(
                    [
                        ("spark.hadoop.fs.s3a.fast.upload", "true"),
                        (
                            "spark.hadoop.fs.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3AFileSystem",
                        ),
                        (
                            "spark.hadoop.fs.AbstractFileSystem.s3.impl",
                            "org.apache.hadoop.fs.s3a.S3A",
                        ),
                        (
                            "spark.hadoop.fs.s3a.aws.credentials.provider",
                            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
                        ),
                        ("spark.hadoop.fs.s3a.access.key", f"{key}"),
                        ("spark.hadoop.fs.s3a.secret.key", f"{secret}"),
                    ]
                )
            else:
                raise RuntimeError(
                    "When you use an Spark step operator with an S3 artifact "
                    "store, please make sure that your artifact store has"
                    "defined the required credentials namely the access key "
                    "and the secret access key."
                )
        else:
            logger.warning(
                "In most cases, the Spark step operator requires additional "
                "configuration based on the artifact store flavor you are "
                "using. That also means, that when you use this step operator "
                "with certain artifact store flavor, ZenML can take care of "
                "the pre-configuration. However, the artifact store flavor "
                f"'{artifact_store.flavor}' featured in this stack is not "
                f"known to this step operator and it might require additional "
                f"configuration."
            )

    def _additional_configuration(
        self, spark_config: SparkConf, settings: SparkStepOperatorSettings
    ) -> None:
        """Appends the user-defined configuration parameters.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            settings: Step operator settings for the current step run.
        """
        # Add the additional parameters
        if settings.submit_kwargs:
            for k, v in settings.submit_kwargs.items():
                spark_config.set(k, v)

    def _launch_spark_job(
        self,
        spark_config: SparkConf,
        deploy_mode: str,
        entrypoint_command: List[str],
    ) -> None:
        """Generates and executes a spark-submit command.

        Args:
            spark_config: a SparkConf object which collects all the
                configuration parameters
            deploy_mode: The spark deploy mode to use.
            entrypoint_command: The entrypoint command to run.

        Raises:
            RuntimeError: if the spark-submit fails
        """
        # Base spark-submit command
        command = [
            f"spark-submit "
            f"--master {self.config.master} "
            f"--deploy-mode {deploy_mode}"
        ]

        # Add the configuration parameters
        command += [f"--conf {c[0]}={c[1]}" for c in spark_config.getAll()]

        # Add the application path
        command.append(self.application_path)  # type: ignore[arg-type]

        # Update the default step operator command to use the spark entrypoint
        # configuration
        original_args = SparkEntrypointConfiguration._parse_arguments(
            entrypoint_command
        )
        command += SparkEntrypointConfiguration.get_entrypoint_arguments(
            **original_args
        )

        final_command = " ".join(command)

        # Execute the spark-submit
        process = subprocess.Popen(
            final_command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            universal_newlines=True,
            shell=True,
        )
        stdout, stderr = process.communicate()

        if process.returncode != 0:
            raise RuntimeError(stderr)
        print(stdout)

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
    ) -> None:
        """Launches a step on Spark.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.
        """
        settings = cast(SparkStepOperatorSettings, self.get_settings(info))
        # Start off with an empty configuration
        conf = SparkConf()

        # Add the resource configuration such as cores, memory.
        self._resource_configuration(
            spark_config=conf,
            resource_settings=info.config.resource_settings,
        )

        # Add the backend configuration such as namespace, docker images names.
        self._backend_configuration(spark_config=conf, step_config=info.config)

        # Add the IO configuration for the inputs and the outputs
        self._io_configuration(
            spark_config=conf,
        )

        # Add any additional configuration given by the user.
        self._additional_configuration(spark_config=conf, settings=settings)

        # Generate a spark-submit command given the configuration
        self._launch_spark_job(
            spark_config=conf,
            deploy_mode=settings.deploy_mode,
            entrypoint_command=entrypoint_command,
        )
application_path: Optional[str] property readonly

Optional method for providing the application path.

This is especially critical when using 'spark-submit' as it defines the path (to the application in the environment where Spark is running) which is used within the command.

For more information on how to set this property please check:

https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management

Returns:

Type Description
Optional[str]

The path to the application entrypoint

config: SparkStepOperatorConfig property readonly

Returns the SparkStepOperatorConfig config.

Returns:

Type Description
SparkStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Spark step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

launch(self, info, entrypoint_command)

Launches a step on Spark.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
) -> None:
    """Launches a step on Spark.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.
    """
    settings = cast(SparkStepOperatorSettings, self.get_settings(info))
    # Start off with an empty configuration
    conf = SparkConf()

    # Add the resource configuration such as cores, memory.
    self._resource_configuration(
        spark_config=conf,
        resource_settings=info.config.resource_settings,
    )

    # Add the backend configuration such as namespace, docker images names.
    self._backend_configuration(spark_config=conf, step_config=info.config)

    # Add the IO configuration for the inputs and the outputs
    self._io_configuration(
        spark_config=conf,
    )

    # Add any additional configuration given by the user.
    self._additional_configuration(spark_config=conf, settings=settings)

    # Generate a spark-submit command given the configuration
    self._launch_spark_job(
        spark_config=conf,
        deploy_mode=settings.deploy_mode,
        entrypoint_command=entrypoint_command,
    )