|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import argparse |
|
from concurrent.futures import ProcessPoolExecutor |
|
import logging |
|
import os |
|
from pathlib import Path |
|
import subprocess as sp |
|
import sys |
|
from tempfile import NamedTemporaryFile |
|
import time |
|
import typing as tp |
|
import warnings |
|
import base64 |
|
|
|
from einops import rearrange |
|
import torch |
|
import gradio as gr |
|
|
|
from audiocraft.data.audio_utils import convert_audio |
|
from audiocraft.data.audio import audio_write |
|
from audiocraft.models.encodec import InterleaveStereoCompressionModel |
|
from audiocraft.models import MusicGen, MultiBandDiffusion |
|
|
|
from pydub import AudioSegment |
|
import io |
|
|
|
SECRET_TOKEN = os.getenv('SECRET_TOKEN', 'default_secret') |
|
|
|
MODEL = None |
|
SPACE_ID = os.environ.get('SPACE_ID', '') |
|
IS_BATCHED = False |
|
MAX_BATCH_SIZE = 12 |
|
BATCHED_DURATION = 15 |
|
INTERRUPTING = False |
|
MBD = None |
|
|
|
_old_call = sp.call |
|
|
|
|
|
def _call_nostderr(*args, **kwargs): |
|
|
|
kwargs['stderr'] = sp.DEVNULL |
|
kwargs['stdout'] = sp.DEVNULL |
|
_old_call(*args, **kwargs) |
|
|
|
|
|
sp.call = _call_nostderr |
|
|
|
pool = ProcessPoolExecutor(4) |
|
pool.__enter__() |
|
|
|
|
|
def interrupt(): |
|
global INTERRUPTING |
|
INTERRUPTING = True |
|
|
|
|
|
class FileCleaner: |
|
def __init__(self, file_lifetime: float = 3600): |
|
self.file_lifetime = file_lifetime |
|
self.files = [] |
|
|
|
def add(self, path: tp.Union[str, Path]): |
|
self._cleanup() |
|
self.files.append((time.time(), Path(path))) |
|
|
|
def _cleanup(self): |
|
now = time.time() |
|
for time_added, path in list(self.files): |
|
if now - time_added > self.file_lifetime: |
|
if path.exists(): |
|
path.unlink() |
|
self.files.pop(0) |
|
else: |
|
break |
|
|
|
|
|
file_cleaner = FileCleaner() |
|
|
|
def load_model(version='facebook/musicgen-melody'): |
|
global MODEL |
|
print("Loading model", version) |
|
if MODEL is None or MODEL.name != version: |
|
del MODEL |
|
MODEL = None |
|
MODEL = MusicGen.get_pretrained(version) |
|
|
|
|
|
def load_diffusion(): |
|
global MBD |
|
if MBD is None: |
|
print("loading MBD") |
|
MBD = MultiBandDiffusion.get_mbd_musicgen() |
|
|
|
|
|
def _do_predictions(texts, melodies, duration, progress=False, gradio_progress=None, **gen_kwargs): |
|
MODEL.set_generation_params(duration=duration, **gen_kwargs) |
|
print("new batch", len(texts), texts, [None if m is None else (m[0], m[1].shape) for m in melodies]) |
|
be = time.time() |
|
processed_melodies = [] |
|
target_sr = 32000 |
|
target_ac = 1 |
|
for melody in melodies: |
|
if melody is None: |
|
processed_melodies.append(None) |
|
else: |
|
sr, melody = melody[0], torch.from_numpy(melody[1]).to(MODEL.device).float().t() |
|
if melody.dim() == 1: |
|
melody = melody[None] |
|
melody = melody[..., :int(sr * duration)] |
|
melody = convert_audio(melody, sr, target_sr, target_ac) |
|
processed_melodies.append(melody) |
|
|
|
try: |
|
if any(m is not None for m in processed_melodies): |
|
outputs = MODEL.generate_with_chroma( |
|
descriptions=texts, |
|
melody_wavs=processed_melodies, |
|
melody_sample_rate=target_sr, |
|
progress=progress, |
|
return_tokens=USE_DIFFUSION |
|
) |
|
else: |
|
outputs = MODEL.generate(texts, progress=progress, return_tokens=USE_DIFFUSION) |
|
except RuntimeError as e: |
|
raise gr.Error("Error while generating " + e.args[0]) |
|
if USE_DIFFUSION: |
|
if gradio_progress is not None: |
|
gradio_progress(1, desc='Running MultiBandDiffusion...') |
|
tokens = outputs[1] |
|
if isinstance(MODEL.compression_model, InterleaveStereoCompressionModel): |
|
left, right = MODEL.compression_model.get_left_right_codes(tokens) |
|
tokens = torch.cat([left, right]) |
|
outputs_diffusion = MBD.tokens_to_wav(tokens) |
|
if isinstance(MODEL.compression_model, InterleaveStereoCompressionModel): |
|
assert outputs_diffusion.shape[1] == 1 |
|
outputs_diffusion = rearrange(outputs_diffusion, '(s b) c t -> b (s c) t', s=2) |
|
outputs = torch.cat([outputs[0], outputs_diffusion], dim=0) |
|
outputs = outputs.detach().cpu().float() |
|
out_wavs = [] |
|
for output in outputs: |
|
with NamedTemporaryFile("wb", suffix=".wav", delete=False) as file: |
|
audio_write( |
|
file.name, output, MODEL.sample_rate, strategy="loudness", |
|
loudness_headroom_db=16, loudness_compressor=True, add_suffix=False) |
|
out_wavs.append(file.name) |
|
file_cleaner.add(file.name) |
|
|
|
print("batch finished", len(texts), time.time() - be) |
|
print("Tempfiles currently stored: ", len(file_cleaner.files)) |
|
return out_wavs |
|
|
|
|
|
def predict_batched(texts, melodies): |
|
max_text_length = 512 |
|
texts = [text[:max_text_length] for text in texts] |
|
load_model('facebook/musicgen-stereo-melody') |
|
return _do_predictions(texts, melodies, BATCHED_DURATION) |
|
|
|
|
|
def predict_full(secret_token, model, model_path, decoder, text, melody, duration, topk, topp, temperature, cfg_coef, progress=gr.Progress()): |
|
if secret_token != SECRET_TOKEN: |
|
raise gr.Error( |
|
f'Invalid secret token. Please fork the original space if you want to use it for yourself.') |
|
|
|
global INTERRUPTING |
|
global USE_DIFFUSION |
|
INTERRUPTING = False |
|
progress(0, desc="Loading model...") |
|
model_path = model_path.strip() |
|
if model_path: |
|
if not Path(model_path).exists(): |
|
raise gr.Error(f"Model path {model_path} doesn't exist.") |
|
if not Path(model_path).is_dir(): |
|
raise gr.Error(f"Model path {model_path} must be a folder containing " |
|
"state_dict.bin and compression_state_dict_.bin.") |
|
model = model_path |
|
if temperature < 0: |
|
raise gr.Error("Temperature must be >= 0.") |
|
if topk < 0: |
|
raise gr.Error("Topk must be non-negative.") |
|
if topp < 0: |
|
raise gr.Error("Topp must be non-negative.") |
|
|
|
topk = int(topk) |
|
if decoder == "MultiBand_Diffusion": |
|
USE_DIFFUSION = True |
|
progress(0, desc="Loading diffusion model...") |
|
load_diffusion() |
|
else: |
|
USE_DIFFUSION = False |
|
load_model(model) |
|
|
|
max_generated = 0 |
|
|
|
def _progress(generated, to_generate): |
|
nonlocal max_generated |
|
max_generated = max(generated, max_generated) |
|
progress((min(max_generated, to_generate), to_generate)) |
|
if INTERRUPTING: |
|
raise gr.Error("Interrupted.") |
|
MODEL.set_custom_progress_callback(_progress) |
|
|
|
wavs = _do_predictions( |
|
[text], [melody], duration, progress=True, |
|
top_k=topk, top_p=topp, temperature=temperature, cfg_coef=cfg_coef, |
|
gradio_progress=progress) |
|
|
|
wav_path = wavs[0] |
|
if USE_DIFFUSION: |
|
wav_path = wavs[1] |
|
wav_base64 = "" |
|
|
|
|
|
|
|
mp3_path = wav_path.replace(".wav", ".mp3") |
|
sound = AudioSegment.from_wav(wav_path) |
|
sound.export(mp3_path, format="mp3") |
|
|
|
|
|
mp3_base64 = "" |
|
with open(mp3_path, "rb") as mp3_file: |
|
mp3_base64 = base64.b64encode(mp3_file.read()).decode('utf-8') |
|
|
|
|
|
mp3_base64_data_uri = 'data:audio/mp3;base64,' + mp3_base64 |
|
|
|
return mp3_base64_data_uri |
|
|
|
def toggle_audio_src(choice): |
|
if choice == "mic": |
|
return gr.update(source="microphone", value=None, label="Microphone") |
|
else: |
|
return gr.update(source="upload", value=None, label="File") |
|
|
|
|
|
def toggle_diffusion(choice): |
|
if choice == "MultiBand_Diffusion": |
|
return [gr.update(visible=True)] |
|
else: |
|
return [gr.update(visible=False)] |
|
|
|
|
|
def ui_full(): |
|
with gr.Blocks() as interface: |
|
gr.Markdown( |
|
""" |
|
# MusicGen |
|
This is your private demo for [MusicGen](https://github.com/facebookresearch/audiocraft), |
|
a simple and controllable model for music generation |
|
presented at: ["Simple and Controllable Music Generation"](https://huggingface.co/papers/2306.05284) |
|
""" |
|
) |
|
with gr.Row(): |
|
with gr.Column(): |
|
with gr.Row(): |
|
secret_token = gr.Text( |
|
label='Secret Token', |
|
max_lines=1, |
|
placeholder='Enter your secret token' |
|
) |
|
text = gr.Text(label="Input Text", interactive=True) |
|
with gr.Column(): |
|
radio = gr.Radio(["file", "mic"], value="file", |
|
label="Condition on a melody (optional) File or Mic") |
|
melody = gr.Audio(source="upload", type="numpy", label="File", |
|
interactive=True, elem_id="melody-input") |
|
with gr.Row(): |
|
submit = gr.Button("Submit") |
|
|
|
_ = gr.Button("Interrupt").click(fn=interrupt, queue=False) |
|
with gr.Row(): |
|
model = gr.Radio(["facebook/musicgen-melody", "facebook/musicgen-medium", "facebook/musicgen-small", |
|
"facebook/musicgen-large", "facebook/musicgen-melody-large", |
|
"facebook/musicgen-stereo-small", "facebook/musicgen-stereo-medium", |
|
"facebook/musicgen-stereo-melody", "facebook/musicgen-stereo-large", |
|
"facebook/musicgen-stereo-melody-large"], |
|
label="Model", value="facebook/musicgen-stereo-large", interactive=True) |
|
model_path = gr.Text(label="Model Path (custom models)") |
|
with gr.Row(): |
|
decoder = gr.Radio(["Default", "MultiBand_Diffusion"], |
|
label="Decoder", value="Default", interactive=True) |
|
with gr.Row(): |
|
duration = gr.Slider(minimum=1, maximum=600, value=120, label="Duration", interactive=True) |
|
with gr.Row(): |
|
topk = gr.Number(label="Top-k", value=250, interactive=True) |
|
topp = gr.Number(label="Top-p", value=0, interactive=True) |
|
temperature = gr.Number(label="Temperature", value=1.0, interactive=True) |
|
cfg_coef = gr.Number(label="Classifier Free Guidance", value=3.0, interactive=True) |
|
with gr.Column(): |
|
audio_output = gr.Textbox(label="Generated Music (wav)") |
|
|
|
submit.click( |
|
fn=predict_full, |
|
inputs=[secret_token, model, model_path, decoder, text, melody, duration, topk, topp, |
|
temperature, cfg_coef], |
|
outputs=audio_output, |
|
api_name="run") |
|
|
|
gr.HTML(""" |
|
<div style="z-index: 100; position: fixed; top: 0px; right: 0px; left: 0px; bottom: 0px; width: 100%; height: 100%; background: white; display: flex; align-items: center; justify-content: center; color: black;"> |
|
<div style="text-align: center; color: black;"> |
|
<p style="color: black;">This space is a REST API to programmatically generate music.</p> |
|
<p style="color: black;">Interested in using it? All credit is due to the <a href="https://huggingface.co/spaces/facebook/MusicGen" target="_blank">original space</a>, so go on and fork it 🤗</p> |
|
</div> |
|
</div>""") |
|
|
|
interface.queue(max_size=12).launch() |
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stderr) |
|
|
|
|
|
|
|
load_model('facebook/musicgen-stereo-large') |
|
ui_full() |
|
|