Select Git revision
editsalamander.py
-
Anders Langlie authoredAnders Langlie authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
editsalamander.py 16.95 KiB
from flask import request, jsonify
from flask_jwt_extended import jwt_required, get_jwt_identity
from flask_restful import Resource
from api import db, limiter
import os
from api.models.dbmodels import Location, SalamanderGrowth, Salamander, User
from algorithm.brute_force_matching import match_file_structure
from api.endpoints.matchsalamander import sanitize_number_str
import glob
from shutil import move
from path_constants import _ACCESS_DATABASE
from image_encoder.image_encoder import *
from cv2 import cv2
import numpy as np
"""
Endpoint for editing salamanders.
GET: Gets a specific salamanders original and processed image, and all its data.
PUT: Edits a specific salamander with new data like sex, species, location etc.
DELETE: Deletes a specific salamander from the system.
"""
class EditSalamander(Resource):
decorators = [limiter.limit("60/minute")]
@staticmethod
@jwt_required
def get(salamander_id, image_id):
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
if user.admin:
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=salamander_id).first()
salamander_growth = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=image_id).first()
if salamander and salamander_growth:
location = db.session.query(Location).filter_by(id=salamander.location_id).first()
salamander_images = []
path_to_salamander_images = os.path.join("./images", location.name, salamander.species,
salamander.sex, str(salamander.id))
list_of_paths = glob.glob(os.path.join(path_to_salamander_images, '*.*'))
for path in list_of_paths:
basename = os.path.basename(path)
if basename.__contains__(str(image_id)):
image = cv2.imdecode(np.fromfile(path, dtype=np.uint8), cv2.IMREAD_UNCHANGED)
if image is None:
raise FileNotFoundError("Cannot find image file " + path)
if not basename.__contains__("str"): # Scale only original to set size
height, width, _ = image.shape
desired_long_side = 320
if width > height:
scaling_factor = desired_long_side / width
else:
scaling_factor = desired_long_side / height
new_dim = (int(width * scaling_factor), int(height * scaling_factor))
# resize image
image = cv2.resize(image, new_dim, interpolation=cv2.INTER_AREA)
_, buffer = cv2.imencode('.jpg', image)
encoded = base64.b64encode(buffer)
image_data = {"url": "data:image/png;base64," + encoded.decode()}
salamander_images.append(image_data)
return jsonify({"images": salamander_images, "length": salamander_growth.length,
"weight": salamander_growth.weight, "imageId": image_id, "location": location.name,
"salamanderId": salamander.id, "sex": salamander.sex, "species": salamander.species,
'status': 200})
else:
return jsonify({"message": "no salamander with this id or no growth data", 'status': 400})
else:
return jsonify({"message": "no access to this endpoint", 'status': 400})
@staticmethod
@jwt_required
def delete():
data = request.form
if "id" in data and "image_id" in data and "location" in data:
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
location = db.session.query(Location).filter_by(name=data['location']).first()
if user.admin and salamander and location:
salamander_path = os.path.join("./images", location.name, salamander.species, salamander.sex,
str(salamander.id))
img_id = int(data['image_id'])
last_id = int(len(os.listdir(salamander_path)) / 2) - 1
if img_id >= int(len(os.listdir(salamander_path)) / 2):
return jsonify({"message": "this image id does not exist", "status": 400})
original_path = glob.glob(os.path.join(salamander_path, str(img_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(img_id) + '_str.*'))[0]
os.remove(original_path)
os.remove(processed_path)
delete_growth_row(salamander.id, img_id)
# if last image was moved:
if len(os.listdir(salamander_path)) == 0:
os.rmdir(salamander_path)
db.session.delete(salamander)
db.session.commit()
return jsonify(
{"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
else:
if img_id != last_id:
original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
original_extension = os.path.splitext(original_path)[1]
processed_extension = os.path.splitext(processed_path)[1]
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(
salamander_id=salamander.id, image_id=last_id).first()
salamander_growth_row.image_id = img_id
db.session.commit()
os.rename(processed_path,
os.path.join(salamander_path, str(img_id) + "_str" + processed_extension))
os.rename(original_path, os.path.join(salamander_path, str(img_id) + original_extension))
return jsonify(
{"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
else:
return jsonify({"message": "this user cannot delete this salamander", 'status': 400})
else:
return jsonify({"message": "wrong data in form", 'status': 400})
@staticmethod
@jwt_required
def put():
user_id = get_jwt_identity()
user = db.session.query(User).filter_by(id=user_id).first()
data = request.form
if user.admin and "id" in data and "image_id" in data:
with _ACCESS_DATABASE:
salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
if salamander:
growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=int(data['image_id'])).first()
if "weight" in data and "length" in data:
growth_row.weight = sanitize_number_str(str(data['weight']))
growth_row.length = sanitize_number_str(str(data['length']))
db.session.commit()
if "location" in data and "new_location" in data and "new_sex" in data and "new_species" in data:
if data['location'] != data['new_location'] or salamander.sex != data[
'new_sex'] or salamander.species != data['new_species']:
salamander_path = os.path.join("images", data['location'], salamander.species,
salamander.sex, str(salamander.id))
new_path_to_images = os.path.join("images", data['new_location'], data['new_species'],
data['new_sex'])
original_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '.*'))
processed_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '_str.*'))
if len(original_path) > 0 and len(processed_path) > 0:
original_path = original_path[0]
processed_path = processed_path[0]
image_id = int(data['image_id'])
last_id = int(len(os.listdir(salamander_path)) / 2) - 1
if data['new_sex'] != "juvenile":
result = match_file_structure(input_image=processed_path,
match_directory=new_path_to_images)
if result:
# if salamander already exist:
if result > -1:
# move all images and delete previous ID.:
move_images(salamander.id, result, image_id,
path_to_salamander=os.path.join(new_path_to_images,
str(result)),
path_to_original_image=original_path,
path_to_processed_image=processed_path)
new_id = result
else:
new_id = add_salamander(salamander.id, image_id, data['new_location'],
data['new_species'],
data['new_sex'], new_path_to_images, user_id,
original_path,
processed_path)
return handle_remaining_images(salamander, salamander_path, image_id, last_id,
True, result, new_id)
else:
return jsonify(
{"message": "Salamander image, " + data['image_id'] + " does not exist",
'status': 400})
# if last image was moved:
else:
new_id = add_salamander(salamander.id, image_id, data['new_location'],
data['new_species'],
data['new_sex'], new_path_to_images, user_id, original_path,
processed_path)
return handle_remaining_images(salamander, salamander_path, image_id, last_id,
False, 0, new_id)
else:
return jsonify({"message": "Image id, " + data[
'image_id'] + " does not exist in salamander_id, " + data['id'], 'status': 400})
else:
return jsonify({"message": "salamander data updated", 'status': 200})
else:
return jsonify({"message": "wrong data", 'status': 400})
else:
return jsonify({"message": "Salamander ID" + data['id'] + " does not exist", 'status': 400})
else:
return jsonify({"message": "Non admin users cannot move salamander", 'status': 400})
def delete_growth_row(salamander_id, image_id):
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=image_id).first()
if salamander_growth_row:
db.session.delete(salamander_growth_row)
db.session.commit()
def handle_remaining_images(salamander, salamander_path: str, image_id: int, last_id: int, match_bool, result, new_id):
# if last image was moved:
if len(os.listdir(salamander_path)) == 0:
os.rmdir(salamander_path)
db.session.delete(salamander)
db.session.commit()
if match_bool and result > -1:
return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
else:
return jsonify(
{"id": new_id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
else:
if image_id != last_id:
original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
original_extension = os.path.splitext(original_path)[1]
processed_extension = os.path.splitext(processed_path)[1]
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
image_id=last_id).first()
salamander_growth_row.image_id = image_id
db.session.commit()
os.rename(processed_path, os.path.join(salamander_path, str(image_id) + "_str" + processed_extension))
os.rename(original_path, os.path.join(salamander_path, str(image_id) + original_extension))
if match_bool and result > -1:
return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
else:
return jsonify(
{"id": new_id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
def lower_image_id_growth_row(salamander_id, old_id, new_id):
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=old_id).first()
if salamander_growth_row:
salamander_growth_row.image_id = new_id
db.session.commit()
def move_images(salamander_id, new_salamander_id, image_id, path_to_salamander, path_to_original_image,
path_to_processed_image):
path_to_salamander = os.path.abspath(path_to_salamander)
split_name_org = str(path_to_original_image).split('.')
split_name_proc = str(path_to_processed_image).split('.')
number_of_files = int(len(os.listdir(path_to_salamander)) / 2)
a = os.path.join(path_to_salamander, str(number_of_files)) + "." + split_name_org[len(split_name_org) - 1]
b = os.path.join(path_to_salamander, str(number_of_files)) + "_str." + split_name_proc[len(split_name_org) - 1]
move(path_to_original_image, a)
move(path_to_processed_image, b)
salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
image_id=image_id).first()
if salamander_growth_row:
salamander_growth_row.image_id = number_of_files
salamander_growth_row.salamander_id = new_salamander_id
db.session.commit()
def add_salamander(old_salamander_id, image_id, location, species, sex, path_to_images, user_id, path_to_original_image,
path_to_processed_image):
location_id = db.session.query(Location.id).filter_by(name=location.lower()).first()
new_salamander = Salamander(sex=sex, species=species, location_id=location_id[0], uid=user_id)
db.session.add(new_salamander)
db.session.commit()
salamander_id = db.session.query(Salamander.id).filter_by(uid=user_id).order_by(Salamander.id.desc()).first()
path_for_new_salamander = os.path.join(path_to_images, str(salamander_id[0]))
os.makedirs(path_for_new_salamander)
move_images(old_salamander_id, salamander_id[0], image_id, path_for_new_salamander, path_to_original_image,
path_to_processed_image)
return salamander_id[0]