Spark
zenml.integrations.spark
special
The Spark integration module to enable distributed processing for steps.
SparkIntegration (Integration)
Definition of Spark integration for ZenML.
Source code in zenml/integrations/spark/__init__.py
class SparkIntegration(Integration):
"""Definition of Spark integration for ZenML."""
NAME = SPARK
REQUIREMENTS = ["pyspark==3.2.1"]
@classmethod
def activate(cls) -> None:
"""Activating the corresponding Spark materializers."""
from zenml.integrations.spark import materializers # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Spark integration.
Returns:
The flavor wrapper for the step operator flavor
"""
from zenml.integrations.spark.flavors import (
KubernetesSparkStepOperatorFlavor,
)
return [KubernetesSparkStepOperatorFlavor]
activate()
classmethod
Activating the corresponding Spark materializers.
Source code in zenml/integrations/spark/__init__.py
@classmethod
def activate(cls) -> None:
"""Activating the corresponding Spark materializers."""
from zenml.integrations.spark import materializers # noqa
flavors()
classmethod
Declare the stack component flavors for the Spark integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
The flavor wrapper for the step operator flavor |
Source code in zenml/integrations/spark/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Spark integration.
Returns:
The flavor wrapper for the step operator flavor
"""
from zenml.integrations.spark.flavors import (
KubernetesSparkStepOperatorFlavor,
)
return [KubernetesSparkStepOperatorFlavor]
flavors
special
Spark integration flavors.
spark_on_kubernetes_step_operator_flavor
Spark on Kubernetes step operator flavor.
KubernetesSparkStepOperatorConfig (SparkStepOperatorConfig)
pydantic-model
Config for the Kubernetes Spark step operator.
Attributes:
Name | Type | Description |
---|---|---|
namespace |
Optional[str] |
the namespace under which the driver and executor pods will run. |
service_account |
Optional[str] |
the service account that will be used by various Spark components (to create and watch the pods). |
Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorConfig(SparkStepOperatorConfig):
"""Config for the Kubernetes Spark step operator.
Attributes:
namespace: the namespace under which the driver and executor pods
will run.
service_account: the service account that will be used by various Spark
components (to create and watch the pods).
"""
namespace: Optional[str] = None
service_account: Optional[str] = None
KubernetesSparkStepOperatorFlavor (SparkStepOperatorFlavor)
Flavor for the Kubernetes Spark step operator.
Source code in zenml/integrations/spark/flavors/spark_on_kubernetes_step_operator_flavor.py
class KubernetesSparkStepOperatorFlavor(SparkStepOperatorFlavor):
"""Flavor for the Kubernetes Spark step operator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SPARK_KUBERNETES_STEP_OPERATOR
@property
def config_class(self) -> Type[KubernetesSparkStepOperatorConfig]:
"""Returns `KubernetesSparkStepOperatorConfig` config class.
Returns:
The config class.
"""
return KubernetesSparkStepOperatorConfig
@property
def implementation_class(self) -> Type["KubernetesSparkStepOperator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.spark.step_operators import (
KubernetesSparkStepOperator,
)
return KubernetesSparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig]
property
readonly
Returns KubernetesSparkStepOperatorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.spark.flavors.spark_on_kubernetes_step_operator_flavor.KubernetesSparkStepOperatorConfig] |
The config class. |
implementation_class: Type[KubernetesSparkStepOperator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KubernetesSparkStepOperator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
spark_step_operator_flavor
Spark step operator flavor.
SparkStepOperatorConfig (BaseStepOperatorConfig, SparkStepOperatorSettings)
pydantic-model
Spark step operator config.
Attributes:
Name | Type | Description |
---|---|---|
master |
str |
is the master URL for the cluster. You might see different schemes for different cluster managers which are supported by Spark like Mesos, YARN, or Kubernetes. Within the context of this PR, the implementation supports Kubernetes as a cluster manager. |
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseStepOperatorConfig, SparkStepOperatorSettings
):
"""Spark step operator config.
Attributes:
master: is the master URL for the cluster. You might see different
schemes for different cluster managers which are supported by Spark
like Mesos, YARN, or Kubernetes. Within the context of this PR,
the implementation supports Kubernetes as a cluster manager.
"""
master: str
SparkStepOperatorFlavor (BaseStepOperatorFlavor)
Spark step operator flavor.
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorFlavor(BaseStepOperatorFlavor):
"""Spark step operator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return "spark"
@property
def config_class(self) -> Type[SparkStepOperatorConfig]:
"""Returns `SparkStepOperatorConfig` config class.
Returns:
The config class.
"""
return SparkStepOperatorConfig
@property
def implementation_class(self) -> Type["SparkStepOperator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.spark.step_operators.spark_step_operator import (
SparkStepOperator,
)
return SparkStepOperator
config_class: Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig]
property
readonly
Returns SparkStepOperatorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.spark.flavors.spark_step_operator_flavor.SparkStepOperatorConfig] |
The config class. |
implementation_class: Type[SparkStepOperator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SparkStepOperator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
SparkStepOperatorSettings (BaseSettings)
pydantic-model
Spark step operator settings.
Attributes:
Name | Type | Description |
---|---|---|
deploy_mode |
str |
can either be 'cluster' (default) or 'client' and it decides where the driver node of the application will run. |
submit_kwargs |
Optional[Dict[str, Any]] |
is the JSON string of a dict, which will be used to define additional params if required (Spark has quite a lot of different parameters, so including them, all in the step operator was not implemented). |
Source code in zenml/integrations/spark/flavors/spark_step_operator_flavor.py
class SparkStepOperatorSettings(BaseSettings):
"""Spark step operator settings.
Attributes:
deploy_mode: can either be 'cluster' (default) or 'client' and it
decides where the driver node of the application will run.
submit_kwargs: is the JSON string of a dict, which will be used
to define additional params if required (Spark has quite a
lot of different parameters, so including them, all in the step
operator was not implemented).
"""
deploy_mode: str = "cluster"
submit_kwargs: Optional[Dict[str, Any]] = None
@validator("submit_kwargs", pre=True)
def _convert_json_string(
cls, value: Union[None, str, Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Converts potential JSON strings passed via the CLI to dictionaries.
Args:
value: The value to convert.
Returns:
The converted value.
Raises:
TypeError: If the value is not a `str`, `Dict` or `None`.
ValueError: If the value is an invalid json string or a json string
that does not decode into a dictionary.
"""
if isinstance(value, str):
try:
dict_ = json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid json string '{value}'") from e
if not isinstance(dict_, Dict):
raise ValueError(
f"Json string '{value}' did not decode into a dictionary."
)
return dict_
elif isinstance(value, Dict) or value is None:
return value
else:
raise TypeError(f"{value} is not a json string or a dictionary.")
materializers
special
Spark Materializers.
spark_dataframe_materializer
Implementation of the Spark Dataframe Materializer.
SparkDataFrameMaterializer (BaseMaterializer)
Materializer to read/write Spark dataframes.
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
class SparkDataFrameMaterializer(BaseMaterializer):
"""Materializer to read/write Spark dataframes."""
ASSOCIATED_TYPES = (DataFrame,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> DataFrame:
"""Reads and returns a spark dataframe.
Args:
data_type: The type of the data to read.
Returns:
A loaded spark dataframe.
"""
super().handle_input(data_type)
# Create the Spark session
spark = SparkSession.builder.getOrCreate()
# Read the data
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
return spark.read.parquet(path)
def handle_return(self, df: DataFrame) -> None:
"""Writes a spark dataframe.
Args:
df: A spark dataframe object.
"""
super().handle_return(df)
# Write the dataframe to the artifact store
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
df.write.parquet(path)
handle_input(self, data_type)
Reads and returns a spark dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
DataFrame |
A loaded spark dataframe. |
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def handle_input(self, data_type: Type[Any]) -> DataFrame:
"""Reads and returns a spark dataframe.
Args:
data_type: The type of the data to read.
Returns:
A loaded spark dataframe.
"""
super().handle_input(data_type)
# Create the Spark session
spark = SparkSession.builder.getOrCreate()
# Read the data
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
return spark.read.parquet(path)
handle_return(self, df)
Writes a spark dataframe.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
A spark dataframe object. |
required |
Source code in zenml/integrations/spark/materializers/spark_dataframe_materializer.py
def handle_return(self, df: DataFrame) -> None:
"""Writes a spark dataframe.
Args:
df: A spark dataframe object.
"""
super().handle_return(df)
# Write the dataframe to the artifact store
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
df.write.parquet(path)
spark_model_materializer
Implementation of the Spark Model Materializer.
SparkModelMaterializer (BaseMaterializer)
Materializer to read/write Spark models.
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
class SparkModelMaterializer(BaseMaterializer):
"""Materializer to read/write Spark models."""
ASSOCIATED_TYPES = (Transformer, Estimator, Model)
ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)
def handle_input(
self, model_type: Type[Any]
) -> Union[Transformer, Estimator, Model]: # type: ignore[type-arg]
"""Reads and returns a Spark ML model.
Args:
model_type: The type of the model to read.
Returns:
A loaded spark model.
"""
super().handle_input(model_type)
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
return model_type.load(path) # type: ignore[no-any-return]
def handle_return(
self, model: Union[Transformer, Estimator, Model] # type: ignore[type-arg]
) -> None:
"""Writes a spark model.
Args:
model: A spark model.
"""
super().handle_return(model)
# Write the dataframe to the artifact store
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
model.save(path) # type: ignore[union-attr]
handle_input(self, model_type)
Reads and returns a Spark ML model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_type |
Type[Any] |
The type of the model to read. |
required |
Returns:
Type | Description |
---|---|
Union[pyspark.ml.base.Transformer, pyspark.ml.base.Estimator, pyspark.ml.base.Model] |
A loaded spark model. |
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def handle_input(
self, model_type: Type[Any]
) -> Union[Transformer, Estimator, Model]: # type: ignore[type-arg]
"""Reads and returns a Spark ML model.
Args:
model_type: The type of the model to read.
Returns:
A loaded spark model.
"""
super().handle_input(model_type)
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
return model_type.load(path) # type: ignore[no-any-return]
handle_return(self, model)
Writes a spark model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Union[pyspark.ml.base.Transformer, pyspark.ml.base.Estimator, pyspark.ml.base.Model] |
A spark model. |
required |
Source code in zenml/integrations/spark/materializers/spark_model_materializer.py
def handle_return(
self, model: Union[Transformer, Estimator, Model] # type: ignore[type-arg]
) -> None:
"""Writes a spark model.
Args:
model: A spark model.
"""
super().handle_return(model)
# Write the dataframe to the artifact store
path = os.path.join(self.artifact.uri, DEFAULT_FILEPATH)
model.save(path) # type: ignore[union-attr]
step_operators
special
Spark Step Operators.
kubernetes_step_operator
Implementation of the Kubernetes Spark Step Operator.
KubernetesSparkStepOperator (SparkStepOperator)
Step operator which runs Steps with Spark on Kubernetes.
Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
class KubernetesSparkStepOperator(SparkStepOperator):
"""Step operator which runs Steps with Spark on Kubernetes."""
@property
def config(self) -> KubernetesSparkStepOperatorConfig:
"""Returns the `KubernetesSparkStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(KubernetesSparkStepOperatorConfig, self._config)
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote container
registry and a remote artifact store.
"""
def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
if stack.artifact_store.config.is_local:
return False, (
"The Spark step operator runs code remotely and "
"needs to write files into the artifact store, but the "
f"artifact store `{stack.artifact_store.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote artifact store when using the Spark "
"step operator."
)
container_registry = stack.container_registry
assert container_registry is not None
if container_registry.config.is_local:
return False, (
"The Spark step operator runs code remotely and "
"needs to push/pull Docker images, but the "
f"container registry `{container_registry.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote container registry when using the "
"Spark step operator."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_remote_components,
)
@property
def application_path(self) -> Any:
"""Provides the application path in the corresponding docker image.
Returns:
The path to the application entrypoint within the docker image
"""
return f"local://{DOCKER_IMAGE_WORKDIR}/{ENTRYPOINT_NAME}"
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
FileExistsError: If the entrypoint file already exists.
"""
steps_to_run = [
step
for step in deployment.steps.values()
if step.config.step_operator == self.name
]
if not steps_to_run:
return
entrypoint_path = os.path.join(get_source_root_path(), ENTRYPOINT_NAME)
try:
fileio.copy(LOCAL_ENTRYPOINT, entrypoint_path, overwrite=False)
except OSError:
raise FileExistsError(
f"The Kubernetes Spark step operator needs to copy the step "
f"entrypoint to {entrypoint_path}, however a file with this "
f"path already exists."
)
try:
# Build and push the image
docker_image_builder = PipelineDockerImageBuilder()
image_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
finally:
fileio.remove(entrypoint_path)
for step in steps_to_run:
step.config.extra[SPARK_DOCKER_IMAGE_KEY] = image_digest
def _backend_configuration(
self,
spark_config: SparkConf,
step_config: "StepConfiguration",
) -> None:
"""Configures Spark to run on Kubernetes.
This method will build and push a docker image for the drivers and
executors and adjust the config accordingly.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
step_config: Configuration of the step to run.
"""
docker_image = step_config.extra[SPARK_DOCKER_IMAGE_KEY]
# Adjust the spark configuration
spark_config.set("spark.kubernetes.container.image", docker_image)
if self.config.namespace:
spark_config.set(
"spark.kubernetes.namespace",
self.config.namespace,
)
if self.config.service_account:
spark_config.set(
"spark.kubernetes.authenticate.driver.serviceAccountName",
self.config.service_account,
)
application_path: Any
property
readonly
Provides the application path in the corresponding docker image.
Returns:
Type | Description |
---|---|
Any |
The path to the application entrypoint within the docker image |
config: KubernetesSparkStepOperatorConfig
property
readonly
Returns the KubernetesSparkStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
KubernetesSparkStepOperatorConfig |
The configuration. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote container registry and a remote artifact store. |
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If the entrypoint file already exists. |
Source code in zenml/integrations/spark/step_operators/kubernetes_step_operator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
FileExistsError: If the entrypoint file already exists.
"""
steps_to_run = [
step
for step in deployment.steps.values()
if step.config.step_operator == self.name
]
if not steps_to_run:
return
entrypoint_path = os.path.join(get_source_root_path(), ENTRYPOINT_NAME)
try:
fileio.copy(LOCAL_ENTRYPOINT, entrypoint_path, overwrite=False)
except OSError:
raise FileExistsError(
f"The Kubernetes Spark step operator needs to copy the step "
f"entrypoint to {entrypoint_path}, however a file with this "
f"path already exists."
)
try:
# Build and push the image
docker_image_builder = PipelineDockerImageBuilder()
image_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
finally:
fileio.remove(entrypoint_path)
for step in steps_to_run:
step.config.extra[SPARK_DOCKER_IMAGE_KEY] = image_digest
spark_entrypoint_configuration
Spark step operator entrypoint configuration.
SparkEntrypointConfiguration (StepOperatorEntrypointConfiguration)
Entrypoint configuration for the Spark step operator.
Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
class SparkEntrypointConfiguration(StepOperatorEntrypointConfiguration):
"""Entrypoint configuration for the Spark step operator."""
def run(self) -> None:
"""Runs the entrypoint configuration.
This prepends the directory containing the source files to the python
path so that spark can find them.
"""
with source_utils.prepend_python_path([DOCKER_IMAGE_WORKDIR]):
super().run()
run(self)
Runs the entrypoint configuration.
This prepends the directory containing the source files to the python path so that spark can find them.
Source code in zenml/integrations/spark/step_operators/spark_entrypoint_configuration.py
def run(self) -> None:
"""Runs the entrypoint configuration.
This prepends the directory containing the source files to the python
path so that spark can find them.
"""
with source_utils.prepend_python_path([DOCKER_IMAGE_WORKDIR]):
super().run()
spark_step_operator
Implementation of the Spark Step Operator.
SparkStepOperator (BaseStepOperator)
Base class for all Spark-related step operators.
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
class SparkStepOperator(BaseStepOperator):
"""Base class for all Spark-related step operators."""
@property
def config(self) -> SparkStepOperatorConfig:
"""Returns the `SparkStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(SparkStepOperatorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Spark step operator.
Returns:
The settings class.
"""
return SparkStepOperatorSettings
@property
def application_path(self) -> Optional[str]:
"""Optional method for providing the application path.
This is especially critical when using 'spark-submit' as it defines the
path (to the application in the environment where Spark is running)
which is used within the command.
For more information on how to set this property please check:
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
Returns:
The path to the application entrypoint
"""
return None
def _resource_configuration(
self,
spark_config: SparkConf,
resource_settings: "ResourceSettings",
) -> None:
"""Configures Spark to handle the resource settings.
This should serve as the layer between our ResourceSettings
and Spark's own ways of configuring its resources.
Note: This is still work-in-progress. In the future, we would like to
enable much more than executor cores and memory with a dedicated
ResourceSettings object.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
resource_settings: the resource settings for this step
"""
if resource_settings.cpu_count:
spark_config.set(
"spark.executor.cores",
str(int(resource_settings.cpu_count)),
)
if resource_settings.memory:
# TODO[LOW]: Fix the conversion of the memory unit with a new
# type of resource configuration.
spark_config.set(
"spark.executor.memory",
resource_settings.memory.lower().strip("b"),
)
def _backend_configuration(
self,
spark_config: SparkConf,
step_config: "StepConfiguration",
) -> None:
"""Configures Spark to handle backends like YARN, Mesos or Kubernetes.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
step_config: Configuration of the step to run.
"""
def _io_configuration(self, spark_config: SparkConf) -> None:
"""Configures Spark to handle different input/output sources.
When you work with the Spark integration, you get materializers
such as SparkDataFrameMaterializer, SparkModelMaterializer. However, in
many cases, these materializer work only if the environment, where
Spark is running, is configured according to the artifact store.
Take s3 as an example. When you want to save a dataframe to an S3
artifact store, you need to provide configuration parameters such as,
'"spark.hadoop.fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem" to
Spark. This method aims to provide these configuration parameters.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
Raises:
RuntimeError: when the step operator is being used with an S3
artifact store and the artifact store does not have the
required authentication
"""
# Get active artifact store
client = Client()
artifact_store = client.active_stack.artifact_store
from zenml.integrations.s3 import S3_ARTIFACT_STORE_FLAVOR
# If S3, preconfigure the spark session
if artifact_store.flavor == S3_ARTIFACT_STORE_FLAVOR:
(
key,
secret,
_,
) = artifact_store._get_credentials() # type:ignore[attr-defined]
if key and secret:
spark_config.setAll(
[
("spark.hadoop.fs.s3a.fast.upload", "true"),
(
"spark.hadoop.fs.s3.impl",
"org.apache.hadoop.fs.s3a.S3AFileSystem",
),
(
"spark.hadoop.fs.AbstractFileSystem.s3.impl",
"org.apache.hadoop.fs.s3a.S3A",
),
(
"spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
),
("spark.hadoop.fs.s3a.access.key", f"{key}"),
("spark.hadoop.fs.s3a.secret.key", f"{secret}"),
]
)
else:
raise RuntimeError(
"When you use an Spark step operator with an S3 artifact "
"store, please make sure that your artifact store has"
"defined the required credentials namely the access key "
"and the secret access key."
)
else:
logger.warning(
"In most cases, the Spark step operator requires additional "
"configuration based on the artifact store flavor you are "
"using. That also means, that when you use this step operator "
"with certain artifact store flavor, ZenML can take care of "
"the pre-configuration. However, the artifact store flavor "
f"'{artifact_store.flavor}' featured in this stack is not "
f"known to this step operator and it might require additional "
f"configuration."
)
def _additional_configuration(
self, spark_config: SparkConf, settings: SparkStepOperatorSettings
) -> None:
"""Appends the user-defined configuration parameters.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
settings: Step operator settings for the current step run.
"""
# Add the additional parameters
if settings.submit_kwargs:
for k, v in settings.submit_kwargs.items():
spark_config.set(k, v)
def _launch_spark_job(
self,
spark_config: SparkConf,
deploy_mode: str,
entrypoint_command: List[str],
) -> None:
"""Generates and executes a spark-submit command.
Args:
spark_config: a SparkConf object which collects all the
configuration parameters
deploy_mode: The spark deploy mode to use.
entrypoint_command: The entrypoint command to run.
Raises:
RuntimeError: if the spark-submit fails
"""
# Base spark-submit command
command = [
f"spark-submit "
f"--master {self.config.master} "
f"--deploy-mode {deploy_mode}"
]
# Add the configuration parameters
command += [f"--conf {c[0]}={c[1]}" for c in spark_config.getAll()]
# Add the application path
command.append(self.application_path) # type: ignore[arg-type]
# Update the default step operator command to use the spark entrypoint
# configuration
original_args = SparkEntrypointConfiguration._parse_arguments(
entrypoint_command
)
command += SparkEntrypointConfiguration.get_entrypoint_arguments(
**original_args
)
final_command = " ".join(command)
# Execute the spark-submit
process = subprocess.Popen(
final_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
shell=True,
)
stdout, stderr = process.communicate()
if process.returncode != 0:
raise RuntimeError(stderr)
print(stdout)
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
) -> None:
"""Launches a step on Spark.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
"""
settings = cast(SparkStepOperatorSettings, self.get_settings(info))
# Start off with an empty configuration
conf = SparkConf()
# Add the resource configuration such as cores, memory.
self._resource_configuration(
spark_config=conf,
resource_settings=info.config.resource_settings,
)
# Add the backend configuration such as namespace, docker images names.
self._backend_configuration(spark_config=conf, step_config=info.config)
# Add the IO configuration for the inputs and the outputs
self._io_configuration(
spark_config=conf,
)
# Add any additional configuration given by the user.
self._additional_configuration(spark_config=conf, settings=settings)
# Generate a spark-submit command given the configuration
self._launch_spark_job(
spark_config=conf,
deploy_mode=settings.deploy_mode,
entrypoint_command=entrypoint_command,
)
application_path: Optional[str]
property
readonly
Optional method for providing the application path.
This is especially critical when using 'spark-submit' as it defines the path (to the application in the environment where Spark is running) which is used within the command.
For more information on how to set this property please check:
https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
Returns:
Type | Description |
---|---|
Optional[str] |
The path to the application entrypoint |
config: SparkStepOperatorConfig
property
readonly
Returns the SparkStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
SparkStepOperatorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Spark step operator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
launch(self, info, entrypoint_command)
Launches a step on Spark.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
Source code in zenml/integrations/spark/step_operators/spark_step_operator.py
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
) -> None:
"""Launches a step on Spark.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
"""
settings = cast(SparkStepOperatorSettings, self.get_settings(info))
# Start off with an empty configuration
conf = SparkConf()
# Add the resource configuration such as cores, memory.
self._resource_configuration(
spark_config=conf,
resource_settings=info.config.resource_settings,
)
# Add the backend configuration such as namespace, docker images names.
self._backend_configuration(spark_config=conf, step_config=info.config)
# Add the IO configuration for the inputs and the outputs
self._io_configuration(
spark_config=conf,
)
# Add any additional configuration given by the user.
self._additional_configuration(spark_config=conf, settings=settings)
# Generate a spark-submit command given the configuration
self._launch_spark_job(
spark_config=conf,
deploy_mode=settings.deploy_mode,
entrypoint_command=entrypoint_command,
)