from ultralytics import YOLO import cv2 import numpy as np from scipy.spatial import KDTree import os import datetime from sort.sort import * import util import io from util import get_car, read_license_plate import firebase_admin from firebase_admin import credentials, db, storage import base64 # Firebase Information Server cred = credentials.Certificate("anpr-v3-b5bb8-firebase-adminsdk-8pkgt-d88b8f69b1.json") firebase_admin.initialize_app(cred, { 'databaseURL': 'https://anpr-v3-b5bb8-default-rtdb.asia-southeast1.firebasedatabase.app', 'storageBucket': 'anpr-v3-b5bb8.appspot.com' }) ref = db.reference('/') users_ref = ref.child('right') #Get Data from the RealTime Database of The Firebase root_ref = db.reference('/Detected') # Fetch all data at the root of the database Detected_data = root_ref.get() plates = [data['plate'] for data in Detected_data.values()] if Detected_data else [] print(plates) # Directories for saving detected cars and license plates car_output_dir = "detected_cars" plate_output_dir = "detected_plates" Detected_dir = "DATA" if not os.path.exists(car_output_dir): os.makedirs(car_output_dir) if not os.path.exists(plate_output_dir): os.makedirs(plate_output_dir) results = {} mot_tracker = Sort() # Load models coco_model = YOLO('yolov8n.pt') license_plate_detector = YOLO('./models/run46.pt') model = YOLO('car.pt') model1 = YOLO('color.pt') # Load video cap = cv2.VideoCapture('./sample1.mp4') vehicles = [2, 3, 4, 5, 6, 7] frame_skip = 40 # Read frames frame_nmr = -1 ret = True # Color Recognition functions # Predefined colors dictionary # COLORS = { # "black": (0, 0, 0), # "white": (255, 255, 255), # "red": (255, 0, 0), # "green": (0, 255, 0), # "blue": (0, 0, 255), # "yellow": (255, 255, 0), # "cyan": (0, 255, 255), # "magenta": (255, 0, 255), # "gray": (128, 128, 128), # } class_map_color = { 0: 'beige', 1: 'black', 2: 'blue', 3: 'brown', 4: 'gold', 5: 'green', 6: 'grey', 7: 'orange', 8: 'pink', 9: 'purple', 10: 'red', 11: 'sivler', 12: 'tan', 13: 'white', 14: 'yellow' } # color_names = list(COLORS.keys()) # color_values = list(COLORS.values()) # color_tree = KDTree(color_values) # def find_closest_color_name(rgb_color): # distance, index = color_tree.query(rgb_color) # return color_names[index] # def find_dominant_color(image, n_colors=3): # h, w = image.shape[:2] # central_region = image[int(h * 0.25):int(h * 0.75), int(w * 0.25):int(w * 0.75)] # pixels = np.float32(central_region.reshape(-1, 3)) # criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 100, 0.2) # _, labels, palette = cv2.kmeans(pixels, n_colors, None, criteria, 10, cv2.KMEANS_RANDOM_CENTERS) # _, counts = np.unique(labels, return_counts=True) # dominant = palette[np.argmax(counts)] # return tuple(dominant.astype(int)) def find_plate(search_plate, plate_array): return search_plate in plate_array def reset_counts(): global vehicle_counts, current_frame_count, unique_vehicle_ids vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} # Reset vehicle counts unique_vehicle_ids.clear() # Clear the set of unique vehicle IDs current_frame_count = 0 # Reset frame counter # def upload_text(path, text): # ref = db.reference(path) # ref.set(text) # def upload_two_texts_append(path, text1, text2): # # Create a reference to the specified path in the Firebase Realtime Database # ref = db.reference(path) # # Structure the data as a dictionary with two keys, each holding one of the text strings # data = { # 'text1': text1, # 'text2': text2 # } # # Use the push() method to add the data under a new, unique child node at the specified path # new_ref = ref.push(data) # print(f"Appended text1 and text2 under {path} at {new_ref.key}") def upload_to_firebase(filename, destination_blob_name): bucket = storage.bucket() blob = bucket.blob(destination_blob_name) blob.upload_from_filename(filename) print(f"File {filename} uploaded to {destination_blob_name}.") # Return the public URL of the uploaded image return blob.public_url # print(f"Data saved to database with key: {new_user.key}") #---------------------------------------------------------------- #Car coutner # Initialize a set to keep track of unique vehicle IDs detected by SORT coco_class_to_vehicle_type = { 2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck' } vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} # Initialize a set to keep track of unique vehicle IDs detected by SORT unique_vehicle_ids = set() # Initialize dictionary to map vehicle track IDs to their COCO class IDs track_id_to_class_id = {} cnt = 0 score = 1 frame_rate = cap.get(cv2.CAP_PROP_FPS) # Get the frame rate of the video count_duration_seconds = 10 # Duration in seconds before resetting counts frame_count_for_reset = int(frame_rate * count_duration_seconds) # Calculate number of frames for the duration current_frame_count = 0 # Initialize a counter for frames #================================================================ class_map = {0: 'Convertible', 1: 'Coupe', 2: 'Hatchback', 3: 'Pickup', 4: 'SUV', 5: 'Sedan'} while ret: frame_nmr += 1 ret, frame = cap.read() current_frame_count += 1 # Increment frame counter if ret and frame_nmr % frame_skip == 0: # Perform detection detections = coco_model(frame)[0] detections_ = [] for detection in detections.boxes.data.tolist(): x1, y1, x2, y2, score, class_id = detection cnt+=1 if int(class_id) in vehicles: # Append detections with class ID for later reference detections_.append([x1, y1, x2, y2, score, class_id]) # Convert detections for tracking (excluding class_id) tracking_data = np.array([d[:5] for d in detections_]) track_bbs_ids = mot_tracker.update(tracking_data) # Update track ID to class ID mapping for track in track_bbs_ids: track_id = int(track[4]) # Find matching detection (assuming first match is correct for simplicity) for d in detections_: if all([np.isclose(track[i], d[i], atol=1e-3) for i in range(4)]): # Simple bounding box match class_id = d[5] track_id_to_class_id[track_id] = class_id break # Debug print # if detections_: # print(f"Vehicle detections with confidence: {[(d[4], 'Confidence') for d in detections_]}") # Track vehicles track_ids = mot_tracker.update(np.asarray(detections_)) for track in track_bbs_ids: track_id = int(track[4]) # Track ID is the last element in the track array unique_vehicle_ids.add(track_id) # Debug print # if len(track_ids) > 0: # print(f"Tracking IDs: {track_ids}") # Detect license plates license_plates = license_plate_detector(frame)[0] # Debug print # if license_plates: # print(f"License plates detected: {len(license_plates.boxes.data.tolist())}") for license_plate in license_plates.boxes.data.tolist(): x1, y1, x2, y2, score, class_id = license_plate # Assign license plate to car xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids) if car_id != -1: print(f"Car ID: {car_id}, Confidence: {score}") # Debug print # upload_text('/', car_id) # Crop and process license plate license_plate_crop = frame[int(y1):int(y2), int(x1): int(x2), :] license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY) _, license_plate_crop_thresh = cv2.threshold(license_plate_crop_gray, 64, 255, cv2.THRESH_BINARY_INV) # Read license plate number license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh) if license_plate_text is not None: print(f"License Plate Text: {license_plate_text}") # Debug print is_plate_found = find_plate(license_plate_text, plates) print(f"Is the plate '{license_plate_text}' found? {'Yes' if is_plate_found else 'No'}") # if(is_plate_found): # print(1) # else: # continue # Save the image of the detected car car_image = frame[int(ycar1):int(ycar2), int(xcar1): int(xcar2), :] # dominant_color = find_dominant_color(car_image) # closest_color_name = find_closest_color_name(dominant_color) # print(f"Vehicle Color: {closest_color_name}") # Debug print results1 = model1(car_image) # Assuming the top prediction is what you're interested in top_prediction_index = results1[0].probs.top5[0] # Index of the highest probability class top_prediction_prob = results1[0].probs.top5conf[0].item() # Highest probability # Get the car type from the class_map car_color = class_map_color[top_prediction_index] print(f"{car_color}") # Save the car image to the local filesystem # cv2.imwrite(car_image_filename, car_image) ################################ #Type of Car Detectin # Perform object detection results = model(car_image) # Assuming the top prediction is what you're interested in top_prediction_index = results[0].probs.top5[0] # Index of the highest probability class top_prediction_prob = results[0].probs.top5conf[0].item() # Highest probability # Get the car type from the class_map car_type = class_map[top_prediction_index] print(f"{car_type}") now_str = datetime.datetime.utcnow().replace(microsecond=0).isoformat() # Current date and time as Unix timestamp in milliseconds now_int = int(datetime.datetime.utcnow().timestamp() * 1000) # Save the image of the detected car car_image_filename = os.path.join(car_output_dir, f"car_{license_plate_text}.jpg") cv2.imwrite(car_image_filename, car_image) car_image_url = upload_to_firebase(car_image_filename, f"detected_cars/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg") if(is_plate_found): Detected_dir1 = os.path.join(Detected_dir, f"car_{license_plate_text}.jpg") cv2.imwrite(Detected_dir1, car_image) car_image_url = upload_to_firebase(Detected_dir1, f"DATA/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg") # Save the image of the detected license plate license_plate_image_filename = os.path.join(plate_output_dir, f"plate_{license_plate_text}.jpg") cv2.imwrite(license_plate_image_filename, license_plate_crop) license_plate_url = upload_to_firebase(license_plate_image_filename, f"detected_plates/plate_{license_plate_text}_{now_str}_{now_int}.jpg") # print(f"License plate image uploaded: {license_plate_url}") # Debug print # print(f"Current vehicle counts (before potential reset): {vehicle_counts}") # Count vehicles by type after processing all frames vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} for class_id in track_id_to_class_id.values(): vehicle_type = coco_class_to_vehicle_type.get(class_id, 'unknown') if vehicle_type in vehicle_counts: vehicle_counts[vehicle_type] += 1 # Print counts for vehicle_type, count in vehicle_counts.items(): if vehicle_counts.items() == 0: score += count * 1 else: score += count* 4 print(f"Total {vehicle_type}s detected: {count}") data = users_ref.push({'text1': vehicle_type,'text2': count}) # upload_two_texts_append("/right", vehicle_type, count) print(cnt) print('/traffic-score1', score) # print('/traffic-score2', score) #####First # upload_text('/traffic-score1', score) # score1 = ref.child('TrafficScore1') # score1.set(score) #####Second # upload_text('/traffic-score2', score) score2 = ref.child('TrafficScore2') score2.set(score) count = ref.child('count') count.set(count) # upload_text('/Counter', cnt)