Skip to content
Snippets Groups Projects
Select Git revision
  • fd0cb5d87e93ff164842fd8b8af397eec60e454c
  • master default protected
  • 51-docker
  • 40-return-match-when-edit-salamander
  • 36-email-verification
  • 23-add-object-detection
  • 22-run-branch
7 results

editsalamander.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    editsalamander.py 16.53 KiB
    from flask import request, jsonify
    from flask_jwt_extended import jwt_required, get_jwt_identity
    from flask_restful import Resource
    from api import db, limiter
    import os
    from api.models.dbmodels import Location, SalamanderGrowth, Salamander, User
    from algorithm.brute_force_matching import match_file_structure
    from api.endpoints.matchsalamander import sanitize_number_str
    import glob
    from shutil import move
    from path_constants import _ACCESS_DATABASE
    from image_encoder.image_encoder import *
    import cv2
    
    
    class EditSalamander(Resource):
        decorators = [limiter.limit("60/minute")]
    
        @staticmethod
        @jwt_required
        def get(salamander_id, image_id):
            user_id = get_jwt_identity()
            user = db.session.query(User).filter_by(id=user_id).first()
            if user.admin:
                with _ACCESS_DATABASE:
                    salamander = db.session.query(Salamander).filter_by(id=salamander_id).first()
                    salamander_growth = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
                                                                                     image_id=image_id).first()
                    if salamander and salamander_growth:
                        location = db.session.query(Location).filter_by(id=salamander.location_id).first()
                        salamander_images = []
                        path_to_salamander_images = os.path.join("./images", location.name, salamander.species,
                                                                 salamander.sex, str(salamander.id))
                        list_of_paths = glob.glob(os.path.join(path_to_salamander_images, '*.*'))
                        for path in list_of_paths:
                            basename = os.path.basename(path)
                            if basename.__contains__(str(image_id)):
    
                                image = cv2.imread(path)
                                if not basename.__contains__("str"):  # Scale only original to set size
    
                                    height, width, _ = image.shape
                                    desired_long_side = 320
                                    scaling_factor = 1
    
                                    if width > height:
                                        scaling_factor = desired_long_side / width
                                    else:
                                        scaling_factor = desired_long_side / height
    
                                    new_dim = (int(width * scaling_factor), int(height * scaling_factor))
    
                                    # resize image
                                    image = cv2.resize(image, new_dim, interpolation=cv2.INTER_AREA)
    
                                _, buffer = cv2.imencode('.jpg', image)
                                encoded = base64.b64encode(buffer)
    
                                image_data = {"url": "data:image/png;base64," + encoded.decode()}
    
                                salamander_images.append(image_data)
    
                        return jsonify({"images": salamander_images, "length": salamander_growth.length,
                                        "weight": salamander_growth.weight, "imageId": image_id, "location": location.name,
                                        "salamanderId": salamander.id, "sex": salamander.sex, "species": salamander.species,
                                        'status': 200})
                    else:
                        return jsonify({"message": "no salamander with this id or no growth data", 'status': 400})
            else:
                return jsonify({"message": "no access to this endpoint", 'status': 400})
    
        @staticmethod
        @jwt_required
        def delete():
            data = request.form
            if "id" in data and "image_id" in data and "location" in data:
                user_id = get_jwt_identity()
                user = db.session.query(User).filter_by(id=user_id).first()
                with _ACCESS_DATABASE:
                    salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
                    location = db.session.query(Location).filter_by(name=data['location']).first()
                    if user.admin and salamander and location:
                        salamander_path = os.path.join("./images", location.name, salamander.species, salamander.sex,
                                                       str(salamander.id))
    
                        img_id = int(data['image_id'])
                        last_id = int(len(os.listdir(salamander_path)) / 2) - 1
    
                        if img_id >= int(len(os.listdir(salamander_path)) / 2):
                            return jsonify({"message": "this image id does not exist", "status": 400})
    
                        original_path = glob.glob(os.path.join(salamander_path, str(img_id) + '.*'))[0]
                        processed_path = glob.glob(os.path.join(salamander_path, str(img_id) + '_str.*'))[0]
    
                        os.remove(original_path)
                        os.remove(processed_path)
                        delete_growth_row(salamander.id, img_id)
    
                        # if last image was moved:
                        if len(os.listdir(salamander_path)) == 0:
                            os.rmdir(salamander_path)
                            db.session.delete(salamander)
                            db.session.commit()
                            return jsonify(
                                {"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
                        else:
                            if img_id != last_id:
                                original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
                                processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
    
                                original_extension = os.path.splitext(original_path)[1]
                                processed_extension = os.path.splitext(processed_path)[1]
    
                                salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(
                                    salamander_id=salamander.id, image_id=last_id).first()
                                salamander_growth_row.image_id = img_id
                                db.session.commit()
                                os.rename(processed_path,
                                          os.path.join(salamander_path, str(img_id) + "_str" + processed_extension))
                                os.rename(original_path, os.path.join(salamander_path, str(img_id) + original_extension))
    
                            return jsonify(
                                {"id": salamander.id, "imageId": img_id, "message": "salamander deleted", 'status': 200})
                    else:
                        return jsonify({"message": "this user cannot delete this salamander", 'status': 400})
            else:
                return jsonify({"message": "wrong data in form", 'status': 400})
    
        @staticmethod
        @jwt_required
        def put():
            user_id = get_jwt_identity()
            user = db.session.query(User).filter_by(id=user_id).first()
            data = request.form
            if user.admin and "id" in data and "image_id" in data:
                with _ACCESS_DATABASE:
                    salamander = db.session.query(Salamander).filter_by(id=data['id']).first()
                    if salamander:
                        growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
                                                                                  image_id=int(data['image_id'])).first()
                        if "weight" in data and "length" in data:
                            growth_row.weight = sanitize_number_str(str(data['weight']))
                            growth_row.length = sanitize_number_str(str(data['length']))
                            db.session.commit()
    
                        if "location" in data and "new_location" in data and "new_sex" in data and "new_species" in data:
                            if data['location'] != data['new_location'] or salamander.sex != data[
                                'new_sex'] or salamander.species != data['new_species']:
                                salamander_path = os.path.join("images", data['location'], salamander.species,
                                                               salamander.sex, str(salamander.id))
                                new_path_to_images = os.path.join("images", data['new_location'], data['new_species'],
                                                                  data['new_sex'])
                                original_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '.*'))
                                processed_path = glob.glob(os.path.join(salamander_path, data['image_id'] + '_str.*'))
                                if len(original_path) > 0 and len(processed_path) > 0:
                                    original_path = original_path[0]
                                    processed_path = processed_path[0]
                                    image_id = int(data['image_id'])
                                    last_id = int(len(os.listdir(salamander_path)) / 2) - 1
    
                                    if data['new_sex'] != "juvenile":
                                        result = match_file_structure(input_image=processed_path,
                                                                      match_directory=new_path_to_images)
                                        if result:
                                            # if salamander already exist:
                                            if result > -1:
                                                # move all images and delete previous ID.:
                                                move_images(salamander.id, result, image_id,
                                                            path_to_salamander=os.path.join(new_path_to_images,
                                                                                            str(result)),
                                                            path_to_original_image=original_path,
                                                            path_to_processed_image=processed_path)
                                                new_id = result
                                            else:
                                                new_id = add_salamander(salamander.id, image_id, data['new_location'],
                                                                        data['new_species'],
                                                                        data['new_sex'], new_path_to_images, user_id,
                                                                        original_path,
                                                                        processed_path)
                                            return handle_remaining_images(salamander, salamander_path, image_id, last_id,
                                                                           True, result, new_id)
                                        else:
                                            return jsonify(
                                                {"message": "Salamander image, " + data['image_id'] + " does not exist",
                                                 'status': 400})
                                        # if last image was moved:
                                    else:
                                        new_id = add_salamander(salamander.id, image_id, data['new_location'],
                                                                data['new_species'],
                                                                data['new_sex'], new_path_to_images, user_id, original_path,
                                                                processed_path)
                                        return handle_remaining_images(salamander, salamander_path, image_id, last_id,
                                                                       False, 0, new_id)
                                else:
                                    return jsonify({"message": "Image id, " + data[
                                        'image_id'] + " does not exist in salamander_id, " + data['id'], 'status': 400})
                            else:
                                return jsonify({"message": "salamander data updated", 'status': 200})
                        else:
                            return jsonify({"message": "wrong data", 'status': 400})
                    else:
                        return jsonify({"message": "Salamander ID" + data['id'] + " does not exist", 'status': 400})
            else:
                return jsonify({"message": "Non admin users cannot move salamander", 'status': 400})
    
    
    def delete_growth_row(salamander_id, image_id):
        salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
                                                                             image_id=image_id).first()
        if salamander_growth_row:
            db.session.delete(salamander_growth_row)
            db.session.commit()
    
    
    def handle_remaining_images(salamander, salamander_path: str, image_id: int, last_id: int, match_bool, result, new_id):
        # if last image was moved:
        if len(os.listdir(salamander_path)) == 0:
            os.rmdir(salamander_path)
            db.session.delete(salamander)
            db.session.commit()
            if match_bool and result > -1:
                return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
            else:
                return jsonify(
                    {"id": new_id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
    
        else:
            if image_id != last_id:
                original_path = glob.glob(os.path.join(salamander_path, str(last_id) + '.*'))[0]
                processed_path = glob.glob(os.path.join(salamander_path, str(last_id) + '_str.*'))[0]
                original_extension = os.path.splitext(original_path)[1]
                processed_extension = os.path.splitext(processed_path)[1]
                salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander.id,
                                                                                     image_id=last_id).first()
                salamander_growth_row.image_id = image_id
                db.session.commit()
                os.rename(processed_path, os.path.join(salamander_path, str(image_id) + "_str" + processed_extension))
                os.rename(original_path, os.path.join(salamander_path, str(image_id) + original_extension))
            if match_bool and result > -1:
                return jsonify({"id": result, "matching": "Match!", "message": "Matched with salamander", 'status': 200})
            else:
                return jsonify(
                    {"id": new_id, "matching": "No match.", "message": "New salamander in database", 'status': 200})
    
    
    def lower_image_id_growth_row(salamander_id, old_id, new_id):
        salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
                                                                             image_id=old_id).first()
        if salamander_growth_row:
            salamander_growth_row.image_id = new_id
            db.session.commit()
    
    
    def move_images(salamander_id, new_salamander_id, image_id, path_to_salamander, path_to_original_image,
                    path_to_processed_image):
        path_to_salamander = os.path.abspath(path_to_salamander)
        split_name_org = str(path_to_original_image).split('.')
        split_name_proc = str(path_to_processed_image).split('.')
        number_of_files = int(len(os.listdir(path_to_salamander)) / 2)
        a = os.path.join(path_to_salamander, str(number_of_files)) + "." + split_name_org[len(split_name_org) - 1]
        b = os.path.join(path_to_salamander, str(number_of_files)) + "_str." + split_name_proc[len(split_name_org) - 1]
        move(path_to_original_image, a)
        move(path_to_processed_image, b)
        salamander_growth_row = db.session.query(SalamanderGrowth).filter_by(salamander_id=salamander_id,
                                                                             image_id=image_id).first()
        if salamander_growth_row:
            salamander_growth_row.image_id = number_of_files
            salamander_growth_row.salamander_id = new_salamander_id
            db.session.commit()
    
    
    def add_salamander(old_salamander_id, image_id, location, species, sex, path_to_images, user_id, path_to_original_image,
                       path_to_processed_image):
        location_id = db.session.query(Location.id).filter_by(name=location.lower()).first()
        new_salamander = Salamander(sex=sex, species=species, location_id=location_id[0], uid=user_id)
        db.session.add(new_salamander)
        db.session.commit()
        salamander_id = db.session.query(Salamander.id).filter_by(uid=user_id).order_by(Salamander.id.desc()).first()
        path_for_new_salamander = os.path.join(path_to_images, str(salamander_id[0]))
        os.makedirs(path_for_new_salamander)
        move_images(old_salamander_id, salamander_id[0], image_id, path_for_new_salamander, path_to_original_image,
                    path_to_processed_image)
        return salamander_id[0]