Select Git revision
editsalamander.py
-
Herman Andersen Dyrkorn authoredHerman Andersen Dyrkorn authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
editsalamander.py 16.38 KiB
from flask import request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_restful import Resource
from api import db, limiter
import os
from api.models.dbmodels import Location, SalamanderGrowth, Salamander, User
from algorithm.brute_force_matching import match_file_structure
from api.endpoints.matchsalamander import sanitize_number_str
import glob
from shutil import move
from path_constants import _ACCESS_DATABASE
from image_encoder.image_encoder import *
import cv2
class EditSalamander(Resource):
decorators = [limiter.limit("60/minute")]
@staticmethod
@jwt_required
def get(salamander_id, image_id):
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
if user.admin:
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=salamander_id).first()
salamander_growth = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=image_id).first()
if salamander and salamander_growth:
location = db.session.query(Location).filter_by(id=salamander.location_id).first()
salamander_images = []
path_to_salamander_images = os.path.join("./images", location.name, salamander.species,
salamander.sex, str(salamander.id))
list_of_paths = glob.glob(os.path.join(path_to_salamander_images, '*.*'))
for path in list_of_paths:
basename = os.path.basename(path)
if basename.__contains__(str(image_id)):
image = cv2.imread(path)
if not basename.__contains__("str"): # Scale only original to set size
height, width, _ = image.shape
desired_long_side = 320
scaling_factor = 1
if width > height:
scaling_factor = desired_long_side / width
else:
scaling_factor = desired_long_side / height
new_dim = (int(width * scaling_factor), int(height * scaling_factor))
# resize image
image = cv2.resize(image, new_dim, interpolation=cv2.INTER_AREA)
_, buffer = cv2.imencode('.jpg', image)
encoded = base64.b64encode(buffer)
image_data = {"url": "data:image/png;base64," + encoded.decode()}
salamander_images.append(image_data)
return jsonify({"images": salamander_images, "length": salamander_growth.length,
"weight": salamander_growth.weight, "imageId": image_id, "location": location.name,
"salamanderId": salamander.id, "sex": salamander.sex, "species": salamander.species,
'status': 200})
else:
return jsonify({"message": "no salamander with this id or no growth data", 'status': 400})
else:
return jsonify({"message": "no access to this endpoint", 'status': 400})
@staticmethod
@jwt_required
def delete():
data = request.form
if "id" in data and "image_id" in data and "location" in data:
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
location = db.session.query(Location).filter_by(name=data['location']).first()
if user.admin and salamander and location:
salamander_path = os.path.join("./images", location.name, salamander.species, salamander.sex,
str(salamander.id))
img_id = int(data['image_id'])
last_id = int(len(os.listdir(salamander_path)) / 2) - 1
if img_id >= int(len(os.listdir(salamander_path)) / 2):
return jsonify({"message": "this image id does not exist", "status": 400})
original_path = glob.glob(os.path.join(salamander_path, str(img_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(img_id) + '_str.*'))[0]
os.remove(original_path)
os.remove(processed_path)
delete_growth_row(salamander.id, img_id)
# if last image was moved:
if len(os.listdir(salamander_path)) == 0:
os.rmdir(salamander_path)
db.session.delete(salamander)
db.session.commit()
return jsonify(
{"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
else:
if img_id != last_id:
original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
original_extension = os.path.splitext(original_path)[1]
processed_extension = os.path.splitext(processed_path)[1]
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(
salamander_id=salamander.id, image_id=last_id).first()
salamander_growth_row.image_id = img_id
db.session.commit()
os.rename(processed_path,
os.path.join(salamander_path, str(img_id) + "_str" + processed_extension))
os.rename(original_path, os.path.join(salamander_path, str(img_id) + original_extension))
return jsonify(
{"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
else:
return jsonify({"message": "this user cannot delete this salamander", 'status': 400})
else:
return jsonify({"message": "wrong data in form", 'status': 400})
@staticmethod
@jwt_required
def put():
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
data = request.form
if user.admin and "id" in data and "image_id" in data:
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
if salamander:
growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=int(data['image_id'])).first()
if "weight" in data:
growth_row.weight = sanitize_number_str(str(data['weight']))
db.session.commit()
if "length" in data:
growth_row.length = sanitize_number_str(str(data['length']))
db.session.commit()
if "location" in data and "new_location" in data and "new_sex" in data and "new_species" in data:
if data['location'] != data['new_location'] or salamander.sex != data['new_sex'] or \
salamander.species != data['new_species']:
salamander_path = os.path.join("images", data['location'], salamander.species,
salamander.sex, str(salamander.id))
new_path_to_images = os.path.join("images", data['new_location'], data['new_species'],
data['new_sex'])
original_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '.*'))
processed_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '_str.*'))
if len(original_path) > 0 and len(processed_path) > 0:
original_path = original_path[0]
processed_path = processed_path[0]
image_id = int(data['image_id'])
last_id = int(len(os.listdir(salamander_path)) / 2) - 1
if data['new_sex'] != "juvenile":
result = match_file_structure(input_image=processed_path,
match_directory=new_path_to_images)
if result:
# if salamander already exist:
if result > -1:
# move all images and delete previous ID.:
move_images(salamander.id, result, image_id,
path_to_salamander=os.path.join(new_path_to_images,
str(result)),
path_to_original_image=original_path,
path_to_processed_image=processed_path)
else:
add_salamander(salamander.id, image_id, data['new_location'],
data['new_species'],
data['new_sex'], new_path_to_images, user_id, original_path,
processed_path)
return handle_remaining_images(salamander, salamander_path, image_id, last_id,
True, result)
else:
return jsonify(
{"message": "Salamander image, " + data['image_id'] + " does not exist",
'status': 400})
# if last image was moved:
else:
add_salamander(salamander.id, image_id, data['new_location'], data['new_species'],
data['new_sex'], new_path_to_images, user_id, original_path,
processed_path)
return handle_remaining_images(salamander, salamander_path, image_id, last_id,
False, 0)
else:
return jsonify({"message": "Image id, " + data[
'image_id'] + " does not exist in salamander_id, " + data['id'], 'status': 400})
else:
return jsonify(
{"message": "salamander cannot be moved to the same location", 'status': 400})
else:
return jsonify({"message": "salamander updated", 'status': 200})
else:
return jsonify({"message": "Salamander ID" + data['id'] + " does not exist", 'status': 400})
else:
return jsonify({"message": "Non admin users cannot move salamander", 'status': 400})
def delete_growth_row(salamander_id, image_id):
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=image_id).first()
if salamander_growth_row:
db.session.delete(salamander_growth_row)
db.session.commit()
def handle_remaining_images(salamander, salamander_path: str, image_id: int, last_id: int, match_bool, result):
# if last image was moved:
if len(os.listdir(salamander_path)) == 0:
os.rmdir(salamander_path)
db.session.delete(salamander)
db.session.commit()
if match_bool:
return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
else:
return jsonify(
{"id": salamander.id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
else:
if image_id != last_id:
original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
original_extension = os.path.splitext(original_path)[1]
processed_extension = os.path.splitext(processed_path)[1]
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=last_id).first()
salamander_growth_row.image_id = image_id
db.session.commit()
os.rename(processed_path, os.path.join(salamander_path, str(image_id) + "_str" + processed_extension))
os.rename(original_path, os.path.join(salamander_path, str(image_id) + original_extension))
if match_bool:
return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
else:
return jsonify(
{"id": salamander.id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
def lower_image_id_growth_row(salamander_id, old_id, new_id):
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=old_id).first()
if salamander_growth_row:
salamander_growth_row.image_id = new_id
db.session.commit()
def move_images(salamander_id, new_salamander_id, image_id, path_to_salamander, path_to_original_image,
path_to_processed_image):
path_to_salamander = os.path.abspath(path_to_salamander)
split_name_org = str(path_to_original_image).split('.')
split_name_proc = str(path_to_processed_image).split('.')
number_of_files = int(len(os.listdir(path_to_salamander)) / 2)
a = os.path.join(path_to_salamander, str(number_of_files)) + "." + split_name_org[len(split_name_org) - 1]
b = os.path.join(path_to_salamander, str(number_of_files)) + "_str." + split_name_proc[len(split_name_org) - 1]
move(path_to_original_image, a)
move(path_to_processed_image, b)
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=image_id).first()
if salamander_growth_row:
salamander_growth_row.image_id = number_of_files
salamander_growth_row.salamander_id = new_salamander_id
db.session.commit()
def add_salamander(old_salamander_id, image_id, location, species, sex, path_to_images, user_id, path_to_original_image,
path_to_processed_image):
location_id = db.session.query(Location.id).filter_by(name=location.lower()).first()
new_salamander = Salamander(sex=sex, species=species, location_id=location_id[0], uid=user_id)
db.session.add(new_salamander)
db.session.commit()
salamander_id = db.session.query(Salamander.id).filter_by(uid=user_id).order_by(Salamander.id.desc()).first()
path_for_new_salamander = os.path.join(path_to_images, str(salamander_id[0]))
os.makedirs(path_for_new_salamander)
move_images(old_salamander_id, salamander_id[0], image_id, path_for_new_salamander, path_to_original_image,
path_to_processed_image)
return salamander_id[0]