Skip to content

Commit

Permalink
Working S3, HDFS version, Code added for ADLS and Ozone with Test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
ranade1 committed Nov 21, 2023
1 parent d9cd203 commit 7cc4856
Show file tree
Hide file tree
Showing 14 changed files with 688 additions and 37 deletions.
19 changes: 17 additions & 2 deletions apps/filebrowser/src/filebrowser/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys

from desktop.conf import ENABLE_DOWNLOAD
Expand All @@ -26,7 +27,6 @@
else:
from django.utils.translation import ugettext_lazy as _


MAX_SNAPPY_DECOMPRESSION_SIZE = Config(
key="max_snappy_decompression_size",
help=_("Max snappy decompression size in bytes."),
Expand All @@ -37,9 +37,24 @@
ARCHIVE_UPLOAD_TEMPDIR = Config(
key="archive_upload_tempdir",
help=_("Location on local filesystem where the uploaded archives are temporary stored."),
default=None,
default="/tmp/hue_uploads",
type=str)

if not os.path.exists(ARCHIVE_UPLOAD_TEMPDIR.get()):
os.makedirs(ARCHIVE_UPLOAD_TEMPDIR.get())

FILE_UPLOAD_CHUNK_SIZE = Config(
key="file_upload_chunk_size",
default=5000000,
type=int,
help=_('Configure chunk size of the chunked file uploader. Default chunk size is set to 5MB.'))

CONCURRENT_MAX_CONNECTIONS = Config(
key="concurrent_max_connections",
default=5,
type=int,
help=_('Configure the maximum number of concurrent connections(chunks) for file uploads using the chunked file uploader.'))

def get_desktop_enable_download():
"""Get desktop enable_download default"""
return ENABLE_DOWNLOAD.get()
Expand Down
2 changes: 2 additions & 0 deletions apps/filebrowser/src/filebrowser/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
# POST operations
re_path(r'^save$', filebrowser_views.save_file, name="filebrowser_views_save_file"),
re_path(r'^upload/file$', filebrowser_views.upload_file, name='upload_file'),
re_path(r'^upload/chunks', filebrowser_views.upload_chunks, name='upload_chunks'),
re_path(r'^upload/complete', filebrowser_views.upload_complete, name='upload_complete'),
re_path(r'^extract_archive', filebrowser_views.extract_archive_using_batch_job, name='extract_archive_using_batch_job'),
re_path(r'^compress_files', filebrowser_views.compress_files_using_batch_job, name='compress_files_using_batch_job'),
re_path(r'^trash/restore$', filebrowser_views.trash_restore, name='trash_restore'),
Expand Down
49 changes: 49 additions & 0 deletions apps/filebrowser/src/filebrowser/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Licensed to Cloudera, Inc. under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. Cloudera, Inc. licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import io
import os

from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR

def generate_chunks(uuid, totalparts, DEFAULT_WRITE_SIZE=1024 * 1024 * 128):
fp = io.BytesIO()
total=0
files = [os.path.join(ARCHIVE_UPLOAD_TEMPDIR.get(), f'{uuid}_{i}') for i in range(totalparts)]
for file_path in files:
with open(file_path, 'rb') as f:
while True:
# Read the file in portions, e.g., 1MB at a time
portion = f.read(1 * 1024 * 1024)
if not portion:
break
fp.write(portion)
total = total + len(portion)
# If buffer size is more than 5MB, yield the chunk
if fp.tell() >= DEFAULT_WRITE_SIZE:
fp.seek(0)
yield fp, total
fp.close()
fp = io.BytesIO()
# Yield any remaining data in the buffer
if fp.tell() > 0:
fp.seek(0)
yield fp, total + fp.tell()
fp.close()
# chances are the chunk is zero and we never yielded
else:
fp.close()
for file_path in files:
os.remove(file_path)
158 changes: 144 additions & 14 deletions apps/filebrowser/src/filebrowser/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

from future import standard_library
standard_library.install_aliases()

from django.views.decorators.csrf import csrf_exempt
from filebrowser.conf import ARCHIVE_UPLOAD_TEMPDIR
from django.core.files.uploadhandler import FileUploadHandler, StopUpload, StopFutureHandlers
from hadoop.conf import UPLOAD_CHUNK_SIZE

from builtins import object
import errno
import logging
Expand Down Expand Up @@ -80,6 +86,11 @@
RenameFormSet, RmTreeFormSet, ChmodFormSet, ChownFormSet, CopyFormSet, RestoreFormSet,\
TrashPurgeForm, SetReplicationFactorForm

from hadoop.fs.upload import HDFSFineUploaderChunkedUpload, LocalFineUploaderChunkedUpload
from aws.s3.upload import S3FineUploaderChunkedUpload
from azure.abfs.upload import ABFSFineUploaderChunkedUpload
from desktop.lib.fs.ozone.upload import OFSFineUploaderChunkedUpload

if sys.version_info[0] > 2:
import io
from io import StringIO as string_io
Expand Down Expand Up @@ -121,10 +132,25 @@

INLINE_DISPLAY_MIMETYPE_EXCEPTIONS = re.compile('image/svg\+xml')

SCHEME_PREFIXES = {
's3a': 's3a://',
'ofs': 'ofs://',
'abfs': 'abfs://',
'hdfs': 'hdfs://',
'gs': 'gs://',
'local': 'local://',
}

UPLOAD_CLASSES = {
's3a': S3FineUploaderChunkedUpload,
'ofs': OFSFineUploaderChunkedUpload,
'abfs': ABFSFineUploaderChunkedUpload,
'hdfs': HDFSFineUploaderChunkedUpload,
'local': LocalFineUploaderChunkedUpload,
}

logger = logging.getLogger()


class ParquetOptions(object):
def __init__(self, col=None, format='json', no_headers=True, limit=-1):
self.col = col
Expand All @@ -147,10 +173,10 @@ def index(request):

def _decode_slashes(path):
# This is a fix for some installations where the path is still having the slash (/) encoded
# as %2F while the rest of the path is actually decoded.
# as %2F while the rest of the path is actually decoded.
encoded_slash = '%2F'
if path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \
path.startswith('s3a:' + encoded_slash) or path.startswith('gs:' + encoded_slash) or path.startswith('ofs:' + encoded_slash):
if path and path.startswith(encoded_slash) or path.startswith('abfs:' + encoded_slash) or \
path.startswith('s3a:' + encoded_slash) or path.startswith('ofs:' + encoded_slash):
path = path.replace(encoded_slash, '/')

return path
Expand All @@ -159,17 +185,19 @@ def _normalize_path(path):
path = _decode_slashes(path)

# Check if protocol missing / and add it back (e.g. Kubernetes ingress can strip double slash)
if path.startswith('abfs:/') and not path.startswith('abfs://'):
path = path.replace('abfs:/', 'abfs://')
if path.startswith('s3a:/') and not path.startswith('s3a://'):
path = path.replace('s3a:/', 's3a://')
if path.startswith('gs:/') and not path.startswith('gs://'):
path = path.replace('gs:/', 'gs://')
if path.startswith('ofs:/') and not path.startswith('ofs://'):
path = path.replace('ofs:/', 'ofs://')
for scheme, prefix in SCHEME_PREFIXES.items():
single_slash_prefix = f"{scheme}:/"
if path and path.startswith(single_slash_prefix) and not path.startswith(prefix):
path = path.replace(single_slash_prefix, prefix)

return path

def get_scheme(path):
path = _normalize_path(path)
for scheme, prefix in SCHEME_PREFIXES.items():
if path.startswith(prefix):
return scheme
return 'hdfs'

def download(request, path):
"""
Expand Down Expand Up @@ -1223,7 +1251,7 @@ def generic_op(form_class, request, op, parameter_names, piggyback=None, templat
if next:
logging.debug("Next: %s" % next)
file_path_prefix = '/filebrowser/view='
if next.startswith(file_path_prefix):
if next.startswith(file_path_prefix):
decoded_file_path = next[len(file_path_prefix):]
filepath_encoded_next = file_path_prefix + urllib_quote(decoded_file_path.encode('utf-8'), safe=SAFE_CHARACTERS_URI_COMPONENTS)
return format_preserving_redirect(request, filepath_encoded_next)
Expand Down Expand Up @@ -1347,7 +1375,7 @@ def bulk_copy(*args, **kwargs):
ofs_skip_files += request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user)
else:
request.fs.copy(arg['src_path'], arg['dest_path'], recursive=True, owner=request.user)

# Send skipped filenames via raising exception to let users know.
if ofs_skip_files:
raise PopupException("Following files were skipped due to file size limitations:" + ofs_skip_files)
Expand Down Expand Up @@ -1418,6 +1446,108 @@ def bulk_restore(*args, **kwargs):
def trash_purge(request):
return generic_op(TrashPurgeForm, request, request.fs.purge_trash, [], None)

def create_response(request, _fs, result="success", data="Success"):
return {
'path': _fs.filepath,
'result': result,
'next': request.GET.get("next"),
'success': True,
'uuid': _fs.qquuid,
'status': 0,
'data': data
}

def perform_upload(request, *args, **kwargs):
"""
Uploads a file to the specified destination.
Args:
request: The HTTP request object.
**kwargs: Arbitrary keyword arguments.
Returns:
A dictionary containing the following keys:
- path: The path of the uploaded file.
- result: The result of the upload operation.
- next: The URL to redirect to after the upload operation.
- success: A boolean indicating whether the upload operation was successful.
- uuid: The UUID of the uploaded file.
- status: The status of the upload operation.
- data: Additional data about the upload operation.
"""
scheme = get_scheme(kwargs['dest'])
upload_class = UPLOAD_CLASSES.get(scheme, LocalFineUploaderChunkedUpload)
_fs = upload_class(request, **kwargs)
_fs.upload()
if scheme == 'hdfs':
result = _massage_stats(request, stat_absolute_path(_fs.filepath, request.fs.stats(_fs.filepath)))
else:
result = "success"
return create_response(request, _fs, result=result, data="Success")

def extract_upload_data(request, method):
data = request.POST if method == "POST" else request.GET
chunks = {
"qquuid": data.get('qquuid'),
"qqpartindex": int(data.get('qqpartindex', 0)),
"qqpartbyteoffset": int(data.get('qqpartbyteoffset', 0)),
"qqtotalfilesize": int(data.get('qqtotalfilesize', 0)),
"qqtotalparts": int(data.get('qqtotalparts', 1)),
"qqfilename": data.get('qqfilename'),
"qqchunksize": int(data.get('qqchunksize', 0)),
"dest": data.get('dest', None),
"fileFieldLabel": data.get('fileFieldLabel')
}
return chunks

@require_http_methods(["POST"])
def upload_chunks(request):
"""
View function to handle chunked file uploads using Fine Uploader.
Args:
- request: The HTTP request object.
Returns:
- JsonResponse: A JSON response object with the following keys:
- success: A boolean indicating whether the upload was successful.
- uuid: A string representing the unique identifier for the uploaded file.
- error: A string representing the error message if the upload failed.
"""
if request.method == "POST":
try:
for _ in request.FILES.values(): # This processes the upload.
pass
except StopUpload:
return JsonResponse({'success': False, 'error': 'Error in upload'})

# case where file is larger than the single chunk size
if int(request.GET.get("qqtotalparts", 0))>0:
return JsonResponse({'success': True, 'uuid': request.GET.get('qquuid')})

# case where file is smaller than the chunk size
if int(request.GET.get("qqtotalparts", 0))==0 and int(request.GET.get("qqtotalfilesize",0))<=2000000:
chunks = extract_upload_data(request, "GET")
try:
response = perform_upload(request, **chunks)
return JsonResponse(response)
except Exception as e:
return JsonResponse({'success': False, 'error': 'Error in upload'})
return JsonResponse({'success': False, 'error': 'Unsupported request method'})

@require_http_methods(["POST"])
def upload_complete(request):
"""
View function that handles the completion of a file upload.
Args:
request (HttpRequest): The HTTP request object.
Returns:
JsonResponse: A JSON response containing the result of the upload.
"""
if request.method == "POST":
chunks = extract_upload_data(request, "POST")
try:
response = perform_upload(request, **chunks)
return JsonResponse(response)
except Exception as e:
return JsonResponse({'success': False, 'error': 'Error in upload'})
return JsonResponse({'success': False, 'error': 'Unsupported request method'})

@require_http_methods(["POST"])
def upload_file(request):
Expand Down
47 changes: 47 additions & 0 deletions apps/filebrowser/src/filebrowser/views_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from django.urls import reverse
from django.utils.encoding import smart_str
from django.http import HttpResponse
from django.test import TestCase

from nose.plugins.attrib import attr
from nose.plugins.skip import SkipTest
Expand Down Expand Up @@ -1530,6 +1531,52 @@ def test_has_default_permissions(self):
finally:
remove_from_group(self.user.username, 'has_adls')


class UploadChunksTestCase(TestCase):
def setUp(self):
self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False)
grant_access('test', 'test', 'filebrowser')
add_to_group('test')
self.user = User.objects.get(username="test")
self.url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12'
self.filename = "test.txt"

def test_upload_chunks_success(self):
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=local&qquuid=123&qqfilename=test.txt&qqtotalfilesize=12'
response = self.client.post(url, {'filename': self.filename})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], True)
self.assertIsNotNone(response.json()['uuid'])

def test_upload_chunks_large_file(self):
# simulate a large file upload
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqpartindex=2&qqpartbyteoffset=4000000&qqchunksize=2000000&qqtotalparts=36&qqtotalfilesize=71138958&qqfilename=ym_2020.csv&qquuid=123'
response = self.client.post(url, {'filename': self.filename})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], True)
self.assertEqual(response.json()['uuid'], '123')

def test_upload_chunks_small_file(self):
# simulate a small file upload
url = '/filebrowser/upload/chunks/?dest=/tmp&fileFieldLabel=hdfs_file&qqtotalfilesize=48&qqfilename=ym_2020.csv&qquuid=123'
response = self.client.post(url, {'qqtotalparts': 0, 'qqtotalfilesize': 1000, 'qquuid': '123'})
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], True)
self.assertIsNotNone(response.json()['uuid'])

def test_upload_chunks_error(self):
# simulate an error in the upload
url = '/filebrowser/upload/chunks/'
try:
response = self.client.post(url)
except Exception as e:
self.assertEqual(e.status_code, 500)
self.assertEqual(e.json()['success'], False)
self.assertEqual(e.json()['error'], 'Error in upload')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()['success'], False)
self.assertEqual(response.json()['error'], 'Error in upload')

class TestOFSAccessPermissions(object):
def setUp(self):
self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False)
Expand Down
Loading

0 comments on commit 7cc4856

Please sign in to comment.