Skip to content

Commit

Permalink
Working S3, HDFS version, Code added for ADLS and Ozone with Test cases
Browse files Browse the repository at this point in the history
Testing Done:
1. Tested on HDFS, S3, ADLS and Ozone upload
  • Loading branch information
ranade1 committed Dec 6, 2023
1 parent c6ee29f commit 829f3bb
Show file tree
Hide file tree
Showing 14 changed files with 720 additions and 37 deletions.
16 changes: 14 additions & 2 deletions apps/filebrowser/src/filebrowser/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys

from desktop.conf import ENABLE_DOWNLOAD
Expand All @@ -26,7 +27,6 @@
else:
from django.utils.translation import ugettext_lazy as _


MAX_SNAPPY_DECOMPRESSION_SIZE = Config(
key="max_snappy_decompression_size",
help=_("Max snappy decompression size in bytes."),
Expand All @@ -37,9 +37,21 @@
ARCHIVE_UPLOAD_TEMPDIR = Config(
key="archive_upload_tempdir",
help=_("Location on local filesystem where the uploaded archives are temporary stored."),
default=None,
default="/tmp/hue_uploads",
type=str)

FILE_UPLOAD_CHUNK_SIZE = Config(
key="file_upload_chunk_size",
default=5000000,
type=int,
help=_('Configure chunk size of the chunked file uploader. Default chunk size is set to 5MB.'))

CONCURRENT_MAX_CONNECTIONS = Config(
key="concurrent_max_connections",
default=5,
type=int,
help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.'))

def get_desktop_enable_download():
"""Get desktop enable_download default"""
return ENABLE_DOWNLOAD.get()
Expand Down
2 changes: 2 additions & 0 deletions apps/filebrowser/src/filebrowser/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
# POST operations
re_path(r'^save$', filebrowser_views.save_file, name="filebrowser_views_save_file"),
re_path(r'^upload/file$', filebrowser_views.upload_file, name='upload_file'),
re_path(r'^upload/chunks', filebrowser_views.upload_chunks, name='upload_chunks'),
re_path(r'^upload/complete', filebrowser_views.upload_complete, name='upload_complete'),
re_path(r'^extract_archive', filebrowser_views.extract_archive_using_batch_job, name='extract_archive_using_batch_job'),
re_path(r'^compress_files', filebrowser_views.compress_files_using_batch_job, name='compress_files_using_batch_job'),
re_path(r'^trash/restore$', filebrowser_views.trash_restore, name='trash_restore'),
Expand Down
64 changes: 64 additions & 0 deletions apps/filebrowser/src/filebrowser/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os
import logging
LOG = logging.getLogger()

from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR
DEFAULT_WRITE_SIZE = 1024 * 1024 * 128

def calculate_total_size(uuid, totalparts):
total = 0
files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)]
for file_path in files:
try:
total += os.path.getsize(file_path)
except FileNotFoundError:
LOG.error(f"calculate_total_size: The file '{file_path}' does not exist.")
except OSError as e:
LOG.error(f"calculate_total_size: For the file '{file_path}' error occurred: {e}")
return total

def generate_chunks(uuid, totalparts, default_write_size=DEFAULT_WRITE_SIZE):
fp = io.BytesIO()
total = 0
files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)]
for file_path in files:
with open(file_path, 'rb') as f:
while True:
# Read the file in portions, e.g., 1MB at a time
portion = f.read(1 * 1024 * 1024)
if not portion:
break
fp.write(portion)
total = total + len(portion)
# If buffer size is more than 128MB, yield the chunk
if fp.tell() >= default_write_size:
fp.seek(0)
yield fp, total
fp.close()
fp = io.BytesIO()
# Yield any remaining data in the buffer
if fp.tell() > 0:
fp.seek(0)
yield fp, total + fp.tell()
fp.close()
# chances are the chunk is zero and we never yielded
else:
fp.close()
for file_path in files:
os.remove(file_path)
159 changes: 145 additions & 14 deletions apps/filebrowser/src/filebrowser/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

from future import standard_library
standard_library.install_aliases()

from django.views.decorators.csrf import csrf_exempt
from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR
from django.core.files.uploadhandler import FileUploadHandler, StopUpload, StopFutureHandlers
from hadoop.conf import UPLOAD_CHUNK_SIZE

from builtins import object
import errno
import logging
Expand Down Expand Up @@ -80,6 +86,11 @@
RenameFormSet, RmTreeFormSet, ChmodFormSet, ChownFormSet, CopyFormSet, RestoreFormSet,\
TrashPurgeForm, SetReplicationFactorForm

from hadoop.fs.upload import HDFSFineUploaderChunkedUpload, LocalFineUploaderChunkedUpload
from aws.s3.upload import S3FineUploaderChunkedUpload
from azure.abfs.upload import ABFSFineUploaderChunkedUpload
from desktop.lib.fs.ozone.upload import OFSFineUploaderChunkedUpload

