"""
Function to solve the problem of the
`Route Inspection Problem <https://en.wikipedia.org/wiki/Route_inspection_problem>`_.
:githublink:`%|py|6`
"""
import math
import random
import fractions
[docs]class SolutionException(Exception):
"""
wrong solution
:githublink:`%|py|14`
"""
pass
[docs]def haversine_distance(lat1, lng1, lat2, lng2):
"""
Computes `Haversine formula <http://en.wikipedia.org/wiki/Haversine_formula>`_.
.. index:: Haversine
:param lat1: lattitude
:param lng1: longitude
:param lat2: lattitude
:param lng2: longitude
:return: distance
:githublink:`%|py|29`
"""
radius = 6371
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lng2 - lng1)
a = math.sin(dlat / 2) * math.sin(dlat / 2) + math.cos(math.radians(lat1)) \
* math.cos(math.radians(lat2)) * math.sin(dlon / 2) * math.sin(dlon / 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
d = radius * c
return d
[docs]def distance_solution(edges_index, edges, distances, solution, exc=True):
"""
Checks if a solution is really a solution and returns the distance of it,
None if it is not a solution. The function does not case about the order.
:param edges_index: list of indices of edges (if None --> range(len(edges))
:param edges: list of tuple (vertex A, vertex B)
:param distances: list of distances of each edge
:param solution: proposed solutions (list of edge indices)
:param exc: raises an exception in case of error
:githublink:`%|py|50`
"""
if edges_index is None:
edges_index = list(range(len(edges)))
indices = set(edges_index)
solset = set(solution)
if len(indices) != len(solset):
if exc:
raise SolutionException("Different number of distinct edges:\nexpected={0} got={1}\n"
"Did you cover all the edges?".format(len(indices), len(solset)))
return None
for s in solution:
if s not in indices:
raise SolutionException(
"Index {0} is not in edges_index".format(s))
doubles = {}
for a, b in edges:
if a > b:
a, b = b, a
if (a, b) in doubles:
if exc:
raise SolutionException(
"Edge {0} is duplicated in edges.".format((a, b)))
return None
doubles[a, b] = 1
corres = {e: i for i, e in enumerate(edges_index)}
degrees = {}
for s in solution:
a, b = edges[corres[s]]
degrees[a] = degrees.get(a, 0) + 1
degrees[b] = degrees.get(b, 0) + 1
odd, even = 0, 0
for d in degrees.values():
if d % 2 == 0:
even += 1
else:
odd += 1
if odd > 2:
if exc:
red = list(sorted([(k, v)
for k, v in degrees.items() if v % 2 != 0]))
if len(red) > 10:
red = red[:10] + ["..."]
raise SolutionException(
"Are you sure? The path is inconsistent. Some help:\n" + str(red))
return None
else:
return sum(distances[corres[s]] for s in solution)
[docs]def compute_degrees(edges):
"""
Compute the degree of vertices.
:param edges: list of tuple
:return: dictionary {key: degree}
:githublink:`%|py|110`
"""
res = {}
for a, b in edges:
res[a] = res.get(a, 0) + 1
res[b] = res.get(b, 0) + 1
return res
[docs]def euler_path(edges_index, edges, solution):
"""
Computes an Eulerian path.
.. index:: Eulerian
:param edges_index: list of indices of edges (if None --> range(len(edges))
:param edges: list of tuple (vertex A, vertex B)
:param solution: proposed solutions (list of edge indices)
:return: path, list of edges indices
The function assumes every vertex in the graph defined by *edges*
has an even degree.
:githublink:`%|py|131`
"""
if edges_index is None:
edges_index = range(len(edges))
pos = {k: i for i, k in enumerate(edges_index)}
indices = [pos[s] for s in solution]
edges = [edges[i] for i in indices]
degrees = compute_degrees(edges)
odd = {k: v for k, v in degrees.items() if v % 2 == 1}
if len(odd) not in (0, 2):
odd = list(sorted((k, v) for k, v in odd.items()))
if len(odd) > 10:
odd = odd[:10] + ["..."]
raise SolutionException(
"Some vertices have an odd degree. This is not allowed.\n" + str(odd))
if len(odd) == 2:
# we add an extra edge which we remove later
odd = list(odd.keys())
remove = (odd[0], odd[1])
edges.append(remove)
else:
remove = None
res = _euler_path(edges)
pathi = [_[1][0] for _ in res]
if remove is not None:
index = pathi.index(len(edges) - 1)
if index == 0:
pathi = pathi[1:]
elif index == len(edges) - 1:
pathi = pathi[:-1]
else:
pathi = pathi[index + 1:] + pathi[:index]
pathi = [solution[i] for i in pathi]
return pathi
[docs]def _euler_path(edges):
"""
Computes an Eulerian path.
:param edges: edges
:return: path, list of (vertex,edge)
The function assumes every vertex in the graph defined by *edges*
has an even degree.
:githublink:`%|py|178`
"""
alledges = {}
edges_from = {}
for i, k in enumerate(edges):
if isinstance(k, list):
k = tuple(k)
v = (i,) + k
alledges[k] = v
a, b = k
alledges[b, a] = alledges[a, b]
if a not in edges_from:
edges_from[a] = []
if b not in edges_from:
edges_from[b] = []
edges_from[a].append(alledges[a, b])
edges_from[b].append(alledges[a, b])
degre = {}
for a, v in edges_from.items():
t = len(v)
degre[t] = degre.get(t, 0) + 1
two = [a for a, v in edges_from.items() if len(v) == 2]
odd = [a for a, v in edges_from.items() if len(v) % 2 == 1]
if len(odd) not in (0, 2):
add = "\n" + str(odd) if len(odd) < 10 else ""
raise SolutionException(
"Some vertices have an odd degree. This is not allowed." + add)
begin = two[0]
# checking
for v, le in edges_from.items():
for e in le:
to = e[1] if v != e[1] else e[2]
if to not in edges_from:
raise SolutionException(
"Unable to find vertex {0} for edge ({0},{1}).".format(to, v))
if to == v:
raise SolutionException("Circular edge {0}.".format(to))
# loop
path = _explore_path(edges_from, begin)
for p in path:
if len(p) == 0:
raise NotImplementedError("This exception should not happen.")
while len(edges_from) > 0:
start = None
for i, p in enumerate(path):
if p[0] in edges_from:
start = i, p
break
sub = _explore_path(edges_from, start[1][0])
i = start[0]
path[i:i + 1] = path[i:i + 1] + sub
return path
[docs]def _delete_edge(edges_from, n, to):
"""
Removes an edge from the graph.
:param edges_from: structure which contains the edges (will be modified)
:param n: first vertex
:param to: second vertex
:return: the edge
:githublink:`%|py|244`
"""
le = edges_from[to]
f = None
for i, e in enumerate(le):
if (e[1] == to and e[2] == n) or (e[2] == to and e[1] == n):
f = i
break
assert f is not None
del le[f]
if len(le) == 0:
del edges_from[to]
le = edges_from[n]
f = None
for i, e in enumerate(le):
if (e[1] == to and e[2] == n) or (e[2] == to and e[1] == n):
f = i
break
assert f is not None
keep = le[f]
del le[f]
if len(le) == 0:
del edges_from[n]
return keep
[docs]def _explore_path(edges_from, begin):
"""
Explores an Eulerian path, remove used edges from *edges_from*.
:param edges_from: structure which contains the edges (will be modified)
:param begin: first vertex to use
:return: path
:githublink:`%|py|280`
"""
path = [(begin, None)]
stay = True
while stay and len(edges_from) > 0:
n = path[-1][0]
if n not in edges_from:
# fin
break
le = edges_from[n]
if len(le) == 1:
h = 0
e = le[h]
to = e[1] if n != e[1] else e[2]
else:
to = None
nb = 100
while to is None or to == begin:
h = random.randint(0, len(le) - 1) if len(le) > 1 else 0
e = le[h]
to = e[1] if n != e[1] else e[2]
nb -= 1
if nb < 0:
raise NotImplementedError(
"Algorithm issue {0}".format(len(path)))
if len(edges_from[to]) == 1:
if begin != to:
raise NotImplementedError("Wrong algorithm.")
stay = False
keep = _delete_edge(edges_from, n, to)
path.append((to, keep))
return path[1:]
[docs]def distance_vertices(edges, vertices, distances):
"""
Computes the length of edges if distances is None.
:param edges: list of tuple (vertex A, vertex B)
:param vertices: locations of the vertices
:param distances: distances (None or list of floats)
:return: distances (list of float)
:githublink:`%|py|326`
"""
if distances is None:
distances = []
while len(distances) < len(edges):
distances.append(None)
for i, edge in enumerate(edges):
if distances[i] is not None:
continue
a, b = edge
va = vertices[a]
vb = vertices[b]
d = haversine_distance(va[0], va[1], vb[0], vb[1])
distances[i] = d
return distances
[docs]def bellman_distances(edges, distances, fLOG=None):
"""
Computes shortest distances between all vertices.
It assumes edges are symmetric.
:param edges: list of tuple (vertex A, vertex B)
:param distances: distances (list of floats)
:param fLOG: logging function
:return: dictionary of distances
This function could be implemented based on
`shortest_path <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csgraph.shortest_path.html>`_.
:githublink:`%|py|354`
"""
dist = {(a, b): d for d, (a, b) in zip(distances, edges)}
dist.update({(b, a): d for d, (a, b) in zip(distances, edges)})
dist0 = dist.copy()
iter = 0 # pylint: disable=W0622
up = dist
while len(up) > 0:
iter += 1
up = {}
for (a, b), d1 in dist.items():
if (a, b) not in dist0 and (a, b) not in up:
# no need to continue as no update during the last iteration
continue
for (aa, bb), d2 in dist.items():
# not the most efficient
if b == aa and a != bb:
d = d1 + d2
if (a, bb) not in dist or dist[a, bb] > d:
up[a, bb] = d
up[bb, a] = d
dist.update(up)
if fLOG:
sum_values = sum(dist.values())
mes = "iteration={0} modif={1} #dist={2} sum_values={3} avg={4}".format(
iter, len(up), len(dist), sum_values, sum_values / len(dist))
fLOG("[bellman_distances] " + mes)
return dist
[docs]def dijkstra_path(edges, distances, va, vb):
"""
Returns the best path between two vertices.
Uses `Dikjstra <https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm>`_ algorithm.
:param edges: list of edges.
:param distances: list of distances
:param va: first vertex
:param vb: last vertex
:return: list of edges
This function could be implemented based on
`shortest_path <https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csgraph.shortest_path.html>`_.
:githublink:`%|py|398`
"""
dist = {va: 0}
prev = {va: None}
modif = 1
while modif > 0:
modif = 0
for (a, b), d in zip(edges, distances):
if a in dist:
d2 = dist[a] + d
if b not in dist or dist[b] > d2:
dist[b] = d2
prev[b] = a
modif += 1
if b in dist:
d2 = dist[b] + d
if a not in dist or dist[a] > d2:
dist[a] = d2
prev[a] = b
modif += 1
rev = {(a, b): i for i, (a, b) in enumerate(edges)}
rev.update({(b, a): i for i, (a, b) in enumerate(edges)})
path = []
v = vb
while v is not None:
path.append(v)
v = prev[v]
path.reverse()
return [rev[a, b] for a, b in zip(path[:-1], path[1:])]
[docs]def matching_vertices(distances, algo="blossom"):
"""
Finds the best match between vertices.
:param distances: result of function @bellman_distances but
only for odd vertices (odd = odd degree),
dictionary { (vi,vj) : distance}
:param algo: algorithm (see below)
:return: sequences of best matches.
If ``algo=='hungarian'``,
the function relies on `linear_sum_assignment
<https://docs.scipy.org/doc/scipy-0.18.1/reference/generated/scipy.optimize.linear_sum_assignment.html>`_
from `scipy <https://docs.scipy.org/doc/scipy-0.18.1/>`_ which uses the
`Hungarian Algorithm <https://en.wikipedia.org/wiki/Hungarian_algorithm>`_.
Vertex index do not have to start from zero and be continuous.
The function will handle that particular case. However, the method
works for a bi-partite matching and is not suitable here unless
the algorithm is modified. Not done (yet?).
If ``algo=='blossom'``, it uses the
`Blossom algorithm <https://en.wikipedia.org/wiki/Blossom_algorithm>`_
which is known to be in :math:`O(n^3)` and finds the optimal matching.
If ``algo=='basic'``, the function sorts the distances
by increasing order and builds new edges as they come in this list.
It does not return an optimal solution but is much faster
when the graph is big.
:githublink:`%|py|456`
"""
if not isinstance(distances, dict):
raise TypeError(
"Unexpected type for distances, this should be a dictionary.")
from numpy import empty
from scipy.optimize import linear_sum_assignment
unique = set(_[0] for _ in distances) | set(_[1] for _ in distances)
mapping = {}
rev = {}
for s in unique:
n = len(mapping)
mapping[s] = n
rev[n] = s
mx = len(mapping)
# Hungarian algorithm
if algo == "hungarian":
cost = empty((mx, mx))
mx = abs(max(distances.values())) * len(mapping) * 10
cost[:] = mx
for (i, j), v in distances.items():
if i == j:
raise ValueError(
"Unreasonable case: {0} == {1}, v={2}".format(i, j, v))
cost[mapping[i], mapping[j]] = v
row_ind, col_ind = linear_sum_assignment( # pylint: disable=W0632
cost) # pylint: disable=W0632
pairs = [(rev[i], rev[j]) for i, j in zip(row_ind, col_ind)]
for a, b in pairs:
if a == b:
raise ValueError("Issue with one pair a == b.\n{0}".format(
"\n".join(str(_) for _ in pairs)))
# we remove duplicates
done = set()
final = []
for a, b in pairs:
if (a, b) not in done:
final.append((a, b))
done.add((b, a))
if len(final) * 2 != len(pairs):
mes = "final={0}\n{3}={1}\ncost\n{2}".format(
final, pairs, cost, algo)
raise ValueError(
"Did you use the tweak? The matching should be symmetric.\n" + mes)
return final
elif algo == "basic":
# we sort pair by increasing order
dists = [(v, i, j) for (i, j), v in distances.items()]
dists.sort()
# the graph is fully connected so we don't need to check
# if selecting an edge leads to an impossible solution
vdone = set()
pairs = []
for v, i, j in dists:
if i not in vdone and j not in vdone:
pairs.append((i, j))
vdone.add(i)
vdone.add(j)
return pairs
elif algo == "blossom":
from .blossom import Vertex, StructureUpToDate, TreeStructureChanged, MaximumDualReached, INF
vertices = {}
for v in mapping:
vertices[v] = Vertex(v)
for (i, j), v in distances.items():
f = fractions.Fraction(v)
vertices[i].add_edge_to(vertices[j], f)
def update_tree_structures(roots):
try:
while True:
try:
for root in roots:
root.alter_tree(roots)
raise StructureUpToDate()
except TreeStructureChanged:
pass
except StructureUpToDate:
pass
def get_max_delta(roots):
if len(roots) == 0:
raise MaximumDualReached()
delta = INF
for root in roots:
delta = min(delta, root.get_max_delta())
assert delta >= 0
return delta
roots = set(vertices.values())
try:
while True:
delta = get_max_delta(roots)
for root in roots:
root.adjust_charge(delta)
update_tree_structures(roots)
except MaximumDualReached:
# done
pass
M = set()
for v in vertices.values():
M.update(e for e in v.edges if e.selected)
pairs = [e.extremities for e in M]
return pairs
else:
raise NotImplementedError("Not recognized: {0}".format(algo))
[docs]def best_euler_path(edges, vertices, distances, edges_index=None, algo="blossom", fLOG=None):
"""
Computes the final solution for the Eulerian path.
:param edges: edges (list of tuple)
:param vertices: location of the vertices (not needed if distances are filled)
:param distances: distances for each edge (list of distance)
:param edges_index: list of indices of edges (if None --> range(len(edges))
:param algo: algorithm to use to computes the matching
(see :func:`matching_vertices <ensae_projects.challenge.city_tour.matching_vertices>`)
:param fLOG: logging function
:return: list of edges as tuple, list of edges as indices, distance
:githublink:`%|py|584`
"""
if fLOG:
fLOG("[best_euler_path] distance_vertices #edges={0}".format(
len(edges)))
distances = distance_vertices(edges, vertices, distances)
if fLOG:
fLOG("[best_euler_path] bellman_distances")
dist = bellman_distances(edges, distances, fLOG=fLOG)
if fLOG:
fLOG("[best_euler_path] degrees and distances between odd vertices")
degrees = compute_degrees(edges)
odd = {k: v for k, v in degrees.items() if v % 2 != 0}
odd_dist = {k: v for k, v in dist.items() if k[0] in odd and k[1] in odd}
if fLOG:
fLOG("[best_euler_path] matching #odd={0}, #odd_dist={1}".format(
len(odd), len(odd_dist)))
pairs = matching_vertices(odd_dist, algo=algo)
if fLOG:
fLOG("[best_euler_path] build solution")
solution = list(edges)
for va, vb in pairs:
dij = dijkstra_path(edges, distances, va, vb)
solution.extend([edges[e] for e in dij])
if fLOG:
fLOG("[best_euler_path] order edges to get the path #edges={0}".format(
len(solution)))
if edges_index is None:
edges_index = list(range(len(edges)))
mapping = {}
rev = {}
for edge, index in zip(edges, edges_index):
mapping[edge] = index
rev[index] = edge
sol_index = [mapping[e] for e in solution]
euler_index = euler_path(edges_index, edges, sol_index)
euler = [rev[i] for i in euler_index]
if fLOG:
fLOG("[best_euler_path] done.")
d = distance_solution(edges_index, edges, distances, sol_index)
return euler, euler_index, d