-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathaddonmanager_git.py
480 lines (425 loc) · 18.4 KB
/
addonmanager_git.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# SPDX-License-Identifier: LGPL-2.1-or-later
# ***************************************************************************
# * *
# * Copyright (c) 2022 FreeCAD Project Association *
# * *
# * This file is part of FreeCAD. *
# * *
# * FreeCAD is free software: you can redistribute it and/or modify it *
# * under the terms of the GNU Lesser General Public License as *
# * published by the Free Software Foundation, either version 2.1 of the *
# * License, or (at your option) any later version. *
# * *
# * FreeCAD is distributed in the hope that it will be useful, but *
# * WITHOUT ANY WARRANTY; without even the implied warranty of *
# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU *
# * Lesser General Public License for more details. *
# * *
# * You should have received a copy of the GNU Lesser General Public *
# * License along with FreeCAD. If not, see *
# * <https://www.gnu.org/licenses/>. *
# * *
# ***************************************************************************
"""Wrapper around git executable to simplify calling git commands from Python."""
# pylint: disable=too-few-public-methods
import os
import platform
import shutil
import subprocess
from typing import List, Dict, Optional
import time
import addonmanager_utilities as utils
import addonmanager_freecad_interface as fci
translate = fci.translate
class NoGitFound(RuntimeError):
"""Could not locate the git executable on this system."""
class GitFailed(RuntimeError):
"""The call to git returned an error of some kind"""
def _ref_format_string() -> str:
return (
"--format=%(refname:lstrip=2)\t%(upstream:lstrip=2)\t%(authordate:rfc)\t%("
"authorname)\t%(subject)"
)
def _parse_ref_table(text: str):
rows = text.splitlines()
result = []
for row in rows:
columns = row.split("\t")
result.append(
{
"ref_name": columns[0],
"upstream": columns[1],
"date": columns[2],
"author": columns[3],
"subject": columns[4],
}
)
return result
class GitManager:
"""A class to manage access to git: mostly just provides a simple wrapper around
the basic command-line calls. Provides optional asynchronous access to clone and
update."""
def __init__(self):
self.git_exe = None
self._find_git()
if not self.git_exe:
raise NoGitFound()
def clone(self, remote, local_path, args: List[str] = None):
"""Clones the remote to the local path"""
final_args = ["clone", "--recurse-submodules"]
if args:
final_args.extend(args)
final_args.extend([remote, local_path])
self._synchronous_call_git(final_args)
def async_clone(self, remote, local_path, progress_monitor, args: List[str] = None):
"""Clones the remote to the local path, sending periodic progress updates
to the passed progress_monitor. Returns a handle that can be used to
cancel the job."""
def checkout(self, local_path, spec, args: List[str] = None):
"""Checks out a specific git revision, tag, or branch. Any valid argument to
git checkout can be submitted."""
old_dir = os.getcwd()
os.chdir(local_path)
final_args = ["checkout"]
if args:
final_args.extend(args)
final_args.append(spec)
self._synchronous_call_git(final_args)
os.chdir(old_dir)
def dirty(self, local_path: str) -> bool:
"""Check for local changes"""
old_dir = os.getcwd()
os.chdir(local_path)
result = False
final_args = ["diff-index", "HEAD"]
try:
stdout = self._synchronous_call_git(final_args)
if stdout:
result = True
except GitFailed:
result = False
os.chdir(old_dir)
return result
def detached_head(self, local_path: str) -> bool:
"""Check for detached head state"""
old_dir = os.getcwd()
os.chdir(local_path)
result = False
final_args = ["rev-parse", "--abbrev-ref", "--symbolic-full-name", "HEAD"]
try:
stdout = self._synchronous_call_git(final_args)
if stdout == "HEAD":
result = True
except GitFailed:
result = False
os.chdir(old_dir)
return result
def update(self, local_path):
"""Fetches and pulls the local_path from its remote"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
self._synchronous_call_git(["fetch"])
self._synchronous_call_git(["pull"])
self._synchronous_call_git(["submodule", "update", "--init", "--recursive"])
except GitFailed as e:
fci.Console.PrintWarning(
translate(
"AddonsInstaller",
"Basic Git update failed with the following message:",
)
+ str(e)
+ "\n"
)
fci.Console.PrintWarning(
translate(
"AddonsInstaller",
"Backing up the original directory and re-cloning",
)
+ "...\n"
)
remote = self.get_remote(local_path)
with open(os.path.join(local_path, "ADDON_DISABLED"), "w", encoding="utf-8") as f:
f.write(
"This is a backup of an addon that failed to update cleanly so "
"was re-cloned. It was disabled by the Addon Manager's git update "
"facility and can be safely deleted if the addon is working "
"properly."
)
os.chdir("..")
os.rename(local_path, local_path + ".backup" + str(time.time()))
self.clone(remote, local_path)
os.chdir(old_dir)
def status(self, local_path) -> str:
"""Gets the v1 porcelain status"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
status = self._synchronous_call_git(["status", "-sb", "--porcelain"])
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
return status
def reset(self, local_path, args: List[str] = None):
"""Executes the git reset command"""
old_dir = os.getcwd()
os.chdir(local_path)
final_args = ["reset"]
if args:
final_args.extend(args)
try:
self._synchronous_call_git(final_args)
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
def async_fetch_and_update(self, local_path, progress_monitor, args=None):
"""Same as fetch_and_update, but asynchronous"""
def update_available(self, local_path) -> bool:
"""Returns True if an update is available from the remote, or false if not"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
self._synchronous_call_git(["fetch"])
status = self._synchronous_call_git(["status", "-sb", "--porcelain"])
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
return "behind" in status
def current_tag(self, local_path) -> str:
"""Get the name of the currently checked-out tag if HEAD is detached"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
tag = self._synchronous_call_git(["describe", "--tags"]).strip()
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
return tag
def current_branch(self, local_path) -> str:
"""Get the name of the current branch"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
# This only works with git 2.22 and later (June 2019)
# branch = self._synchronous_call_git(["branch", "--show-current"]).strip()
# This is more universal (albeit more opaque to the reader):
branch = self._synchronous_call_git(["rev-parse", "--abbrev-ref", "HEAD"]).strip()
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
return branch
def repair(self, remote, local_path):
"""Assumes that local_path is supposed to be a local clone of the given
remote, and ensures that it is. Note that any local changes in local_path
will be destroyed. This is achieved by archiving the old path, cloning an
entirely new copy, and then deleting the old directory."""
original_cwd = os.getcwd()
# Make sure we are not currently in that directory, otherwise on Windows the
# "rename" will fail. To guarantee we aren't in it, change to it, then shift
# up one.
os.chdir(local_path)
os.chdir("..")
backup_path = local_path + ".backup" + str(time.time())
os.rename(local_path, backup_path)
try:
self.clone(remote, local_path)
except GitFailed as e:
fci.Console.PrintError(
translate("AddonsInstaller", "Failed to clone {} into {} using Git").format(
remote, local_path
)
)
os.chdir(original_cwd)
raise e
os.chdir(original_cwd)
shutil.rmtree(backup_path, ignore_errors=True)
def get_remote(self, local_path) -> str:
"""Get the repository that this local path is set to fetch from"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
response = self._synchronous_call_git(["remote", "-v", "show"])
except GitFailed as e:
os.chdir(old_dir)
raise e
lines = response.split("\n")
result = "(unknown remote)"
for line in lines:
if line.endswith("(fetch)"):
# The line looks like:
# origin https://some/sort/of/path (fetch)
segments = line.split()
if len(segments) == 3:
result = segments[1]
break
fci.Console.PrintWarning("Error parsing the results from git remote -v show:\n")
fci.Console.PrintWarning(line + "\n")
os.chdir(old_dir)
return result
def get_branches(self, local_path) -> List[str]:
"""Get a list of all available branches (local and remote)"""
old_dir = os.getcwd()
os.chdir(local_path)
try:
stdout = self._synchronous_call_git(["branch", "-a", "--format=%(refname:lstrip=2)"])
except GitFailed as e:
os.chdir(old_dir)
raise e
os.chdir(old_dir)
branches = []
for branch in stdout.split("\n"):
branches.append(branch)
return branches
def get_branches_with_info(self, local_path) -> List[Dict[str, str]]:
"""Get a list of branches, where each entry is a dictionary with status information about
the branch."""
old_dir = os.getcwd()
os.chdir(local_path)
try:
stdout = self._synchronous_call_git(["branch", "-a", _ref_format_string()])
return _parse_ref_table(stdout)
except GitFailed as e:
os.chdir(old_dir)
raise e
def get_tags_with_info(self, local_path) -> List[Dict[str, str]]:
"""Get a list of branches, where each entry is a dictionary with status information about
the branch."""
old_dir = os.getcwd()
os.chdir(local_path)
try:
stdout = self._synchronous_call_git(["tag", "-l", _ref_format_string()])
return _parse_ref_table(stdout)
except GitFailed as e:
os.chdir(old_dir)
raise e
def get_last_committers(self, local_path, n=10):
"""Examine the last n entries of the commit history, and return a list of all
the committers, their email addresses, and how many commits each one is
responsible for.
"""
old_dir = os.getcwd()
os.chdir(local_path)
authors = self._synchronous_call_git(["log", f"-{n}", "--format=%cN"]).split("\n")
emails = self._synchronous_call_git(["log", f"-{n}", "--format=%cE"]).split("\n")
os.chdir(old_dir)
result_dict = {}
for author, email in zip(authors, emails):
if not author or not email:
continue
if author not in result_dict:
result_dict[author] = {}
result_dict[author]["email"] = [email]
result_dict[author]["count"] = 1
else:
if email not in result_dict[author]["email"]:
# Same author name, new email address -- treat it as the same
# person with a second email, instead of as a whole new person
result_dict[author]["email"].append(email)
result_dict[author]["count"] += 1
return result_dict
def get_last_authors(self, local_path, n=10):
"""Examine the last n entries of the commit history, and return a list of all
the authors, their email addresses, and how many commits each one is
responsible for.
"""
old_dir = os.getcwd()
os.chdir(local_path)
authors = self._synchronous_call_git(["log", f"-{n}", "--format=%aN"])
emails = self._synchronous_call_git(["log", f"-{n}", "--format=%aE"])
os.chdir(old_dir)
result_dict = {}
for author, email in zip(authors, emails):
if author not in result_dict:
result_dict[author]["email"] = [email]
result_dict[author]["count"] = 1
else:
if email not in result_dict[author]["email"]:
# Same author name, new email address -- treat it as the same
# person with a second email, instead of as a whole new person
result_dict[author]["email"].append(email)
result_dict[author]["count"] += 1
return result_dict
def migrate_branch(self, local_path: str, old_branch: str, new_branch: str) -> None:
"""Rename a branch (used when the remote branch name changed). Assumes that "origin"
exists."""
old_dir = os.getcwd()
os.chdir(local_path)
try:
self._synchronous_call_git(["branch", "-m", old_branch, new_branch])
self._synchronous_call_git(["fetch", "origin"])
self._synchronous_call_git(["branch", "--unset-upstream"])
self._synchronous_call_git(["branch", f"--set-upstream-to=origin/{new_branch}"])
self._synchronous_call_git(["pull"])
except GitFailed as e:
fci.Console.PrintWarning(
translate(
"AddonsInstaller",
"Git branch rename failed with the following message:",
)
+ str(e)
+ "\n"
)
os.chdir(old_dir)
raise e
os.chdir(old_dir)
def _find_git(self):
# Find git. In preference order
# A) The value of the GitExecutable user preference
# B) The executable located in the same directory as FreeCAD and called "git"
# C) The result of a shutil search for your system's "git" executable
prefs = fci.ParamGet("User parameter:BaseApp/Preferences/Addons")
git_exe = prefs.GetString("GitExecutable", "Not set")
if not git_exe or git_exe == "Not set" or not os.path.exists(git_exe):
fc_dir = fci.DataPaths().home_dir
git_exe = os.path.join(fc_dir, "bin", "git")
if "Windows" in platform.system():
git_exe += ".exe"
if platform.system() == "Darwin" and not self._xcode_command_line_tools_are_installed():
return
if not git_exe or not os.path.exists(git_exe):
git_exe = shutil.which("git")
if not git_exe or not os.path.exists(git_exe):
return
prefs.SetString("GitExecutable", git_exe)
self.git_exe = git_exe
def _xcode_command_line_tools_are_installed(self) -> bool:
"""On Macs, there is *always* an executable called "git", but sometimes it's just a
script that tells the user to install XCode's Command Line tools. So the existence of git
on the Mac actually requires us to check for that installation."""
try:
subprocess.check_output(["xcode-select", "-p"])
return True
except subprocess.CalledProcessError:
return False
def _synchronous_call_git(self, args: List[str]) -> str:
"""Calls git and returns its output."""
final_args = [self.git_exe]
final_args.extend(args)
try:
proc = utils.run_interruptable_subprocess(final_args)
except subprocess.CalledProcessError as e:
raise GitFailed(
f"Git returned a non-zero exit status: {e.returncode}\n"
+ f"Called with: {' '.join(final_args)}\n\n"
+ f"Returned stderr:\n{e.stderr}"
) from e
return proc.stdout
def initialize_git() -> Optional[GitManager]:
"""If git is enabled, locate the git executable if necessary and return a new
GitManager object. The executable location is saved in user preferences for reuse,
and git can be disabled by setting the disableGit parameter in the Addons
preference group. Returns None if for any of those reasons we aren't using git."""
git_manager = None
pref = fci.ParamGet("User parameter:BaseApp/Preferences/Addons")
disable_git = pref.GetBool("disableGit", False)
if not disable_git:
try:
git_manager = GitManager()
except NoGitFound:
pass
return git_manager