Cloud Blog: AI/ML-ready Apache Spark with Dataproc

Source URL: https://cloud.google.com/blog/products/data-analytics/dataproc-features-enable-aiml-ready-apache-spark/
Source: Cloud Blog
Title: AI/ML-ready Apache Spark with Dataproc

Feedly Summary: Apache Spark is the cornerstone for large-scale data processing, model training, and inference for AI/ML workloads. Yet, the complexities of environment configuration, dependency management, and MLOps integration can slow you down. To accelerate your AI/ML journey, Dataproc now delivers powerful, ML-ready capabilities for Spark. Available on both Dataproc on Compute Engine clusters and Google Cloud Serverless for Apache Spark, these enhancements are engineered to streamline development and operations, reducing setup overhead and simplifying workflows. This allows data scientists and engineers to dedicate more time to building and deploying impactful models rather than wrestling with infrastructure.
Let’s explore what’s new and how to start using these innovations today.
AI/ML-capable runtimes
Getting a Spark environment ready for ML, especially with GPU acceleration, used to involve custom scripts and manual configuration. Dataproc now streamlines this with ML Runtimes. ML Runtimes is a specialized Dataproc on Compute Engine image version, starting from 2.3 for Ubuntu-based images, designed to accelerate ML workloads. It ships with pre-packaged GPU drivers (NVIDIA Driver, CUDA, cuDNN, NCCL) and common ML libraries such as PyTorch, XGBoost, tokenizers, transformers etc, significantly cutting down cluster provisioning and setup time. 
Google Cloud Serverless for Apache Spark also benefits from runtimes with pre-installed ML libraries, bringing the same ease of use to a serverless environment. These also include libraries such as XGBoost, PyTorch, tokenizers, transformers, etc. 
“At Snap we use Spark on Dataproc for a variety of analytics and ML workloads including running GPU accelerated Spark Rapids, and model training and inference with PyTorch. The new Dataproc 2.3 ML runtime has been really helpful — reducing our cluster startup latency by 75% and eliminating toil for our ML Platform developers to build and manage environments.”- Prudhvi Vatala, Sr. Manager, Snap Inc.
It’s easy to create a Dataproc on a compute Engine cluster, specifying the ML image version and the required GPU accelerators for your workers.

code_block
\\\r\n–project=<your-project-id> \\\r\n–region=<your-region> \\\r\n–image-version=2.3-ml-unbuntu \\\r\n–master-machine-type g2-standard-4 –master-accelerator=type=nvidia-l4,count=1\r\n–worker-machine-type=g2-standard-8 \\\r\n–worker-accelerator type=nvidia-l4,count=1′), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3e3dee2f2340>)])]>

Additionally, Serverless Spark sessions (Generally Available) also support GPUs, and come similarly packaged with GPU drivers and common ML libraries.
Develop Spark applications in Colab or your favorite IDE
Now, you can develop and run Spark applications using Colab Enterprise notebooks in BigQuery Studio or via integrated development environments (IDEs) like VSCode and Jupyter. 
BigQuery Colab enterprise notebooks are available within BigQuery Studio with native support for Spark application development. With Colab Enterprise notebooks, you can create  Serverless Spark sessions using Spark Connect and work with your tables in BigLake metastore. 
Colab notebook provides advanced features such as gen AI code assistance and error explanation, with error correction coming soon. It also supports observability for your Spark jobs and management of Spark sessions. In addition, the Colab notebooks lets you mix BigQuery SQL with Spark code in a single notebook and interoperate on the resulting tables. Once your code is ready, you can schedule notebooks via the inbuilt scheduling functionality or use BigQuery Pipelines for more complicated DAGs.

You can also use IDEs such as Visual Studio Code or JupyterLab for Spark application development. JupyterLab users can use the Dataproc JupyterLab plugin to simplify interactive development with Spark serverless sessions and simplify creation and submission of batch jobs via Serverless batch jobs. This plugin comes preinstalled in Vertex Workbench, so you can be productive in minutes.

On VS Code, you can use the Cloud Code extension, which supports development against a range of Google Cloud services. After configuring the Cloud Code extension, you can browse BigQuery datasets and tables, browse and manage your Dataproc compute resources (clusters, serverless interactive sessions and batch), create Spark notebooks from available templates or start developing on your own, and then schedule your workloads all from VS Code. This choice in development tooling allows you to pick one that best suits your workflow, without sacrificing access to the power of Dataproc Spark.
Distributed training and inference with GPU support
Dataproc’s ML runtimes are built to run distributed training and inference, leveraging frameworks like XGBoost, TensorFlow, and PyTorch, all pre-configured for GPU utilization. For example, for distributed training with XGBoost on Spark, you can leverage the pre-installed xgboost.spark library. By setting parameters such as num_workers to distribute the task across Spark executors and device=”cuda”, you can effectively train your models on multiple GPUs, significantly speeding up the training process for large datasets. Here’s an example of how to configure XGBoost classifier for distributed GPU training on your Spark cluster:

