pixtral-12b-ZipNN-Compressed / zipnn_decompress_file.py
royleibov's picture
Add ZipNN
6bc66ab
raw
history blame
3.79 kB
import os
import subprocess
import sys
import argparse
import time
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
RED = "\033[91m"
YELLOW = "\033[93m"
GREEN = "\033[92m"
RESET = "\033[0m"
GB = 1024 * 1024 * 1024
def check_and_install_zipnn():
try:
import zipnn
except ImportError:
print("zipnn not found. Installing...")
subprocess.check_call([sys.executable, "-m", "pip", "install", "zipnn"])
import zipnn
def decompress_file(input_file, delete=False, force=False, hf_cache=False):
import zipnn
if not input_file.endswith(".znn"):
raise ValueError("Input file does not have the '.znn' suffix")
if os.path.exists(input_file):
decompressed_path = input_file[:-4]
if not force and os.path.exists(decompressed_path):
user_input = (
input(f"{decompressed_path} already exists; overwrite (y/n)? ").strip().lower()
)
if user_input not in ("yes", "y"):
print(f"Skipping {input_file}...")
return
print(f"Decompressing {input_file}...")
output_file = input_file[:-4]
zpn = zipnn.ZipNN(is_streaming=True)
file_size_before = 0
file_size_after = 0
start_time = time.time()
with open(input_file, "rb") as infile, open(output_file, "wb") as outfile:
d_data = b""
chunk = infile.read()
file_size_before = len(chunk)
d_data += zpn.decompress(chunk)
file_size_after = len(d_data)
outfile.write(d_data)
print(f"Decompressed {input_file} to {output_file}")
end_time = time.time() - start_time
print(
f"{GREEN}Back to original size: {file_size_after/GB:.02f}GB size before decompression: {file_size_before/GB:.02f}GB, time: {end_time:.02f}{RESET}"
)
if delete and not hf_cache:
print(f"Deleting {input_file}...")
os.remove(input_file)
if hf_cache:
# If the file is in the Hugging Face cache, fix the symlinks
print(f"{YELLOW}Reorganizing Hugging Face cache...{RESET}")
try:
snapshot_path = os.path.dirname(input_file)
blob_name = os.path.join(snapshot_path, os.readlink(input_file))
os.rename(output_file, blob_name)
os.symlink(blob_name, output_file)
if os.path.exists(input_file):
os.remove(input_file)
except Exception as e:
raise Exception(f"Error reorganizing Hugging Face cache: {e}")
else:
print(f"Error: The file {input_file} does not exist.")
if __name__ == "__main__":
check_and_install_zipnn()
parser = argparse.ArgumentParser(description="Enter a file path to decompress.")
parser.add_argument("input_file", type=str, help="Specify the path to the file to decompress.")
parser.add_argument(
"--delete",
action="store_true",
help="A flag that triggers deletion of a single compressed file instead of decompression",
)
parser.add_argument(
"--force", action="store_true", help="A flag that forces overwriting when decompressing."
)
parser.add_argument(
"--hf_cache",
action="store_true",
help="A flag that indicates if the file is in the Hugging Face cache.",
)
args = parser.parse_args()
optional_kwargs = {}
if args.delete:
optional_kwargs["delete"] = args.delete
if args.force:
optional_kwargs["force"] = args.force
if args.hf_cache:
optional_kwargs["hf_cache"] = args.hf_cache
decompress_file(args.input_file, **optional_kwargs)