From 5e79597556c01aa1de751f5ef6cf95f656e94fd8 Mon Sep 17 00:00:00 2001 From: Prakash Ranade Date: Mon, 16 Oct 2023 14:03:44 -0700 Subject: [PATCH] Working S3, HDFS version, Code added for ADLS and Ozone with Test cases Testing Done: 1. Tested on HDFS, S3, ADLS and Ozone upload --- apps/filebrowser/src/filebrowser/conf.py | 16 +- apps/filebrowser/src/filebrowser/urls.py | 2 + apps/filebrowser/src/filebrowser/utils.py | 50 ++++++ apps/filebrowser/src/filebrowser/views.py | 162 ++++++++++++++++-- .../filebrowser/src/filebrowser/views_test.py | 48 ++++++ desktop/core/src/desktop/api2.py | 5 +- desktop/core/src/desktop/conf.py | 10 ++ .../core/src/desktop/lib/fs/ozone/upload.py | 105 ++++++++++++ desktop/core/src/desktop/settings.py | 32 ++-- desktop/libs/aws/src/aws/s3/upload.py | 75 +++++++- desktop/libs/azure/src/azure/abfs/abfs.py | 2 +- desktop/libs/azure/src/azure/abfs/upload.py | 81 ++++++++- desktop/libs/hadoop/src/hadoop/conf.py | 4 +- desktop/libs/hadoop/src/hadoop/fs/upload.py | 137 ++++++++++++++- 14 files changed, 692 insertions(+), 37 deletions(-) create mode 100644 apps/filebrowser/src/filebrowser/utils.py diff --git a/apps/filebrowser/src/filebrowser/conf.py b/apps/filebrowser/src/filebrowser/conf.py index 7681e984071..722439ff3f2 100644 --- a/apps/filebrowser/src/filebrowser/conf.py +++ b/apps/filebrowser/src/filebrowser/conf.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import sys from desktop.conf import ENABLE_DOWNLOAD @@ -26,7 +27,6 @@ else: from django.utils.translation import ugettext_lazy as _ - MAX_SNAPPY_DECOMPRESSION_SIZE = Config( key="max_snappy_decompression_size", help=_("Max snappy decompression size in bytes."), @@ -37,9 +37,21 @@ ARCHIVE_UPLOAD_TEMPDIR = Config( key="archive_upload_tempdir", help=_("Location on local filesystem where the uploaded archives are temporary stored."), - default=None, + default="/tmp/hue_uploads", type=str) +FILE_UPLOAD_CHUNK_SIZE = Config( + key="file_upload_chunk_size", + default=5000000, + type=int, + help=_('Configure chunk size of the chunked file uploader. Default chunk size is set to 5MB.')) + +CONCURRENT_MAX_CONNECTIONS = Config( + key="concurrent_max_connections", + default=5, + type=int, + help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.')) + def get_desktop_enable_download(): """Get desktop enable_download default""" return ENABLE_DOWNLOAD.get() diff --git a/apps/filebrowser/src/filebrowser/urls.py b/apps/filebrowser/src/filebrowser/urls.py index a74df475e39..71aac00a4c8 100644 --- a/apps/filebrowser/src/filebrowser/urls.py +++ b/apps/filebrowser/src/filebrowser/urls.py @@ -44,6 +44,8 @@ # POST operations re_path(r'^save$', filebrowser_views.save_file, name="filebrowser_views_save_file"), re_path(r'^upload/file$', filebrowser_views.upload_file, name='upload_file'), + re_path(r'^upload/chunks', filebrowser_views.upload_chunks, name='upload_chunks'), + re_path(r'^upload/complete', filebrowser_views.upload_complete, name='upload_complete'), re_path(r'^extract_archive', filebrowser_views.extract_archive_using_batch_job, name='extract_archive_using_batch_job'), re_path(r'^compress_files', filebrowser_views.compress_files_using_batch_job, name='compress_files_using_batch_job'), re_path(r'^trash/restore$', filebrowser_views.trash_restore, name='trash_restore'), diff --git a/apps/filebrowser/src/filebrowser/utils.py b/apps/filebrowser/src/filebrowser/utils.py new file mode 100644 index 00000000000..af63c12b91d --- /dev/null +++ b/apps/filebrowser/src/filebrowser/utils.py @@ -0,0 +1,50 @@ +# Licensed to Cloudera, Inc. under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Cloudera, Inc. licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import io +import os + +from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR +DEFAULT_WRITE_SIZE=1024 * 1024 * 128 + +def generate_chunks(uuid, totalparts, default_write_size=DEFAULT_WRITE_SIZE): + fp = io.BytesIO() + total = 0 + files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)] + for file_path in files: + with open(file_path, 'rb') as f: + while True: + # Read the file in portions, e.g., 1MB at a time + portion = f.read(1 * 1024 * 1024) + if not portion: + break + fp.write(portion) + total = total + len(portion) + # If buffer size is more than 128MB, yield the chunk + if fp.tell() >= default_write_size: + fp.seek(0) + yield fp, total + fp.close() + fp = io.BytesIO() + # Yield any remaining data in the buffer + if fp.tell() > 0: + fp.seek(0) + yield fp, total + fp.tell() + fp.close() + # chances are the chunk is zero and we never yielded + else: + fp.close() + for file_path in files: + os.remove(file_path) diff --git a/apps/filebrowser/src/filebrowser/views.py b/apps/filebrowser/src/filebrowser/views.py index ae9ed29b723..165f45f77ed 100644 --- a/apps/filebrowser/src/filebrowser/views.py +++ b/apps/filebrowser/src/filebrowser/views.py @@ -17,6 +17,12 @@ from future import standard_library standard_library.install_aliases() + +from django.views.decorators.csrf import csrf_exempt +from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR +from django.core.files.uploadhandler import FileUploadHandler, StopUpload, StopFutureHandlers +from hadoop.conf import UPLOAD_CHUNK_SIZE + from builtins import object import errno import logging @@ -80,6 +86,11 @@ RenameFormSet, RmTreeFormSet, ChmodFormSet, ChownFormSet, CopyFormSet, RestoreFormSet,\ TrashPurgeForm, SetReplicationFactorForm +from hadoop.fs.upload import HDFSFineUploaderChunkedUpload, LocalFineUploaderChunkedUpload +from aws.s3.upload import S3FineUploaderChunkedUpload +from azure.abfs.upload import ABFSFineUploaderChunkedUpload +from desktop.lib.fs.ozone.upload import OFSFineUploaderChunkedUpload + if sys.version_info[0] > 2: import io from io import StringIO as string_io @@ -121,10 +132,28 @@ INLINE_DISPLAY_MIMETYPE_EXCEPTIONS = re.compile('image/svg\+xml') +SCHEME_PREFIXES = { + 's3a': 's3a://', + 'ofs': 'ofs://', + 'abfs': 'abfs://', + 'hdfs': 'hdfs://', + 'gs': 'gs://', + 'local': 'local://', +} + +UPLOAD_CLASSES = { + 's3a': S3FineUploaderChunkedUpload, + 'ofs': OFSFineUploaderChunkedUpload, + 'abfs': ABFSFineUploaderChunkedUpload, + 'hdfs': HDFSFineUploaderChunkedUpload, + 'local': LocalFineUploaderChunkedUpload, +} + +if not os.path.exists(ARCHIVE_UPLOAD_TEMPDIR.get()): + os.makedirs(ARCHIVE_UPLOAD_TEMPDIR.get()) logger = logging.getLogger() - class ParquetOptions(object): def __init__(self, col=None, format='json', no_headers=True, limit=-1): self.col = col @@ -147,10 +176,11 @@ def index(request): def _decode_slashes(path): # This is a fix for some installations where the path is still having the slash (/) encoded - # as %2F while the rest of the path is actually decoded. + # as %2F while the rest of the path is actually decoded. encoded_slash = '%2F' - if path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \ - path.startswith('s3a:' + encoded_slash) or path.startswith('gs:' + encoded_slash) or path.startswith('ofs:' + encoded_slash): + if path and path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \ + path.startswith('s3a:' + encoded_slash) or path.startswith('gs:' + encoded_slash) or \ + path.startswith('ofs:' + encoded_slash): path = path.replace(encoded_slash, '/') return path @@ -159,17 +189,19 @@ def _normalize_path(path): path = _decode_slashes(path) # Check if protocol missing / and add it back (e.g. Kubernetes ingress can strip double slash) - if path.startswith('abfs:/') and not path.startswith('abfs://'): - path = path.replace('abfs:/', 'abfs://') - if path.startswith('s3a:/') and not path.startswith('s3a://'): - path = path.replace('s3a:/', 's3a://') - if path.startswith('gs:/') and not path.startswith('gs://'): - path = path.replace('gs:/', 'gs://') - if path.startswith('ofs:/') and not path.startswith('ofs://'): - path = path.replace('ofs:/', 'ofs://') + for scheme, prefix in SCHEME_PREFIXES.items(): + single_slash_prefix = f"{scheme}:/" + if path and path.startswith(single_slash_prefix) and not path.startswith(prefix): + path = path.replace(single_slash_prefix, prefix) return path +def get_scheme(path): + path = _normalize_path(path) + for scheme, prefix in SCHEME_PREFIXES.items(): + if path.startswith(prefix): + return scheme + return 'hdfs' def download(request, path): """ @@ -1223,7 +1255,7 @@ def generic_op(form_class, request, op, parameter_names, piggyback=None, templat if next: logging.debug("Next: %s" % next) file_path_prefix = '/filebrowser/view=' - if next.startswith(file_path_prefix): + if next.startswith(file_path_prefix): decoded_file_path = next[len(file_path_prefix):] filepath_encoded_next = file_path_prefix + urllib_quote(decoded_file_path.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS) return format_preserving_redirect(request, filepath_encoded_next) @@ -1347,7 +1379,7 @@ def bulk_copy(*args, **kwargs): ofs_skip_files += request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user) else: request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user) - + # Send skipped filenames via raising exception to let users know. if ofs_skip_files: raise PopupException("Following files were skipped due to file size limitations:" + ofs_skip_files) @@ -1418,6 +1450,108 @@ def bulk_restore(*args, **kwargs): def trash_purge(request): return generic_op(TrashPurgeForm, request, request.fs.purge_trash, [], None) +def _create_response(request, _fs, result="success", data="Success"): + return { + 'path': _fs.filepath, + 'result': result, + 'next': request.GET.get("next"), + 'success': True, + 'uuid': _fs.qquuid, + 'status': 0, + 'data': data + } + +def perform_upload(request, *args, **kwargs): + """ + Uploads a file to the specified destination. + Args: + request: The HTTP request object. + **kwargs: Arbitrary keyword arguments. + Returns: + A dictionary containing the following keys: + - path: The path of the uploaded file. + - result: The result of the upload operation. + - next: The URL to redirect to after the upload operation. + - success: A boolean indicating whether the upload operation was successful. + - uuid: The UUID of the uploaded file. + - status: The status of the upload operation. + - data: Additional data about the upload operation. + """ + scheme = get_scheme(kwargs['dest']) + upload_class = UPLOAD_CLASSES.get(scheme, LocalFineUploaderChunkedUpload) + _fs = upload_class(request, **kwargs) + _fs.upload() + if scheme == 'hdfs': + result = _massage_stats(request, stat_absolute_path(_fs.filepath, request.fs.stats(_fs.filepath))) + else: + result = "success" + return _create_response(request, _fs, result=result, data="Success") + +def extract_upload_data(request, method): + data = request.POST if method == "POST" else request.GET + chunks = { + "qquuid": data.get('qquuid'), + "qqpartindex": int(data.get('qqpartindex', 0)), + "qqpartbyteoffset": int(data.get('qqpartbyteoffset', 0)), + "qqtotalfilesize": int(data.get('qqtotalfilesize', 0)), + "qqtotalparts": int(data.get('qqtotalparts', 1)), + "qqfilename": data.get('qqfilename'), + "qqchunksize": int(data.get('qqchunksize', 0)), + "dest": data.get('dest', None), + "fileFieldLabel": data.get('fileFieldLabel') + } + return chunks + +@require_http_methods(["POST"]) +def upload_chunks(request): + """ + View function to handle chunked file uploads using Fine Uploader. + Args: + - request: The HTTP request object. + Returns: + - JsonResponse: A JSON response object with the following keys: + - success: A boolean indicating whether the upload was successful. + - uuid: A string representing the unique identifier for the uploaded file. + - error: A string representing the error message if the upload failed. + """ + if request.method == "POST": + try: + for _ in request.FILES.values(): # This processes the upload. + pass + except StopUpload: + return JsonResponse({'success': False, 'error': 'Error in upload'}) + + # case where file is larger than the single chunk size + if int(request.GET.get("qqtotalparts", 0)) > 0: + return JsonResponse({'success': True, 'uuid': request.GET.get('qquuid')}) + + # case where file is smaller than the chunk size + if int(request.GET.get("qqtotalparts", 0)) == 0 and int(request.GET.get("qqtotalfilesize", 0)) <= 2000000: + chunks = extract_upload_data(request, "GET") + try: + response = perform_upload(request, **chunks) + return JsonResponse(response) + except Exception as e: + return JsonResponse({'success': False, 'error': 'Error in upload'}) + return JsonResponse({'success': False, 'error': 'Unsupported request method'}) + +@require_http_methods(["POST"]) +def upload_complete(request): + """ + View function that handles the completion of a file upload. + Args: + request (HttpRequest): The HTTP request object. + Returns: + JsonResponse: A JSON response containing the result of the upload. + """ + if request.method == "POST": + chunks = extract_upload_data(request, "POST") + try: + response = perform_upload(request, **chunks) + return JsonResponse(response) + except Exception as e: + return JsonResponse({'success': False, 'error': 'Error in upload'}) + return JsonResponse({'success': False, 'error': 'Unsupported request method'}) @require_http_methods(["POST"]) def upload_file(request): diff --git a/apps/filebrowser/src/filebrowser/views_test.py b/apps/filebrowser/src/filebrowser/views_test.py index 2cef99f5e5e..d3d7dce6729 100644 --- a/apps/filebrowser/src/filebrowser/views_test.py +++ b/apps/filebrowser/src/filebrowser/views_test.py @@ -44,6 +44,7 @@ from django.urls import reverse from django.utils.encoding import smart_str from django.http import HttpResponse +from django.test import TestCase from nose.plugins.attrib import attr from nose.plugins.skip import SkipTest @@ -1530,6 +1531,53 @@ def test_has_default_permissions(self): finally: remove_from_group(self.user.username, 'has_adls') + +class UploadChunksTestCase(TestCase): + def setUp(self): + self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False) + grant_access('test', 'test', 'filebrowser') + add_to_group('test') + self.user = User.objects.get(username="test") + self.url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12' + self.filename = "test.txt" + + def test_upload_chunks_success(self): + url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12' + response = self.client.post(url, {'filename': self.filename}) + self.assertEqual(response.status_code, 200) + # In Test Setup HDFS is not available, so it will fail + self.assertEqual(response.json()['success'], False) + + def test_upload_chunks_large_file(self): + # simulate a large file upload + url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqpartindex=2&qqpartbyteoffset=4000000&' + url += 'qqchunksize=2000000&qqtotalparts=36&qqtotalfilesize=71138958&qqfilename=ym_2020.csv&qquuid=123' + response = self.client.post(url, {'filename': self.filename}) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['success'], True) + self.assertEqual(response.json()['uuid'], '123') + + def test_upload_chunks_small_file(self): + # simulate a small file upload + url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqtotalfilesize=48&qqfilename=ym_2020.csv&qquuid=123' + response = self.client.post(url, {'qqtotalfilesize': 1000, 'qquuid': '123'}) + self.assertEqual(response.status_code, 200) + # In Test Setup HDFS is not available, so it will fail + self.assertEqual(response.json()['success'], False) + + def test_upload_chunks_error(self): + # simulate an error in the upload + url = '/filebrowser/upload/chunks/' + try: + response = self.client.post(url) + except Exception as e: + self.assertEqual(e.status_code, 500) + self.assertEqual(e.json()['success'], False) + self.assertEqual(e.json()['error'], 'Error in upload') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()['success'], False) + self.assertEqual(response.json()['error'], 'Error in upload') + class TestOFSAccessPermissions(object): def setUp(self): self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False) diff --git a/desktop/core/src/desktop/api2.py b/desktop/core/src/desktop/api2.py index 5c56eb45435..f9915587b51 100644 --- a/desktop/core/src/desktop/api2.py +++ b/desktop/core/src/desktop/api2.py @@ -46,8 +46,8 @@ from beeswax.models import Namespace from desktop import appmanager from desktop.auth.backend import is_admin -from desktop.conf import ENABLE_CONNECTORS, ENABLE_GIST_PREVIEW, CUSTOM, get_clusters, ENABLE_SHARING -from desktop.conf import ENABLE_NEW_STORAGE_BROWSER +from desktop.conf import ENABLE_CONNECTORS, ENABLE_GIST_PREVIEW, CUSTOM, get_clusters, IS_K8S_ONLY, ENABLE_SHARING +from desktop.conf import ENABLE_NEW_STORAGE_BROWSER, ENABLE_NEW_FILE_UPLOADER from desktop.lib.conf import BoundContainer, GLOBAL_CONFIG, is_anonymous from desktop.lib.django_util import JsonResponse, login_notrequired, render from desktop.lib.exceptions_renderable import PopupException @@ -100,6 +100,7 @@ def get_config(request): config['hue_config']['is_admin'] = is_admin(request.user) config['hue_config']['is_yarn_enabled'] = is_yarn() config['hue_config']['enable_new_storage_browser'] = ENABLE_NEW_STORAGE_BROWSER.get() + config['hue_config']['enable_new_file_uploader'] = ENABLE_NEW_FILE_UPLOADER.get() config['clusters'] = list(get_clusters(request.user).values()) config['documents'] = { 'types': list(Document2.objects.documents(user=request.user).order_by().values_list('type', flat=True).distinct()) diff --git a/desktop/core/src/desktop/conf.py b/desktop/core/src/desktop/conf.py index e99fd3c1b19..45aa55c8a23 100644 --- a/desktop/core/src/desktop/conf.py +++ b/desktop/core/src/desktop/conf.py @@ -1818,6 +1818,16 @@ def get_instrumentation_default(): default=False ) +def get_chunked_fileuploader(): + return ENABLE_NEW_FILE_UPLOADER.get(); + +ENABLE_NEW_FILE_UPLOADER = Config( + key="enable_new_file_uploader", + help=_("Enable new chunked file uploader."), + type=coerce_bool, + default=False +) + USE_NEW_EDITOR = Config( # To remove in Hue 4 key='', default=True, diff --git a/desktop/core/src/desktop/lib/fs/ozone/upload.py b/desktop/core/src/desktop/lib/fs/ozone/upload.py index ecc28ba5f5a..740e19b475f 100644 --- a/desktop/core/src/desktop/lib/fs/ozone/upload.py +++ b/desktop/core/src/desktop/lib/fs/ozone/upload.py @@ -14,8 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import io import logging import sys +import unicodedata from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadhandler import FileUploadHandler, StopFutureHandlers, StopUpload, UploadFileException @@ -29,9 +31,112 @@ else: from django.utils.translation import ugettext as _ +from desktop.lib.exceptions_renderable import PopupException +from filebrowser.utils import generate_chunks LOG = logging.getLogger() +class OFSFineUploaderChunkedUpload(object): + def __init__(self, request, *args, **kwargs): + self.qquuid = kwargs.get('qquuid') + self.qqtotalparts = kwargs.get('qqtotalparts') + self.totalfilesize = kwargs.get('qqtotalfilesize') + self.file_name = kwargs.get('qqfilename') + if self.file_name: + self.file_name = unicodedata.normalize('NFC', self.file_name) # Normalize unicode + self.chunk_size = UPLOAD_CHUNK_SIZE.get() + self.destination = kwargs.get('dest', None) # GET param avoids infinite looping + self.target_path = None + self.file = None + self._request = request + self._part_size = UPLOAD_CHUNK_SIZE.get() + + def check_access(self): + if self._is_ofs_upload(): + self._fs = self._get_ofs(self._request) + + # Verify that the path exists + try: + self._fs.stats(self.destination) + except Exception as e: + raise PopupException(_('Destination path does not exist: %s' % self.destination)) + + LOG.debug("Chunk size = %d" % UPLOAD_CHUNK_SIZE.get()) + LOG.info('Using OFSFileUploadHandler to handle file upload.') + self.target_path = self._fs.join(self.destination, self.file_name) + + def upload_chunks(self): + LOG.debug("OFSfileUploadHandler receive_data_chunk") + try: + LOG.debug("OFSFileUploadHandler uploading file part with size: %s" % self._part_size) + fp = io.BytesIO() + for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=self.chunk_size), 1): + logging.debug("OFSfileUploadHandler uploading file %s, part %d, size %d, dest: %s" % (self.file_name, i, total, self.destination)) + fp.write(chunk.getvalue()) + fp.seek(0) + self._fs.create(self.target_path, data=fp.getvalue()) + fp.close() + except Exception as e: + LOG.exception('Failed to upload file to ozone at %s: %s' % (self.target_path, e)) + raise PopupException("OFSfileUploadHandler uploading file %s failed with %s" % (self.target_path, e)) + finally: + # Finish the upload + LOG.info("OFSFileUploadHandler has completed file upload to OFS, total file size is: %d." % self.totalfilesize) + LOG.debug("%s" % self._fs.stats(self.target_path)) + return True + + def upload(self): + self.check_access() + self.upload_chunks() + + def _get_ofs(self, request): + fs = get_client(fs='ofs', user=request.user.username) + if not fs: + raise PopupException(_("No OFS filesystem found.")) + return fs + + def _is_ofs_upload(self): + return self._get_scheme() and self._get_scheme().startswith('ofs') + + def _get_scheme(self): + if self.destination: + dst_parts = self.destination.split('://') + if dst_parts: + return dst_parts[0].lower() + else: + raise PopupException('Destination does not start with a valid scheme.') + else: + return None + + def file_complete(self, file_size): + if self._is_ofs_upload(): + # Finish the upload + LOG.info("OFSFileUploadHandler has completed file upload to OFS, total file size is: %d." % file_size) + self.file.size = file_size + LOG.debug("%s" % self._fs.stats(self.target_path)) + return self.file + else: + return None + + def _get_ofs(self, request): + fs = get_client(fs='ofs', user=request.user.username) + if not fs: + raise PopupException(_("No OFS filesystem found.")) + return fs + + def _is_ofs_upload(self): + return self._get_scheme() and self._get_scheme().startswith('ofs') + + def _get_scheme(self): + if self.destination: + dst_parts = self.destination.split('://') + if dst_parts: + return dst_parts[0].lower() + else: + raise PopupException('Destination does not start with a valid scheme.') + else: + return None + class OFSFileUploadError(UploadFileException): pass diff --git a/desktop/core/src/desktop/settings.py b/desktop/core/src/desktop/settings.py index 2d1766ddde9..9ee89937931 100644 --- a/desktop/core/src/desktop/settings.py +++ b/desktop/core/src/desktop/settings.py @@ -37,7 +37,7 @@ from aws.conf import is_enabled as is_s3_enabled from azure.conf import is_abfs_enabled -from desktop.conf import is_ofs_enabled, is_gs_enabled +from desktop.conf import is_ofs_enabled, is_gs_enabled, get_chunked_fileuploader if sys.version_info[0] > 2: from django.utils.translation import gettext_lazy as _ @@ -654,24 +654,30 @@ def is_oidc_configured(): ################################################################ # Insert our custom upload handlers -file_upload_handlers = [ +file_upload_handlers = [] +if get_chunked_fileuploader(): + file_upload_handlers = [ + 'hadoop.fs.upload.FineUploaderChunkedUploadHandler', + 'django.core.files.uploadhandler.MemoryFileUploadHandler', + 'django.core.files.uploadhandler.TemporaryFileUploadHandler', + ] +else: + file_upload_handlers = [ 'hadoop.fs.upload.HDFSfileUploadHandler', 'django.core.files.uploadhandler.MemoryFileUploadHandler', 'django.core.files.uploadhandler.TemporaryFileUploadHandler', -] - -if is_s3_enabled(): - file_upload_handlers.insert(0, 'aws.s3.upload.S3FileUploadHandler') - -if is_gs_enabled(): - file_upload_handlers.insert(0, 'desktop.lib.fs.gc.upload.GSFileUploadHandler') + ] + if is_s3_enabled(): + file_upload_handlers.insert(0, 'aws.s3.upload.S3FileUploadHandler') -if is_abfs_enabled(): - file_upload_handlers.insert(0, 'azure.abfs.upload.ABFSFileUploadHandler') + if is_gs_enabled(): + file_upload_handlers.insert(0, 'desktop.lib.fs.gc.upload.GSFileUploadHandler') -if is_ofs_enabled(): - file_upload_handlers.insert(0, 'desktop.lib.fs.ozone.upload.OFSFileUploadHandler') + if is_abfs_enabled(): + file_upload_handlers.insert(0, 'azure.abfs.upload.ABFSFileUploadHandler') + if is_ofs_enabled(): + file_upload_handlers.insert(0, 'desktop.lib.fs.ozone.upload.OFSFileUploadHandler') FILE_UPLOAD_HANDLERS = tuple(file_upload_handlers) diff --git a/desktop/libs/aws/src/aws/s3/upload.py b/desktop/libs/aws/src/aws/s3/upload.py index 8509127bd27..5faa4f458d7 100644 --- a/desktop/libs/aws/src/aws/s3/upload.py +++ b/desktop/libs/aws/src/aws/s3/upload.py @@ -21,10 +21,12 @@ See http://docs.djangoproject.com/en/1.9/topics/http/file-uploads/ """ +import io from future import standard_library standard_library.install_aliases() import logging import sys +import unicodedata if sys.version_info[0] > 2: from io import BytesIO as stream_io @@ -43,10 +45,81 @@ else: from django.utils.translation import ugettext as _ -DEFAULT_WRITE_SIZE = 1024 * 1024 * 50 # TODO: set in configuration (currently 50 MiB) +DEFAULT_WRITE_SIZE = 1024 * 1024 * 128 # TODO: set in configuration (currently 128 MiB) LOG = logging.getLogger() +from desktop.lib.exceptions_renderable import PopupException +from filebrowser.utils import generate_chunks + +class S3FineUploaderChunkedUpload(object): + def __init__(self, request, *args, **kwargs): + self._part_num = 1 + self._mp = None + self._request = request + self.qquuid = kwargs.get('qquuid') + self.qqtotalparts = kwargs.get('qqtotalparts') + self.totalfilesize = kwargs.get('qqtotalfilesize') + self.file_name = kwargs.get('qqfilename') + if self.file_name: + self.file_name = unicodedata.normalize('NFC', self.file_name) # Normalize unicode + self.destination = kwargs.get('dest', None) # GET param avoids infinite looping + self.file_name = kwargs.get('qqfilename') + self._fs = get_client(fs='s3a', user=self._request.user.username) + self.bucket_name, self.key_name = parse_uri(self.destination)[:2] + # Verify that the path exists + self._fs._stats(self.destination) + self._bucket = self._fs._get_bucket(self.bucket_name) + self.filepath = self._fs.join(self.key_name, self.file_name) + + def check_access(self): + if self._is_s3_upload(): + try: + # Check access permissions before attempting upload + self._check_access() + # Create a multipart upload request + LOG.debug("Initiating S3 multipart upload to target path: %s" % self.filepath) + self._mp = self._bucket.initiate_multipart_upload(self.filepath) + except (S3FileUploadError, S3FileSystemException) as e: + LOG.error("Encountered error in S3UploadHandler check_access: %s" % e) + self.request.META['upload_failed'] = e + raise PopupException("Initiating S3 multipart upload to target path: %s failed" % self.filepath) + + def upload_chunks(self): + try: + for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=DEFAULT_WRITE_SIZE), 1): + LOG.debug("S3FileUploadHandler uploading file %s, part %d, size %d, dest: %s" % (self.file_name, i, total, self.destination)) + self._mp.upload_part_from_file(fp=chunk, part_num=i) + except Exception as e: + self._mp.cancel_upload() + LOG.exception('Failed to upload file to S3 at %s: %s' % (self.filepath, e)) + raise PopupException("S3FileUploadHandler uploading file %s failed with %s" % (self.filepath, e)) + finally: + # Finish the upload + LOG.info("S3FileUploadHandler has completed file upload to S3, total file size is: %d." % self.totalfilesize) + self._mp.complete_upload() + + def upload(self): + self.check_access() + self.upload_chunks() + + def _is_s3_upload(self): + return self._get_scheme() and self._get_scheme().startswith('S3') + + def _check_access(self): + if not self._fs.check_access(self.destination, permission='WRITE'): + raise S3FileSystemException('Insufficient permissions to write to S3 path "%s".' % self.destination) + + def _get_scheme(self): + if self.destination: + dst_parts = self.destination.split('://') + if dst_parts: + return dst_parts[0].upper() + else: + raise S3FileSystemException('Destination does not start with a valid scheme.') + else: + return None + class S3FileUploadError(UploadFileException): pass diff --git a/desktop/libs/azure/src/azure/abfs/abfs.py b/desktop/libs/azure/src/azure/abfs/abfs.py index 4baaac99656..d4c69c0c494 100644 --- a/desktop/libs/azure/src/azure/abfs/abfs.py +++ b/desktop/libs/azure/src/azure/abfs/abfs.py @@ -449,7 +449,7 @@ def _append(self, path, data, size=0, offset=0, params=None, **kwargs): params['action'] = 'append' headers = {} if size == 0 or size == '0': - headers['Content-Length'] = str(len(data)) + headers['Content-Length'] = str(len(data.getvalue())) if headers['Content-Length'] == '0': return else: diff --git a/desktop/libs/azure/src/azure/abfs/upload.py b/desktop/libs/azure/src/azure/abfs/upload.py index e0b014b5d4c..37d7eddb496 100644 --- a/desktop/libs/azure/src/azure/abfs/upload.py +++ b/desktop/libs/azure/src/azure/abfs/upload.py @@ -17,6 +17,7 @@ standard_library.install_aliases() import logging import sys +import unicodedata if sys.version_info[0] > 2: from io import StringIO as string_io @@ -26,6 +27,7 @@ from django.core.files.uploadedfile import SimpleUploadedFile from django.core.files.uploadhandler import FileUploadHandler, SkipFile, StopFutureHandlers, StopUpload, UploadFileException +from desktop.lib.exceptions_renderable import PopupException from desktop.lib.fsmanager import get_client from azure.abfs.__init__ import parse_uri from azure.abfs.abfs import ABFSFileSystemException @@ -35,10 +37,87 @@ else: from django.utils.translation import ugettext as _ -DEFAULT_WRITE_SIZE = 30 * 1000 * 1000 # TODO: set in configuration +DEFAULT_WRITE_SIZE = 100 * 1024 * 1024 # As per Azure doc, maximum blob size is 100MB LOG = logging.getLogger() +from filebrowser.utils import generate_chunks + +class ABFSFineUploaderChunkedUpload(object): + def __init__(self, request, *args, **kwargs): + self.destination = kwargs.get('dest', None) # GET param avoids infinite looping + self.target_path = None + self.file = None + self._request = request + self._part_size = DEFAULT_WRITE_SIZE + + self.qquuid = kwargs.get('qquuid') + self.qqtotalparts = kwargs.get('qqtotalparts') + self.totalfilesize = kwargs.get('qqtotalfilesize') + self.file_name = kwargs.get('qqfilename') + if self.file_name: + self.file_name = unicodedata.normalize('NFC', self.file_name) # Normalize unicode + + if self._is_abfs_upload(): + self._fs = self._get_abfs(request) + self.filesystem, self.directory = parse_uri(self.destination)[:2] + # Verify that the path exists + self._fs.stats(self.destination) + LOG.debug("Chunk size = %d" % DEFAULT_WRITE_SIZE) + + def check_access(self): + LOG.info('Using ABFSFileUploadHandler to handle file upload wit temp file %s.' % self.file_name) + self.target_path = self._fs.join(self.destination, self.file_name) + + try: + # Check access permissions before attempting upload + #self._check_access() #implement later + LOG.debug("Initiating ABFS upload to target path: %s" % self.target_path) + self._fs.create(self.target_path) + except (ABFSFileUploadError, ABFSFileSystemException) as e: + LOG.error("Encountered error in ABFSUploadHandler check_access: %s" % e) + self.request.META['upload_failed'] = e + raise PopupException("Initiating ABFS upload to target path: %s failed %s" % (self.target_path, e)) + + def upload_chunks(self): + try: + for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=DEFAULT_WRITE_SIZE), 1): + LOG.debug("ABFSFileUploadHandler uploading file %s, part %d, size %d, dest: %s" % (self.file_name, i, total, self.destination)) + self._fs._append(self.target_path, chunk) + except Exception as e: + self._fs.remove(self.target_path) + LOG.exception('Failed to upload file to ABFS at %s: %s' % (self.target_path, e)) + raise PopupException("S3FileUploadHandler uploading file %s part: %d failed" % (self.filepath, i)) + finally: + #finish the upload + self._fs.flush(self.target_path, {'position': self.totalfilesize}) + LOG.info("ABFSFileUploadHandler has completed file upload to ABFS, total file size is: %d." % self.totalfilesize) + LOG.debug("%s" % self._fs.stats(self.target_path)) + + def upload(self): + self.check_access() + self.upload_chunks() + + def _get_abfs(self, request): + fs = get_client(fs='abfs', user=request.user.username) + + if not fs: + raise ABFSFileUploadError(_("No ABFS filesystem found")) + + return fs + + def _is_abfs_upload(self): + return self._get_scheme() and self._get_scheme().startswith('ABFS') + + def _get_scheme(self): + if self.destination: + dst_parts = self.destination.split('://') + if dst_parts: + return dst_parts[0].upper() + else: + raise ABFSFileSystemException('Destination does not start with a valid scheme.') + else: + return None class ABFSFileUploadError(UploadFileException): pass diff --git a/desktop/libs/hadoop/src/hadoop/conf.py b/desktop/libs/hadoop/src/hadoop/conf.py index a0103aaa1dc..3b2a8efbabb 100644 --- a/desktop/libs/hadoop/src/hadoop/conf.py +++ b/desktop/libs/hadoop/src/hadoop/conf.py @@ -51,9 +51,9 @@ def f(): UPLOAD_CHUNK_SIZE = Config( key="upload_chunk_size", - help="Size, in bytes, of the 'chunks' Django should store into memory and feed into the handler. Default is 64MB.", + help="Size, in bytes, of the 'chunks' Django should store into memory and feed into the handler. Default is 128MB.", type=int, - default=1024 * 1024 * 64) + default=1024 * 1024 * 128) def has_hdfs_enabled(): diff --git a/desktop/libs/hadoop/src/hadoop/fs/upload.py b/desktop/libs/hadoop/src/hadoop/fs/upload.py index 1d7cc768ba8..8dc04662f37 100644 --- a/desktop/libs/hadoop/src/hadoop/fs/upload.py +++ b/desktop/libs/hadoop/src/hadoop/fs/upload.py @@ -28,7 +28,10 @@ from builtins import object import errno import logging +import os +import posixpath import sys +import unicodedata import time from django.core.files.uploadhandler import FileUploadHandler, StopFutureHandlers, StopUpload, UploadFileException, SkipFile @@ -38,6 +41,9 @@ import hadoop.cluster from hadoop.conf import UPLOAD_CHUNK_SIZE from hadoop.fs.exceptions import WebHdfsException +from desktop.lib.exceptions_renderable import PopupException +from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR +from filebrowser.utils import generate_chunks if sys.version_info[0] > 2: from django.utils.translation import gettext as _ @@ -51,6 +57,90 @@ UPLOAD_SUBDIR = 'hue-uploads' +class LocalFineUploaderChunkedUpload(object): + def __init__(self, request, *args, **kwargs): + self._request = request + self.qquuid = kwargs.get('qquuid') + self.qqtotalparts = kwargs.get('qqtotalparts') + self.totalfilesize = kwargs.get('qqtotalfilesize') + self.file_name = kwargs.get('qqfilename') + if self.file_name: + self.file_name = unicodedata.normalize('NFC', self.file_name) # Normalize unicode + local = "local:/" + if local in kwargs.get('dest', ""): + self.dest = kwargs.get('dest')[len(local):] + else: + self.dest = kwargs.get('dest') + self.file_name = kwargs.get('qqfilename') + self.filepath = request.fs.join(self.dest, self.file_name) + self._file = None + self.chunk_size = 0 + def check_access(self): + pass + def upload_chunks(self): + pass + def upload(self): + self.check_access() + self.upload_chunks() + + +class HDFSFineUploaderChunkedUpload(object): + def __init__(self, request, *args, **kwargs): + self._request = request + self.qquuid = kwargs.get('qquuid') + self.qqtotalparts = kwargs.get('qqtotalparts') + self.totalfilesize = kwargs.get('qqtotalfilesize') + self.file_name = kwargs.get('qqfilename') + if self.file_name: + self.file_name = unicodedata.normalize('NFC', self.file_name) # Normalize unicode + self.dest = kwargs.get('dest') + self.file_name = kwargs.get('qqfilename') + self.filepath = request.fs.join(self.dest, self.file_name) + self._file = None + self.chunk_size = 0 + + def check_access(self): + if self._request.fs.isdir(self.dest) and posixpath.sep in self.file_name: + raise PopupException(_('Sorry, no "%(sep)s" in the filename %(name)s.' % {'sep': posixpath.sep, 'name': self.file_name})) + + fs = fsmanager.get_filesystem('default') + if not fs: + logging.warning('No HDFS set for HDFS upload') + else: + fs.setuser(self._request.user.username) + self.chunk_size = fs.get_upload_chuck_size(self.dest) if self.dest else UPLOAD_CHUNK_SIZE.get() + logging.debug("Chunk size = %d" % self.chunk_size) + + def upload_chunks(self): + self._file = HDFStemporaryUploadedFile(self._request, self.file_name, self.dest) + logging.debug('Upload attempt to %s' % (self._file.get_temp_path(),)) + for i, (chunk, total) in enumerate(generate_chunks(self.qquuid, self.qqtotalparts, default_write_size=self.chunk_size), 1): + logging.debug("HDFSfileUploadHandler uploading file %s, part %d, size %d, dest: %s" % (self.file_name, i, total, self.dest)) + self._file.write(chunk) + self._file.flush() + self._file.finish_upload(self.totalfilesize) + self._file._do_cleanup = False + self._file.close() + + try: + self._request.fs.upload(file=self._file, path=self.dest, username=self._request.user.username) + except IOError as ex: + already_exists = False + try: + already_exists = self._request.fs.exists(self.dest) + except Exception: + pass + if already_exists: + msg = _('Destination %(name)s already exists.') % {'name': self.filepath} + else: + msg = _('Copy to %(name)s failed: %(error)s') % {'name': self.filepath, 'error': ex} + raise PopupException(msg) + + def upload(self): + self.check_access() + self.upload_chunks() + + class HDFSerror(UploadFileException): pass @@ -124,6 +214,50 @@ def flush(self): def close(self): self._file.close() +class FineUploaderChunkedUploadHandler(FileUploadHandler): + """ + A custom file upload handler for handling chunked uploads using FineUploader. + + Attributes: + - qquuid (str): The unique identifier for the uploaded file. + - qqpartindex (int): The index of the current chunk being uploaded. + - qqpartbyteoffset (int): The byte offset of the current chunk within the file. + - qqtotalfilesize (int): The total size of the uploaded file. + - qqtotalparts (int): The total number of chunks that make up the file. + - qqfilename (str): The name of the uploaded file. + - qqchunksize (int): The size of each chunk being uploaded. + """ + def __init__(self, request=None, *args, **kwargs): + super().__init__(request, *args, **kwargs) + # Capture FineUploader parameters from the request + self.qquuid = self.request.GET.get('qquuid', "") + self.qqpartindex = int(self.request.GET.get('qqpartindex', 0)) + self.qqpartbyteoffset = int(self.request.GET.get('qqpartbyteoffset', 0)) + self.qqtotalfilesize = int(self.request.GET.get('qqtotalfilesize', 0)) + self.qqtotalparts = int(self.request.GET.get('qqtotalparts', 1)) + self.qqfilename = self.request.GET.get('qqfilename', "") + self.qqchunksize = int(self.request.GET.get('qqchunksize', 0)) + self._starttime = time.time() + self.chunk_file_path = os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{self.qquuid}_{self.qqpartindex}') + + def receive_data_chunk(self, raw_data, start): + """ + Receives a chunk of data and writes it to a temporary file. + Args: + - raw_data (bytes): The raw data of the chunk being uploaded. + - start (int): The starting byte offset of the chunk within the file. + """ + with open(self.chunk_file_path, 'ab+') as dest: + dest.write(raw_data) + + def file_complete(self, file_size): + """ + Called when the entire file has been uploaded and all chunks have been processed. + Args: + - file_size (int): The total size of the uploaded file. + """ + elapsed = time.time() - self._starttime + LOG.info('Uploaded %s bytes %s to in %s seconds' % (file_size, self.chunk_file_path, elapsed)) class HDFSfileUploadHandler(FileUploadHandler): """ @@ -176,7 +310,8 @@ def receive_data_chunk(self, raw_data, start): LOG.debug("HDFSfileUploadHandler receive_data_chunk") if not self._activated: - if self.request.META.get('PATH_INFO').startswith('/filebrowser') and self.request.META.get('PATH_INFO') != '/filebrowser/upload/archive': + if self.request.META.get('PATH_INFO').startswith('/filebrowser') and \ + self.request.META.get('PATH_INFO') != '/filebrowser/upload/archive': raise SkipFile() return raw_data