|
import gradio as gr |
|
from PIL import Image, ImageDraw, PngImagePlugin |
|
import io |
|
import os |
|
import cv2 |
|
import numpy as np |
|
import uuid |
|
|
|
|
|
|
|
def to_bin(data): |
|
if isinstance(data, str): |
|
return ''.join([format(ord(i), "08b") for i in data]) |
|
elif isinstance(data, bytes): |
|
return ''.join([format(i, "08b") for i in data]) |
|
elif isinstance(data, np.ndarray): |
|
return [format(i, "08b") for i in data] |
|
elif isinstance(data, int) or isinstance(data, np.uint8): |
|
return format(data, "08b") |
|
else: |
|
raise TypeError("Type not supported.") |
|
|
|
|
|
def encode(image_path, secret_data): |
|
try: |
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
return None, "Failed to read image" |
|
|
|
|
|
n_bytes = (image.shape[0] * image.shape[1] * 3) // 8 |
|
|
|
|
|
secret_data_with_delimiter = f'{secret_data}#####' |
|
if len(secret_data_with_delimiter) >= n_bytes: |
|
return None, "Watermark is too large for Image Size" |
|
|
|
secret_data_with_delimiter += "=====" |
|
binary_secret_data = to_bin(secret_data_with_delimiter) |
|
data_len = len(binary_secret_data) |
|
|
|
|
|
watermarked_image = image.copy() |
|
data_index = 0 |
|
|
|
|
|
for i in range(watermarked_image.shape[0]): |
|
for j in range(watermarked_image.shape[1]): |
|
for k in range(3): |
|
if data_index < data_len: |
|
|
|
pixel = watermarked_image[i, j, k] |
|
|
|
binary_pixel = format(pixel, '08b') |
|
new_pixel = binary_pixel[:-1] + binary_secret_data[data_index] |
|
|
|
watermarked_image[i, j, k] = int(new_pixel, 2) |
|
data_index += 1 |
|
else: |
|
break |
|
if data_index >= data_len: |
|
break |
|
if data_index >= data_len: |
|
break |
|
|
|
return watermarked_image, None |
|
except Exception as e: |
|
return None, f"Error during encoding: {str(e)}" |
|
|
|
|
|
def decode(image_path): |
|
try: |
|
|
|
image = cv2.imread(image_path) |
|
if image is None: |
|
return "Failed to read image" |
|
|
|
binary_data = "" |
|
|
|
for i in range(image.shape[0]): |
|
for j in range(image.shape[1]): |
|
for k in range(3): |
|
pixel = format(image[i, j, k], '08b') |
|
binary_data += pixel[-1] |
|
|
|
|
|
decoded_data = "" |
|
for i in range(0, len(binary_data), 8): |
|
byte = binary_data[i:i+8] |
|
if len(byte) == 8: |
|
decoded_data += chr(int(byte, 2)) |
|
|
|
if decoded_data[-5:] == "=====": |
|
break |
|
|
|
|
|
if "#####" in decoded_data: |
|
return decoded_data.split("#####")[0] |
|
return "No watermark found" |
|
|
|
except Exception as e: |
|
return f"Error during decoding: {str(e)}" |
|
|
|
|
|
def png_encode(im_name, extra): |
|
try: |
|
im = Image.open(im_name) |
|
info = PngImagePlugin.PngInfo() |
|
info.add_text("TXT", extra) |
|
|
|
unique_id = str(uuid.uuid4())[:8] |
|
filename, ext = os.path.splitext(im_name) |
|
new_filename = f"{filename}_{unique_id}{ext}" |
|
im.save(new_filename, "PNG", pnginfo=info) |
|
|
|
width, height = im.size |
|
rect_width, rect_height = 200, 50 |
|
x = width - rect_width - 10 |
|
y = height - rect_height - 10 |
|
global png_encode_coords |
|
png_encode_coords = (x, y, rect_width, rect_height) |
|
|
|
return new_filename, None |
|
|
|
except Exception as e: |
|
return None, str(e) |
|
|
|
|
|
def highlight_watermark(image, coords=(0, 0, 100, 50), color="red", width=5): |
|
try: |
|
if isinstance(image, np.ndarray): |
|
image = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB)) |
|
draw = ImageDraw.Draw(image) |
|
x, y, w, h = coords |
|
draw.rectangle((max(0, x - 5), max(0, y - 5), x + w + 5, y + h + 5), outline=color, width=width) |
|
return image |
|
except Exception as e: |
|
print(f"Error highlighting: {e}") |
|
return image |
|
|
|
|
|
|
|
def choose_encode(inp_im, inp_mark, cho): |
|
try: |
|
if not inp_im: |
|
return None, "Please upload an image.", None |
|
if not inp_mark: |
|
return None, "Please enter watermark text.", None |
|
|
|
if cho == "stegan": |
|
watermarked_image, error_msg = encode(inp_im, inp_mark) |
|
if error_msg: |
|
return None, error_msg, None |
|
|
|
output_path = os.path.splitext(inp_im)[0] + "_watermarked.png" |
|
cv2.imwrite(output_path, watermarked_image) |
|
return output_path, "Steganography watermark added successfully.", output_path |
|
|
|
elif cho == "pnginfo": |
|
output_path, error_msg = png_encode(inp_im, inp_mark) |
|
if error_msg: |
|
return None, error_msg, None |
|
return output_path, "PNG metadata watermark added successfully.", output_path |
|
except Exception as e: |
|
return None, f"An unexpected error occurred: {e}", None |
|
|
|
|
|
def detect_watermark(det_im): |
|
if not det_im: |
|
return "Please upload an image." |
|
|
|
detected_text = decode(det_im) |
|
if not detected_text or detected_text.startswith("Error"): |
|
return "No watermark detected or not encoded with this tool." |
|
|
|
return detected_text |
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
with gr.Tab("Add Watermark"): |
|
cho = gr.Radio(choices=["stegan", "pnginfo"], value="stegan", label="Watermark Method") |
|
with gr.Row(): |
|
with gr.Column(): |
|
inp_im = gr.Image(label="Input Image", type="filepath") |
|
inp_mark = gr.Textbox(label="Watermark Text") |
|
mark_btn = gr.Button("Add Watermark") |
|
msg_box = gr.Textbox(label="System Message") |
|
file_output = gr.File(label="Download Watermarked Image") |
|
with gr.Column(): |
|
out_im = gr.Image(label="Watermarked Image") |
|
|
|
mark_btn.click(choose_encode, inputs=[inp_im, inp_mark, cho], outputs=[out_im, msg_box, file_output]) |
|
|
|
with gr.Tab("Detect Watermark"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
det_im = gr.Image(label="Watermarked Image", type="filepath") |
|
det_btn = gr.Button("Detect Watermark") |
|
with gr.Column(): |
|
det_msg = gr.Textbox(label="Detected Watermark", lines=6) |
|
|
|
det_btn.click(detect_watermark, inputs=[det_im], outputs=[det_msg]) |
|
|
|
if __name__ == "__main__": |
|
app.launch( |
|
show_api=False |
|
) |
|
|
|
mark_btn.click(choose_encode, inputs=[inp_im, inp_mark, cho], outputs=[out_im, msg_box, file_output]) |
|
|
|
with gr.Tab("Detect Watermark"): |
|
with gr.Row(): |
|
with gr.Column(): |
|
det_im = gr.Image(label="Watermarked Image", type="filepath") |
|
det_btn = gr.Button("Detect Watermark") |
|
with gr.Column(): |
|
det_msg = gr.Textbox(label="Detected Watermark", lines=6) |
|
|
|
det_btn.click(detect_watermark, inputs=[det_im], outputs=[det_msg]) |
|
|
|
if __name__ == "__main__": |
|
app.launch( |
|
show_api=False |
|
) |
|
|