ANPR-V3 / v3.py
songhieng's picture
Upload folder using huggingface_hub
c1b3a0c verified
from ultralytics import YOLO
import cv2
import numpy as np
from scipy.spatial import KDTree
import os
import datetime
from sort.sort import *
import util
import io
from util import get_car, read_license_plate
import firebase_admin
from firebase_admin import credentials, db, storage
import base64
# Firebase Information Server
cred = credentials.Certificate("anpr-v3-b5bb8-firebase-adminsdk-8pkgt-d88b8f69b1.json")
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://anpr-v3-b5bb8-default-rtdb.asia-southeast1.firebasedatabase.app',
'storageBucket': 'anpr-v3-b5bb8.appspot.com'
})
ref = db.reference('/')
users_ref = ref.child('right')
#Get Data from the RealTime Database of The Firebase
root_ref = db.reference('/Detected')
# Fetch all data at the root of the database
Detected_data = root_ref.get()
plates = [data['plate'] for data in Detected_data.values()] if Detected_data else []
print(plates)
# Directories for saving detected cars and license plates
car_output_dir = "detected_cars"
plate_output_dir = "detected_plates"
Detected_dir = "DATA"
if not os.path.exists(car_output_dir):
os.makedirs(car_output_dir)
if not os.path.exists(plate_output_dir):
os.makedirs(plate_output_dir)
results = {}
mot_tracker = Sort()
# Load models
coco_model = YOLO('yolov8n.pt')
license_plate_detector = YOLO('./models/run46.pt')
model = YOLO('car.pt')
model1 = YOLO('color.pt')
# Load video
cap = cv2.VideoCapture('./sample1.mp4')
vehicles = [2, 3, 4, 5, 6, 7]
frame_skip = 40
# Read frames
frame_nmr = -1
ret = True
# Color Recognition functions
# Predefined colors dictionary
# COLORS = {
# "black": (0, 0, 0),
# "white": (255, 255, 255),
# "red": (255, 0, 0),
# "green": (0, 255, 0),
# "blue": (0, 0, 255),
# "yellow": (255, 255, 0),
# "cyan": (0, 255, 255),
# "magenta": (255, 0, 255),
# "gray": (128, 128, 128),
# }
class_map_color = {
0: 'beige',
1: 'black',
2: 'blue',
3: 'brown',
4: 'gold',
5: 'green',
6: 'grey',
7: 'orange',
8: 'pink',
9: 'purple',
10: 'red',
11: 'sivler',
12: 'tan',
13: 'white',
14: 'yellow'
}
# color_names = list(COLORS.keys())
# color_values = list(COLORS.values())
# color_tree = KDTree(color_values)
# def find_closest_color_name(rgb_color):
# distance, index = color_tree.query(rgb_color)
# return color_names[index]
# def find_dominant_color(image, n_colors=3):
# h, w = image.shape[:2]
# central_region = image[int(h * 0.25):int(h * 0.75), int(w * 0.25):int(w * 0.75)]
# pixels = np.float32(central_region.reshape(-1, 3))
# criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2)
# _, labels, palette = cv2.kmeans(pixels, n_colors, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS)
# _, counts = np.unique(labels, return_counts=True)
# dominant = palette[np.argmax(counts)]
# return tuple(dominant.astype(int))
def find_plate(search_plate, plate_array):
return search_plate in plate_array
def reset_counts():
global vehicle_counts, current_frame_count, unique_vehicle_ids
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} # Reset vehicle counts
unique_vehicle_ids.clear() # Clear the set of unique vehicle IDs
current_frame_count = 0 # Reset frame counter
# def upload_text(path, text):
# ref = db.reference(path)
# ref.set(text)
# def upload_two_texts_append(path, text1, text2):
# # Create a reference to the specified path in the Firebase Realtime Database
# ref = db.reference(path)
# # Structure the data as a dictionary with two keys, each holding one of the text strings
# data = {
# 'text1': text1,
# 'text2': text2
# }
# # Use the push() method to add the data under a new, unique child node at the specified path
# new_ref = ref.push(data)
# print(f"Appended text1 and text2 under {path} at {new_ref.key}")
def upload_to_firebase(filename, destination_blob_name):
bucket = storage.bucket()
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(filename)
print(f"File {filename} uploaded to {destination_blob_name}.")
# Return the public URL of the uploaded image
return blob.public_url
# print(f"Data saved to database with key: {new_user.key}")
#----------------------------------------------------------------
#Car coutner
# Initialize a set to keep track of unique vehicle IDs detected by SORT
coco_class_to_vehicle_type = {
2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck'
}
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()}
# Initialize a set to keep track of unique vehicle IDs detected by SORT
unique_vehicle_ids = set()
# Initialize dictionary to map vehicle track IDs to their COCO class IDs
track_id_to_class_id = {}
cnt = 0
score = 1
frame_rate = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate of the video
count_duration_seconds = 10 # Duration in seconds before resetting counts
frame_count_for_reset = int(frame_rate * count_duration_seconds) # Calculate number of frames for the duration
current_frame_count = 0 # Initialize a counter for frames
#================================================================
class_map = {0: 'Convertible', 1: 'Coupe', 2: 'Hatchback', 3: 'Pickup', 4: 'SUV', 5: 'Sedan'}
while ret:
frame_nmr += 1
ret, frame = cap.read()
current_frame_count += 1 # Increment frame counter
if ret and frame_nmr % frame_skip == 0:
# Perform detection
detections = coco_model(frame)[0]
detections_ = []
for detection in detections.boxes.data.tolist():
x1, y1, x2, y2, score, class_id = detection
cnt+=1
if int(class_id) in vehicles:
# Append detections with class ID for later reference
detections_.append([x1, y1, x2, y2, score, class_id])
# Convert detections for tracking (excluding class_id)
tracking_data = np.array([d[:5] for d in detections_])
track_bbs_ids = mot_tracker.update(tracking_data)
# Update track ID to class ID mapping
for track in track_bbs_ids:
track_id = int(track[4])
# Find matching detection (assuming first match is correct for simplicity)
for d in detections_:
if all([np.isclose(track[i], d[i], atol=1e-3) for i in range(4)]): # Simple bounding box match
class_id = d[5]
track_id_to_class_id[track_id] = class_id
break
# Debug print
# if detections_:
# print(f"Vehicle detections with confidence: {[(d[4], 'Confidence') for d in detections_]}")
# Track vehicles
track_ids = mot_tracker.update(np.asarray(detections_))
for track in track_bbs_ids:
track_id = int(track[4]) # Track ID is the last element in the track array
unique_vehicle_ids.add(track_id)
# Debug print
# if len(track_ids) > 0:
# print(f"Tracking IDs: {track_ids}")
# Detect license plates
license_plates = license_plate_detector(frame)[0]
# Debug print
# if license_plates:
# print(f"License plates detected: {len(license_plates.boxes.data.tolist())}")
for license_plate in license_plates.boxes.data.tolist():
x1, y1, x2, y2, score, class_id = license_plate
# Assign license plate to car
xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids)
if car_id != -1:
print(f"Car ID: {car_id}, Confidence: {score}") # Debug print
# upload_text('/', car_id)
# Crop and process license plate
license_plate_crop = frame[int(y1):int(y2), int(x1): int(x2), :]
license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY)
_, license_plate_crop_thresh = cv2.threshold(license_plate_crop_gray, 64, 255, cv2.THRESH_BINARY_INV)
# Read license plate number
license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh)
if license_plate_text is not None:
print(f"License Plate Text: {license_plate_text}") # Debug print
is_plate_found = find_plate(license_plate_text, plates)
print(f"Is the plate '{license_plate_text}' found? {'Yes' if is_plate_found else 'No'}")
# if(is_plate_found):
# print(1)
# else:
# continue
# Save the image of the detected car
car_image = frame[int(ycar1):int(ycar2), int(xcar1): int(xcar2), :]
# dominant_color = find_dominant_color(car_image)
# closest_color_name = find_closest_color_name(dominant_color)
# print(f"Vehicle Color: {closest_color_name}") # Debug print
results1 = model1(car_image)
# Assuming the top prediction is what you're interested in
top_prediction_index = results1[0].probs.top5[0] # Index of the highest probability class
top_prediction_prob = results1[0].probs.top5conf[0].item() # Highest probability
# Get the car type from the class_map
car_color = class_map_color[top_prediction_index]
print(f"{car_color}")
# Save the car image to the local filesystem
# cv2.imwrite(car_image_filename, car_image)
################################
#Type of Car Detectin
# Perform object detection
results = model(car_image)
# Assuming the top prediction is what you're interested in
top_prediction_index = results[0].probs.top5[0] # Index of the highest probability class
top_prediction_prob = results[0].probs.top5conf[0].item() # Highest probability
# Get the car type from the class_map
car_type = class_map[top_prediction_index]
print(f"{car_type}")
now_str = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
# Current date and time as Unix timestamp in milliseconds
now_int = int(datetime.datetime.utcnow().timestamp() * 1000)
# Save the image of the detected car
car_image_filename = os.path.join(car_output_dir, f"car_{license_plate_text}.jpg")
cv2.imwrite(car_image_filename, car_image)
car_image_url = upload_to_firebase(car_image_filename, f"detected_cars/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg")
if(is_plate_found):
Detected_dir1 = os.path.join(Detected_dir, f"car_{license_plate_text}.jpg")
cv2.imwrite(Detected_dir1, car_image)
car_image_url = upload_to_firebase(Detected_dir1, f"DATA/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg")
# Save the image of the detected license plate
license_plate_image_filename = os.path.join(plate_output_dir, f"plate_{license_plate_text}.jpg")
cv2.imwrite(license_plate_image_filename, license_plate_crop)
license_plate_url = upload_to_firebase(license_plate_image_filename, f"detected_plates/plate_{license_plate_text}_{now_str}_{now_int}.jpg")
# print(f"License plate image uploaded: {license_plate_url}") # Debug print
# print(f"Current vehicle counts (before potential reset): {vehicle_counts}")
# Count vehicles by type after processing all frames
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()}
for class_id in track_id_to_class_id.values():
vehicle_type = coco_class_to_vehicle_type.get(class_id, 'unknown')
if vehicle_type in vehicle_counts:
vehicle_counts[vehicle_type] += 1
# Print counts
for vehicle_type, count in vehicle_counts.items():
if vehicle_counts.items() == 0:
score += count * 1
else:
score += count* 4
print(f"Total {vehicle_type}s detected: {count}")
data = users_ref.push({'text1': vehicle_type,'text2': count})
# upload_two_texts_append("/right", vehicle_type, count)
print(cnt)
print('/traffic-score1', score)
# print('/traffic-score2', score)
#####First
# upload_text('/traffic-score1', score)
# score1 = ref.child('TrafficScore1')
# score1.set(score)
#####Second
# upload_text('/traffic-score2', score)
score2 = ref.child('TrafficScore2')
score2.set(score)
count = ref.child('count')
count.set(count)
# upload_text('/Counter', cnt)