Source code for ensae_projects.challenge.velib_trajectories

# -*- coding: utf-8 -*-
"""
Une solution au problème proposée :
:ref:`Reconstruction de trajectoire velib <l-codingparty1>`


:githublink:`%|py|7`
"""
import os
import random
import pandas
from pyensae.datasource import download_data
from manydataapi.velib import DataCollectJCDecaux as DataVelibCollect
from pyquickhelper.loghelper import str2datetime


[docs]def get_data(whereto): """ Récupère les données. :param whereto: destination :githublink:`%|py|20` """ download_data('velib_synthetique.zip', website='xdtd', whereTo=whereto) download_data('besancon.df.txt.zip', website='xdtd', whereTo=whereto)
[docs]def enumerate_events(df): """ Construit la liste des événements (vélo réposé ou retiré). :param df: DataFrame :return: énumère événéments (ierator) ("file", "collect_date", "name", "lat", "lng", +1 ou -1) :githublink:`%|py|31` """ df = df[["file", "collect_date", "name", "lat", "lng", "available_bike_stands", "available_bikes"]] df = df.sort_values(["name", "file", "collect_date"]) lastrow = None for row in df.values: if lastrow is not None and lastrow[2] == row[2]: # pylint: disable=E1136 # nombre de place en plus d1 = row[-2] - lastrow[-2] # pylint: disable=E1136 # nombre de vélos en plus d2 = row[-1] - lastrow[-1] # pylint: disable=E1136 if d1 != 0: step = d1 // abs(d1) for _ in range(1, abs(d1) + 1): yield tuple(row[:-2]) + (step,) elif d2 != 0: step = d2 // abs(d2) for _ in range(1, abs(d2) + 1): yield tuple(row[:-2]) + (step,) lastrow = row
[docs]class ParemetreCoutTrajet: """ Regroupe l'ensembles des paramètres pour le calcul de la distance associé à un appariement. :githublink:`%|py|59` """
[docs] def __init__(self, max_speed=50, high_speed=25, low_speed=5, high_time=0.75, low_time=0.1): """ :param max_speed: au-delà, c'est une vitesse impossible :param high_speed: au-delà, c'est la vitesse d'un cycliste :param low_speed: en-deça, il vaut mieux marcher :param high_time: au-delà, c'est une randonnée :param low_time: en-deça, pourquoi un vélo :githublink:`%|py|69` """ self.max_speed = max_speed self.high_speed = high_speed self.low_speed = low_speed self.high_time = high_time self.low_time = low_time
[docs] def __str__(self): """ usuel :githublink:`%|py|79` """ return str(self.values)
@property def values(self): """ Retourne les valeurs dans un dictionnaire. :githublink:`%|py|86` """ return {k: getattr(self, k) for k in self.__dict__ if "time" in k or "speed" in k}
[docs] def cost(self, dh, dt, v): """ Retourne un coût, plus il est bas, plus de déplacement est probable. :param dh: distance :param dt: durée :param v: vitesse :return: coût :githublink:`%|py|98` """ c = 0 if v > self.max_speed: c += 1e3 if v > self.high_speed: c += (v - self.high_speed) ** 2 * 10 if v < self.low_speed: c += (v - self.low_speed) ** 2 * 100 dt = dt.total_seconds() / 3600 if dt > self.high_time: c += (dt - self.high_time) ** 2 * 10 if dt < self.low_time: c += (dt - self.low_time) ** 2 * 10 return c
[docs]def vitesse(c, d, params): """ Calcule la vitesse d'un déplacement. :param c: tuple ("file", "collect_date", "name", "lat", "lng", +1 ou -1) :param d: tuple ("file", "collect_date", "name", "lat", "lng", +1 ou -1) :param params: ParemetreCoutTrajet :return: vitesse, cout La fonction retourne une valeur aberrante si le temps entre les deux événements est négatifs. C'est une configuration impossible : on ne peut reposer un vélo avant de l'avoir retiré. La valeur aberrante est ``1e8``. Il reste un cas pour lequel, je ne sais pas encore quelle valeur donner : il s'agit des demi-appariements : un vélo rétiré mais jamais reposé et réciproquement. :githublink:`%|py|129` """ if c[0] is None or d[0] is None: # cas des vélos perdus if c[0] is None: if d[0] is None: return None else: return 0.0, 0.0 # je ne sais pas trop quoi mettre else: return 0.0, 0.0 # je ne sais pas trop quoi mettre else: lat1, lng1 = c[3], c[4] lat2, lng2 = d[3], d[4] dh = DataVelibCollect.distance_haversine(lat1, lng1, lat2, lng2) dt = d[1] - c[1] if dt.total_seconds() <= 0: return 1e8, 1e8 # infini v = dh / (dt.total_seconds() / 3600) cost = params.cost(dh, dt, v) return v, cost
[docs]def distance(positif, negatif, app, params): """ Calcule une distance pour un appariement conçu ici comme la variance de la vitesse de chaque déplacement + la somme des coûts de chaque appariement retourné par la fonction :func:`vitesse <ensae_projects.challenge.velib_trajectories.vitesse>`. :param positif: vélos pris (ou l'inverse) :param negatif: vélos remis (ou l'inverse) :param app: appariement (list de tuple (i,j)) :param params: :class:`ParemetreCoutTrajet <ensae_projects.challenge.velib_trajectories.ParemetreCoutTrajet>` :return: tuple: vitesse moyenne (sans appariements négatifs), distance, vitesse moyenne avec, nombre d'appariements négatifs :githublink:`%|py|163` """ val = [] nb_max = [] cost = 0 for i, j in app: p = positif[i] n = negatif[j] v, d = vitesse(p, n, params) cost += d if v > params.max_speed: nb_max.append(v) if v is not None: val.append(v) mean = sum(val) / len(val) # on enlève les appariements négatifs cor = [v_ for v_ in val if v_ < 1e8] if len(cor) == 0: raise RuntimeError("Pas d'appariements positifs: {0}".format(val)) mean_cor = sum(cor) / len(cor) dev = sum((x - mean) ** 2 for x in val) / len(val) return mean_cor, \ cost + dev ** 0.5 * (1 + len(nb_max)), \ mean, \ len(val) - len(cor)
[docs]def appariement(events, iter=1000, params=ParemetreCoutTrajet(), fLOG=print): # pylint: disable=W0622 """ On veut apparier les événemens -1 aux événemens +1. On s'attend aux colonnes suivantes: *"file"*, *"collect_date"*, *"name"*, *"lat"*, *"lng"*, *+1* ou *-1*. La fonction part des deux séries d'évéments rangés par ordre croissant, puis il permute deux appariements tant que cela diminue l'erreur d'appariement. :param events: list d'événements produits par la fonction :func:`enumerate_events <ensae_projects.challenge.velib_trajectories.enumerate_events>` :param iter: nombre d'itérations :param params: ParemetreCoutTrajet :param fLOG: logging function :return: tuple (mindist, moyenne, appariement, positif, negatif) :githublink:`%|py|204` """ events = sorted(events) # on élimine tous les -1 du début (forcément des vélos pris avant la # période d'étude) lasti = 0 for i, ev in enumerate(events): lasti = i if ev[-1] == 1: break i = lasti if i > 0: del events[:i] # pareil de l'autre côté for i, ev in enumerate(reversed(events)): if ev[-1] == -1: break if i > 0: del events[len(events) - i:] default = (None, None, None, 0, 0, 0) positif = [e for e in events if e[-1] > 0] negatif = [e for e in events if e[-1] < 0] # on veut autant d'événements de chaque côté while len(positif) > len(negatif): negatif.append(default) while len(positif) < len(negatif): positif.append(default) appariement_ = [(i, i) for i in range(0, len(positif))] vit, mindist, vitav, nbneg = distance( # pylint: disable=W0612 positif, negatif, appariement_, params) nbchange = 0 for it in range(0, iter): if it % 10 == 0: fLOG("iteration ", it, ": app-", nbneg, "/", len(appariement_), "min", mindist, "vitesse ", vit, " nbchange", nbchange) nbchange = 0 for _ in range(0, len(appariement_)): i = random.randint(0, len(appariement_) - 1) j = random.randint(0, len(appariement_) - 1) if i == j: continue ki, kj = appariement_[i], appariement_[j] appariement_[i] = (ki[0], kj[1]) appariement_[j] = (kj[0], ki[1]) v, dist, vt, nbneg = distance( # pylint: disable=W0612 positif, negatif, appariement_, params) if dist < mindist: mindist = dist vit = v nbchange += 1 else: appariement_[i], appariement_[j] = ki, kj moyenne = distance(positif, negatif, appariement_, params)[0] # def dd(a, b): # try: # return b - a # except Exception: # return None # for a in appariement_: # fLOG(a,dd(positif [a[0]][1],negatif[a[1]][1]), vitesse(positif [a[0]], negatif[a[1]], params), positif [a[0]],"-->",negatif[a[1]]) return mindist, moyenne, appariement_, positif, negatif
[docs]def distance_path(dfp): """ Calcule la vitesse moyenne lorsque le chemin est connu. :param dfp: liste des chemins :return: moyenne, stddev :githublink:`%|py|282` """ dfp = dfp.copy() dfp["speed"] = dfp["dist"] / dfp["hours"] dfp["dist"] = dfp.apply( lambda r: DataVelibCollect.distance_haversine(r["lat0"], r["lng0"], r["lat1"], r["lng1"]), axis=1) mean_ = sum(dfp["speed"]) / len(dfp) std_ = sum((x - mean_) ** 2 for x in dfp["speed"]) / len(dfp) return mean_, std_ ** 0.5
if __name__ == "__main__": # pragma: no cover def main_velib(): dest = "codpart1" if not os.path.exists(dest): os.makedirs(dest) get_data(dest) # récupère les données jeu = os.path.join(dest, "besancon.df.txt") jeu = os.path.join(dest, "out_simul_bike_nb1_sp10_data.txt") df = pandas.read_csv(jeu, sep="\t", encoding="utf8") # conversion des dates df["collect_date"] = df.apply( lambda r: str2datetime(r["collect_date"]), axis=1) # on regarde s'il existe le fichier des trajectoires path = jeu.replace("_data.", "_path.") if path != jeu and os.path.exists(path): dfp = pandas.read_csv(path, sep="\t") dfp = dfp[dfp["beginend"] == "end"] mean, std = distance_path(dfp) print("expected: vitesse moyenne ", mean, " stddev ", std) # on calcule les événements (1 vélo apparu, 1 vélo disparu) events = list(sorted(enumerate_events(df))) params = ParemetreCoutTrajet() print(params) mindist, moyenne, appariement_, positif, negatif = appariement( # pylint: disable=W0612 events, iter=200, params=params) print("vitesse moyenne", moyenne) main_velib()