Spaces:
Sleeping
Sleeping
import os | |
import folium | |
import confuse | |
import numpy as np | |
from math import isnan | |
import geopandas as gpd | |
from shapely.geometry import Point | |
from PIL import Image | |
from tqdm import tqdm | |
import geopy | |
from geopy.geocoders import Nominatim | |
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable | |
import random | |
import string | |
import os | |
from datetime import datetime, timedelta | |
TOKEN_FILE = "tokens.txt" | |
EXPIRED_FILE = "tokens_expired.txt" | |
NEW_TOKEN_INSTRUCTIONS="""<div style="background-color: #e6f2ff; padding: 20px; border-radius: 5px;"> | |
<h3 style="color: #4CAF50;">Get Your SNET API Token</h3> | |
<p style="color: #666; font-size: 16px;"> | |
To get an API token for the SNET platform: | |
<ol> | |
<li>Purchase tokens on the SNET platform using AGIX</li> | |
<li> <del> Input the email address you used to sign up for the dashboard </del>. Any text will work for testing purposes</li> | |
<li>An API token will be generated for you</li> | |
</ol> | |
The generated token will be valid for unlimited calls to the forecasting API for 15 minutes from your first call, for any of your fields. | |
</p> | |
<p style="color: #666; font-size: 16px;"> | |
You can find us on the SNET platform at: | |
<a href="https://beta.singularitynet.io/servicedetails/org/EnigmaAi/service/farmingID" style="color: #007bff; text-decoration: none;">SNET Platform</a> | |
</p> | |
</div> | |
""" | |
import pandas as pd | |
import os | |
def manage_user_tokens(current_user, api_token, valid_until): | |
""" | |
Manages the storage and retrieval of user API tokens. | |
Args: | |
current_user (str): The username of the currently logged-in user. | |
api_token (str): The API token to be stored. | |
valid_until (str): The expiration date of the API token (in 'YYYY-MM-DD HH:MM:SS' format). | |
Returns: | |
None | |
""" | |
filename = f'{current_user}_tokens.csv' | |
# Check if the file exists | |
if not os.path.exists(filename): | |
# Create a new DataFrame and save it to the file | |
df = pd.DataFrame({'token': [api_token], 'valid_until': [valid_until]}) | |
df.to_csv(filename, index=False) | |
else: | |
# Load the existing DataFrame | |
df = pd.read_csv(filename) | |
# Check if the token already exists in the DataFrame | |
if api_token not in df['token'].values: | |
# Append the new token and valid_until to the DataFrame | |
new_row = {'token': api_token, 'valid_until': valid_until} | |
new_row_df = pd.DataFrame(new_row, index=[0]) | |
df = pd.concat([df, new_row_df], ignore_index=True) | |
df.to_csv(filename, index=False) | |
return df | |
def generate_random_unique_tokens(num_tokens=10, token_file=TOKEN_FILE): | |
'''Generates a list of random unique tokens and saves them to a file.''' | |
if not os.path.exists(token_file): | |
with open(token_file, 'w') as f: | |
tokens = set() | |
while len(tokens) < num_tokens: | |
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) | |
tokens.add(token) | |
for token in tokens: | |
f.write(token + '\n') | |
else: | |
with open(token_file, 'r') as f: | |
tokens = set(f.read().splitlines()) | |
with open(token_file, 'a') as f: | |
while len(tokens) < num_tokens: | |
token = ''.join(random.choices(string.ascii_lowercase + string.digits, k=32)) | |
if token not in tokens: | |
tokens.add(token) | |
f.write(token + '\n') | |
return tokens | |
def confirm_api_token(token, token_file=TOKEN_FILE, expired_file=EXPIRED_FILE): | |
'''Checks if the given token is valid and not expired.''' | |
print(f'Checking token: {token} >>>>>>>>>>>>>>>>>>>') | |
msg = 'Token is valid' | |
with open(token_file, 'r') as f: | |
tokens = set(f.read().splitlines()) # Load All possible tokens | |
if token in tokens: # Check if the token is one of the possible tokens | |
now = datetime.now() | |
if token in load_expired_tokens(expired_file): # Check if the token have been used before | |
if now < load_token_expiration(token, expired_file): # Check if the token has been expired | |
return {'valid': True, 'message': msg} | |
else: # If the token has been expired, return invalid | |
msg = 'Token has expired' | |
return {'valid': False, 'message': msg} | |
else: | |
msg = 'Token is valid' | |
expiry_date = now + timedelta(minutes=15) | |
save_expired_token(token, expiry_date, expired_file) # If the token is valid and have not been used before, set the expiration date to 15 minutes | |
return {'valid': True, 'message': msg} | |
msg = 'Token is invalid' | |
return {'valid': False, 'message': msg} | |
def load_expired_tokens(expired_file=EXPIRED_FILE): | |
'''Loads expired tokens from the file.''' | |
expired_tokens = {} | |
if os.path.exists(expired_file): | |
with open(expired_file, 'r') as f: | |
for line in f: | |
print(line) | |
print(line.strip().split(',')) | |
print('------------') | |
token, expiry_date = line.strip().split(',') | |
expired_tokens[token] = datetime.fromisoformat(expiry_date) | |
return expired_tokens | |
def load_token_expiration(token, expired_file=EXPIRED_FILE): | |
'''Loads the expiration date for a given token.''' | |
expired_tokens = load_expired_tokens(expired_file) | |
return expired_tokens.get(token) | |
def save_expired_token(token, expiry_date, expired_file=EXPIRED_FILE): | |
'''Saves expired tokens to the file.''' | |
if not os.path.exists(expired_file): | |
with open(expired_file, 'w') as f: | |
f.write(f"{token},{expiry_date.isoformat()}\n") | |
else: | |
with open(expired_file, 'a') as f: | |
f.write(f"{token},{expiry_date.isoformat()}\n") | |
def get_region_from_coordinates(latitude, longitude, max_retries=3): | |
geolocator = Nominatim(user_agent="my_agent") | |
for attempt in range(max_retries): | |
try: | |
location = geolocator.reverse(f"{latitude}, {longitude}") | |
if location and location.raw.get('address'): | |
address = location.raw['address'] | |
# Try to get the most relevant administrative level | |
for level in ['state', 'county', 'region', 'province', 'district']: | |
if level in address: | |
return address[level] | |
# If no specific region is found, return the country | |
if 'country' in address: | |
return address['country'] | |
return "Region not found" | |
except (GeocoderTimedOut, GeocoderUnavailable): | |
if attempt == max_retries - 1: | |
return "Geocoding service unavailable" | |
return "Failed to retrieve region information" | |
# Initialzie custom basemaps for folium | |
basemaps = { | |
'Google Maps': folium.TileLayer( | |
tiles = 'https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}', | |
attr = 'Google', | |
name = 'Google Maps', | |
overlay = True, | |
control = True | |
), | |
'Google Satellite': folium.TileLayer( | |
tiles = 'https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}', | |
attr = 'Google', | |
name = 'Google Satellite', | |
overlay = True, | |
control = True | |
), | |
'Google Terrain': folium.TileLayer( | |
tiles = 'https://mt1.google.com/vt/lyrs=p&x={x}&y={y}&z={z}', | |
attr = 'Google', | |
name = 'Google Terrain', | |
overlay = True, | |
control = True | |
), | |
'Google Satellite Hybrid': folium.TileLayer( | |
tiles = 'https://mt1.google.com/vt/lyrs=y&x={x}&y={y}&z={z}', | |
attr = 'Google', | |
name = 'Google Satellite', | |
overlay = True, | |
control = True | |
), | |
'Esri Satellite': folium.TileLayer( | |
tiles = 'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', | |
attr = 'Esri', | |
name = 'Esri Satellite', | |
overlay = True, | |
control = True | |
), | |
'openstreetmap': folium.TileLayer('openstreetmap'), | |
'cartodbdark_matter': folium.TileLayer('cartodbdark_matter') | |
} | |
# Dictionary of JavaScript files (More Readable) | |
scripts_dir = './scripts/' | |
scripts_files = [f for f in os.listdir(scripts_dir) if f.endswith('.js')] | |
Scripts = {} | |
for f in scripts_files: | |
key = f.split('.')[0].upper() | |
with open(scripts_dir + f) as f: | |
Scripts[key] = f.read() | |
def calculate_bbox(df, field): | |
''' | |
Calculate the bounding box of a specfic field ID in a given data frame | |
''' | |
bbox = df.loc[df['name'] == field].bounds | |
r = bbox.iloc[0] | |
return [r.minx, r.miny, r.maxx, r.maxy] | |
def tiff_to_geodataframe(im, metric, date, crs): | |
''' | |
Convert a tiff image to a geodataframe | |
''' | |
x_cords = im.coords['x'].values | |
y_cords = im.coords['y'].values | |
vals = im.values | |
dims = vals.shape | |
points = [] | |
v_s = [] | |
for lat in range(dims[1]): | |
y = y_cords[lat] | |
for lon in range(dims[2]): | |
x = x_cords[lon] | |
v = vals[:,lat,lon] | |
if isnan(v[0]): | |
continue | |
points.append(Point(x,y)) | |
v_s.append(v.item()) | |
d = {f'{metric}_{date}': v_s, 'geometry': points} | |
df = gpd.GeoDataFrame(d, crs = crs) | |
return df | |
def get_bearer_token_headers(bearer_token): | |
''' | |
Get the bearer token headers to be used in the request to the SentinelHub API | |
''' | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': 'Bearer '+ bearer_token, | |
} | |
return headers | |
def get_downloaded_location_img_path(clientName, metric, date, field, extension='tiff'): | |
''' | |
Get the path of the downloaded image in TIFF based on the: | |
''' | |
date_dir = f'./data/{clientName}/raw/{metric}/{date}/field_{field}/' | |
print(f'True Color Date Dir: {date_dir}') | |
os.makedirs(date_dir, exist_ok=True) | |
intermediate_dirs = os.listdir(date_dir) | |
print(f'Intermediate Dirs: {intermediate_dirs}') | |
if len(intermediate_dirs) == 0: | |
return None | |
imagePath = f'{date_dir}{os.listdir(date_dir)[0]}/response.{extension}' | |
print(f'Image Path: {imagePath}') | |
if not os.path.exists(imagePath): | |
return None | |
print(f'Image Path: {imagePath}') | |
return imagePath | |
def get_masked_location_img_path(clientName, metric, date, field): | |
''' | |
Get the path of the downloaded image after applying the mask in TIFF based on the: | |
''' | |
date_dir = f'./data/{clientName}/processed/{metric}/{date}/field_{field}/' | |
imagePath = date_dir + 'masked.tiff' | |
return imagePath | |
def get_curated_location_img_path(clientName, metric, date, field): | |
''' | |
Get the path of the downloaded image after applying the mask and converting it to geojson formay based on the: | |
''' | |
date_dir = f'./data/{clientName}/curated/{metric}/{date}/field_{field}/' | |
imagePath = date_dir + 'masked.geojson' | |
if os.path.exists(imagePath): | |
return imagePath | |
else: | |
return None | |
def parse_app_config(path=r'config-fgm-dev.yaml'): | |
config = confuse.Configuration('CropHealth', __name__) | |
config.set_file(path) | |
return config | |
def fix_image(img): | |
def normalize(band): | |
band_min, band_max = (band.min(), band.max()) | |
return ((band-band_min)/((band_max - band_min))) | |
def brighten(band): | |
alpha=3 | |
beta=0 | |
return np.clip(alpha*band+beta, 0,255) | |
def gammacorr(band): | |
gamma=0.9 | |
return np.power(band, 1/gamma) | |
red = img[:, :, 0] | |
green = img[:, :, 1] | |
blue = img[:, :, 2] | |
red_b=brighten(red) | |
blue_b=brighten(blue) | |
green_b=brighten(green) | |
red_bg=gammacorr(red_b) | |
blue_bg=gammacorr(blue_b) | |
green_bg=gammacorr(green_b) | |
red_bgn = normalize(red_bg) | |
green_bgn = normalize(green_bg) | |
blue_bgn = normalize(blue_bg) | |
rgb_composite_bgn= np.dstack((red_b, green_b, blue_b)) | |
return rgb_composite_bgn | |
def creat_gif(dataset, gif_name, duration=50): | |
''' | |
Create a gif from a list of images | |
''' | |
imgs = [Image.fromarray((255*img).astype(np.uint8)) for img in dataset] | |
# duration is the number of milliseconds between frames; this is 40 frames per second | |
imgs[0].save(gif_name, save_all=True, append_images=imgs[1:], duration=duration, loop=1) | |
def add_lat_lon_to_gdf_from_geometry(gdf): | |
gdf['Lat'] = gdf['geometry'].apply(lambda p: p.x) | |
gdf['Lon'] = gdf['geometry'].apply(lambda p: p.y) | |
return gdf | |
def gdf_column_to_one_band_array(gdf, column_name): | |
gdf = gdf.sort_values(by=['Lat', 'Lon']) | |
gdf = gdf.reset_index(drop=True) | |
unique_lats_count = gdf['Lat'].nunique() | |
unique_lons_count = gdf['Lon'].nunique() | |
rows_arr = [[] for i in range(unique_lats_count)] | |
column_values = gdf[column_name].values | |
for i in tqdm(range(len(column_values))): | |
row_index = i // unique_lons_count | |
rows_arr[row_index].append(column_values[i]) | |
max_row_length = max([len(row) for row in rows_arr]) | |
for row in rows_arr: | |
while len(row) < max_row_length: | |
row.append(0) | |
rows_arr = np.array(rows_arr) | |
return rows_arr |