if sys.version_info[0] > 2:
import io
from io import StringIO as string_io
Expand Down Expand Up @@ -121,10 +132,28 @@

INLINE_DISPLAY_MIMETYPE_EXCEPTIONS = re.compile('image/svg\+xml')

SCHEME_PREFIXES = {
's3a': 's3a://',
'ofs': 'ofs://',
'abfs': 'abfs://',
'hdfs': 'hdfs://',
'gs': 'gs://',
'local': 'local://',
}

UPLOAD_CLASSES = {
's3a': S3FineUploaderChunkedUpload,
'ofs': OFSFineUploaderChunkedUpload,
'abfs': ABFSFineUploaderChunkedUpload,
'hdfs': HDFSFineUploaderChunkedUpload,
'local': LocalFineUploaderChunkedUpload,
}

if not os.path.exists(ARCHIVE_UPLOAD_TEMPDIR.get()):
os.makedirs(ARCHIVE_UPLOAD_TEMPDIR.get())

logger = logging.getLogger()


class ParquetOptions(object):
def __init__(self, col=None, format='json', no_headers=True, limit=-1):
self.col = col
Expand All @@ -147,10 +176,11 @@ def index(request):

def _decode_slashes(path):
# This is a fix for some installations where the path is still having the slash (/) encoded
# as %2F while the rest of the path is actually decoded.
# as %2F while the rest of the path is actually decoded.
encoded_slash = '%2F'
if path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \
path.startswith('s3a:' + encoded_slash) or path.startswith('gs:' + encoded_slash) or path.startswith('ofs:' + encoded_slash):
if path and path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \
path.startswith('s3a:' + encoded_slash) or path.startswith('gs:' + encoded_slash) or \
path.startswith('ofs:' + encoded_slash):
path = path.replace(encoded_slash, '/')

return path
Expand All @@ -159,17 +189,19 @@ def _normalize_path(path):
path = _decode_slashes(path)

# Check if protocol missing / and add it back (e.g. Kubernetes ingress can strip double slash)
if path.startswith('abfs:/') and not path.startswith('abfs://'):
path = path.replace('abfs:/', 'abfs://')
if path.startswith('s3a:/') and not path.startswith('s3a://'):
path = path.replace('s3a:/', 's3a://')
if path.startswith('gs:/') and not path.startswith('gs://'):
path = path.replace('gs:/', 'gs://')
if path.startswith('ofs:/') and not path.startswith('ofs://'):
path = path.replace('ofs:/', 'ofs://')
for scheme, prefix in SCHEME_PREFIXES.items():
single_slash_prefix = f"{scheme}:/"
if path and path.startswith(single_slash_prefix) and not path.startswith(prefix):
path = path.replace(single_slash_prefix, prefix)

return path

def get_scheme(path):
path = _normalize_path(path)
for scheme, prefix in SCHEME_PREFIXES.items():
if path.startswith(prefix):
return scheme
return 'hdfs'

