File size: 6,527 Bytes
2cdce84 46c0e18 2cdce84 46c0e18 2cdce84 46c0e18 2cdce84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import os
import folium
import confuse
import numpy as np
from math import isnan
import geopandas as gpd
from shapely.geometry import Point
from PIL import Image
from tqdm import tqdm
# Initialzie custom basemaps for folium
basemaps = {
'Google Maps': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=m&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Maps',
overlay = True,
control = True
),
'Google Satellite': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=s&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Satellite',
overlay = True,
control = True
),
'Google Terrain': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=p&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Terrain',
overlay = True,
control = True
),
'Google Satellite Hybrid': folium.TileLayer(
tiles = 'https://mt1.google.com/vt/lyrs=y&x={x}&y={y}&z={z}',
attr = 'Google',
name = 'Google Satellite',
overlay = True,
control = True
),
'Esri Satellite': folium.TileLayer(
tiles = 'https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr = 'Esri',
name = 'Esri Satellite',
overlay = True,
control = True
),
'openstreetmap': folium.TileLayer('openstreetmap'),
'cartodbdark_matter': folium.TileLayer('cartodbdark_matter')
}
# Dictionary of JavaScript files (More Readable)
scripts_dir = './scripts/'
scripts_files = [f for f in os.listdir(scripts_dir) if f.endswith('.js')]
Scripts = {}
for f in scripts_files:
key = f.split('.')[0].upper()
with open(scripts_dir + f) as f:
Scripts[key] = f.read()
def calculate_bbox(df, field):
'''
Calculate the bounding box of a specfic field ID in a given data frame
'''
bbox = df.loc[df['name'] == field].bounds
r = bbox.iloc[0]
return [r.minx, r.miny, r.maxx, r.maxy]
def tiff_to_geodataframe(im, metric, date, crs):
'''
Convert a tiff image to a geodataframe
'''
x_cords = im.coords['x'].values
y_cords = im.coords['y'].values
vals = im.values
dims = vals.shape
points = []
v_s = []
for lat in range(dims[1]):
y = y_cords[lat]
for lon in range(dims[2]):
x = x_cords[lon]
v = vals[:,lat,lon]
if isnan(v[0]):
continue
points.append(Point(x,y))
v_s.append(v.item())
d = {f'{metric}_{date}': v_s, 'geometry': points}
df = gpd.GeoDataFrame(d, crs = crs)
return df
def get_bearer_token_headers(bearer_token):
'''
Get the bearer token headers to be used in the request to the SentinelHub API
'''
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer '+ bearer_token,
}
return headers
def get_downloaded_location_img_path(clientName, metric, date, field, extension='tiff'):
'''
Get the path of the downloaded image in TIFF based on the:
'''
date_dir = f'./data/{clientName}/raw/{metric}/{date}/field_{field}/'
print(f'True Color Date Dir: {date_dir}')
os.makedirs(date_dir, exist_ok=True)
intermediate_dirs = os.listdir(date_dir)
print(f'Intermediate Dirs: {intermediate_dirs}')
if len(intermediate_dirs) == 0:
return None
imagePath = f'{date_dir}{os.listdir(date_dir)[0]}/response.{extension}'
print(f'Image Path: {imagePath}')
if not os.path.exists(imagePath):
return None
print(f'Image Path: {imagePath}')
return imagePath
def get_masked_location_img_path(clientName, metric, date, field):
'''
Get the path of the downloaded image after applying the mask in TIFF based on the:
'''
date_dir = f'./data/{clientName}/processed/{metric}/{date}/field_{field}/'
imagePath = date_dir + 'masked.tiff'
return imagePath
def get_curated_location_img_path(clientName, metric, date, field):
'''
Get the path of the downloaded image after applying the mask and converting it to geojson formay based on the:
'''
date_dir = f'./data/{clientName}/curated/{metric}/{date}/field_{field}/'
imagePath = date_dir + 'masked.geojson'
if os.path.exists(imagePath):
return imagePath
else:
return None
def parse_app_config(path=r'config-fgm-dev.yaml'):
config = confuse.Configuration('CropHealth', __name__)
config.set_file(path)
return config
def fix_image(img):
def normalize(band):
band_min, band_max = (band.min(), band.max())
return ((band-band_min)/((band_max - band_min)))
def brighten(band):
alpha=3
beta=0
return np.clip(alpha*band+beta, 0,255)
def gammacorr(band):
gamma=0.9
return np.power(band, 1/gamma)
red = img[:, :, 0]
green = img[:, :, 1]
blue = img[:, :, 2]
red_b=brighten(red)
blue_b=brighten(blue)
green_b=brighten(green)
red_bg=gammacorr(red_b)
blue_bg=gammacorr(blue_b)
green_bg=gammacorr(green_b)
red_bgn = normalize(red_bg)
green_bgn = normalize(green_bg)
blue_bgn = normalize(blue_bg)
rgb_composite_bgn= np.dstack((red_b, green_b, blue_b))
return rgb_composite_bgn
def creat_gif(dataset, gif_name, duration=50):
'''
Create a gif from a list of images
'''
imgs = [Image.fromarray((255*img).astype(np.uint8)) for img in dataset]
# duration is the number of milliseconds between frames; this is 40 frames per second
imgs[0].save(gif_name, save_all=True, append_images=imgs[1:], duration=duration, loop=1)
def add_lat_lon_to_gdf_from_geometry(gdf):
gdf['Lat'] = gdf['geometry'].apply(lambda p: p.x)
gdf['Lon'] = gdf['geometry'].apply(lambda p: p.y)
return gdf
def gdf_column_to_one_band_array(gdf, column_name):
gdf = gdf.sort_values(by=['Lat', 'Lon'])
gdf = gdf.reset_index(drop=True)
unique_lats_count = gdf['Lat'].nunique()
unique_lons_count = gdf['Lon'].nunique()
rows_arr = [[] for i in range(unique_lats_count)]
column_values = gdf[column_name].values
for i in tqdm(range(len(column_values))):
row_index = i // unique_lons_count
rows_arr[row_index].append(column_values[i])
max_row_length = max([len(row) for row in rows_arr])
for row in rows_arr:
while len(row) < max_row_length:
row.append(0)
rows_arr = np.array(rows_arr)
return rows_arr |