Initial commit
This commit is contained in:
commit
53eab9c0ef
11 changed files with 456 additions and 0 deletions
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
|||
/Olds/
|
||||
*.7z
|
||||
.idea
|
||||
__pycache__
|
3
.idea/.gitignore
generated
vendored
Normal file
3
.idea/.gitignore
generated
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
8
.idea/atlanta_maps.iml
generated
Normal file
8
.idea/atlanta_maps.iml
generated
Normal file
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="PYTHON_MODULE" version="4">
|
||||
<component name="NewModuleRootManager">
|
||||
<content url="file://$MODULE_DIR$" />
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
6
.idea/inspectionProfiles/profiles_settings.xml
generated
Normal file
|
@ -0,0 +1,6 @@
|
|||
<component name="InspectionProjectProfileManager">
|
||||
<settings>
|
||||
<option name="USE_PROJECT_PROFILE" value="false" />
|
||||
<version value="1.0" />
|
||||
</settings>
|
||||
</component>
|
7
.idea/misc.xml
generated
Normal file
7
.idea/misc.xml
generated
Normal file
|
@ -0,0 +1,7 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="Black">
|
||||
<option name="sdkName" value="Python 3.11" />
|
||||
</component>
|
||||
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11" project-jdk-type="Python SDK" />
|
||||
</project>
|
8
.idea/modules.xml
generated
Normal file
8
.idea/modules.xml
generated
Normal file
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/.idea/atlanta_maps.iml" filepath="$PROJECT_DIR$/.idea/atlanta_maps.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
6
.idea/vcs.xml
generated
Normal file
6
.idea/vcs.xml
generated
Normal file
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
2
README.md
Normal file
2
README.md
Normal file
|
@ -0,0 +1,2 @@
|
|||
Progetto di studio di sistemi di guida autonoma.
|
||||
Utility per processo di filtraggio delle mappe raccolte dal campo
|
212
do_process_all_maps.py
Normal file
212
do_process_all_maps.py
Normal file
|
@ -0,0 +1,212 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
import cv2
|
||||
from pathlib import Path
|
||||
import numpy as np
|
||||
from anytree import Node, RenderTree, PreOrderIter
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
from skimage.morphology import skeletonize
|
||||
from visit_points import give_me_the_main_loop_only
|
||||
from yaml_stuff import create_corrected_yaml
|
||||
|
||||
DO_SHOW = False
|
||||
|
||||
@dataclass
|
||||
class MyContour:
|
||||
idx: int
|
||||
c: cv2.Mat | np.ndarray[Any, np.dtype] | np.ndarray
|
||||
h: list # 0_next, 1_previous, 2_child, 3_parent
|
||||
area: float
|
||||
|
||||
def __str__(self):
|
||||
return f'IDX: {self.idx}, Area:{self.area:.0f} h:{self.h}'
|
||||
|
||||
|
||||
border_size = 4
|
||||
|
||||
|
||||
def collect_my_children(me: Node, cl: list[MyContour], threshold=0):
|
||||
"""
|
||||
adds all the linked children to a father Node.
|
||||
:param me: me, the starting Node
|
||||
:param cl: the complete element list (the CV2 hierarchy)
|
||||
:return:
|
||||
"""
|
||||
# print(f'Collecting: {me}')
|
||||
my_first_child = me.name.h[2]
|
||||
if my_first_child < 0:
|
||||
return # You are having no child => we have done
|
||||
if cl[my_first_child].area >= threshold:
|
||||
# print(f'Adding a Node (first child): {my_first_child}, area={cl[my_first_child].area}')
|
||||
child = Node(cl[my_first_child], parent=me)
|
||||
if child.name.h[2] >= 0:
|
||||
# This child has other children, collect them first
|
||||
collect_my_children(child, cl, threshold=threshold)
|
||||
# Now do iterate over all the siblings
|
||||
sibling_idx = cl[my_first_child].h[0]
|
||||
while sibling_idx >= 0:
|
||||
# there is a sibling
|
||||
# print(f'Checking Sibling: {sibling_idx}')
|
||||
if cl[sibling_idx].area >= threshold:
|
||||
# print(f'Adding a Node (sibling child): {sibling_idx}')
|
||||
sibling = Node(cl[sibling_idx], parent=me)
|
||||
if sibling.name.h[2] >= 0:
|
||||
collect_my_children(sibling, cl, threshold=threshold)
|
||||
sibling_idx = cl[sibling_idx].h[0]
|
||||
|
||||
|
||||
FILTERED = '_filtered'
|
||||
RACE = '_race'
|
||||
COMPOSITE = '_composite'
|
||||
|
||||
BLACK_LIST = (
|
||||
FILTERED,
|
||||
RACE,
|
||||
COMPOSITE,
|
||||
'_edited',
|
||||
'_raceline',
|
||||
)
|
||||
|
||||
|
||||
def process_a_map(in_image):
|
||||
"""
|
||||
Partendo da una file di mappa grezzo, tenta di creare:
|
||||
una versione filtrata per localizzazione (_filtered.pgm)
|
||||
una versione filtrata per calcolo della race-line (_race.pgm)
|
||||
una versione filtrata per visualizzazione e monitoraggio algoritmo (_composite.png)
|
||||
:param in_image:
|
||||
:return:
|
||||
"""
|
||||
if in_image.suffix.lower() != '.pgm':
|
||||
# Processa solo le immagini di tipo pgm
|
||||
return
|
||||
stem = str(in_image.stem)
|
||||
for bl in BLACK_LIST:
|
||||
if stem.endswith(bl):
|
||||
# scarta tutte le immagini che sono degli output di elaborazioni precedenti
|
||||
return
|
||||
# Load the image
|
||||
image_ = cv2.imread(str(in_image), cv2.IMREAD_GRAYSCALE)
|
||||
# print(image_.shape)
|
||||
image = cv2.copyMakeBorder(image_, top=border_size, bottom=border_size, left=border_size,
|
||||
right=border_size, borderType=cv2.BORDER_CONSTANT, value=205)
|
||||
if DO_SHOW:
|
||||
cv2.imshow(in_image.stem, image)
|
||||
color_image = cv2.cvtColor(image_, cv2.COLOR_GRAY2BGR)
|
||||
|
||||
# Apply binary threshold (you can tweak the threshold value if needed)
|
||||
_, thresh = cv2.threshold(image, 210, 255, cv2.THRESH_BINARY)
|
||||
|
||||
# Find contours
|
||||
contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
||||
# print('++ LEN ++++++++++++++++++', len(contours))
|
||||
if len(contours) < 1:
|
||||
raise ValueError(f'No contours found for :{in_image}')
|
||||
for c in contours:
|
||||
area = cv2.contourArea(c)
|
||||
# print(f'{area=}')
|
||||
if area < 100: # TODO Hardcodein is not a good practice
|
||||
continue
|
||||
mask_inside = np.zeros_like(image)
|
||||
|
||||
# Draw the contour in white
|
||||
cv2.drawContours(mask_inside, [c], -1, color=255, thickness=cv2.FILLED)
|
||||
# mask_outside = cv2.bitwise_not(mask_inside)
|
||||
|
||||
img_retain = cv2.bitwise_and(image, mask_inside)
|
||||
|
||||
thres = 230
|
||||
_, th2 = cv2.threshold(img_retain, thres, 255, cv2.THRESH_BINARY)
|
||||
|
||||
ccc, hhh = cv2.findContours(th2, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
|
||||
|
||||
cl: list[MyContour] = []
|
||||
for n, (c, h) in enumerate(zip(ccc, *hhh)):
|
||||
cl.append(MyContour(idx=n, c=c, h=h, area=cv2.contourArea(c)))
|
||||
# print(n, h, cv2.contourArea(c))
|
||||
# do find the root contour:
|
||||
roots = tuple(filter(lambda x: x.h[3] < 0, cl))
|
||||
assert len(roots) == 1, 'We found more than one root contour!!'
|
||||
root = Node(roots[0])
|
||||
main_th = root.name.area * 0.01
|
||||
collect_my_children(root, cl, threshold=main_th)
|
||||
tentative = np.zeros_like(image)
|
||||
cv2.fillPoly(tentative, [root.name.c], color=255) # Fill with white color (255)
|
||||
for node in PreOrderIter(root):
|
||||
# print(node.name.idx)
|
||||
if node.name.idx != root.name.idx:
|
||||
cv2.fillPoly(tentative, [node.name.c], color=0) # Fill with white color (255)
|
||||
|
||||
rev = cv2.bitwise_not(tentative)
|
||||
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5)) # shape and size
|
||||
enlarged_black = cv2.dilate(rev, kernel, iterations=1)
|
||||
smoothed_black = cv2.erode(enlarged_black, kernel, iterations=1)
|
||||
filtered = cv2.bitwise_not(smoothed_black)
|
||||
fc = filtered[4:-4, 4:-4] # [rows, cols] => [y1:y2, x1:x2]
|
||||
if DO_SHOW:
|
||||
cv2.imshow(f'Filtered', fc)
|
||||
|
||||
destination = in_image.parent / f'{in_image.stem}{FILTERED}.pgm'
|
||||
create_corrected_yaml(in_image, destination)
|
||||
cv2.imwrite(str(destination), fc)
|
||||
|
||||
binary_image = fc // 255
|
||||
# Skeletonize the binary image
|
||||
skeleton = skeletonize(binary_image)
|
||||
|
||||
# Convert the skeleton back to a format suitable for display
|
||||
skeleton_image = (skeleton * 255).astype(np.uint8)
|
||||
if DO_SHOW:
|
||||
cv2.imshow(f'SK', skeleton_image)
|
||||
|
||||
main_loop = give_me_the_main_loop_only(skeleton_image)
|
||||
|
||||
race_kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (30, 30)) # shape and size
|
||||
race_map_1 = cv2.dilate(main_loop, race_kernel, iterations=1)
|
||||
race_map = cv2.bitwise_and(race_map_1, fc)
|
||||
if DO_SHOW:
|
||||
cv2.imshow(f'Race', race_map)
|
||||
|
||||
destination = in_image.parent / f'{in_image.stem}{RACE}.pgm'
|
||||
create_corrected_yaml(in_image, destination)
|
||||
cv2.imwrite(str(destination), race_map)
|
||||
|
||||
# Define colors for the masks (BGR format)
|
||||
color1 = (0, 255, 0) # Green
|
||||
color2 = (255, 0, 0) # Blue
|
||||
|
||||
# Create colored masks
|
||||
colored_mask1 = np.zeros_like(color_image)
|
||||
colored_mask1[fc == 255] = color1
|
||||
|
||||
colored_mask2 = np.zeros_like(color_image)
|
||||
colored_mask2[race_map == 255] = color2
|
||||
|
||||
# Blend the images
|
||||
sum_image = cv2.bitwise_xor(colored_mask1, colored_mask2)
|
||||
composite_image = cv2.addWeighted(color_image, 0.7, sum_image, 0.2, 0)
|
||||
|
||||
# Show the result
|
||||
# cv2.imshow('Composite Image', composite_image)
|
||||
|
||||
destination = in_image.parent / f'{in_image.stem}{COMPOSITE}.png'
|
||||
create_corrected_yaml(in_image, destination)
|
||||
cv2.imwrite(str(destination), composite_image)
|
||||
|
||||
# cv2.imwrite('comp.png', composite_image)
|
||||
|
||||
if DO_SHOW:
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
FOLDER = Path(r'C:\Mc\Python\PyProjs\atlanta_maps\atlant_maps\maps')
|
||||
for element in FOLDER.iterdir():
|
||||
if element.is_dir():
|
||||
img_filename = element / f'{element.stem}.pgm'
|
||||
if img_filename.is_file():
|
||||
print(f'We have found: {img_filename}')
|
||||
process_a_map(img_filename)
|
||||
|
158
visit_points.py
Normal file
158
visit_points.py
Normal file
|
@ -0,0 +1,158 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
import numpy as np
|
||||
import cv2
|
||||
|
||||
sub_paths = []
|
||||
visited = set()
|
||||
current_path = []
|
||||
sk = None
|
||||
|
||||
DO_PRINT_DEBUG = False
|
||||
|
||||
|
||||
def clear():
|
||||
sub_paths.clear()
|
||||
visited.clear()
|
||||
current_path.clear()
|
||||
|
||||
|
||||
def get_neighbors(point):
|
||||
"""Get the neighboring points of a given point in the skeleton."""
|
||||
x, y = point
|
||||
neighbors = []
|
||||
for dx in [-1, 0, 1]:
|
||||
for dy in [-1, 0, 1]:
|
||||
if dx == 0 and dy == 0:
|
||||
continue
|
||||
nx, ny = x + dx, y + dy
|
||||
if 0 <= nx < sk.shape[0] and 0 <= ny < sk.shape[1]:
|
||||
if sk[nx, ny] == 255: # Check if it's part of the skeleton
|
||||
neighbors.append((nx, ny))
|
||||
return neighbors
|
||||
|
||||
|
||||
def add_a_point(p):
|
||||
if p in visited:
|
||||
return
|
||||
if DO_PRINT_DEBUG:
|
||||
print(f'Adding: {p}')
|
||||
visited.add(p)
|
||||
current_path.append(p)
|
||||
neighbors = [pix for pix in get_neighbors(p) if pix not in visited]
|
||||
branch_count = len(neighbors)
|
||||
# print(f'LN: {len(neighbors)} {neighbors}')
|
||||
while branch_count == 1:
|
||||
new_point = neighbors[0]
|
||||
visited.add(new_point)
|
||||
current_path.append(new_point)
|
||||
neighbors = [pix for pix in get_neighbors(new_point) if pix not in visited]
|
||||
branch_count = len(neighbors)
|
||||
# print(f'LN: {len(neighbors)} {neighbors}')
|
||||
|
||||
sub_paths.append(current_path.copy())
|
||||
current_path.clear()
|
||||
for nei in neighbors:
|
||||
add_a_point(nei)
|
||||
|
||||
|
||||
def compute_all_the_sub_paths(skeleton):
|
||||
global sk
|
||||
sk = skeleton
|
||||
clear()
|
||||
skeleton_points = np.argwhere(skeleton == 255) # Get all skeleton points
|
||||
for skp in skeleton_points:
|
||||
pto = tuple(skp)
|
||||
add_a_point(pto)
|
||||
return sub_paths
|
||||
|
||||
|
||||
def draw_paths(image_shape, sub_paths):
|
||||
"""Draw the remaining paths on a blank image."""
|
||||
drawn_image = np.zeros(image_shape, dtype=np.uint8) # Create a blank image
|
||||
for path in sub_paths:
|
||||
for point in path:
|
||||
drawn_image[point] = 255 # Draw the path in white
|
||||
return drawn_image
|
||||
|
||||
|
||||
def give_me_the_main_loop_only(input_skeleton):
|
||||
# Extract sub-paths
|
||||
sub_paths = compute_all_the_sub_paths(input_skeleton)
|
||||
end_points = {}
|
||||
|
||||
def is_close_enough(p1, p2):
|
||||
x, y = p1
|
||||
px, py = p2
|
||||
dx = abs(x - px)
|
||||
dy = abs(y - py)
|
||||
if dx <= 3 and dy <= 3:
|
||||
return True
|
||||
return False
|
||||
|
||||
def find_close_point(pto):
|
||||
for p in end_points.keys():
|
||||
if is_close_enough(pto, p):
|
||||
return p
|
||||
return None
|
||||
|
||||
def add_endpoint(pto, idx):
|
||||
where = find_close_point(pto)
|
||||
if where is None:
|
||||
# no close point found, add it
|
||||
end_points[pto] = []
|
||||
where = pto
|
||||
end_points[where].append(idx)
|
||||
|
||||
# Print the results
|
||||
to_be_removed = set()
|
||||
for i, path in enumerate(sub_paths):
|
||||
# if len(path) < 2:
|
||||
# continue
|
||||
begin = path[0]
|
||||
end = path[-1]
|
||||
if is_close_enough(begin, end):
|
||||
to_be_removed.add(i)
|
||||
continue
|
||||
add_endpoint(begin, i)
|
||||
add_endpoint(end, i)
|
||||
if DO_PRINT_DEBUG:
|
||||
print(f"Sub-path {i}: {len(path):4} {path[0]},{path[-1]}")
|
||||
|
||||
# Draw the remaining paths on a blank image
|
||||
# to_be_removed = {0, 7, 12, 13}
|
||||
# to_be_removed = set()
|
||||
for n, (cc, ep) in enumerate(end_points.items()):
|
||||
pto = int(cc[0]), int(cc[1])
|
||||
if DO_PRINT_DEBUG:
|
||||
print(f'Endpoint #{n} {pto}: {len(ep)}, {ep}')
|
||||
if len(ep) == 1:
|
||||
# this is a dead branch
|
||||
to_be_removed.add(ep[0])
|
||||
if DO_PRINT_DEBUG:
|
||||
print(f'{to_be_removed=}')
|
||||
sub_paths_remaining = [sp for n, sp in enumerate(sub_paths) if n not in to_be_removed]
|
||||
|
||||
return draw_paths(input_skeleton.shape, sub_paths_remaining)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
skeleton_image_path = 'sk_bonk.png' # Update with your image path
|
||||
skeleton = cv2.imread(skeleton_image_path, cv2.IMREAD_GRAYSCALE)
|
||||
|
||||
# Threshold the image to ensure it's binary
|
||||
_, skeleton = cv2.threshold(skeleton, 127, 255, cv2.THRESH_BINARY)
|
||||
|
||||
cv2.imshow('Input', skeleton)
|
||||
|
||||
# sp = compute_all_the_sub_paths(skeleton=skeleton)
|
||||
# print(len(sp))
|
||||
# for n, p in enumerate(sp):
|
||||
# print(f'{n:3} {len(p)}')
|
||||
|
||||
loop_only = give_me_the_main_loop_only(skeleton)
|
||||
cv2.imshow('Output', loop_only)
|
||||
|
||||
|
||||
cv2.waitKey(0)
|
||||
cv2.destroyAllWindows()
|
42
yaml_stuff.py
Normal file
42
yaml_stuff.py
Normal file
|
@ -0,0 +1,42 @@
|
|||
#!/usr/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def load_yaml(infile: Path):
|
||||
yaml_file = infile.parent / f'{infile.stem}.yaml'
|
||||
if yaml_file.is_file():
|
||||
with open(yaml_file, 'r') as fin:
|
||||
lines = fin.readlines()
|
||||
return lines
|
||||
return None
|
||||
|
||||
|
||||
def save_yaml(out_file: Path, lines: list[str]):
|
||||
yaml_file = out_file.parent / f'{out_file.stem}.yaml'
|
||||
with open(yaml_file, 'w', encoding='utf-8', newline='\n') as f_out:
|
||||
f_out.writelines(lines)
|
||||
|
||||
|
||||
IMAGE_TAG = 'image: '
|
||||
|
||||
|
||||
def adjust_yaml_lines(lines: list[str], new_name: Path):
|
||||
found = None
|
||||
for idx, line in enumerate(lines):
|
||||
if line.startswith(IMAGE_TAG):
|
||||
found = idx
|
||||
break
|
||||
if found is None:
|
||||
raise ValueError(f'yaml line starting with "{IMAGE_TAG}" not found!')
|
||||
lines[found] = f'{IMAGE_TAG}{new_name.name}\n'
|
||||
|
||||
|
||||
def create_corrected_yaml(src: Path, dst: Path):
|
||||
my_lines = load_yaml(src)
|
||||
adjust_yaml_lines(my_lines, dst)
|
||||
save_yaml(dst, my_lines)
|
||||
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# main()
|
Loading…
Add table
Reference in a new issue