def download(request, path):
"""
Expand Down Expand Up @@ -1223,7 +1255,7 @@ def generic_op(form_class, request, op, parameter_names, piggyback=None, templat
if next:
logging.debug("Next: %s" % next)
file_path_prefix = '/filebrowser/view='
if next.startswith(file_path_prefix):
if next.startswith(file_path_prefix):
decoded_file_path = next[len(file_path_prefix):]
filepath_encoded_next = file_path_prefix + urllib_quote(decoded_file_path.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS)
return format_preserving_redirect(request, filepath_encoded_next)
Expand Down Expand Up @@ -1347,7 +1379,7 @@ def bulk_copy(*args, **kwargs):
ofs_skip_files += request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user)
else:
request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user)

# Send skipped filenames via raising exception to let users know.
if ofs_skip_files:
raise PopupException("Following files were skipped due to file size limitations:" + ofs_skip_files)
Expand Down Expand Up @@ -1418,6 +1450,105 @@ def bulk_restore(*args, **kwargs):
def trash_purge(request):
return generic_op(TrashPurgeForm, request, request.fs.purge_trash, [], None)

def _create_response(request, _fs, result="success", data="Success"):
return {
'path': _fs.filepath,
'result': result,
'next': request.GET.get("next"),
'success': True,
'uuid': _fs.qquuid,
'status': 0,
'data': data
}

def perform_upload(request, *args, **kwargs):
"""
Uploads a file to the specified destination.
Args:
request: The HTTP request object.
**kwargs: Arbitrary keyword arguments.
Returns:
A dictionary containing the following keys:
- path: The path of the uploaded file.
- result: The result of the upload operation.
- next: The URL to redirect to after the upload operation.
- success: A boolean indicating whether the upload operation was successful.
- uuid: The UUID of the uploaded file.
- status: The status of the upload operation.
- data: Additional data about the upload operation.
"""
scheme = get_scheme(kwargs['dest'])
upload_class = UPLOAD_CLASSES.get(scheme, LocalFineUploaderChunkedUpload)
_fs = upload_class(request, **kwargs)
_fs.upload()
if scheme == 'hdfs':
result = _massage_stats(request, stat_absolute_path(_fs.filepath, request.fs.stats(_fs.filepath)))
else:
result = "success"
return _create_response(request, _fs, result=result, data="Success")

def extract_upload_data(request, method):
data = request.POST if method == "POST" else request.GET
chunks = {
"qquuid": data.get('qquuid'),
"qqpartindex": int(data.get('qqpartindex', 0)),
"qqpartbyteoffset": int(data.get('qqpartbyteoffset', 0)),
"qqtotalfilesize": int(data.get('qqtotalfilesize', 0)),
"qqtotalparts": int(data.get('qqtotalparts', 1)),
"qqfilename": data.get('qqfilename'),
"qqchunksize": int(data.get('qqchunksize', 0)),
"dest": data.get('dest', None),
"fileFieldLabel": data.get('fileFieldLabel')
}
return chunks

@require_http_methods(["POST"])
def upload_chunks(request):
"""
View function to handle chunked file uploads using Fine Uploader.
Args:
- request: The HTTP request object.
Returns:
- JsonResponse: A JSON response object with the following keys:
- success: A boolean indicating whether the upload was successful.
- uuid: A string representing the unique identifier for the uploaded file.
- error: A string representing the error message if the upload failed.
"""
try:
for _ in request.FILES.values(): # This processes the upload.
pass
except StopUpload:
return JsonResponse({'success': False, 'error': 'Error in upload'})

# case where file is larger than the single chunk size
if int(request.GET.get("qqtotalparts", 0)) > 0:
return JsonResponse({'success': True, 'uuid': request.GET.get('qquuid')})

# case where file is smaller than the chunk size
if int(request.GET.get("qqtotalparts", 0)) == 0 and int(request.GET.get("qqtotalfilesize", 0)) <= 2000000:
chunks = extract_upload_data(request, "GET")
try:
response = perform_upload(request, **chunks)
return JsonResponse(response)
except Exception as e:
return JsonResponse({'success': False, 'error': 'Error in upload'})
return JsonResponse({'success': False, 'error': 'Unsupported request method'})

@require_http_methods(["POST"])
def upload_complete(request):
"""
View function that handles the completion of a file upload.
Args:
request (HttpRequest): The HTTP request object.
Returns:
JsonResponse: A JSON response containing the result of the upload.
"""
chunks = extract_upload_data(request, "POST")
try:
response = perform_upload(request, **chunks)
return JsonResponse(response)
except Exception as e:
return JsonResponse({'success': False, 'error': 'Error in upload'})

@require_http_methods(["POST"])
def upload_file(request):
Expand Down
48 changes: 48 additions & 0 deletions apps/filebrowser/src/filebrowser/views_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from django.urls import reverse
from django.utils.encoding import smart_str
from django.http import HttpResponse
from django.test import TestCase

from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
Expand Down Expand Up @@ -1530,6 +1531,53 @@ def test_has_default_permissions(self):
finally:
remove_from_group(self.user.username, 'has_adls')


class UploadChunksTestCase(TestCase):
def setUp(self):
self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False)
grant_access('test', 'test', 'filebrowser')
add_to_group('test')
self.user = User.objects.get(username="test")
self.url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12'
self.filename = "test.txt"

def test_upload_chunks_success(self):
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12'
response = self.client.post(url, {'filename': self.filename})
self.assertEqual(response.status_code, 200)
# In Test Setup HDFS is not available, so it will fail
self.assertEqual(response.json()['success'], False)

def test_upload_chunks_large_file(self):
# simulate a large file upload
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqpartindex=2&qqpartbyteoffset=4000000&'
url += 'qqchunksize=2000000&qqtotalparts=36&qqtotalfilesize=71138958&qqfilename=ym_2020.csv&qquuid=123'
response = self.client.post(url, {'filename': self.filename})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], True)
self.assertEqual(response.json()['uuid'], '123')

def test_upload_chunks_small_file(self):
# simulate a small file upload
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqtotalfilesize=48&qqfilename=ym_2020.csv&qquuid=123'
response = self.client.post(url, {'qqtotalfilesize': 1000, 'qquuid': '123'})
self.assertEqual(response.status_code, 200)
# In Test Setup HDFS is not available, so it will fail
self.assertEqual(response.json()['success'], False)

def test_upload_chunks_error(self):
# simulate an error in the upload
url = '/filebrowser/upload/chunks/'
try:
response = self.client.post(url)
except Exception as e:
self.assertEqual(e.status_code, 500)
self.assertEqual(e.json()['success'], False)
self.assertEqual(e.json()['error'], 'Error in upload')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], False)
self.assertEqual(response.json()['error'], 'Error in upload')

class TestOFSAccessPermissions(object):
def setUp(self):
self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False)
Expand Down
Loading

0 comments on commit 829f3bb

Please sign in to comment.