img1 / main.py
Jamiiwej2903's picture
Update main.py
03fe7d6 verified
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
import uvicorn
from fastapi.responses import StreamingResponse
import io
import base64
from PIL import Image
import ffmpeg
import tempfile
import os
from huggingface_hub import InferenceClient
app = FastAPI()
client = InferenceClient("stabilityai/stable-video-diffusion-img2vid-xt-1-1-tensorrt")
@app.post("/generate_video/")
async def generate_video_api(
file: UploadFile = File(...),
num_frames: int = Form(14),
fps: int = Form(7),
motion_bucket_id: int = Form(127),
cond_aug: float = Form(0.02),
seed: int = Form(0)
):
try:
# Read the uploaded image file
image_content = await file.read()
image = Image.open(io.BytesIO(image_content))
# Convert image to base64
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
# Generate video frames using the stable-video-diffusion model
response = client.post(
json={
"inputs": img_str,
"parameters": {
"num_inference_steps": 25,
"num_frames": num_frames,
"motion_bucket_id": motion_bucket_id,
"cond_aug": cond_aug,
"seed": seed
}
}
)
# Check if the response is a list of images
if not isinstance(response, list):
raise ValueError(f"Unexpected response from the model: {response}")
# Create a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
# Save frames as temporary files
frame_files = []
for i, frame_data in enumerate(response):
frame = Image.open(io.BytesIO(base64.b64decode(frame_data)))
frame_file = os.path.join(tmpdir, f"frame_{i:03d}.png")
frame.save(frame_file)
frame_files.append(frame_file)
# Create a temporary file for the video
with tempfile.NamedTemporaryFile(suffix='.mp4', delete=False) as temp_video:
temp_video_path = temp_video.name
# Use ffmpeg-python to combine images into a video
input_stream = ffmpeg.input(os.path.join(tmpdir, 'frame_%03d.png'), framerate=fps)
output_stream = ffmpeg.output(input_stream, temp_video_path, vcodec='libx264', pix_fmt='yuv420p')
ffmpeg.run(output_stream, overwrite_output=True)
# Read the temporary video file
with open(temp_video_path, 'rb') as video_file:
video_content = video_file.read()
# Delete the temporary video file
os.unlink(temp_video_path)
# Return the video as a streaming response
return StreamingResponse(io.BytesIO(video_content), media_type="video/mp4")
except Exception as err:
# Handle any errors
raise HTTPException(status_code=500, detail=f"An error occurred: {str(err)}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)