From 53eab9c0ef1d15f22be677b2aa25076950920882 Mon Sep 17 00:00:00 2001 From: ddnthemc Date: Sun, 4 May 2025 18:39:46 +0200 Subject: [PATCH] Initial commit --- .gitignore | 4 + .idea/.gitignore | 3 + .idea/atlanta_maps.iml | 8 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + README.md | 2 + do_process_all_maps.py | 212 ++++++++++++++++++ visit_points.py | 158 +++++++++++++ yaml_stuff.py | 42 ++++ 11 files changed, 456 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/atlanta_maps.iml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 README.md create mode 100644 do_process_all_maps.py create mode 100644 visit_points.py create mode 100644 yaml_stuff.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..91706d1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/Olds/ +*.7z +.idea +__pycache__ \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/atlanta_maps.iml b/.idea/atlanta_maps.iml new file mode 100644 index 0000000..d0876a7 --- /dev/null +++ b/.idea/atlanta_maps.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a6218fe --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..cc662d0 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..0fa3d8c --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +Progetto di studio di sistemi di guida autonoma. +Utility per processo di filtraggio delle mappe raccolte dal campo diff --git a/do_process_all_maps.py b/do_process_all_maps.py new file mode 100644 index 0000000..c26ed30 --- /dev/null +++ b/do_process_all_maps.py @@ -0,0 +1,212 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +import cv2 +from pathlib import Path +import numpy as np +from anytree import Node, RenderTree, PreOrderIter +from dataclasses import dataclass +from typing import Any +from skimage.morphology import skeletonize +from visit_points import give_me_the_main_loop_only +from yaml_stuff import create_corrected_yaml + +DO_SHOW = False + +@dataclass +class MyContour: + idx: int + c: cv2.Mat | np.ndarray[Any, np.dtype] | np.ndarray + h: list # 0_next, 1_previous, 2_child, 3_parent + area: float + + def __str__(self): + return f'IDX: {self.idx}, Area:{self.area:.0f} h:{self.h}' + + +border_size = 4 + + +def collect_my_children(me: Node, cl: list[MyContour], threshold=0): + """ + adds all the linked children to a father Node. + :param me: me, the starting Node + :param cl: the complete element list (the CV2 hierarchy) + :return: + """ + # print(f'Collecting: {me}') + my_first_child = me.name.h[2] + if my_first_child < 0: + return # You are having no child => we have done + if cl[my_first_child].area >= threshold: + # print(f'Adding a Node (first child): {my_first_child}, area={cl[my_first_child].area}') + child = Node(cl[my_first_child], parent=me) + if child.name.h[2] >= 0: + # This child has other children, collect them first + collect_my_children(child, cl, threshold=threshold) + # Now do iterate over all the siblings + sibling_idx = cl[my_first_child].h[0] + while sibling_idx >= 0: + # there is a sibling + # print(f'Checking Sibling: {sibling_idx}') + if cl[sibling_idx].area >= threshold: + # print(f'Adding a Node (sibling child): {sibling_idx}') + sibling = Node(cl[sibling_idx], parent=me) + if sibling.name.h[2] >= 0: + collect_my_children(sibling, cl, threshold=threshold) + sibling_idx = cl[sibling_idx].h[0] + + +FILTERED = '_filtered' +RACE = '_race' +COMPOSITE = '_composite' + +BLACK_LIST = ( + FILTERED, + RACE, + COMPOSITE, + '_edited', + '_raceline', +) + + +def process_a_map(in_image): + """ + Partendo da una file di mappa grezzo, tenta di creare: + una versione filtrata per localizzazione (_filtered.pgm) + una versione filtrata per calcolo della race-line (_race.pgm) + una versione filtrata per visualizzazione e monitoraggio algoritmo (_composite.png) + :param in_image: + :return: + """ + if in_image.suffix.lower() != '.pgm': + # Processa solo le immagini di tipo pgm + return + stem = str(in_image.stem) + for bl in BLACK_LIST: + if stem.endswith(bl): + # scarta tutte le immagini che sono degli output di elaborazioni precedenti + return + # Load the image + image_ = cv2.imread(str(in_image), cv2.IMREAD_GRAYSCALE) + # print(image_.shape) + image = cv2.copyMakeBorder(image_, top=border_size, bottom=border_size, left=border_size, + right=border_size, borderType=cv2.BORDER_CONSTANT, value=205) + if DO_SHOW: + cv2.imshow(in_image.stem, image) + color_image = cv2.cvtColor(image_, cv2.COLOR_GRAY2BGR) + + # Apply binary threshold (you can tweak the threshold value if needed) + _, thresh = cv2.threshold(image, 210, 255, cv2.THRESH_BINARY) + + # Find contours + contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) + # print('++ LEN ++++++++++++++++++', len(contours)) + if len(contours) < 1: + raise ValueError(f'No contours found for :{in_image}') + for c in contours: + area = cv2.contourArea(c) + # print(f'{area=}') + if area < 100: # TODO Hardcodein is not a good practice + continue + mask_inside = np.zeros_like(image) + + # Draw the contour in white + cv2.drawContours(mask_inside, [c], -1, color=255, thickness=cv2.FILLED) + # mask_outside = cv2.bitwise_not(mask_inside) + + img_retain = cv2.bitwise_and(image, mask_inside) + + thres = 230 + _, th2 = cv2.threshold(img_retain, thres, 255, cv2.THRESH_BINARY) + + ccc, hhh = cv2.findContours(th2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + + cl: list[MyContour] = [] + for n, (c, h) in enumerate(zip(ccc, *hhh)): + cl.append(MyContour(idx=n, c=c, h=h, area=cv2.contourArea(c))) + # print(n, h, cv2.contourArea(c)) + # do find the root contour: + roots = tuple(filter(lambda x: x.h[3] < 0, cl)) + assert len(roots) == 1, 'We found more than one root contour!!' + root = Node(roots[0]) + main_th = root.name.area * 0.01 + collect_my_children(root, cl, threshold=main_th) + tentative = np.zeros_like(image) + cv2.fillPoly(tentative, [root.name.c], color=255) # Fill with white color (255) + for node in PreOrderIter(root): + # print(node.name.idx) + if node.name.idx != root.name.idx: + cv2.fillPoly(tentative, [node.name.c], color=0) # Fill with white color (255) + + rev = cv2.bitwise_not(tentative) + kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) # shape and size + enlarged_black = cv2.dilate(rev, kernel, iterations=1) + smoothed_black = cv2.erode(enlarged_black, kernel, iterations=1) + filtered = cv2.bitwise_not(smoothed_black) + fc = filtered[4:-4, 4:-4] # [rows, cols] => [y1:y2, x1:x2] + if DO_SHOW: + cv2.imshow(f'Filtered', fc) + + destination = in_image.parent / f'{in_image.stem}{FILTERED}.pgm' + create_corrected_yaml(in_image, destination) + cv2.imwrite(str(destination), fc) + + binary_image = fc // 255 + # Skeletonize the binary image + skeleton = skeletonize(binary_image) + + # Convert the skeleton back to a format suitable for display + skeleton_image = (skeleton * 255).astype(np.uint8) + if DO_SHOW: + cv2.imshow(f'SK', skeleton_image) + + main_loop = give_me_the_main_loop_only(skeleton_image) + + race_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (30, 30)) # shape and size + race_map_1 = cv2.dilate(main_loop, race_kernel, iterations=1) + race_map = cv2.bitwise_and(race_map_1, fc) + if DO_SHOW: + cv2.imshow(f'Race', race_map) + + destination = in_image.parent / f'{in_image.stem}{RACE}.pgm' + create_corrected_yaml(in_image, destination) + cv2.imwrite(str(destination), race_map) + + # Define colors for the masks (BGR format) + color1 = (0, 255, 0) # Green + color2 = (255, 0, 0) # Blue + + # Create colored masks + colored_mask1 = np.zeros_like(color_image) + colored_mask1[fc == 255] = color1 + + colored_mask2 = np.zeros_like(color_image) + colored_mask2[race_map == 255] = color2 + + # Blend the images + sum_image = cv2.bitwise_xor(colored_mask1, colored_mask2) + composite_image = cv2.addWeighted(color_image, 0.7, sum_image, 0.2, 0) + + # Show the result + # cv2.imshow('Composite Image', composite_image) + + destination = in_image.parent / f'{in_image.stem}{COMPOSITE}.png' + create_corrected_yaml(in_image, destination) + cv2.imwrite(str(destination), composite_image) + + # cv2.imwrite('comp.png', composite_image) + + if DO_SHOW: + cv2.waitKey(0) + cv2.destroyAllWindows() + + +if __name__ == '__main__': + FOLDER = Path(r'C:\Mc\Python\PyProjs\atlanta_maps\atlant_maps\maps') + for element in FOLDER.iterdir(): + if element.is_dir(): + img_filename = element / f'{element.stem}.pgm' + if img_filename.is_file(): + print(f'We have found: {img_filename}') + process_a_map(img_filename) + diff --git a/visit_points.py b/visit_points.py new file mode 100644 index 0000000..23fe504 --- /dev/null +++ b/visit_points.py @@ -0,0 +1,158 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +import numpy as np +import cv2 + +sub_paths = [] +visited = set() +current_path = [] +sk = None + +DO_PRINT_DEBUG = False + + +def clear(): + sub_paths.clear() + visited.clear() + current_path.clear() + + +def get_neighbors(point): + """Get the neighboring points of a given point in the skeleton.""" + x, y = point + neighbors = [] + for dx in [-1, 0, 1]: + for dy in [-1, 0, 1]: + if dx == 0 and dy == 0: + continue + nx, ny = x + dx, y + dy + if 0 <= nx < sk.shape[0] and 0 <= ny < sk.shape[1]: + if sk[nx, ny] == 255: # Check if it's part of the skeleton + neighbors.append((nx, ny)) + return neighbors + + +def add_a_point(p): + if p in visited: + return + if DO_PRINT_DEBUG: + print(f'Adding: {p}') + visited.add(p) + current_path.append(p) + neighbors = [pix for pix in get_neighbors(p) if pix not in visited] + branch_count = len(neighbors) + # print(f'LN: {len(neighbors)} {neighbors}') + while branch_count == 1: + new_point = neighbors[0] + visited.add(new_point) + current_path.append(new_point) + neighbors = [pix for pix in get_neighbors(new_point) if pix not in visited] + branch_count = len(neighbors) + # print(f'LN: {len(neighbors)} {neighbors}') + + sub_paths.append(current_path.copy()) + current_path.clear() + for nei in neighbors: + add_a_point(nei) + + +def compute_all_the_sub_paths(skeleton): + global sk + sk = skeleton + clear() + skeleton_points = np.argwhere(skeleton == 255) # Get all skeleton points + for skp in skeleton_points: + pto = tuple(skp) + add_a_point(pto) + return sub_paths + + +def draw_paths(image_shape, sub_paths): + """Draw the remaining paths on a blank image.""" + drawn_image = np.zeros(image_shape, dtype=np.uint8) # Create a blank image + for path in sub_paths: + for point in path: + drawn_image[point] = 255 # Draw the path in white + return drawn_image + + +def give_me_the_main_loop_only(input_skeleton): + # Extract sub-paths + sub_paths = compute_all_the_sub_paths(input_skeleton) + end_points = {} + + def is_close_enough(p1, p2): + x, y = p1 + px, py = p2 + dx = abs(x - px) + dy = abs(y - py) + if dx <= 3 and dy <= 3: + return True + return False + + def find_close_point(pto): + for p in end_points.keys(): + if is_close_enough(pto, p): + return p + return None + + def add_endpoint(pto, idx): + where = find_close_point(pto) + if where is None: + # no close point found, add it + end_points[pto] = [] + where = pto + end_points[where].append(idx) + + # Print the results + to_be_removed = set() + for i, path in enumerate(sub_paths): + # if len(path) < 2: + # continue + begin = path[0] + end = path[-1] + if is_close_enough(begin, end): + to_be_removed.add(i) + continue + add_endpoint(begin, i) + add_endpoint(end, i) + if DO_PRINT_DEBUG: + print(f"Sub-path {i}: {len(path):4} {path[0]},{path[-1]}") + + # Draw the remaining paths on a blank image + # to_be_removed = {0, 7, 12, 13} + # to_be_removed = set() + for n, (cc, ep) in enumerate(end_points.items()): + pto = int(cc[0]), int(cc[1]) + if DO_PRINT_DEBUG: + print(f'Endpoint #{n} {pto}: {len(ep)}, {ep}') + if len(ep) == 1: + # this is a dead branch + to_be_removed.add(ep[0]) + if DO_PRINT_DEBUG: + print(f'{to_be_removed=}') + sub_paths_remaining = [sp for n, sp in enumerate(sub_paths) if n not in to_be_removed] + + return draw_paths(input_skeleton.shape, sub_paths_remaining) + + +if __name__ == '__main__': + skeleton_image_path = 'sk_bonk.png' # Update with your image path + skeleton = cv2.imread(skeleton_image_path, cv2.IMREAD_GRAYSCALE) + + # Threshold the image to ensure it's binary + _, skeleton = cv2.threshold(skeleton, 127, 255, cv2.THRESH_BINARY) + + cv2.imshow('Input', skeleton) + + # sp = compute_all_the_sub_paths(skeleton=skeleton) + # print(len(sp)) + # for n, p in enumerate(sp): + # print(f'{n:3} {len(p)}') + + loop_only = give_me_the_main_loop_only(skeleton) + cv2.imshow('Output', loop_only) + + + cv2.waitKey(0) + cv2.destroyAllWindows() diff --git a/yaml_stuff.py b/yaml_stuff.py new file mode 100644 index 0000000..474328c --- /dev/null +++ b/yaml_stuff.py @@ -0,0 +1,42 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +from pathlib import Path + + +def load_yaml(infile: Path): + yaml_file = infile.parent / f'{infile.stem}.yaml' + if yaml_file.is_file(): + with open(yaml_file, 'r') as fin: + lines = fin.readlines() + return lines + return None + + +def save_yaml(out_file: Path, lines: list[str]): + yaml_file = out_file.parent / f'{out_file.stem}.yaml' + with open(yaml_file, 'w', encoding='utf-8', newline='\n') as f_out: + f_out.writelines(lines) + + +IMAGE_TAG = 'image: ' + + +def adjust_yaml_lines(lines: list[str], new_name: Path): + found = None + for idx, line in enumerate(lines): + if line.startswith(IMAGE_TAG): + found = idx + break + if found is None: + raise ValueError(f'yaml line starting with "{IMAGE_TAG}" not found!') + lines[found] = f'{IMAGE_TAG}{new_name.name}\n' + + +def create_corrected_yaml(src: Path, dst: Path): + my_lines = load_yaml(src) + adjust_yaml_lines(my_lines, dst) + save_yaml(dst, my_lines) + + +# if __name__ == '__main__': +# main()