Commit c821af96 authored by Anders Langlie's avatar Anders Langlie 🦀
Browse files

Merge branch '45-sort-algorithm-source-code' into 'master'

Resolve "Sort algorithm source code"

Closes #45

See merge request !48
parents a49eb0c6 760234c8
import os
import algorithm.estimate_gender as bb
import cv2
from path_constants import abs_path_temp_images
def test():
img_path = os.path.join(abs_path_temp_images, "smallDSC_0605.png")
img = cv2.imread(img_path)
if None is not img:
gender, pred_img, objects, detected_objects = bb.estimate_image(input_image=img,
minimum_percentage_probability=0.1,
debug_img_name = img_path[0:-4] + "_pred.png",
debug_create_pred_img=True)
print(gender)
# print(pred_img)
print(objects)
print(detected_objects)
......@@ -5,4 +5,4 @@ min_good_match = 15
match_dist = 0.75
images_basepath = "test_code/Feature_matching_extraction/test_images/straightened-manually/"
input_images_path = "data/input"
input_salamander_id = '00' #294,120,64,00
\ No newline at end of file
input_salamander_id = '00'
\ No newline at end of file
......@@ -2,10 +2,11 @@ import numpy as np
from cv2 import cv2
from algorithm.constants import width, height, step_size
kp = [cv2.KeyPoint(x, y, step_size) for y in range(0, width , step_size) for x in range(0, height, step_size)]
kp = [cv2.KeyPoint(x, y, step_size) for y in range(0, width, step_size) for x in range(0, height, step_size)]
sift = cv2.SIFT_create()
def compute_descriptors(image):
dense = sift.compute(image,kp)
des = dense[1]
return des
\ No newline at end of file
return des
import json
import cv2
import numpy as np
import os
from multiprocessing import Process
from multiprocessing import Manager
from path_constants import abs_path_imageai_config
from path_constants import abs_path_imageai_model
from algorithm.straighten_with_dlc import _ACCESS_TF_AND_GPU_SEMA
EXTRACT_DETECTED_OBJECTS = False
MINIMUM_PERCENTAGE_PROBABILITY = 0.1
NMS_TRESHOLD = 0.4
DISPLAY_PERCENTAGE_PROBABILITY = True
DISPLAY_OBJECT_NAME = True
THREAD_SAFE = False
def run_estimation(detection_model_path: str = None, configuration_json: str = None, input_image: np.ndarray = None,
extract_detected_objects: bool = False, minimum_percentage_probability: float = 50,
nms_treshold: float = 0.4,
display_percentage_probability: bool = True, display_object_name: bool = True,
thread_safe: bool = False, debug_img_name: str = "", debug_create_pred_img: bool = False,
ret_dictionary: list = None):
# from tensorflow.keras.backend import clear_session
from tensorflow.core.protobuf.config_pb2 import ConfigProto
from tensorflow.python.client.session import Session
from imageai.Detection.YOLO.yolov3 import yolov3_main
from tensorflow.python.keras.backend import get_session
from tensorflow.keras import Input
# add to the top of your code under import tensorflow as tf
config = ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.per_process_gpu_memory_fraction = 0.9
# session = Session(config=config)
# For now the model is trained for quadratic images:
image_dim_size = 416
if detection_model_path == None or configuration_json == None:
for i in range(0, 4):
# have to append because for some reason I can't x = [1,2,3]
# with mulitprocessing:
ret_dictionary.append(None)
return
detection_model_path = os.path.abspath(detection_model_path)
configuration_json = os.path.abspath(configuration_json)
# Loading the model:
detection_model_json = json.load(open(configuration_json))
model_labels = detection_model_json["labels"]
model_anchors = detection_model_json["anchors"]
detection_utils = CustomDetectionUtils(labels=model_labels)
model = yolov3_main(Input(shape=(None, None, 3)), 3, len(model_labels))
model.load_weights(detection_model_path)
# estimating:
if model is None:
raise ValueError("The model was not able to load.")
else:
object_threshold = minimum_percentage_probability / 100
output_objects_array = []
detected_objects_image_array = []
image = input_image
image_frame = image.copy()
height, width, channels = image.shape
image = cv2.resize(image, (image_dim_size, image_dim_size))
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
image = image.astype("float32") / 255.
# expand the image to batch
image = np.expand_dims(image, 0)
if thread_safe == True:
with get_session():
yolo_results = model.predict(image)
else:
yolo_results = model.predict(image)
boxes = list()
for idx, result in enumerate(yolo_results):
box_set = detection_utils.decode_netout(result[0], model_anchors[idx],
object_threshold, image_dim_size,
image_dim_size)
boxes += box_set
detection_utils.correct_yolo_boxes(boxes, height, width, image_dim_size, image_dim_size)
detection_utils.do_nms(boxes, nms_treshold)
all_boxes, all_labels, all_scores = detection_utils.get_boxes(boxes, model_labels,
object_threshold)
for object_box, object_label, object_score in zip(all_boxes, all_labels, all_scores):
each_object_details = dict()
each_object_details["name"] = object_label
each_object_details["percentage_probability"] = object_score
if object_box.xmin < 0:
object_box.xmin = 0
if object_box.ymin < 0:
object_box.ymin = 0
each_object_details["box_points"] = [object_box.xmin, object_box.ymin, object_box.xmax, object_box.ymax]
output_objects_array.append(each_object_details)
drawn_image = detection_utils.draw_boxes_and_caption(image_frame.copy(), all_boxes, all_labels,
all_scores, show_names=display_object_name,
show_percentage=display_percentage_probability)
if extract_detected_objects:
for cnt, each_object in enumerate(output_objects_array):
splitted_image = image_frame[each_object["box_points"][1]:each_object["box_points"][3],
each_object["box_points"][0]:each_object["box_points"][2]]
detected_objects_image_array.append(splitted_image.copy())
if debug_create_pred_img:
if drawn_image is not None:
print(os.path.abspath(debug_img_name))
cv2.imwrite(os.path.abspath(debug_img_name), drawn_image)
# find the best estimation:
seq = [x['percentage_probability'] for x in output_objects_array]
gender = None
if len(seq)>0:
index = seq.index(max(seq))
gender = output_objects_array[index]
# have to append because for some reason I can't x = [1,2,3]
# with mulitprocessing:
if extract_detected_objects:
# ret_dictionary= [gender, drawn_image, output_objects_array, detected_objects_image_array]
ret_dictionary.append(gender)
ret_dictionary.append(drawn_image)
ret_dictionary.append(output_objects_array)
ret_dictionary.append(detected_objects_image_array)
else:
ret_dictionary.append(gender)
ret_dictionary.append(drawn_image)
ret_dictionary.append(output_objects_array)
ret_dictionary.append(None)
# ret_dictionary = [gender, drawn_image, output_objects_array, None]
def estimate_image(detection_model_path: str = abs_path_imageai_model,
configuration_json: str = abs_path_imageai_config,
input_image: np.ndarray = None,
extract_detected_objects: bool = EXTRACT_DETECTED_OBJECTS,
minimum_percentage_probability: float = MINIMUM_PERCENTAGE_PROBABILITY,
nms_treshold: float = NMS_TRESHOLD,
display_percentage_probability: bool = DISPLAY_PERCENTAGE_PROBABILITY,
display_object_name: bool = DISPLAY_OBJECT_NAME,
thread_safe: bool = THREAD_SAFE,
debug_img_name: str = "",
debug_create_pred_img: bool = False):
"""
'estimate_image()' estimates the gender of the salamander of the given image:
* input_image , which can be a filepath or image numpy array in BGR
* extract_detected_objects (optional) , option to save each object detected individually as an image and return an array of the objects' image path.
* minimum_percentage_probability (optional, 30 by default) , option to set the minimum percentage probability for nominating a detected object for output.
* nms_threshold (optional, o.45 by default) , option to set the Non-maximum suppression for the detection
* display_percentage_probability (optional, True by default), option to show or hide the percentage probability of each object in the saved/returned detected image
* display_display_object_name (optional, True by default), option to show or hide the name of each object in the saved/returned detected image
* thread_safe (optional, False by default), enforce the loaded detection model works across all threads if set to true, made possible by forcing all Keras inference to run on the default graph
* debug_img_name (optional, "" by default), the name of the predicted image if debug_create_pred_img is True
* debug_create_pred_img (optional, False by default), option to create an image with the bounding box
:param detection_model_path:
:param input_image:
:return detected_objects_image_array:
"""
with _ACCESS_TF_AND_GPU_SEMA:
gender = "AI_sex"
if os.path.isfile(detection_model_path) and os.path.isfile(configuration_json):
manager = Manager()
return_dict = manager.list()
# return_dict = multiprocessing.Value("d", [], lock=False)
args = (detection_model_path, configuration_json, input_image,
extract_detected_objects, minimum_percentage_probability,
nms_treshold,
display_percentage_probability, display_object_name,
thread_safe, debug_img_name, debug_create_pred_img, return_dict)
p = Process(target=run_estimation, args=args)
p.start()
p.join()
if return_dict[0]:
gender = return_dict[0]['name']
return gender, return_dict[1], return_dict[2], return_dict[3]
return gender, None, None, None
class BoundBox:
def __init__(self, xmin, ymin, xmax, ymax, objness=None, classes=None):
self.xmin = xmin
self.ymin = ymin
self.xmax = xmax
self.ymax = ymax
self.objness = objness
self.classes = classes
self.label = -1
self.score = -1
class CustomDetectionUtils:
def __init__(self, labels):
self.__labels = labels
self.__colors = []
for i in range(len(labels)):
color_space_values = np.random.randint(50, 255, size=(3,))
red, green, blue = color_space_values
red, green, blue = int(red), int(green), int(blue)
self.__colors.append([red, green, blue])
@staticmethod
def _sigmoid(x):
return 1. / (1. + np.exp(-x))
def decode_netout(self, netout, anchors, obj_thresh, net_h, net_w):
grid_h, grid_w = netout.shape[:2]
nb_box = 3
netout = netout.reshape((grid_h, grid_w, nb_box, -1))
nb_class = netout.shape[-1] - 5
boxes = []
netout[..., :2] = self._sigmoid(netout[..., :2])
netout[..., 4:] = self._sigmoid(netout[..., 4:])
netout[..., 5:] = netout[..., 4][..., np.newaxis] * netout[..., 5:]
netout[..., 5:] *= netout[..., 5:] > obj_thresh
for row in range(grid_h):
for col in range(grid_w):
for b in range(nb_box):
# 4th element is objectness score
objectness = netout[row, col, b, 4]
if objectness <= obj_thresh:
continue
# first 4 elements are x, y, w, and h
x, y, w, h = netout[row, col, b, :4]
x = (col + x) / grid_w # center position, unit: image width
y = (row + y) / grid_h # center position, unit: image height
w = anchors[2 * b + 0] * np.exp(w) / net_w # unit: image width
h = anchors[2 * b + 1] * np.exp(h) / net_h # unit: image height
# last elements are class probabilities
classes = netout[row, col, b, 5:]
box = BoundBox(x - w / 2, y - h / 2, x + w / 2, y + h / 2, objectness, classes)
boxes.append(box)
return boxes
@staticmethod
def correct_yolo_boxes(boxes, image_h, image_w, net_h, net_w):
new_w, new_h = net_w, net_h
for i in range(len(boxes)):
x_offset, x_scale = (net_w - new_w) / 2. / net_w, float(new_w) / net_w
y_offset, y_scale = (net_h - new_h) / 2. / net_h, float(new_h) / net_h
boxes[i].xmin = int((boxes[i].xmin - x_offset) / x_scale * image_w)
boxes[i].xmax = int((boxes[i].xmax - x_offset) / x_scale * image_w)
boxes[i].ymin = int((boxes[i].ymin - y_offset) / y_scale * image_h)
boxes[i].ymax = int((boxes[i].ymax - y_offset) / y_scale * image_h)
def _interval_overlap(self, interval_a, interval_b):
x1, x2 = interval_a
x3, x4 = interval_b
if x3 < x1:
if x4 < x1:
return 0
else:
return min(x2, x4) - x1
else:
if x2 < x3:
return 0
else:
return min(x2, x4) - x3
def bbox_iou(self, box1, box2):
intersect_w = self._interval_overlap([box1.xmin, box1.xmax], [box2.xmin, box2.xmax])
intersect_h = self._interval_overlap([box1.ymin, box1.ymax], [box2.ymin, box2.ymax])
intersect = intersect_w * intersect_h
w1, h1 = box1.xmax - box1.xmin, box1.ymax - box1.ymin
w2, h2 = box2.xmax - box2.xmin, box2.ymax - box2.ymin
union = w1 * h1 + w2 * h2 - intersect
try:
result = float(intersect) / float(union)
return result
except:
return 0.0
def do_nms(self, boxes, nms_thresh):
if len(boxes) > 0:
nb_class = len(boxes[0].classes)
else:
return
for c in range(nb_class):
sorted_indices = np.argsort([-box.classes[c] for box in boxes])
for i in range(len(sorted_indices)):
index_i = sorted_indices[i]
if boxes[index_i].classes[c] == 0: continue
for j in range(i + 1, len(sorted_indices)):
index_j = sorted_indices[j]
if self.bbox_iou(boxes[index_i], boxes[index_j]) >= nms_thresh:
boxes[index_j].classes[c] = 0
def get_boxes(self, boxes, labels, thresh):
v_boxes, v_labels, v_scores = list(), list(), list()
# enumerate all boxes
for box in boxes:
# enumerate all possible labels
for i in range(len(labels)):
# check if the threshold for this label is high enough
if box.classes[i] > thresh:
v_boxes.append(box)
v_labels.append(labels[i])
v_scores.append(box.classes[i] * 100)
# don't break, many labels may trigger for one box
return v_boxes, v_labels, v_scores
def label_color(self, label):
""" Return a color from a set of predefined colors. Contains 80 colors in total.
Args
label: The label to get the color for.
Returns
A list of three values representing a RGB color.
If no color is defined for a certain label, the color green is returned and a warning is printed.
"""
if label < len(self.__colors):
return self.__colors[label]
else:
return 0, 255, 0
def draw_boxes_and_caption(self, image_frame, v_boxes, v_labels, v_scores, show_names=False, show_percentage=False):
for i in range(len(v_boxes)):
box = v_boxes[i]
y1, x1, y2, x2 = box.ymin, box.xmin, box.ymax, box.xmax
width, height = x2 - x1, y2 - y1
class_color = self.label_color(self.__labels.index(v_labels[i]))
image_frame = cv2.rectangle(image_frame, (x1, y1), (x2, y2), class_color, 2)
label = ""
if show_names and show_percentage:
label = "%s : %.3f" % (v_labels[i], v_scores[i])
elif show_names:
label = "%s" % (v_labels[i])
elif show_percentage:
label = "%.3f" % (v_scores[i])
if show_names or show_percentage:
b = np.array([x1, y1, x2, y2]).astype(int)
cv2.putText(image_frame, label, (b[0], b[1] - 10), cv2.FONT_HERSHEY_PLAIN, 1, (200, 0, 0), 3)
cv2.putText(image_frame, label, (b[0], b[1] - 10), cv2.FONT_HERSHEY_PLAIN, 1, (255, 255, 255), 2)
return image_frame
......@@ -3,53 +3,48 @@ import numpy as np
from algorithm.SalamanderImage import SalamanderImage
#import skeletonization
#import straightening
import algorithm.dsift as dsift
import algorithm.segmentation as segmentation
def create_salamander_image(filename: str):
#print("Processing image " + filename)
salamander_image = SalamanderImage(filename)
salamander_image.filename = filename
salamander_image.descriptors = get_descriptors(filename)
salamander_image = SalamanderImage(filename)
salamander_image.filename = filename
salamander_image.descriptors = get_descriptors(filename)
return salamander_image
return salamander_image
def get_descriptors(filename: str):
image = get_straightened_image(filename)
return calculate_descriptors(image)
image = get_straightened_image(filename)
return calculate_descriptors(image)
"""
Reads, straightens, resizes the image and returns it as a
"""
# Reads, straightens, resizes the image and returns it
def get_straightened_image(filename: str):
straightened_filename = filename#[0:-4] + '_str.jpg'
image = get_image(straightened_filename)
return image
"""
Should return a binary image (numpy ndarray) with 1 for "Part of salamander"
and 0 for "Not part of the salamander".
"""
straightened_filename = filename # [0:-4] + '_str.jpg'
image = get_image(straightened_filename)
return image
# Should return a binary image (numpy ndarray) with 1 for "Part of salamander"
# and 0 for "Not part of the salamander".
def get_segmented_image(filename: str):
image = get_image(filename)
return segmentation.get_salamander_mask(image)
image = get_image(filename)
return segmentation.get_salamander_mask(image)
def get_image(filename):
#image = cv2.imread(filename, cv2.IMREAD_GRAYSCALE)
img = cv2.imdecode(np.fromfile(filename, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
if img is None:
raise FileNotFoundError("Cannot find image file " + filename)
img = cv2.imdecode(np.fromfile(filename, dtype=np.uint8), cv2.IMREAD_GRAYSCALE)
return img
if img is None:
raise FileNotFoundError("Cannot find image file " + filename)
return img
"""
Calculates descriptors from preprocessed image
"""
# Calculates descriptors from preprocessed image
def calculate_descriptors(image):
return dsift.compute_descriptors(image)
return dsift.compute_descriptors(image)
# This file contains modified code from DeepLabCut's source code
import os.path
import time
import numpy as np
from pathlib import Path
from skimage.util import img_as_ubyte
def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
trainingsetindex: int =0,gputouse: int =None):
def run_prediction_dlc(config: str, image: np.ndarray, shuffle: int = 1,
trainingsetindex: int = 0, gputouse: int = None):
from deeplabcutcore.pose_estimation_tensorflow.nnet import predict
from deeplabcutcore.pose_estimation_tensorflow.config import load_config
from tensorflow.python.framework.ops import reset_default_graph
......@@ -15,8 +18,8 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
# was potentially set during training
del os.environ['TF_CUDNN_USE_AUTOTUNE']
if gputouse is not None: #gpu selection
os.environ['CUDA_VISIBLE_DEVICES'] = str(gputouse)
if gputouse is not None: # gpu selection
os.environ['CUDA_VISIBLE_DEVICES'] = str(gputouse)
# tf.compat.v1.reset_default_graph()
reset_default_graph()
......@@ -24,21 +27,24 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
cfg = auxiliaryfunctions.read_config(config)
train_fraction = cfg['TrainingFraction'][trainingsetindex]
model_folder=os.path.join(cfg["project_path"],str(auxiliaryfunctions.GetModelFolder(train_fraction,shuffle,cfg)))
model_folder = os.path.join(cfg["project_path"],
str(auxiliaryfunctions.GetModelFolder(train_fraction, shuffle, cfg)))
path_test_config = Path(model_folder) / 'test' / 'pose_cfg.yaml'
# print(path_test_config)
try:
dlc_cfg = load_config(str(path_test_config))
except FileNotFoundError:
raise FileNotFoundError("It seems the model for shuffle %s and trainFraction %s does not exist."%(shuffle,train_fraction))
raise FileNotFoundError(
"It seems the model for shuffle %s and trainFraction %s does not exist." % (shuffle, train_fraction))
# Check which snapshots are available and sort them by # iterations
try:
snapshots = np.array([fn.split('.')[0]for fn in os.listdir(os.path.join(model_folder , 'train'))if "index" in fn])
snapshots = np.array(
[fn.split('.')[0] for fn in os.listdir(os.path.join(model_folder, 'train')) if "index" in fn])
except FileNotFoundError:
raise FileNotFoundError("Snapshots not found!\
raise FileNotFoundError("Snapshots not found!\
It seems the dataset for shuffle %s has not been trained/does not exist.\n \
Please train it before using it to analyze videos.\n Use the function \
'train_network' to train the network for shuffle %s."%(shuffle,shuffle))
'train_network' to train the network for shuffle %s." % (shuffle, shuffle))
if cfg['snapshotindex'] == 'all':
# print("Snapshotindex is set to 'all' in the config.yaml file.\
......@@ -47,7 +53,7 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
# For now, changing snapshot index to -1!")
snapshot_index = -1
else:
snapshot_index=cfg['snapshotindex']
snapshot_index = cfg['snapshotindex']
increasing_indices = np.argsort([int(m.split('-')[1]) for m in snapshots])
snapshots = snapshots[increasing_indices]
......@@ -59,7 +65,7 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
##################################################
# Check if data already was generated:
dlc_cfg['init_weights'] = os.path.join(model_folder , 'train', snapshots[snapshot_index])
dlc_cfg['init_weights'] = os.path.join(model_folder, 'train', snapshots[snapshot_index])
trainingsiterations = (dlc_cfg['init_weights'].split(os.sep)[-1]).split('-')[-1]
# Update batchsize (based on parameters in config.yaml)
......@@ -74,8 +80,8 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
# update number of outputs and adjust pandas indices
dlc_cfg['num_outputs'] = cfg.get('num_outputs', 1)
if gputouse is not None: #gpu selectinon
os.environ['CUDA_VISIBLE_DEVICES'] = str(gputouse)
if gputouse is not None: # gpu selectinon
os.environ['CUDA_VISIBLE_DEVICES'] = str(gputouse)
##################################################
# Loading the images
......@@ -84,8 +90,7 @@ def run_prediction_dlc(config: str,image: np.ndarray,shuffle: int =1,
# print("session: ", sess)
# PredictedData,nframes,nx,ny=get_poses_deeplabcut_model(dlc_cfg, sess, inputs, outputs,image)