Cloud Blog: Connect Spark data pipelines to Gemini and other AI models with Dataproc ML library

Source URL: https://cloud.google.com/blog/products/data-analytics/gemini-and-vertex-ai-for-spark-with-dataproc-ml-library/
Source: Cloud Blog
Title: Connect Spark data pipelines to Gemini and other AI models with Dataproc ML library

Feedly Summary: Many data science teams rely on Apache Spark running on Dataproc managed clusters for powerful, large-scale data preparation. As these teams look to connect their data pipelines directly to machine learning models, there’s a clear opportunity to simplify the integration. But running inference on a Spark DataFrame using a model from Vertex AI typically requires custom development, making it complex to build a single, end-to-end workflow.To solve this problem, we are developing a new open-source Python library designed to simplify AI/ML inference for Dataproc. This library connects your Apache Spark jobs to use popular ML frameworks and Vertex AI features, starting with model inference. Because the library is open-sourced, you will be able to use it directly in your application code with full transparency into its operation.How it worksDataproc ML is built to feel familiar to Spark users, following a SparkML-style builder pattern. You configure the model you want to use, and then call .transform() on your DataFrame. Let’s look at a few common inference use cases.Apply Gemini models to your Spark dataYou can apply generative AI models, like Gemini, to columns in your Spark DataFrame. This is useful for tasks like classification, extraction, or summarization at scale. In this example, we take a DataFrame with “city" & “country” columns and use Gemini to create a new column by providing a simple prompt.You can test in your local environment by installing from PyPi:

code_block
)])]>

To deploy/test at scale, create a Dataproc version 2.3-ml cluster:

code_block
<ListValue: [StructValue([(‘code’, ‘gcloud dataproc clusters create my-ml-cluster \\\r\n –project="YOUR_PROJECT_ID" \\\r\n –region="YOUR_REGION" \\\r\n –image-version=2.3-ml-ubuntu \\\r\n –properties=\’dataproc:pip.packages=dataproc-ml==0.1\”), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x7fb300b89ca0>)])]>

Copy this example to a file gemini_spark.py.

