Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make move() use an rmtree onerror too. #5

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bluezip.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import getpass
import zipfile
import fnmatch
import shutil
import socket
import json
import time
import shutil
import sys
import stat
import re
Expand Down Expand Up @@ -195,10 +195,10 @@ def process_game(self, game):
c.execute('SELECT revision, sha256, title FROM game WHERE id = ? ORDER BY revision DESC LIMIT 1', (game.uid,))
revision, prev_sha256, prev_title = c.fetchone() or (1, None, None)
os.mkdir(build_dir)
shutil.move(game.content_path, os.path.join(build_dir, 'content'))
util.shutil_move(game.content_path, os.path.join(build_dir, 'content'), rmtree_onerror=remove_readonly)
sha256 = create_torrentzip(game.uid, game.platform, build_dir, dist)
outfile = os.path.join(DIST_DIR, f'{game.uid}.zip')
shutil.move(dist, outfile)
util.shutil_move(dist, outfile, rmtree_onerror=remove_readonly)
if prev_sha256:
if prev_sha256 == sha256:
pcolor('green', 'no change')
Expand Down
122 changes: 122 additions & 0 deletions util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import zlib
import re
import os
import sys
import shutil
import stat

def open_db():
try:
Expand Down Expand Up @@ -87,3 +90,122 @@ def pcolor_off(color, *args, **kwargs):
def no_color():
global pcolor
pcolor = pcolor_off

def shutil_move(src, dst, copy_function=shutil.copy2, rmtree_onerror=None):
"""Recursively move a file or directory to another location. This is
similar to the Unix "mv" command. Return the file or directory's
destination.

If the destination is a directory or a symlink to a directory, the source
is moved inside the directory. The destination path must not already
exist.

If the destination already exists but is not a directory, it may be
overwritten depending on os.rename() semantics.

If the destination is on our current filesystem, then rename() is used.
Otherwise, src is copied to the destination and then removed. Symlinks are
recreated under the new name if os.rename() fails because of cross
filesystem renames.

The optional `copy_function` argument is a callable that will be used
to copy the source or it will be delegated to `copytree`.
By default, copy2() is used, but any function that supports the same
signature (like copy()) can be used.

The optional `rmtree_onerror` argument is a callable that will be used
as the "onerror" argument to rmtree(), if it gets called.

A lot more could be done here... A look at a mv.c shows a lot of
the issues this implementation glosses over.

"""
sys.audit("shutil.move", src, dst)
real_dst = dst
if os.path.isdir(dst):
if _samefile(src, dst):
# We might be on a case insensitive filesystem,
# perform the rename anyway.
os.rename(src, dst)
return

# Using _basename instead of os.path.basename is important, as we must
# ignore any trailing slash to avoid the basename returning ''
real_dst = os.path.join(dst, _basename(src))

if os.path.exists(real_dst):
raise Error("Destination path '%s' already exists" % real_dst)
try:
os.rename(src, real_dst)
except OSError:
if os.path.islink(src):
linkto = os.readlink(src)
os.symlink(linkto, real_dst)
os.unlink(src)
elif os.path.isdir(src):
if _destinsrc(src, dst):
raise Error("Cannot move a directory '%s' into itself"
" '%s'." % (src, dst))
if (_is_immutable(src)
or (not os.access(src, os.W_OK) and os.listdir(src)
and sys.platform == 'darwin')):
raise PermissionError("Cannot move the non-empty directory "
"'%s': Lacking write permission to '%s'."
% (src, src))
shutil.copytree(src, real_dst, copy_function=copy_function,
symlinks=True)
shutil.rmtree(src, onerror=rmtree_onerror)
else:
copy_function(src, real_dst)
os.unlink(src)
return real_dst

def _samefile(src, dst):
# Macintosh, Unix.
if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'):
try:
return os.path.samestat(src.stat(), os.stat(dst))
except OSError:
return False

if hasattr(os.path, 'samefile'):
try:
return os.path.samefile(src, dst)
except OSError:
return False

# All other platforms: check for same pathname.
return (os.path.normcase(os.path.abspath(src)) ==
os.path.normcase(os.path.abspath(dst)))

def _is_immutable(src):
st = _stat(src)
immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE]
return hasattr(st, 'st_flags') and st.st_flags in immutable_states

def _basename(path):
"""A basename() variant which first strips the trailing slash, if present.
Thus we always get the last component of the path, even for directories.

path: Union[PathLike, str]

e.g.
>>> os.path.basename('/bar/foo')
'foo'
>>> os.path.basename('/bar/foo/')
''
>>> _basename('/bar/foo/')
'foo'
"""
path = os.fspath(path)
sep = os.path.sep + (os.path.altsep or '')
return os.path.basename(path.rstrip(sep))

def _destinsrc(src, dst):
src = os.path.abspath(src)
dst = os.path.abspath(dst)
if not src.endswith(os.path.sep):
src += os.path.sep
if not dst.endswith(os.path.sep):
dst += os.path.sep
return dst.startswith(src)