Source code for dbutils_batch_query.utils.file_utils

# ====================================================================
# Author: William Muntean
# Copyright (C) 2025 William Muntean. All rights reserved.
#
# Licensed under the MIT License;
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://opensource.org/licenses/MIT
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ====================================================================

"""
=======================================================
Databricks File Utilities
=======================================================

This module provides utilities to download, upload, and delete files and
directories from Databricks volumes.

The file operations are performed in the following stages:

1. **Initialization**:
   - Instantiate ``WorkspaceClient`` and prepare paths.
2. **Operation**:
   - For download/upload/delete, handle single items or recurse through directories.
3. **Completion**:
   - Print progress messages and finalize the operation.

.. Note::
   - Environment variables ``DATABRICKS_TOKEN`` and ``DATABRICKS_HOST`` must be set.

.. Important::
   - Deletes are irreversible; use with caution.

.. currentmodule:: dbutils_batch_query.utils.file_utils

Functions
=========
.. autosummary::
   :toctree: generated/
   :nosignatures:
   :template: function_name_only.rst

   download_from_databricks
   upload_to_databricks
   delete_from_databricks

Standalone Execution
=====================
This module is not intended to be executed as a standalone script.
"""

__author__ = "William Muntean"
__email__ = "williamjmuntean@gmail.com"
__license__ = "MIT"
__maintainer__ = "William Muntean"
__date__ = "2025-05-09"

import shutil
from pathlib import Path

from databricks.sdk import WorkspaceClient