code_block
<ListValue: [StructValue([(‘code’, ‘from pyspark.sql import SparkSession\r\nfrom google.cloud.dataproc_ml.inference import GenAiModelHandler\r\n\r\nspark = SparkSession.builder.getOrCreate()\r\n\r\n# Create a sample DataFrame\r\ndf = spark.createDataFrame([\r\n ("London", "UK"),\r\n ("Bengaluru", "India"),\r\n ("Paris", "France"),\r\n ("Tokyo", "Japan")\r\n], ["city", "country"])\r\n\r\n# Configure the model handler. It uses gemini-2.5-flash by default.\r\ngenai_handler = GenAiModelHandler().prompt(\r\n "Write a short, one-line rhyming poem about the experience of visiting {city} in {country}."\r\n)\r\n\r\n# Apply the model, which will output to a new `predictions` column\r\ngenai_handler.transform(df).show(truncate=False)\r\n\r\n# Output\r\n# +———+——-+———————————————-+\r\n# |city |country|predictions |\r\n# +———+——-+———————————————-+\r\n# |London |UK |Big Ben\’s loud chime, a magical time! |\r\n# |Bengaluru|India |Bengaluru\’s green, a vibrant tech scene. |\r\n# |Paris |France |In Paris, I fell for romance at first glance. |\r\n# |Tokyo |Japan |In Tokyo\’s vibrant pace, a smile upon my face.|\r\n# +———+——-+———————————————-+’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7fb300b89520>)])]>

The handler is flexible to support customized options, as explained in the documentation.

code_block
<ListValue: [StructValue([(‘code’, ‘from google.cloud.dataproc_ml.inference import GenAiModelHandler\r\nfrom vertexai.generative_models import GenerationConfig\r\n\r\n# Configure the model handler\r\ngenai_handler = (\r\n GenAiModelHandler()\r\n .prompt("Write a short, one-line rhyming poem about the experience of visiting {city} in {country}.")\r\n .model("gemini-2.5-pro")\r\n .output_col("city_poem")\r\n .generation_config(GenerationConfig(temperature=0.7))\r\n)’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7fb300b891c0>)])]>

Submit this job to your Dataproc cluster:

code_block
<ListValue: [StructValue([(‘code’, ‘gcloud dataproc jobs submit pyspark gemini_spark.py \\ \r\n –cluster=my-ml-cluster \\\r\n –region="YOUR_REGION"’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x7fb300b89a60>)])]>

2. Run inference with PyTorch and TensorFlow modelsIn addition to calling Gemini endpoints, the library also allows you to run inference with model files loaded directly from Google Cloud Storage. You can use the PyTorchModelHandler (and a similar handler for TensorFlow) to load your model weights, define a pre-processor, and run inference directly on your worker nodes. This is useful when you want to run batch inference at scale without managing a separate model serving endpoint.

code_block
<ListValue: [StructValue([(‘code’, ‘from google.cloud.dataproc_ml.inference import PyTorchModelHandler\r\n\r\n# Get weights and transforms for the model\r\nweights = ResNet50_Weights.DEFAULT\r\n\r\nimage_df = spark.read.format("binaryFile").load("gs://cloud-samples-data/generative-ai/image/")\r\n\r\ndef vectorized_preprocessor(image_bytes_series: pd.Series) -> pd.Series:\r\n """Applies ResNet50 transforms to a series of image bytes."""\r\n return image_bytes_series.apply(\r\n lambda b: weights.transforms()(Image.open(io.BytesIO(b)).convert("RGB"))\r\n )\r\n\r\npytorch_handler = (\r\n PyTorchModelHandler()\r\n .model_path("gs://<bucket>/resnet50_full_model.pt")\r\n .input_cols("content")\r\n .pre_processor(vectorized_preprocess)\r\n .set_return_type(ArrayType(FloatType()))\r\n)\r\n\r\npytorch_handler.transform(image_df).show()’), (‘language’, ‘lang-py’), (‘caption’, <wagtail.rich_text.RichText object at 0x7fb300b89460>)])]>

Built for performanceThis library isn’t just a simple wrapper. It’s designed for running inference on large Dataproc clusters and includes several optimizations for inference:Vectorized data transfer: We use pandas_udf to efficiently move data between Spark and the Python worker processes.Connection re-use: Connections to the endpoint are re-used across partitions to reduce overhead.Retry logic: The library automatically handles errors like HTTP 429 (resource exhausted) with exponential backoff and retries.Get startedYou can start using it today by checking out the open-source repository and reading our documentation.Looking ahead, we plan to add the following features to this library in the coming months.Spark Connect support: This would also allow using above functionalities within BigQuery Studio notebooks.Vertex AI integrations: To ease inference, we plan to add more ModelHandlers to:Directly call a vertex model endpoint for online inferenceRefer to Vertex models and localize them to Spark workersRefer to models hosted in Vertex Model Garden including embedding modelsMore Optimizations: Auto-repartition input dataframes to enhance inference runtimeThird-party integrations: Refer to open sourced models in HuggingFaceWe are actively working on including this library by default in Dataproc on Google Compute Engine ML images and Google Cloud Serverless for Apache Spark runtimes.We look forward to seeing what you build! Have feedback or feature requests to further simplify your AI/ML experience on spark? Reach us at dataproc-feedback@google.com.

AI Summary and Description: Yes

Summary: The text outlines the development of an open-source Python library designed to facilitate machine learning inference within Apache Spark environments using Google Cloud’s Dataproc service. This library aims to streamline the integration of data processing and machine learning models, particularly leveraging generative AI models. Its relevance lies in simplifying workflows for data science teams, making it a valuable resource for those in AI, cloud, and infrastructure security.

Detailed Description:

The text discusses a new open-source library aimed at connecting data processing tasks carried out with Apache Spark to machine learning models, specifically those provided by Google Cloud’s Vertex AI. Here are the key points:

– **Integration of AI/ML Inference**: The library allows data science teams to easily connect their data pipelines to machine learning models, aiming to reduce the complexity typically involved in such integrations.

– **Apache Spark Compatibility**: The library is designed to feel familiar to existing Spark users, using a SparkML-style builder pattern, which helps facilitate adoption by data science teams already familiar with Spark.

– **Use Cases**:
– **Generative AI**: The ability to apply generative AI models (like Gemini) to Spark DataFrames for tasks such as classification, extraction, and summarization at scale.
– **Batch Inference**: Users can run inference on large datasets without needing to manage separate model serving endpoints, optimizing workflow efficiency.

– **Performance Optimizations**:
– **Vectorized Data Transfer**: Utilizing pandas_udf for efficient data movement.
– **Connection Reuse**: Reducing overhead by reusing connections across Spark partitions.
– **Built-in Retry Logic**: Automatically handling transient errors to enhance resilience in job submissions.

– **Future Enhancements**: The library aims to include features such as support for Spark Connect, more Vertex AI integrations, and third-party model references, expanding its utility and functionality.

– **Open-Source Aspect**: The open-source nature of the library suggests transparency and community involvement in its development, potentially leading to broader adoption and collaborative enhancements.

Given the increasing demand for streamlined workflows in AI/ML applications, this library represents a significant step forward in reducing barriers for data science teams, which is crucial for maintaining security and compliance in handling potentially sensitive data during the model inference process.