diff --git a/src/stack_pr/cli.py b/src/stack_pr/cli.py index bba5f42..44d452f 100755 --- a/src/stack_pr/cli.py +++ b/src/stack_pr/cli.py @@ -204,7 +204,9 @@ def commit_id(self) -> str: return self._search_group(RE_RAW_COMMIT_ID, "commit") def parents(self) -> List[str]: - return [m.group("commit") for m in RE_RAW_PARENT.finditer(self.raw_header)] + return [ + m.group("commit") for m in RE_RAW_PARENT.finditer(self.raw_header) + ] def author(self) -> str: return self._search_group(RE_RAW_AUTHOR, "author") @@ -217,7 +219,8 @@ def author_email(self) -> str: def commit_msg(self) -> str: return "\n".join( - m.group("line") for m in RE_RAW_COMMIT_MSG_LINE.finditer(self.raw_header) + m.group("line") + for m in RE_RAW_COMMIT_MSG_LINE.finditer(self.raw_header) ) @@ -398,7 +401,9 @@ def is_ancestor(commit1: str, commit2: str, verbose: bool) -> bool: # TODO: We need to check returncode of this command more carefully, as the # command simply might fail (rc != 0 and rc != 1). p = run_shell_command( - ["git", "merge-base", "--is-ancestor", commit1, commit2], check=False, quiet=not verbose + ["git", "merge-base", "--is-ancestor", commit1, commit2], + check=False, + quiet=not verbose, ) return p.returncode == 0 @@ -424,7 +429,9 @@ def get_stack(base: str, head: str, verbose: bool) -> List[StackEntry]: st: List[StackEntry] = [] stack = ( split_header( - get_command_output(["git", "rev-list", "--header", "^" + base, head]) + get_command_output( + ["git", "rev-list", "--header", "^" + base, head] + ) ) )[::-1] @@ -514,7 +521,9 @@ def draft_bitmask_type(value: str) -> List[bool]: # ===----------------------------------------------------------------------=== # # SUBMIT # ===----------------------------------------------------------------------=== # -def add_or_update_metadata(e: StackEntry, needs_rebase: bool, verbose: bool) -> bool: +def add_or_update_metadata( + e: StackEntry, needs_rebase: bool, verbose: bool +) -> bool: if needs_rebase: run_shell_command( [ @@ -538,7 +547,9 @@ def add_or_update_metadata(e: StackEntry, needs_rebase: bool, verbose: bool) -> # Add the stack info metadata to the commit message commit_msg += f"\n\nstack-info: PR: {e.pr}, branch: {e.head}" run_shell_command( - ["git", "commit", "--amend", "-F", "-"], input=commit_msg.encode(), quiet=not verbose + ["git", "commit", "--amend", "-F", "-"], + input=commit_msg.encode(), + quiet=not verbose, ) return True @@ -581,7 +592,10 @@ def init_local_branches(st: List[StackEntry], remote: str, verbose: bool): log(h("Initializing local branches"), level=1) set_head_branches(st, remote, verbose) for e in st: - run_shell_command(["git", "checkout", e.commit.commit_id(), "-B", e.head], quiet=not verbose) + run_shell_command( + ["git", "checkout", e.commit.commit_id(), "-B", e.head], + quiet=not verbose, + ) def push_branches(st: List[StackEntry], remote, verbose: bool): @@ -593,14 +607,14 @@ def push_branches(st: List[StackEntry], remote, verbose: bool): def print_cmd_failure_details(exc: SubprocessError): cmd_stdout = ( - exc.stdout.decode("utf-8").replace("\\n", "\n").replace("\\t", "\t") - if exc.stdout - else None + exc.stdout.decode("utf-8") + .replace("\\n", "\n") + .replace("\\t", "\t") if exc.stdout else None ) cmd_stderr = ( - exc.stderr.decode("utf-8").replace("\\n", "\n").replace("\\t", "\t") - if exc.stderr - else None + exc.stderr.decode("utf-8") + .replace("\\n", "\n") + .replace("\\t", "\t") if exc.stderr else None ) print(f"Exitcode: {exc.returncode}") print(f"Stdout: {cmd_stdout}") @@ -678,7 +692,9 @@ def add_cross_links(st: List[StackEntry], keep_body: bool, verbose: bool): if keep_body: # Keep current body of the PR after the cross links component current_pr_body = get_current_pr_body(e) - pr_body.append(current_pr_body.split(CROSS_LINKS_DELIMETER, 1)[-1].lstrip()) + pr_body.append( + current_pr_body.split(CROSS_LINKS_DELIMETER, 1)[-1].lstrip() + ) else: pr_body.extend( [ @@ -718,11 +734,15 @@ def add_cross_links(st: List[StackEntry], keep_body: bool, verbose: bool): # # To avoid this, we temporarily set all base branches to point to 'main' - once # all the branches are pushed we can set the actual base branches. -def reset_remote_base_branches(st: List[StackEntry], target: str, verbose: bool): +def reset_remote_base_branches( + st: List[StackEntry], target: str, verbose: bool +): log(h("Resetting remote base branches"), level=1) for e in filter(lambda e: e.has_pr(), st): - run_shell_command(["gh", "pr", "edit", e.pr, "-B", target], quiet=not verbose) + run_shell_command( + ["gh", "pr", "edit", e.pr, "-B", target], quiet=not verbose + ) # If local 'main' lags behind 'origin/main', and 'head' contains all commits @@ -736,7 +756,9 @@ def reset_remote_base_branches(st: List[StackEntry], target: str, verbose: bool) # already in remote into their stack, they can use a different notation for the # base (e.g. explicit hash of the commit) - but most probably nobody ever would # need that. -def should_update_local_base(head: str, base: str, remote: str, target: str, verbose: bool): +def should_update_local_base( + head: str, base: str, remote: str, target: str, verbose: bool +): base_hash = get_command_output(["git", "rev-parse", base]) target_hash = get_command_output(["git", "rev-parse", f"{remote}/{target}"]) return ( @@ -748,7 +770,9 @@ def should_update_local_base(head: str, base: str, remote: str, target: str, ver def update_local_base(base: str, remote: str, target: str, verbose: bool): log(h(f"Updating local branch {base} to {remote}/{target}"), level=1) - run_shell_command(["git", "rebase", f"{remote}/{target}", base], quiet=not verbose) + run_shell_command( + ["git", "rebase", f"{remote}/{target}", base], quiet=not verbose + ) class CommonArgs(NamedTuple): @@ -763,7 +787,14 @@ class CommonArgs(NamedTuple): @classmethod def from_args(cls, args: argparse.Namespace) -> "CommonArgs": - return cls(args.base, args.head, args.remote, args.target, args.hyperlinks, args.verbose) + return cls( + args.base, + args.head, + args.remote, + args.target, + args.hyperlinks, + args.verbose, + ) # If the base isn't explicitly specified, find the merge base between @@ -784,7 +815,12 @@ def deduce_base(args: CommonArgs) -> CommonArgs: ["git", "merge-base", args.head, f"{args.remote}/{args.target}"] ) return CommonArgs( - deduced_base, args.head, args.remote, args.target, args.hyperlinks, args.verbose + deduced_base, + args.head, + args.remote, + args.target, + args.hyperlinks, + args.verbose, ) @@ -815,9 +851,13 @@ def command_submit( current_branch = get_current_branch_name() - if should_update_local_base(args.head, args.base, args.remote, args.target, args.verbose): + if should_update_local_base( + args.head, args.base, args.remote, args.target, args.verbose + ): update_local_base(args.base, args.remote, args.target, args.verbose) - run_shell_command(["git", "checkout", current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "checkout", current_branch], quiet=not args.verbose + ) # Determine what commits belong to the stack st = get_stack(args.base, args.head, args.verbose) @@ -842,7 +882,9 @@ def command_submit( # If the current branch contains commits from the stack, we will need to # rebase it in the end since the commits will be modified. top_branch = st[-1].head - need_to_rebase_current = is_ancestor(top_branch, current_branch, args.verbose) + need_to_rebase_current = is_ancestor( + top_branch, current_branch, args.verbose + ) reset_remote_base_branches(st, args.target, args.verbose) @@ -852,7 +894,9 @@ def command_submit( # Now we have all the branches, so we can create the corresponding PRs log(h("Submitting PRs"), level=1) for e_idx, e in enumerate(st): - is_pr_draft = draft or ((draft_bitmask is not None) and draft_bitmask[e_idx]) + is_pr_draft = draft or ( + (draft_bitmask is not None) and draft_bitmask[e_idx] + ) create_pr(e, is_pr_draft, reviewer) # Verify consistency in everything we have so far @@ -883,11 +927,13 @@ def command_submit( current_branch, "--committer-date-is-author-date", ], - quiet=not args.verbose + quiet=not args.verbose, ) else: log(h(f"Checking out the original branch '{current_branch}'"), level=1) - run_shell_command(["git", "checkout", current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "checkout", current_branch], quiet=not args.verbose + ) delete_local_branches(st, args.verbose) print_tips_after_export(st, args) @@ -920,7 +966,9 @@ def rebase_pr(e: StackEntry, remote: str, target: str, verbose: bool): except Exception: error(ERROR_CANT_REBASE.format(**locals())) raise - run_shell_command(["git", "push", remote, "-f", f"{e.head}:{e.head}"], quiet=not verbose) + run_shell_command( + ["git", "push", remote, "-f", f"{e.head}:{e.head}"], quiet=not verbose + ) def land_pr(e: StackEntry, remote: str, target: str, verbose: bool): @@ -935,7 +983,9 @@ def land_pr(e: StackEntry, remote: str, target: str, verbose: bool): raise # Switch PR base branch to 'main' - run_shell_command(["gh", "pr", "edit", e.pr, "-B", target], quiet=not verbose) + run_shell_command( + ["gh", "pr", "edit", e.pr, "-B", target], quiet=not verbose + ) # Form the commit message: it should contain the original commit message # and nothing else. @@ -950,7 +1000,7 @@ def land_pr(e: StackEntry, remote: str, target: str, verbose: bool): run_shell_command( ["gh", "pr", "merge", e.pr, "--squash", "-t", title, "-F", "-"], input=pr_body.encode(), - quiet=not verbose + quiet=not verbose, ) @@ -992,9 +1042,13 @@ def command_land(args: CommonArgs): current_branch = get_current_branch_name() - if should_update_local_base(args.head, args.base, args.remote, args.target, args.verbose): + if should_update_local_base( + args.head, args.base, args.remote, args.target, args.verbose + ): update_local_base(args.base, args.remote, args.target, args.verbose) - run_shell_command(["git", "checkout", current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "checkout", current_branch], quiet=not args.verbose + ) # Determine what commits belong to the stack st = get_stack(args.base, args.head, args.verbose) @@ -1023,10 +1077,15 @@ def command_land(args: CommonArgs): for e in prs_to_rebase: rebase_pr(e, args.remote, args.target, args.verbose) # Change the target of the new bottom-most PR in the stack to 'target' - run_shell_command(["gh", "pr", "edit", prs_to_rebase[0].pr, "-B", args.target], quiet=not args.verbose) + run_shell_command( + ["gh", "pr", "edit", prs_to_rebase[0].pr, "-B", args.target], + quiet=not args.verbose, + ) # Delete local and remote stack branches - run_shell_command(["git", "checkout", current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "checkout", current_branch], quiet=not args.verbose + ) delete_local_branches(st, args.verbose) delete_remote_branches(st[:1], args.remote, args.verbose) @@ -1035,9 +1094,12 @@ def command_land(args: CommonArgs): if branch_exists(args.target): run_shell_command( ["git", "rebase", f"{args.remote}/{args.target}", args.target], - quiet=not args.verbose + quiet=not args.verbose, ) - run_shell_command(["git", "rebase", f"{args.remote}/{args.target}", current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "rebase", f"{args.remote}/{args.target}", current_branch], + quiet=not args.verbose, + ) log(h(blue("SUCCESS!")), level=1) @@ -1051,9 +1113,13 @@ def strip_metadata(e: StackEntry, verbose: bool) -> str: m = RE_STACK_INFO_LINE.sub("", m) run_shell_command( ["git", "rebase", e.base, e.head, "--committer-date-is-author-date"], - quiet=not verbose + quiet=not verbose, + ) + run_shell_command( + ["git", "commit", "--amend", "-F", "-"], + input=m.encode(), + quiet=not verbose, ) - run_shell_command(["git", "commit", "--amend", "-F", "-"], input=m.encode(), quiet=not verbose) return get_command_output(["git", "rev-parse", e.head]) @@ -1081,7 +1147,9 @@ def command_abandon(args: CommonArgs): last_hash = strip_metadata(e, args.verbose) log(h("Rebasing the current branch on top of updated top branch"), level=1) - run_shell_command(["git", "rebase", last_hash, current_branch], quiet=not args.verbose) + run_shell_command( + ["git", "rebase", last_hash, current_branch], quiet=not args.verbose + ) delete_local_branches(st, args.verbose) delete_remote_branches(st, args.remote, args.verbose) @@ -1122,7 +1190,9 @@ def print_tips_after_view(st: List[StackEntry], args: CommonArgs): def command_view(args: CommonArgs): log(h("VIEW"), level=1) - if should_update_local_base(args.head, args.base, args.remote, args.target, args.verbose): + if should_update_local_base( + args.head, args.base, args.remote, args.target, args.verbose + ): log( red( f"\nWarning: Local '{args.base}' is behind" @@ -1166,9 +1236,13 @@ def create_argparser() -> argparse.ArgumentParser: subparsers = parser.add_subparsers(help="sub-command help", dest="command") common_parser = argparse.ArgumentParser(add_help=False) - common_parser.add_argument("-R", "--remote", default="origin", help="Remote name") + common_parser.add_argument( + "-R", "--remote", default="origin", help="Remote name" + ) common_parser.add_argument("-B", "--base", help="Local base branch") - common_parser.add_argument("-H", "--head", default="HEAD", help="Local head branch") + common_parser.add_argument( + "-H", "--head", default="HEAD", help="Local head branch" + ) common_parser.add_argument( "-T", "--target", default="main", help="Remote target branch" ) @@ -1179,7 +1253,8 @@ def create_argparser() -> argparse.ArgumentParser: help="Enable or disable hyperlink support.", ) common_parser.add_argument( - "-V", "--verbose", + "-V", + "--verbose", action="store_true", default=False, help="Enable verbose output from Git subcommands.", @@ -1273,7 +1348,9 @@ def main(): raise Exception(f"Unknown command {args.command}") except Exception as exc: # If something failed, checkout the original branch - run_shell_command(["git", "checkout", current_branch], quiet=not common_args.verbose) + run_shell_command( + ["git", "checkout", current_branch], quiet=not common_args.verbose + ) if isinstance(exc, SubprocessError): print_cmd_failure_details(exc) raise diff --git a/src/stack_pr/git.py b/src/stack_pr/git.py index 6fa79c8..6e3cae9 100644 --- a/src/stack_pr/git.py +++ b/src/stack_pr/git.py @@ -12,7 +12,9 @@ class GitError(Exception): pass -def fetch_checkout_commit(repo_dir: Path, ref: str, quiet: bool, remote: str = "origin"): +def fetch_checkout_commit( + repo_dir: Path, ref: str, quiet: bool, remote: str = "origin" +): """Helper function to quickly fetch and checkout a new ref. Args: @@ -21,8 +23,12 @@ def fetch_checkout_commit(repo_dir: Path, ref: str, quiet: bool, remote: str = " remote: git remote to use. Default: "origin". """ - run_shell_command(["git", "fetch", "--depth=1", remote, ref], cwd=repo_dir, quiet=quiet) - run_shell_command(["git", "checkout", "FETCH_HEAD"], cwd=repo_dir, quiet=quiet) + run_shell_command( + ["git", "fetch", "--depth=1", remote, ref], cwd=repo_dir, quiet=quiet + ) + run_shell_command( + ["git", "checkout", "FETCH_HEAD"], cwd=repo_dir, quiet=quiet + ) def is_full_git_sha(s: str) -> bool: @@ -38,7 +44,9 @@ def is_full_git_sha(s: str) -> bool: return all(c in digits for c in s) -def shallow_clone(clone_dir: Path, url: str, ref: str, quiet: bool, remove_git: bool = False): +def shallow_clone( + clone_dir: Path, url: str, ref: str, quiet: bool, remove_git: bool = False +): """Clone the given repo without any git history. This makes the cloning faster for repos with large histories. @@ -63,7 +71,9 @@ def shallow_clone(clone_dir: Path, url: str, ref: str, quiet: bool, remove_git: clone_dir.mkdir(parents=True) run_shell_command(["git", "init"], cwd=clone_dir, quiet=quiet) - run_shell_command(["git", "remote", "add", "origin", url], cwd=clone_dir, quiet=quiet) + run_shell_command( + ["git", "remote", "add", "origin", url], cwd=clone_dir, quiet=quiet + ) fetch_checkout_commit(clone_dir, ref, quiet) if remove_git: diff --git a/src/stack_pr/shell_commands.py b/src/stack_pr/shell_commands.py index f824c84..2fe6cce 100644 --- a/src/stack_pr/shell_commands.py +++ b/src/stack_pr/shell_commands.py @@ -6,7 +6,7 @@ def run_shell_command( - cmd: ShellCommand, *, quiet: bool, check: bool = True, **kwargs: Any + cmd: ShellCommand, *, quiet: bool, check: bool = True, **kwargs: Any ) -> subprocess.CompletedProcess: """Runs a shell command using the arguments provided. @@ -27,7 +27,9 @@ def run_shell_command( _ = subprocess.list2cmdline(cmd) kwargs.update({"check": check}) if quiet: - kwargs.update({"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL}) + kwargs.update( + {"stdout": subprocess.DEVNULL, "stderr": subprocess.DEVNULL} + ) return subprocess.run(list(map(str, cmd)), **kwargs) @@ -46,6 +48,8 @@ def get_command_output(cmd: ShellCommand, **kwargs: Any) -> str: ValueError: if the capture_output keyword argument is specified. """ if "capture_output" in kwargs: - raise ValueError("Cannot pass capture_output when using get_command_output") + raise ValueError( + "Cannot pass capture_output when using get_command_output" + ) proc = run_shell_command(cmd, capture_output=True, quiet=False, **kwargs) return proc.stdout.decode("utf-8").rstrip()