code_block
<ListValue: [StructValue([(‘code’, ‘from xgboost.spark import SparkXGBClassifier\r\nfrom pyspark.sql import SparkSession\r\n\r\n# Configure the XGBoost classifier for distributed GPU training\r\nxgb_classifier = SparkXGBClassifier(\r\n featuresCol="features",\r\n labelCol="label",\r\n num_workers=spark.sparkContext.defaultParallelism,\r\n device="cuda", # Enable GPU training\r\n # Other XGBoost parameters like max_depth etc.\r\n max_depth=6 \r\n)\r\n\r\n# Train the model\r\nxgb_model = xgb_classifier.fit(train_df)\r\n\r\n# Model persistence and prediction\r\nxgb_model.save("path/to/your/xgboost_spark_model")\r\npredictions = xgb_model.transform(test_df)’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x3e3ded3d9760>)])]>

Interactive environment customization for Spark Connect
When working interactively with Spark, such as in a Colab notebook using Spark Connect, ensuring Python library consistency between your client and the Spark cluster is crucial. Dataproc simplifies adding PyPI packages dynamically to a Spark session by extending the addArtifacts method. You can now specify the list of packages to install/upgrade/downgrade in version-scheme (same as pip install). This instructs the Spark Connect server to install the package and its dependencies, making them available to workers for your UDFs and other code.

code_block
<ListValue: [StructValue([(‘code’, ‘# Installs textdistance(specific version) and random2 (latest) library on the cluster. UDFs using textdistance and random2 can now run on worker nodes\r\n\r\nspark.addArtifacts("textdistance==4.6.1", "random2", pypi=True)’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x3e3ded3d9790>)])]>

In addition you can also customize your Spark environment on Dataproc with Init scripts and custom images.
MLOps via Vertex AI
Dataproc works with Vertex AI, Google Cloud’s unified platform for AI and ML, helping to improve MLOps for your AI/ML workflows with Spark. Using the Vertex AI SDK directly within your Dataproc Spark code enables experiment tracking and model management, allowing you to:

Track experiments: Track log parameters, metrics, and artifacts from your Dataproc Spark training jobs to Vertex AI Experiments. This allows you to compare runs, visualize results, and reproduce experiments reliably.

Register models: Once training is complete, register your trained models into the Vertex AI Model Registry. This provides a central repository for model versioning, staging, and governance, simplifying the path to deployment.

code_block
<ListValue: [StructValue([(‘code’, ‘# code snippet for Dataproc Spark on GCE. Some details ommited for brevity\r\n\r\n\r\nfrom google.cloud import aiplatform\r\n\r\n# — Initialize Vertex AI SDK & Enable Autologging —\r\naiplatform.init(project=PROJECT_ID, location=REGION, experiment=EXPERIMENT_NAME)\r\n\r\n# Start a run to log experiment metrics\r\naiplatform.start_run(run=RUN_NAME)\r\n\r\nxgb_spark_estimator = SparkXGBClassifier(\r\n featuresCol="features",\r\n labelCol="label" \r\n # Add other XGBoost parameters needed for training\r\n )\r\n\r\n# train model\r\ntrained_spark_model = xgb_spark_estimator.fit(train_df)\r\n\r\n# register model\r\n# 1. Get the underlying XGBoost model and save it\r\nnative_booster = trained_spark_model.get_booster()\r\nnative_booster.save_model(local_path)\r\n\r\n# Log relevant metrics manually in Vertex Experiments \r\nmetrics={parameter_name:parameter_value}\r\naiplatform.log_metrics(metrics)\r\n\r\n\r\n# 2. Upload to GCS\r\ndestination_gcs_object_name = f"{GCS_MODEL_ARTIFACT_DIR_NAME}/{MODEL_FILENAME}" \r\nstorage.Client(project=PROJECT_ID).bucket(GCS_BUCKET_NAME).blob(destination_gcs_object_name).upload_from_filename(local_path) \r\n\r\n# 3. Register to Vertex AI Model Registry\r\nPRE_BUILT_SERVING_CONTAINER_IMAGE_URI = "us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.2-1:latest"\r\n\r\nregistered_model = aiplatform.Model.upload(\r\n display_name=MODEL_DISPLAY_NAME,\r\n artifact_uri=GCS_ARTIFACT_DIRECTORY_URI,\r\n serving_container_image_uri=PRE_BUILT_SERVING_CONTAINER_IMAGE_URI,\r\n description="Spark XGBoost model",\r\n sync=True # Wait for the model to be uploaded and registered\r\n)’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x3e3ded3d9340>)])]>

This integration makes your AI/ML workloads on Spark more manageable, reproducible, and deployable, per your organization’s wider MLOps strategy.
Deploy to production
Move from interactive development to production easily.
When using BigQuery Colab notebooks for development, you get Git support to version-control your code and go through your CI/CD flow. You can also schedule your Spark notebook using BigQuery’s built-in pipeline feature, which allows you create single scheduled notebooks or more complicated DAGs, chaining multiple notebooks or queries. You can run these pipelines using the user account or a service account for production pipelines.
BigQuery Pipelines let you compose your flow into discrete tasks, so you can mix Apache Spark on Dataproc and BigQuery execution. In the following BigQuery pipeline, the first task ingests raw data via a BigQuery query, then the data is transformed via Apache Spark via a notebook task. This notebook contains the pertinent Spark transform steps. Finally, the graph splits into two parallel tasks: a notebook that produces a report based on output of the previous task, and a final query that cleans up the initial ingested data.

When using an IDE you can achieve a similar flow by using the Git client of these IDEs to version your Spark code. You can also create and deploy pipelines using Cloud Composer, Google Cloud’s managed serverless Apache Airflow offering. You can run jobs on your existing Dataproc clusters, ephemeral job clusters, or on Serverless Batch.

code_block
<ListValue: [StructValue([(‘code’, ‘# Following code illustrates how to schedule serverless batch jobs with Cloud Composer (Airflow)\r\n\r\n\r\n# import statements and configurations statements omitted for brevity\r\n\r\n\r\n# Define the full job payload for DataprocCreateBatchOperator\r\nwith models.DAG(\r\n "dataproc_batch_operators", # The id you will see in the DAG airflow page\r\n default_args=default_args, # The interval with which to schedule the DAG\r\n schedule_interval=datetime.timedelta(days=1), # Override to match your needs\r\n) as dag:\r\n create_batch = DataprocCreateBatchOperator(\r\n task_id="batch_create",\r\n batch={\r\n "pyspark_batch": {\r\n "main_python_file_uri": PYTHON_FILE_LOCATION,\r\n "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],\r\n },\r\n "environment_config": {\r\n "peripherals_config": {\r\n "spark_history_server_config": {\r\n "dataproc_cluster": PHS_CLUSTER_PATH,\r\n },\r\n },\r\n },\r\n },\r\n batch_id="create-xgboost-batch",\r\n )’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x3e3ded3d9910>)])]>

AI/ML-ready Apache Spark
With Dataproc, you can build your AI/ML workloads with Apache Spark more easily. By providing pre-configured ML Runtimes with GPU support, simplifying Python dependency management for interactive sessions via Spark Connect, enabling development from your preferred IDE, and offering seamless integration with Vertex AI for MLOps, Dataproc accelerates the entire ML lifecycle. Move from exploration and training to robust, production-ready Spark ML pipelines. Explore the Dataproc documentation today to start leveraging these capabilities.

AI Summary and Description: Yes

Summary: The text provides an overview of the new features and improvements in Google Cloud’s Dataproc for enhancing AI/ML workloads. This includes the introduction of ML Runtimes that streamline environment setup, support for GPU acceleration, and integrations with popular tools, significantly improving the efficiency of data scientists and engineers in deploying models.

Detailed Description: The content outlines various advancements in Google Cloud Dataproc that affect AI/ML operations. Here are the key points covered in the passage:

– **Enhanced ML Capabilities**: Dataproc now supports specialized ML Runtimes that come pre-packaged with essential ML libraries and GPU drivers, improving the speed of setting up ML environments.

– **Environment Configuration**: Simplifies the process for users who previously had to engage in custom scripts and manual setup, reducing cluster provisioning time substantially.

– **Integration with Google Cloud Services**: In addition to Dataproc on Compute Engine, features also extend to Serverless Spark sessions, which make it easier to run ML applications without worrying about infrastructure.

– **Development Enhancements**:
– Users can now develop Spark applications directly within Colab Enterprise notebooks or popular IDEs like VSCode and Jupyter.
– The integration allows for features such as error correction and observability, which are crucial for maintaining efficient workflows.

– **Distributed Training and Inference with GPU Support**: Dataproc’s ML runtimes allow the use of frameworks such as XGBoost and TensorFlow for distributed tasks, which can leverage multiple GPUs for faster processing of large datasets.

– **MLOps with Vertex AI**: Dataproc’s compatibility with Vertex AI provides capabilities for model management, experiment tracking, and smoother transitions from development to production environments. This integration emphasizes the importance of model governance and versioning.

– **Scheduling and CI/CD**: Users can implement Git support for version control within their development environments, taking advantage of features like BigQuery Pipelines and Cloud Composer for orchestrating complex workflows involving both Spark and BigQuery.

In conclusion, these advancements make it easier for data scientists and AI professionals to build, deploy, and manage ML workloads at scale, making the entire process more efficient and integrated with modern cloud-based development practices. This significantly streamlines the ML lifecycle from experimentation to production, indicating a strong emphasis on enhancing productivity in AI/ML endeavors.