File size: 6,361 Bytes
9d3162f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import cv2 
import numpy as np
import requests
import base64
import time
import websockets
import asyncio
import cv2
import json
import os
import imutils
class CaesarSendWeb:
    @classmethod
    def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"):
        cap = cv2.VideoCapture(0)
        while True:
            _, image = cap.read()
            #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
            response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
            valresp = response.json()
            
            imagebase64 = np.array(valresp["frame"])
            
            image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
            cv2.imshow("image", image)
            if ord("q") == cv2.waitKey(1):
                break

        cap.release()
        cv2.destroyAllWindows()
    @classmethod
    def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False):
        if phone == False:
            camera = cv2.VideoCapture(0)

        async def main():
            # Connect to the server
            #uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
            #uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
            async with websockets.connect(uri) as ws:
                while True:
                    if phone == False:
                        success, frame = camera.read()
                    elif phone == True:
                        img_resp = requests.get("http://192.168.0.14:8080/shot.jpg")
                        frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8)
                        frame = cv2.imdecode(frame_arr,-1)
                        frame = imutils.resize(frame,width=500,height=600)
                        success = True
                        #print(frame.shape)
                        #1080, 1920, 3)
                    #print(success)
                    #print(frame.shape)
                    if not success:
                        break
                    else:
                        #print("hi")
                        ret, buffer = cv2.imencode('.png', frame)
                        await ws.send(buffer.tobytes())
                        if jsondata:
                            await ws.send(json.dumps(jsondata))
                        
                        contents = await ws.recv()
                        if type(contents) == bytes:
                            arr = np.frombuffer(contents, np.uint8)
                            frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
                            cv2.imshow('frame',frameobj)
                            cv2.waitKey(1)
                        if type(contents) == str:
                            print(json.loads(contents))
        asyncio.run(main())
    @classmethod
    def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False):

        cap = cv2.VideoCapture(0)
        
        _, image = cap.read()
        #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
        response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
        messageresp = response.json()
        if showimage == True:
            cv2.imshow('frame',image)
            cv2.waitKey(0)
            # closing all open windows
            cv2.destroyAllWindows()
        
        return messageresp
    @classmethod
    def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False):

        cap = cv2.VideoCapture(0)
        
        _, image = cap.read()
        #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect"
        
        response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]})
        valresp = response.json()
        if valresp["frame"] == "no face was detected.":
            print("No face was detected.")
        if valresp["frame"] != "no face was detected.":
            imagebase64 = np.array(valresp["frame"])
            
            image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3)
            if showimage == True:
                cv2.imshow('frame',image)
                cv2.waitKey(0)
                # closing all open windows
                cv2.destroyAllWindows()
            if saveimage == True:
                filename = "croppedimage.png"
                count = 0
                while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"):
                    filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png"
                    count +=1
                cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image)

   

        return image
if __name__ == "__main__":
    #success, frame = camera.read()

    #yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws'
    #CaesarSendWeb.send_video_websocket(uri = yolo_uri)
    
    #video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows'
    #CaesarSendWeb.send_video_websocket(uri = video_uri)
    
    #extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws"
    #CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]})

    #ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws"
    #CaesarSendWeb.send_video_websocket(uri = ocrws_uri)
    
    #ocr_uri = "http://192.168.0.10:7860/caesarocr"
    #message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True)
    #print(message)
    
    facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws"
    CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True)
    
    #facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap"
    #cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True)
    #print(cropped_image)
    



    #https_uri = 'http://192.168.0.10:7860/caesarobjectdetect'
    #CaesarSendWeb.send_video_https(uri = https_uri)