import os import shutil import sys import warnings import platform import psutil import torch import signal import site import traceback from subprocess import Popen from config import python_exec now_dir = os.getcwd() sys.path.insert(0, now_dir) warnings.filterwarnings("ignore") torch.manual_seed(233333) tmp = os.path.join(now_dir, "TEMP") os.makedirs(tmp, exist_ok=True) os.environ["TEMP"] = tmp if(os.path.exists(tmp)): for name in os.listdir(tmp): if(name=="jieba.cache"):continue path="%s/%s"%(tmp,name) delete=os.remove if os.path.isfile(path) else shutil.rmtree try: delete(path) except Exception as e: print(str(e)) pass import site site_packages_roots = [] for path in site.getsitepackages(): if "packages" in path: site_packages_roots.append(path) if(site_packages_roots==[]):site_packages_roots=["%s/runtime/Lib/site-packages" % now_dir] #os.environ["OPENBLAS_NUM_THREADS"] = "4" os.environ["no_proxy"] = "localhost, 127.0.0.1, ::1" os.environ["all_proxy"] = "" for site_packages_root in site_packages_roots: if os.path.exists(site_packages_root): try: with open("%s/users.pth" % (site_packages_root), "w") as f: f.write( "%s\n%s/tools\n%s/tools/damo_asr\n%s/GPT_SoVITS\n%s/tools/uvr5" % (now_dir, now_dir, now_dir, now_dir, now_dir) ) break except PermissionError: pass def kill_proc_tree(pid, including_parent=True): try: parent = psutil.Process(pid) except psutil.NoSuchProcess: # Process already terminated return children = parent.children(recursive=True) for child in children: try: os.kill(child.pid, signal.SIGTERM) # or signal.SIGKILL except OSError: pass if including_parent: try: os.kill(parent.pid, signal.SIGTERM) # or signal.SIGKILL except OSError: pass system=platform.system() def kill_process(pid): if(system=="Windows"): cmd = "taskkill /t /f /pid %s" % pid os.system(cmd) else: kill_proc_tree(pid) ps_slice=[] def open_slice(inp,opt_root,threshold,min_length,min_interval,hop_size,max_sil_kept,_max,alpha,n_parts): global ps_slice inp = my_utils.clean_path(inp) opt_root = my_utils.clean_path(opt_root) if(os.path.exists(inp)==False): yield "输入路径不存在",{"__type__":"update","visible":True},{"__type__":"update","visible":False} return if os.path.isfile(inp):n_parts=1 elif os.path.isdir(inp):pass else: yield "输入路径存在但既不是文件也不是文件夹",{"__type__":"update","visible":True},{"__type__":"update","visible":False} return if (ps_slice == []): for i_part in range(n_parts): cmd = '"%s" tools/slice_audio.py "%s" "%s" %s %s %s %s %s %s %s %s %s''' % (python_exec,inp, opt_root, threshold, min_length, min_interval, hop_size, max_sil_kept, _max, alpha, i_part, n_parts) print(cmd) p = Popen(cmd, shell=True) ps_slice.append(p) yield "切割执行中", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} for p in ps_slice: p.wait() ps_slice=[] yield "切割结束",{"__type__":"update","visible":True},{"__type__":"update","visible":False} else: yield "已有正在进行的切割任务,需先终止才能开启下一次任务", {"__type__": "update", "visible": False}, {"__type__": "update", "visible": True} def close_slice(): global ps_slice if (ps_slice != []): for p_slice in ps_slice: try: kill_process(p_slice.pid) except: traceback.print_exc() ps_slice=[] return "已终止所有切割进程", {"__type__": "update", "visible": True}, {"__type__": "update", "visible": False}