[docs] def download_from_databricks( remote_path: str, local_path: str | Path, ) -> None: """ Download content from a Databricks volume to a local directory. This function can download either a single file or an entire folder (including all nested files and subdirectories) from a specified remote path to a local destination. Parameters ---------- remote_path : str Path to the file or folder within the volume to download. local_path : str or Path Local directory path where files will be saved. Returns ------- None This function does not return any value but prints progress information to standard output. .. Note:: - Progress is printed to standard output during download. - Existing files at the destination will be overwritten without confirmation. - For single file downloads, the file will be saved in the local_path with its original name. .. Warning:: - The function will fail if environment variables ``DATABRICKS_TOKEN`` and ``DATABRICKS_HOST`` are not set. Examples -------- >>> # Download a directory >>> download_folder( ... "data/reports", ... "./local_data", ... ) Downloaded: /path/to/local_data/file1.csv Downloaded: /path/to/local_data/subdir/file2.csv >>> # Download a single file >>> download_folder( ... "data/reports/report.csv", ... "./local_data", ... ) Downloaded: /path/to/local_data/report.csv """ # Construct absolute paths for source and destination remote_path = Path(remote_path) local_path = Path(local_path).resolve() # Ensure the local save path exists local_path.mkdir(parents=True, exist_ok=True) client = WorkspaceClient() # Check if remote_path is a file try: # Attempt to get file info - will fail if it's a directory file_info = client.files.get_metadata(remote_path.as_posix()) local_file_path = local_path / remote_path.name download_response = client.files.download(remote_path.as_posix()) with local_file_path.open("wb") as local_file: shutil.copyfileobj(download_response.contents, local_file) print(f"Downloaded: {local_file_path}") return except Exception as e: # If we get here and it's not because it's a directory, re-raise the exception if str(e) != "Not Found": raise def recursive_download(current_remote_path: Path, current_local_path: Path) -> None: # List contents of the current remote directory entries = client.files.list_directory_contents(current_remote_path.as_posix()) for entry in entries: remote_entry_path = current_remote_path / entry.path local_entry_path = current_local_path / Path(entry.path).name if entry.is_directory: # Create local directory and process its contents local_entry_path.mkdir(parents=True, exist_ok=True) recursive_download(remote_entry_path, local_entry_path) else: # Download file content and save to local path download_response = client.files.download(remote_entry_path.as_posix()) with local_entry_path.open("wb") as local_file: shutil.copyfileobj(download_response.contents, local_file) print(f"Downloaded: {local_entry_path}") # Begin recursive download process recursive_download(remote_path, local_path)
[docs] def upload_to_databricks( remote_path: str, local_path: str | Path, ) -> None: """ Upload a local file or directory to a Databricks volume. This function handles uploading either a single file or an entire directory structure to a Databricks volume. It preserves directory hierarchies when uploading folders. Parameters ---------- remote_path : str Path within the volume where file(s) will be uploaded. local_path : str or Path Local file or directory path to upload. Returns ------- None This function does not return any value but prints progress information to standard output. .. Note:: - Progress is printed to standard output during upload. - Existing files at the destination will be overwritten without confirmation. - For single file uploads, the remote_path should include the target filename. - For directory uploads, the remote_path should be the target directory. .. Warning:: - The function will fail if environment variables ``DATABRICKS_TOKEN`` and ``DATABRICKS_HOST`` are not set. Examples -------- >>> # Upload a single file >>> upload_to_databricks( ... "data/reports/report.csv", ... "./local_data/report.csv", ... ) Uploaded: ./local_data/report.csv to /Volumes/my_catalog/my_schema/my_volume/data/reports/report.csv >>> # Upload a directory >>> upload_to_databricks( ... "data/reports", ... "./local_data", ... ) Uploaded: ./local_data/file1.csv to /Volumes/my_catalog/my_schema/my_volume/data/reports/file1.csv Uploaded: ./local_data/subdir/file2.csv to /Volumes/my_catalog/my_schema/my_volume/data/reports/subdir/file2.csv """ # Convert paths to Path objects local_path = Path(local_path).resolve() remote_path = Path(remote_path) # Check if local source path exists if not local_path.exists(): raise FileNotFoundError(f"Local source path does not exist: {local_path}") client = WorkspaceClient() # Handle single file upload if local_path.is_file(): # Ensure directories exist try: client.files.get_directory_metadata(remote_path.as_posix()) except Exception: # Directory doesn't exist, create it including all parents client.files.create_directory(remote_path.as_posix()) remote_item_path = remote_path / local_path.name # Upload the file with local_path.open("rb") as local_file: client.files.upload( remote_item_path.as_posix(), contents=local_file, overwrite=True ) print(f"Uploaded: {local_path} to {remote_path}") return # Handle directory upload with recursive function def recursive_upload(current_local_path: Path, current_remote_path: Path) -> None: # Ensure remote directory exists try: client.files.get_directory_metadata(current_remote_path.as_posix()) except Exception: # Directory doesn't exist, create it client.files.create_directory(current_remote_path.as_posix()) # Iterate through local directory contents for local_item in current_local_path.iterdir(): remote_item_path = current_remote_path / local_item.name if local_item.is_dir(): # Create remote directory and process its contents recursive_upload(local_item, remote_item_path) else: # Upload the file with local_item.open("rb") as local_file: client.files.upload( remote_item_path.as_posix(), contents=local_file, overwrite=True ) print(f"Uploaded: {local_item} to {remote_item_path}") # Begin recursive upload process recursive_upload(local_path, remote_path)
[docs] def delete_from_databricks(remote_path: str) -> None: """ Delete a file or directory from a Databricks volume. Parameters ---------- remote_path : str Path within the volume to delete. Can point to a single file or a directory. Returns ------- None This function does not return any value but prints progress information to standard output. .. Warning:: - Deletions are irreversible; ensure the correct path is specified. Examples -------- >>> delete_from_databricks("data/reports/report.csv") Deleted: data/reports/report.csv >>> delete_from_databricks("data/old_reports") Deleted directory: data/old_reports """ client = WorkspaceClient() # Try deleting as a file try: client.files.delete(remote_path) print(f"Deleted: {remote_path}") return except Exception as error_file: try: client.files.delete_directory(remote_path) print(f"Deleted directory: {remote_path}") except Exception as error_directory: raise error_directory