|
from ultralytics import YOLO |
|
import cv2 |
|
import numpy as np |
|
from scipy.spatial import KDTree |
|
import os |
|
import datetime |
|
from sort.sort import * |
|
import util |
|
import io |
|
from util import get_car, read_license_plate |
|
import firebase_admin |
|
from firebase_admin import credentials, db, storage |
|
import base64 |
|
|
|
|
|
cred = credentials.Certificate("anpr-v3-b5bb8-firebase-adminsdk-8pkgt-d88b8f69b1.json") |
|
firebase_admin.initialize_app(cred, { |
|
'databaseURL': 'https://anpr-v3-b5bb8-default-rtdb.asia-southeast1.firebasedatabase.app', |
|
'storageBucket': 'anpr-v3-b5bb8.appspot.com' |
|
}) |
|
ref = db.reference('/') |
|
users_ref = ref.child('right') |
|
|
|
|
|
root_ref = db.reference('/Detected') |
|
|
|
Detected_data = root_ref.get() |
|
plates = [data['plate'] for data in Detected_data.values()] if Detected_data else [] |
|
print(plates) |
|
|
|
|
|
car_output_dir = "detected_cars" |
|
plate_output_dir = "detected_plates" |
|
Detected_dir = "DATA" |
|
|
|
if not os.path.exists(car_output_dir): |
|
os.makedirs(car_output_dir) |
|
if not os.path.exists(plate_output_dir): |
|
os.makedirs(plate_output_dir) |
|
|
|
results = {} |
|
mot_tracker = Sort() |
|
|
|
|
|
coco_model = YOLO('yolov8n.pt') |
|
license_plate_detector = YOLO('./models/run46.pt') |
|
model = YOLO('car.pt') |
|
model1 = YOLO('color.pt') |
|
|
|
cap = cv2.VideoCapture('./sample1.mp4') |
|
|
|
vehicles = [2, 3, 4, 5, 6, 7] |
|
|
|
frame_skip = 40 |
|
|
|
|
|
frame_nmr = -1 |
|
ret = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class_map_color = { |
|
0: 'beige', |
|
1: 'black', |
|
2: 'blue', |
|
3: 'brown', |
|
4: 'gold', |
|
5: 'green', |
|
6: 'grey', |
|
7: 'orange', |
|
8: 'pink', |
|
9: 'purple', |
|
10: 'red', |
|
11: 'sivler', |
|
12: 'tan', |
|
13: 'white', |
|
14: 'yellow' |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_plate(search_plate, plate_array): |
|
return search_plate in plate_array |
|
|
|
def reset_counts(): |
|
global vehicle_counts, current_frame_count, unique_vehicle_ids |
|
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} |
|
unique_vehicle_ids.clear() |
|
current_frame_count = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def upload_to_firebase(filename, destination_blob_name): |
|
bucket = storage.bucket() |
|
blob = bucket.blob(destination_blob_name) |
|
blob.upload_from_filename(filename) |
|
|
|
print(f"File {filename} uploaded to {destination_blob_name}.") |
|
|
|
|
|
return blob.public_url |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
coco_class_to_vehicle_type = { |
|
2: 'car', 3: 'motorcycle', 4: 'airplane', 5: 'bus', 6: 'train', 7: 'truck' |
|
} |
|
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} |
|
|
|
unique_vehicle_ids = set() |
|
|
|
track_id_to_class_id = {} |
|
cnt = 0 |
|
score = 1 |
|
frame_rate = cap.get(cv2.CAP_PROP_FPS) |
|
count_duration_seconds = 10 |
|
frame_count_for_reset = int(frame_rate * count_duration_seconds) |
|
current_frame_count = 0 |
|
|
|
|
|
class_map = {0: 'Convertible', 1: 'Coupe', 2: 'Hatchback', 3: 'Pickup', 4: 'SUV', 5: 'Sedan'} |
|
|
|
while ret: |
|
frame_nmr += 1 |
|
ret, frame = cap.read() |
|
current_frame_count += 1 |
|
|
|
if ret and frame_nmr % frame_skip == 0: |
|
|
|
detections = coco_model(frame)[0] |
|
detections_ = [] |
|
|
|
for detection in detections.boxes.data.tolist(): |
|
x1, y1, x2, y2, score, class_id = detection |
|
cnt+=1 |
|
if int(class_id) in vehicles: |
|
|
|
detections_.append([x1, y1, x2, y2, score, class_id]) |
|
|
|
|
|
tracking_data = np.array([d[:5] for d in detections_]) |
|
track_bbs_ids = mot_tracker.update(tracking_data) |
|
|
|
|
|
for track in track_bbs_ids: |
|
track_id = int(track[4]) |
|
|
|
for d in detections_: |
|
if all([np.isclose(track[i], d[i], atol=1e-3) for i in range(4)]): |
|
class_id = d[5] |
|
track_id_to_class_id[track_id] = class_id |
|
break |
|
|
|
|
|
|
|
|
|
|
|
|
|
track_ids = mot_tracker.update(np.asarray(detections_)) |
|
for track in track_bbs_ids: |
|
track_id = int(track[4]) |
|
unique_vehicle_ids.add(track_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
license_plates = license_plate_detector(frame)[0] |
|
|
|
|
|
|
|
|
|
|
|
for license_plate in license_plates.boxes.data.tolist(): |
|
x1, y1, x2, y2, score, class_id = license_plate |
|
|
|
|
|
xcar1, ycar1, xcar2, ycar2, car_id = get_car(license_plate, track_ids) |
|
|
|
if car_id != -1: |
|
print(f"Car ID: {car_id}, Confidence: {score}") |
|
|
|
|
|
license_plate_crop = frame[int(y1):int(y2), int(x1): int(x2), :] |
|
license_plate_crop_gray = cv2.cvtColor(license_plate_crop, cv2.COLOR_BGR2GRAY) |
|
_, license_plate_crop_thresh = cv2.threshold(license_plate_crop_gray, 64, 255, cv2.THRESH_BINARY_INV) |
|
|
|
|
|
license_plate_text, license_plate_text_score = read_license_plate(license_plate_crop_thresh) |
|
|
|
|
|
|
|
if license_plate_text is not None: |
|
print(f"License Plate Text: {license_plate_text}") |
|
|
|
is_plate_found = find_plate(license_plate_text, plates) |
|
print(f"Is the plate '{license_plate_text}' found? {'Yes' if is_plate_found else 'No'}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
car_image = frame[int(ycar1):int(ycar2), int(xcar1): int(xcar2), :] |
|
|
|
|
|
|
|
|
|
results1 = model1(car_image) |
|
|
|
|
|
top_prediction_index = results1[0].probs.top5[0] |
|
top_prediction_prob = results1[0].probs.top5conf[0].item() |
|
|
|
|
|
car_color = class_map_color[top_prediction_index] |
|
print(f"{car_color}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
results = model(car_image) |
|
|
|
|
|
top_prediction_index = results[0].probs.top5[0] |
|
top_prediction_prob = results[0].probs.top5conf[0].item() |
|
|
|
|
|
car_type = class_map[top_prediction_index] |
|
print(f"{car_type}") |
|
|
|
now_str = datetime.datetime.utcnow().replace(microsecond=0).isoformat() |
|
|
|
now_int = int(datetime.datetime.utcnow().timestamp() * 1000) |
|
|
|
|
|
|
|
car_image_filename = os.path.join(car_output_dir, f"car_{license_plate_text}.jpg") |
|
cv2.imwrite(car_image_filename, car_image) |
|
car_image_url = upload_to_firebase(car_image_filename, f"detected_cars/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg") |
|
|
|
if(is_plate_found): |
|
Detected_dir1 = os.path.join(Detected_dir, f"car_{license_plate_text}.jpg") |
|
cv2.imwrite(Detected_dir1, car_image) |
|
car_image_url = upload_to_firebase(Detected_dir1, f"DATA/car_{license_plate_text}_{now_str}_{now_int}_{car_color}_{car_type}.jpg") |
|
|
|
|
|
|
|
license_plate_image_filename = os.path.join(plate_output_dir, f"plate_{license_plate_text}.jpg") |
|
cv2.imwrite(license_plate_image_filename, license_plate_crop) |
|
license_plate_url = upload_to_firebase(license_plate_image_filename, f"detected_plates/plate_{license_plate_text}_{now_str}_{now_int}.jpg") |
|
|
|
|
|
|
|
|
|
|
|
|
|
vehicle_counts = {v: 0 for v in coco_class_to_vehicle_type.values()} |
|
for class_id in track_id_to_class_id.values(): |
|
vehicle_type = coco_class_to_vehicle_type.get(class_id, 'unknown') |
|
if vehicle_type in vehicle_counts: |
|
vehicle_counts[vehicle_type] += 1 |
|
|
|
|
|
for vehicle_type, count in vehicle_counts.items(): |
|
if vehicle_counts.items() == 0: |
|
score += count * 1 |
|
else: |
|
score += count* 4 |
|
|
|
print(f"Total {vehicle_type}s detected: {count}") |
|
data = users_ref.push({'text1': vehicle_type,'text2': count}) |
|
|
|
|
|
print(cnt) |
|
print('/traffic-score1', score) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
score2 = ref.child('TrafficScore2') |
|
score2.set(score) |
|
|
|
|
|
count = ref.child('count') |
|
count.set(count) |
|
|