diff --git a/notebooks/proba_functions.ipynb b/notebooks/proba_functions.ipynb index 92ca1ec..ee35a2b 100644 --- a/notebooks/proba_functions.ipynb +++ b/notebooks/proba_functions.ipynb @@ -1,637 +1,2641 @@ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ - "## Make distribution tables to calculate probabilities of transfer\n", + "## Compute probability of missing a transfer from delays distributions\n", "\n", - "
Any application without a proper name would be promptly killed.
" + "Let's first have a look at a slice of the dictionnary of distribution" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "Current session configs: {'conf': {'spark.app.name': 'lgptguys_final'}, 'kind': 'pyspark'}
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "text/html": [ - "\n", - "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6849application_1589299642358_1346pysparkidleLinkLink
6852application_1589299642358_1349pysparkidleLinkLink
6858application_1589299642358_1352pysparkidleLinkLink
6861application_1589299642358_1355pysparkidleLinkLink
6866application_1589299642358_1360pysparkidleLinkLink
6867application_1589299642358_1361pysparkidleLinkLink
6869application_1589299642358_1363pysparkidleLinkLink
6871application_1589299642358_1365pysparkbusyLinkLink
6872application_1589299642358_1366pysparkidleLinkLink
6875application_1589299642358_1369pysparkidleLinkLink
6876application_1589299642358_1370pysparkidleLinkLink
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], + "source": [ + "import pickle \n", + "import gzip\n", + "from itertools import islice\n", + "import matplotlib as mlt \n", + "import matplotlib.pyplot as plt\n", + "import numpy as np \n", + "import pandas as pd \n", + "import math" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], "source": [ - "%%configure\n", - "{\"conf\": {\n", - " \"spark.app.name\": \"lgptguys_final\"\n", - "}}" + "# Functon to take a slice from a dictionnary - head equivalent\n", + "def take(n, iterable):\n", + " \"Return first n items of the iterable as a list\"\n", + " return list(islice(iterable, n))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Start Spark" + "Load dictionnaries of distributions" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Starting Spark application\n" - ] - }, - { - "data": { - "text/html": [ - "\n", - "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6877application_1589299642358_1371pysparkidleLinkLink
" - ], - "text/plain": [ - "" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "SparkSession available as 'spark'.\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "unknown magic command '%spark'\n", - "UnknownMagic: unknown magic command '%spark'\n", - "\n" + "len dict_real : 12309\n", + "[('10.TA.1-11-B-j19-1.1.R__8590314', array([0, 2, 2, 1, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('10.TA.1-11-B-j19-1.1.R__8590317', array([0, 3, 2, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('10.TA.1-11-B-j19-1.1.R__8594304', array([0, 0, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('10.TA.1-11-B-j19-1.1.R__8594307', array([0, 1, 5, 3, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('10.TA.1-11-B-j19-1.1.R__8594310', array([0, 1, 3, 4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))]\n", + "len dict_all : 246968\n", + "[('1286.TA.26-32-j19-1.12.H__8591182', array([ 0, 1158, 306, 162, 94, 24, 28, 21, 3, 2, 0,\n", + " 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('1286.TA.26-32-j19-1.12.H__8591184', array([ 1, 762, 552, 292, 118, 48, 13, 8, 0, 1, 1, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0])), ('1286.TA.26-32-j19-1.12.H__8591195', array([ 0, 1083, 444, 143, 64, 35, 16, 9, 3, 1, 0,\n", + " 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])), ('1286.TA.26-32-j19-1.12.H__8591200', array([ 2, 239, 227, 228, 212, 128, 74, 42, 29, 17, 3, 3, 2,\n", + " 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 1])), ('1286.TA.26-32-j19-1.12.H__8591209', array([ 0, 1151, 308, 169, 94, 24, 29, 16, 4, 3, 1,\n", + " 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))]\n" ] } ], "source": [ - "# Initialization\n", - "%%spark" + "with gzip.open(\"../data/distributions_geschaetzAndReal.pkl.gz\", \"rb\") as input_file:\n", + " d_real = pickle.load(input_file)\n", + "\n", + "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", + " d_all = pickle.load(input_file)\n", + "\n", + "# display a slice of it\n", + "print('len dict_real : ', len(d_real))\n", + "print(take(5, d_real.items()))\n", + "\n", + "# display a slice of it\n", + "print('len dict_all : ', len(d_all))\n", + "print(take(5, d_all.items()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Compute probability of missing a transfer from delays distributions\n", + "### Probability using cumulative distribution based on frequency of delays \n", "\n", - "Let's first have a look at a slice of the dictionnary of distribution" + "When we have __enough data__ and no ambiguity about `trip_id` and `stop_id` for a given distribution, then we can compute the probability $P(x \\leq X)$ for every x (delay in minute). \n", + "\n", + "Let's take a __threshold of 100__ sample points (=number of time we could measure a delay) as a minimum number of points to use this approach. \n", + "\n", + "_How many keys in our distionnary of distribution have at least this number of samples ?_" ] }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 4, "metadata": {}, "outputs": [], "source": [ - "import pickle \n", - "import gzip\n", - "from itertools import islice\n", - "import matplotlib as mlt \n", - "import matplotlib.pyplot as plt\n", - "import numpy as np \n", - "import pandas as pd " + "def plot_data_points_hist(dico):\n", + " list_tot_points = []\n", + " for key in dico:\n", + " distrib = dico[key]\n", + " list_tot_points.append(np.sum(distrib))\n", + "\n", + " tot_per_key = np.array(list_tot_points)\n", + " binwidth = 100\n", + " n_keys_less_than_binwidth = np.sum(np.array(tot_per_key < binwidth))\n", + " perc_key_to_recover = round(100 * ( n_keys_less_than_binwidth / len(tot_per_key) ), 2)\n", + " plt.figure(figsize = (10,5))\n", + " plt.hist(tot_per_key, bins = range(min(tot_per_key), max(tot_per_key) + binwidth, binwidth))\n", + " plt.title(\"Total number of data points per trip_id / stop_id key. N keys with less than {0} points: {1} ({2}%)\"\\\n", + " .format(binwidth, n_keys_less_than_binwidth, perc_key_to_recover))\n", + " plt.xlabel('n data points')\n", + " plt.ylabel('n keys')\n", + " return plt.show()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, - "outputs": [], - "source": [ - "# Functon to take a slice from a dictionnary - head equivalent\n", - "def take(n, iterable):\n", - " \"Return first n items of the iterable as a list\"\n", - " return list(islice(iterable, n))" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": {}, "outputs": [ { "data": { + "image/png": "\n", "text/plain": [ - "[('1286.TA.26-32-j19-1.12.H__8591182',\n", - " array([ 0, 1158, 306, 162, 94, 24, 28, 21, 3, 2, 0,\n", - " 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", - " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", - " ('1286.TA.26-32-j19-1.12.H__8591184',\n", - " array([ 1, 762, 552, 292, 118, 48, 13, 8, 0, 1, 1, 0, 0,\n", - " 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,\n", - " 0, 0, 0, 0, 0, 0])),\n", - " ('1286.TA.26-32-j19-1.12.H__8591195',\n", - " array([ 0, 1083, 444, 143, 64, 35, 16, 9, 3, 1, 0,\n", - " 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0,\n", - " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", - " ('1286.TA.26-32-j19-1.12.H__8591200',\n", - " array([ 2, 239, 227, 228, 212, 128, 74, 42, 29, 17, 3, 3, 2,\n", - " 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", - " 0, 0, 0, 0, 0, 1])),\n", - " ('1286.TA.26-32-j19-1.12.H__8591209',\n", - " array([ 0, 1151, 308, 169, 94, 24, 29, 16, 4, 3, 1,\n", - " 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,\n", - " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]))]" + "
" ] }, - "execution_count": 1, - "metadata": {}, - "output_type": "execute_result" + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" } ], "source": [ - "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", - " d = pickle.load(input_file)\n", - "\n", - "# Functon to take a slice from a dictionnary - head equivalent\n", - "def take(n, iterable):\n", - " \"Return first n items of the iterable as a list\"\n", - " return list(islice(iterable, n))\n", - "\n", - "# display a slice of it\n", - "take(5, d.items())" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Probability using cumulative distribution based on frequency of delays \n", - "\n", - "When we have __enough data__ and no ambiguity about `trip_id` and `stop_id` for a given distribution, then we can compute the probability $P(x \\leq X)$ for every x (delay in minute). \n", - "\n", - "Let's take a __threshold of 100__ sample points (=number of time we could measure a delay) as a minimum number of points to use this approach. \n", - "\n", - "_How many keys in our distionnary of distribution have at least this number of samples ?_" + "plot_data_points_hist(d_all)" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "metadata": {}, "outputs": [ { "data": { - "image/png": "\n", + "image/png": "\n", "text/plain": [ "
" ] }, "metadata": { "needs_background": "light" }, "output_type": "display_data" } ], "source": [ - "# Functon to take a slice from a dictionnary - head equivalent\n", - "def take(n, iterable):\n", - " \"Return first n items of the iterable as a list\"\n", - " return list(islice(iterable, n))\n", - "\n", - "# display a slice of it\n", - "list_tot_points = []\n", - "for key in d:\n", - " distrib = d[key]\n", - " list_tot_points.append(np.sum(distrib))\n", - " \n", - "tot_per_key = np.array(list_tot_points)\n", - "binwidth = 100\n", - "n_keys_less_than_binwidth = np.sum(np.array(tot_per_key < binwidth))\n", - "perc_key_to_recover = round(100 * ( n_keys_less_than_binwidth / len(tot_per_key) ), 2)\n", - "plt.figure(figsize = (10,5))\n", - "plt.hist(tot_per_key, bins = range(min(tot_per_key), max(tot_per_key) + binwidth, binwidth))\n", - "plt.title(\"Total number of data points per trip_id / stop_id key. N keys with less than {0} points: {1} ({2}%)\"\\\n", - " .format(binwidth, n_keys_less_than_binwidth, perc_key_to_recover))\n", - "plt.show()" + "plot_data_points_hist(d_real)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First we generate a dictionnary with cumulative probability based on frequency of delays, for each keys in our reference dictionnary." ] }, { "cell_type": "code", - "execution_count": 73, + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "def cumul_distri_probas_dict(dico):\n", + " list_tot_points = []\n", + " for key in dico:\n", + " distrib = dico[key]\n", + "\n", + " # get total number of elements \n", + " N = np.sum(distrib)\n", + "\n", + " # make cumulative distribution probabilities\n", + " cdf_distrib = np.empty((len(distrib)), dtype=float)\n", + " save_x = 0\n", + " for x in range(len(distrib)):\n", + " cdf_distrib[x] = float(distrib[x])/float(N) + float(save_x)/float(N)\n", + " save_x += distrib[x]\n", + "\n", + " dico[key] = cdf_distrib\n", + " return dico" + ] + }, + { + "cell_type": "code", + "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('1286.TA.26-32-j19-1.12.H__8591182',\n", " array([0. , 0.64333333, 0.81333333, 0.90333333, 0.95555556,\n", " 0.96888889, 0.98444444, 0.99611111, 0.99777778, 0.99888889,\n", " 0.99888889, 0.99944444, 0.99944444, 0.99944444, 0.99944444,\n", " 0.99944444, 1. , 1. , 1. , 1. ,\n", " 1. , 1. , 1. , 1. , 1. ,\n", " 1. , 1. , 1. , 1. , 1. ,\n", " 1. , 1. ])),\n", " ('1286.TA.26-32-j19-1.12.H__8591184',\n", " array([5.56483027e-04, 4.24596550e-01, 7.31775181e-01, 8.94268225e-01,\n", " 9.59933222e-01, 9.86644407e-01, 9.93878687e-01, 9.98330551e-01,\n", " 9.98330551e-01, 9.98887034e-01, 9.99443517e-01, 9.99443517e-01,\n", " 9.99443517e-01, 9.99443517e-01, 9.99443517e-01, 9.99443517e-01,\n", " 9.99443517e-01, 9.99443517e-01, 9.99443517e-01, 9.99443517e-01,\n", " 9.99443517e-01, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00,\n", " 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00,\n", " 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00])),\n", " ('1286.TA.26-32-j19-1.12.H__8591195',\n", " array([0. , 0.60166667, 0.84833333, 0.92777778, 0.96333333,\n", " 0.98277778, 0.99166667, 0.99666667, 0.99833333, 0.99888889,\n", " 0.99888889, 0.99888889, 0.99888889, 0.99888889, 0.99888889,\n", " 0.99944444, 0.99944444, 0.99944444, 0.99944444, 0.99944444,\n", " 1. , 1. , 1. , 1. , 1. ,\n", " 1. , 1. , 1. , 1. , 1. ,\n", " 1. , 1. ]))]" ] }, - "execution_count": 73, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "list_tot_points = []\n", - "for key in d:\n", - " distrib = d[key]\n", - " \n", - " # get total number of elements \n", - " N = np.sum(distrib)\n", - " \n", - " # make cumulative distribution probabilities\n", - " cdf_distrib = np.empty((len(distrib)), dtype=float)\n", - " save_x = 0\n", - " for x in range(len(distrib)):\n", - " cdf_distrib[x] = float(distrib[x])/float(N) + float(save_x)/float(N)\n", - " save_x += distrib[x]\n", - " \n", - " d[key] = cdf_distrib\n", - "\n", + "d_all_cdp = cumul_distri_probas_dict(d_all)\n", + "take(3, d_all_cdp.items())" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[('10.TA.1-11-B-j19-1.1.R__8590314',\n", + " array([0. , 0.25 , 0.5 , 0.625, 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. ])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8590317',\n", + " array([0. , 0.3, 0.5, 0.7, 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. ])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8594304',\n", + " array([0. , 0. , 0.5, 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. , 1. ,\n", + " 1. , 1. , 1. , 1. , 1. , 1. ]))]" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "d_real_cdp = cumul_distri_probas_dict(d_real)\n", + "take(3, d_real_cdp.items())" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "# write dictionnary \n", + "with gzip.open(\"../data/distributions_cumulative_real.pkl.gz\", \"wb\") as output_file:\n", + " pickle.dump(d_real_cdp, output_file)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'd' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "\u001b[0;32m\u001b[0m in \u001b[0;36m\u001b[0;34m\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[0;31m# write dictionnary\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[0;32mwith\u001b[0m \u001b[0mgzip\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mopen\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m\"../data/distributions_cumulative.pickle\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m\"wb\"\u001b[0m\u001b[0;34m)\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0moutput_file\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mpickle\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mdump\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0md\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0moutput_file\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m", + "\u001b[0;31mNameError\u001b[0m: name 'd' is not defined" + ] + } + ], + "source": [ "# write dictionnary \n", "with gzip.open(\"../data/distributions_cumulative.pickle\", \"wb\") as output_file:\n", - " pickle.dump(d, output_file)\n", - " \n", - "# display a slice of it\n", - "take(3, d.items())" + " pickle.dump(d, output_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Recover data with few/missing points \n", + "### Construct recovery tables \n", "\n", - "First approach is to simple sum up similar distribution to get a new distribution we can use. For that, we need to have transport type, time (rounded to hour) and stop_id which are valid. We then make all combination of these tree parameters and get the associate distributions" + "First approach is to simple sum up similar distribution to get a new distribution we can use. For that, we need to have transport type (`route_desc`), `time` (rounded to hour) and `stop_id` which are valid. We then make all combination of these tree parameters and get the associate distributions" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
route_inttrip_intstop_intstop_sequencearrival_timedeparture_timeroute_idtrip_idstop_idroute_descstop_id_rawsequence_shift_1
00001NaT2020-05-21 07:18:0030-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8502208Bus85022082
100122020-05-21 07:23:002020-05-21 07:23:0030-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8502209Bus85022093
200232020-05-21 07:29:00NaT30-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8503202Bus85032021
30101NaT2020-05-21 07:48:0030-57-Y-j19-15.TA.30-57-Y-j19-1.1.H8502208Bus85022082
401122020-05-21 07:53:002020-05-21 07:53:0030-57-Y-j19-15.TA.30-57-Y-j19-1.1.H8502209Bus85022093
\n", + "
" + ], + "text/plain": [ + " route_int trip_int stop_int stop_sequence arrival_time \\\n", + "0 0 0 0 1 NaT \n", + "1 0 0 1 2 2020-05-21 07:23:00 \n", + "2 0 0 2 3 2020-05-21 07:29:00 \n", + "3 0 1 0 1 NaT \n", + "4 0 1 1 2 2020-05-21 07:53:00 \n", + "\n", + " departure_time route_id trip_id stop_id \\\n", + "0 2020-05-21 07:18:00 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8502208 \n", + "1 2020-05-21 07:23:00 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8502209 \n", + "2 NaT 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8503202 \n", + "3 2020-05-21 07:48:00 30-57-Y-j19-1 5.TA.30-57-Y-j19-1.1.H 8502208 \n", + "4 2020-05-21 07:53:00 30-57-Y-j19-1 5.TA.30-57-Y-j19-1.1.H 8502209 \n", + "\n", + " route_desc stop_id_raw sequence_shift_1 \n", + "0 Bus 8502208 2 \n", + "1 Bus 8502209 3 \n", + "2 Bus 8503202 1 \n", + "3 Bus 8502208 2 \n", + "4 Bus 8502209 3 " + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with open(\"../data/stop_times_df.pkl\", \"rb\") as input_file:\n", + " stoptimes = pickle.load(input_file)\n", + " \n", + "stoptimes.head()" ] }, { "cell_type": "code", - "execution_count": 42, + "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "0.0%, 4.03%, 8.07%, 12.1%, 16.13%, 20.17%, 24.2%, 28.23%, 32.27%, 36.3%, 40.34%, 44.37%, 48.4%, 52.44%, 56.47%, 60.5%, 64.54%, 68.57%, 72.6%, 76.64%, 80.67%, 84.7%, 88.74%, 92.77%, 96.81%, " + "0.0%, 4.07%, 8.14%, 12.21%, 16.28%, 20.35%, 24.42%, 28.49%, 32.55%, 36.62%, 40.69%, 44.76%, 48.83%, 52.9%, 56.97%, 61.04%, 65.11%, 69.18%, 73.25%, 77.32%, 81.39%, 85.46%, 89.53%, 93.6%, 97.66%, " ] } ], "source": [ - "with open(\"../data/stop_times_df.pkl\", \"rb\") as input_file:\n", - " stoptimes = pickle.load(input_file)\n", - " \n", "# Set same stoptimes index as distribution dict \n", + "stoptimes['stop_id'] = stoptimes['stop_id'].astype(str).str[0:7]\n", "stoptimes['key'] = stoptimes['trip_id'] + '__' + stoptimes['stop_id']\n", "stoptimes = stoptimes.set_index('key')\n", "\n", - "stoptimes = stoptimes[['trip_id','stop_id','arrival_time', 'departure_time']]\n", + "stoptimes = stoptimes[['trip_id','stop_id', 'route_desc', 'arrival_time', 'departure_time']]\n", "\n", "list_hours = []\n", "size_stop_times = stoptimes.shape[0]\n", "for x in range(size_stop_times):\n", " if (x % 10000) == 0 :\n", " print('{}%'.format(round(100*x/size_stop_times,2)), end = ', ')\n", " \n", " arr_time_hour = pd.to_datetime(stoptimes.iloc[x,:]['arrival_time']).hour\n", " if math.isnan(arr_time_hour): # if arrival is NaT, use departure time\n", " arr_time_hour = pd.to_datetime(stoptimes.iloc[x,:]['departure_time']).hour\n", - " list_hours.append(arr_time_hour)\n", + " list_hours.append(int(arr_time_hour))\n", " \n", "stoptimes['hour'] = list_hours\n", "stoptimes = stoptimes.drop(columns=['trip_id', 'arrival_time', 'departure_time'])\n", - " \n", + "\n", "# Write this pickle to avoid re-running this above code all the time\n", "with gzip.open(\"../data/stop_times_wHour.pkl\", \"wb\") as output_file:\n", " pickle.dump(stoptimes, output_file) \n", " " ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 15, "metadata": {}, - "outputs": [], - "source": [ - "with gzip.open(\"../data/stop_times_wHour.pkl\", \"rb\") as input_file:\n", - " stoptimes = pickle.load(input_file)\n", - " \n", - "distrib_df = pd.DataFrame(d).transpose()\n", - "stoptimes_df = pd.DataFrame(stoptimes)\n", - "stoptimes_df['stop_id'] = stoptimes_df['stop_id'].str[0:7]\n", - "join_df = pd.concat([distrib_df, stoptimes_df], join='outer')\n", - "list_bins = [x for x in range(31)]\n", - "join_df = join_df.groupby(['hour', 'stop_id'])[list_bins].apply(lambda x : x.astype(float).sum())\n", - "print(join_df.sum(axis=0))\n", - "join_df.head()" + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(17321, 32)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
0123456789...22232425262728293031
stop_idhourroute_desc
85009268.0Bus0233444444...4444444444
9.0Bus0122222222...2222222223
10.0Bus0011111222...2222222222
11.0Bus0011122222...2222222222
\n", + "

4 rows × 32 columns

\n", + "
" + ], + "text/plain": [ + " 0 1 2 3 4 5 6 7 8 9 ... 22 23 \\\n", + "stop_id hour route_desc ... \n", + "8500926 8.0 Bus 0 2 3 3 4 4 4 4 4 4 ... 4 4 \n", + " 9.0 Bus 0 1 2 2 2 2 2 2 2 2 ... 2 2 \n", + " 10.0 Bus 0 0 1 1 1 1 1 2 2 2 ... 2 2 \n", + " 11.0 Bus 0 0 1 1 1 2 2 2 2 2 ... 2 2 \n", + "\n", + " 24 25 26 27 28 29 30 31 \n", + "stop_id hour route_desc \n", + "8500926 8.0 Bus 4 4 4 4 4 4 4 4 \n", + " 9.0 Bus 2 2 2 2 2 2 2 3 \n", + " 10.0 Bus 2 2 2 2 2 2 2 2 \n", + " 11.0 Bus 2 2 2 2 2 2 2 2 \n", + "\n", + "[4 rows x 32 columns]" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with gzip.open(\"../data/stop_times_wHour.pkl\", \"rb\") as input_file:\n", + " stoptimes = pickle.load(input_file)\n", + " \n", + "distrib_df = pd.DataFrame(d_all).transpose()\n", + "distrib_to_rm = np.array(distrib_df.iloc[:,range(11)].sum(axis=1) == 11) # missing trips\n", + "distrib_df = distrib_df.iloc[~distrib_to_rm,:]\n", + "\n", + "stoptimes_df = pd.DataFrame(stoptimes)\n", + "\n", + "recovery_df = distrib_df.join(stoptimes_df)\n", + "list_bins = [x for x in range(32)]\n", + "\n", + "recovery_df = recovery_df.groupby(['stop_id','hour', 'route_desc'])[list_bins].apply(lambda x : x.astype(float).sum())\n", + "recovery_df = recovery_df.astype('int')\n", + "print(recovery_df.shape)\n", + "recovery_df.head(4)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "def plot_df_missing(df, max_bin = 10000):\n", + " tot_per_key = np.array(df.sum(axis=1)).astype('int')\n", + " binwidth = 100\n", + " n_keys_less_than_binwidth = np.sum(np.array(tot_per_key < binwidth))\n", + " perc_key_to_recover = round(100 * ( n_keys_less_than_binwidth / len(tot_per_key) ), 2)\n", + " plt.figure(figsize = (10,5))\n", + " plt.hist(tot_per_key, bins = range(min(tot_per_key), max_bin + binwidth, binwidth))\n", + " plt.title(\"Total number of data points per stop_id / hour key. N keys with less than {0} points: {1} ({2}%)\"\\\n", + " .format(binwidth, n_keys_less_than_binwidth, perc_key_to_recover))\n", + " plt.xlabel('n data points')\n", + " plt.ylabel('n keys')\n", + " return plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "\n", + "text/plain": [ + "
" + ] + }, + "metadata": { + "needs_background": "light" + }, + "output_type": "display_data" + } + ], + "source": [ + "plot_df_missing(recovery_df)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
0123456789...22232425262728293031
stop_idhourroute_desc
85009268.0Bus0233444444...4444444444
9.0Bus0122222222...2222222223
10.0Bus0011111222...2222222222
11.0Bus0011122222...2222222222
12.0Bus0011122222...2222222222
13.0Bus0011122222...2222222222
14.0Bus0011222222...2222222222
15.0Bus0011222222...2222222222
16.0Bus0133333444...4444444444
17.0Bus0133333333...4444444444
18.0Bus0233333333...4444444444
19.0Bus0222233333...3333333333
85021868.0S-Bahn0367777777...8888888888
9.0S-Bahn0477777777...8888888888
10.0S-Bahn0477777777...7777788888
11.0S-Bahn0477777777...7777777778
12.0S-Bahn0377777777...7777777778
13.0S-Bahn0377777777...8888888888
14.0S-Bahn0377777777...8888888888
15.0S-Bahn0377777777...7777777788
\n", + "

20 rows × 32 columns

\n", + "
" + ], + "text/plain": [ + " 0 1 2 3 4 5 6 7 8 9 ... 22 23 \\\n", + "stop_id hour route_desc ... \n", + "8500926 8.0 Bus 0 2 3 3 4 4 4 4 4 4 ... 4 4 \n", + " 9.0 Bus 0 1 2 2 2 2 2 2 2 2 ... 2 2 \n", + " 10.0 Bus 0 0 1 1 1 1 1 2 2 2 ... 2 2 \n", + " 11.0 Bus 0 0 1 1 1 2 2 2 2 2 ... 2 2 \n", + " 12.0 Bus 0 0 1 1 1 2 2 2 2 2 ... 2 2 \n", + " 13.0 Bus 0 0 1 1 1 2 2 2 2 2 ... 2 2 \n", + " 14.0 Bus 0 0 1 1 2 2 2 2 2 2 ... 2 2 \n", + " 15.0 Bus 0 0 1 1 2 2 2 2 2 2 ... 2 2 \n", + " 16.0 Bus 0 1 3 3 3 3 3 4 4 4 ... 4 4 \n", + " 17.0 Bus 0 1 3 3 3 3 3 3 3 3 ... 4 4 \n", + " 18.0 Bus 0 2 3 3 3 3 3 3 3 3 ... 4 4 \n", + " 19.0 Bus 0 2 2 2 2 3 3 3 3 3 ... 3 3 \n", + "8502186 8.0 S-Bahn 0 3 6 7 7 7 7 7 7 7 ... 8 8 \n", + " 9.0 S-Bahn 0 4 7 7 7 7 7 7 7 7 ... 8 8 \n", + " 10.0 S-Bahn 0 4 7 7 7 7 7 7 7 7 ... 7 7 \n", + " 11.0 S-Bahn 0 4 7 7 7 7 7 7 7 7 ... 7 7 \n", + " 12.0 S-Bahn 0 3 7 7 7 7 7 7 7 7 ... 7 7 \n", + " 13.0 S-Bahn 0 3 7 7 7 7 7 7 7 7 ... 8 8 \n", + " 14.0 S-Bahn 0 3 7 7 7 7 7 7 7 7 ... 8 8 \n", + " 15.0 S-Bahn 0 3 7 7 7 7 7 7 7 7 ... 7 7 \n", + "\n", + " 24 25 26 27 28 29 30 31 \n", + "stop_id hour route_desc \n", + "8500926 8.0 Bus 4 4 4 4 4 4 4 4 \n", + " 9.0 Bus 2 2 2 2 2 2 2 3 \n", + " 10.0 Bus 2 2 2 2 2 2 2 2 \n", + " 11.0 Bus 2 2 2 2 2 2 2 2 \n", + " 12.0 Bus 2 2 2 2 2 2 2 2 \n", + " 13.0 Bus 2 2 2 2 2 2 2 2 \n", + " 14.0 Bus 2 2 2 2 2 2 2 2 \n", + " 15.0 Bus 2 2 2 2 2 2 2 2 \n", + " 16.0 Bus 4 4 4 4 4 4 4 4 \n", + " 17.0 Bus 4 4 4 4 4 4 4 4 \n", + " 18.0 Bus 4 4 4 4 4 4 4 4 \n", + " 19.0 Bus 3 3 3 3 3 3 3 3 \n", + "8502186 8.0 S-Bahn 8 8 8 8 8 8 8 8 \n", + " 9.0 S-Bahn 8 8 8 8 8 8 8 8 \n", + " 10.0 S-Bahn 7 7 7 8 8 8 8 8 \n", + " 11.0 S-Bahn 7 7 7 7 7 7 7 8 \n", + " 12.0 S-Bahn 7 7 7 7 7 7 7 8 \n", + " 13.0 S-Bahn 8 8 8 8 8 8 8 8 \n", + " 14.0 S-Bahn 8 8 8 8 8 8 8 8 \n", + " 15.0 S-Bahn 7 7 7 7 7 7 8 8 \n", + "\n", + "[20 rows x 32 columns]" + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "recovery_df.head(20)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Make second recovery table\n", + "\n", + "Here only taking combination of `transport_type x hour`" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(127, 32)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
0123456789...22232425262728293031
hourroute_desc
7.0Bus057991010101111...11121212121212121212
InterRegio0000011111...2222222222
Intercity0000011111...2222222222
S-Bahn0356788899...99999910101010
\n", + "

4 rows × 32 columns

\n", + "
" + ], + "text/plain": [ + " 0 1 2 3 4 5 6 7 8 9 ... 22 23 24 25 \\\n", + "hour route_desc ... \n", + "7.0 Bus 0 5 7 9 9 10 10 10 11 11 ... 11 12 12 12 \n", + " InterRegio 0 0 0 0 0 1 1 1 1 1 ... 2 2 2 2 \n", + " Intercity 0 0 0 0 0 1 1 1 1 1 ... 2 2 2 2 \n", + " S-Bahn 0 3 5 6 7 8 8 8 9 9 ... 9 9 9 9 \n", + "\n", + " 26 27 28 29 30 31 \n", + "hour route_desc \n", + "7.0 Bus 12 12 12 12 12 12 \n", + " InterRegio 2 2 2 2 2 2 \n", + " Intercity 2 2 2 2 2 2 \n", + " S-Bahn 9 9 10 10 10 10 \n", + "\n", + "[4 rows x 32 columns]" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with gzip.open(\"../data/stop_times_wHour.pkl\", \"rb\") as input_file:\n", + " stoptimes = pickle.load(input_file)\n", + " \n", + "distrib_df = pd.DataFrame(d_all).transpose()\n", + "distrib_to_rm = np.array(distrib_df.iloc[:,range(11)].sum(axis=1) == 11) # missing trips\n", + "distrib_df = distrib_df.iloc[~distrib_to_rm,:]\n", + "\n", + "stoptimes_df = pd.DataFrame(stoptimes)\n", + "\n", + "recovery_df2 = distrib_df.join(stoptimes_df)\n", + "list_bins = [x for x in range(32)]\n", + "\n", + "recovery_df2 = recovery_df2.groupby(['hour', 'route_desc'])[list_bins].apply(lambda x : x.astype(float).sum())\n", + "recovery_df2 = recovery_df2.astype('int')\n", + "print(recovery_df2.shape)\n", + "recovery_df2.head(4)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Last recovery table \n", + "\n", + "Takes only transport type distribution" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(11, 32)\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
0123456789...22232425262728293031
route_desc
Bus15725921197405115766124269128687131397133346134908136278...137998138003138007138012138014138016138018138021138023138087
Eurocity0001111222...3333333333
InterRegio3374107141174207240273306339...371371371372372372372372372372
Intercity9192939495969798999...109109109109109109109109109109
\n", + "

4 rows × 32 columns

\n", + "
" + ], + "text/plain": [ + " 0 1 2 3 4 5 6 7 \\\n", + "route_desc \n", + "Bus 1572 59211 97405 115766 124269 128687 131397 133346 \n", + "Eurocity 0 0 0 1 1 1 1 2 \n", + "InterRegio 33 74 107 141 174 207 240 273 \n", + "Intercity 9 19 29 39 49 59 69 79 \n", + "\n", + " 8 9 ... 22 23 24 25 26 \\\n", + "route_desc ... \n", + "Bus 134908 136278 ... 137998 138003 138007 138012 138014 \n", + "Eurocity 2 2 ... 3 3 3 3 3 \n", + "InterRegio 306 339 ... 371 371 371 372 372 \n", + "Intercity 89 99 ... 109 109 109 109 109 \n", + "\n", + " 27 28 29 30 31 \n", + "route_desc \n", + "Bus 138016 138018 138021 138023 138087 \n", + "Eurocity 3 3 3 3 3 \n", + "InterRegio 372 372 372 372 372 \n", + "Intercity 109 109 109 109 109 \n", + "\n", + "[4 rows x 32 columns]" + ] + }, + "execution_count": 20, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "with gzip.open(\"../data/stop_times_wHour.pkl\", \"rb\") as input_file:\n", + " stoptimes = pickle.load(input_file)\n", + " \n", + "distrib_df = pd.DataFrame(d_all).transpose()\n", + "distrib_to_rm = np.array(distrib_df.iloc[:,range(11)].sum(axis=1) == 11) # missing trips\n", + "distrib_df = distrib_df.iloc[~distrib_to_rm,:]\n", + "\n", + "stoptimes_df = pd.DataFrame(stoptimes)\n", + "\n", + "recovery_df3 = distrib_df.join(stoptimes_df)\n", + "list_bins = [x for x in range(32)]\n", + "\n", + "recovery_df3 = recovery_df3.groupby(['route_desc'])[list_bins].apply(lambda x : x.astype(float).sum())\n", + "recovery_df3 = recovery_df3.astype('int')\n", + "print(recovery_df3.shape)\n", + "recovery_df3.head(4)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Reconstruct cumulative distribution probabilities from multiple distributions to recover data with few/missing points \n", + "\n", + "To recover missing or faulty data, the strategy is the following :\n", + "1. If we have more than 100 data points in `real` group, we rely exclusively on it to compute probabilities for a given transfer on a `trip_id x stop_id` \n", + " - `real` group : the delay was calculated with actual arrival time with status `geschaetz` or `real`, meaning it comes from actual measurments.\n", + "2. If we do not find enough data within `real` group, we look at distributions in `all` group (contains all delays including `prognose` status) to compute probabilities, if there is more than 100 data points for a given `trip_id x stop_id`.\n", + "3. If `all` group still does not have more than 100 data points, we rely on `recovery tables` to estimate delay distributions. The strategy is the following :\n", + " - As we will always know the `stop_id`, the `time` and the `transport_type`, we rely on arrival delays from aggregated values of similar transfer. \n", + " - First, we compute a table of distribution with all possible combination of `stop_id`, `time` (round to hours) and `transport_type`, and aggregate all the counts we have to compute cumulative distribution probabilities. \n", + " - Is there is less than 100 data points in one of these intersections, we use the last possibilities : a table with `transport_type` x `time` aggregate counts.\n", + " - The last values with no match are given the overall average of cumulative distribution probabilities for each `transport_type` with no limit for the minimum number of data points.\n", + "\n", + "Following this approach, we can find cumulative distribution probabilities for every combination of `trip_id x stop_id` as defined in `stop_times_df`. We will make a table with the same row order so that McRaptor can easily find their indexes. \n", + "\n", + "In order to do that, we have two dictionnaries of distributions and two recovery dataframes :\n", + " - `df_real` : contains delay distribution for each keys in form `trip_id + __ + stop_id` calculated from delays with status `geschaetz` or `real` in sbb datasets.\n", + " - `df_all` : contains delay distributions for each keys in form `trip_id + __ + stop_id`. No filter was applied on status (contains `geschaetz`, `real` __and__ `prognose` = evaluated delay).\n", + " - `recovery_df` : contains aggregated delay distributions for each combination of `stop_id`, `route_desc` (transport type) and `hour` (time rounded to hour). \n", + " - `recovery_df2` : contains aggregated delay distributions for each combination of `route_desc` (transport type) and `hour` (time rounded to hour). \n", + " - `recovery_df3` : contains aggregated delay distributions for `route_desc` (transport type) " + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
route_inttrip_intstop_intstop_sequencearrival_timedeparture_timeroute_idtrip_idstop_idroute_descstop_id_rawsequence_shift_1
00001NaT2020-05-21 07:18:0030-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8502208Bus85022082
100122020-05-21 07:23:002020-05-21 07:23:0030-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8502209Bus85022093
200232020-05-21 07:29:00NaT30-57-Y-j19-14.TA.30-57-Y-j19-1.1.H8503202Bus85032021
\n", + "
" + ], + "text/plain": [ + " route_int trip_int stop_int stop_sequence arrival_time \\\n", + "0 0 0 0 1 NaT \n", + "1 0 0 1 2 2020-05-21 07:23:00 \n", + "2 0 0 2 3 2020-05-21 07:29:00 \n", + "\n", + " departure_time route_id trip_id stop_id \\\n", + "0 2020-05-21 07:18:00 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8502208 \n", + "1 2020-05-21 07:23:00 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8502209 \n", + "2 NaT 30-57-Y-j19-1 4.TA.30-57-Y-j19-1.1.H 8503202 \n", + "\n", + " route_desc stop_id_raw sequence_shift_1 \n", + "0 Bus 8502208 2 \n", + "1 Bus 8502209 3 \n", + "2 Bus 8503202 1 " + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "###################### MAKE CUMULATIVE PROBABILITY TABLE #######################\n", + "\n", + "# Load stop_time table, to use its order as a template for our final table \n", + "with open(\"../data/stop_times_df.pkl\", \"rb\") as input_file:\n", + " stoptimes = pickle.load(input_file)\n", + " \n", + "stoptimes.head(3)" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0.0%, 4.07%, 8.14%, 12.21%, 16.28%, 20.35%, 24.42%, 28.49%, 32.55%, 36.62%, 40.69%, 44.76%, 48.83%, 52.9%, 56.97%, 61.04%, 65.11%, 69.18%, 73.25%, 77.32%, 81.39%, 85.46%, 89.53%, 93.6%, 97.66%, " + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
keykey_inttrip_idstop_idtransport_typehourdistribution
04.TA.30-57-Y-j19-1.1.H__850220804.TA.30-57-Y-j19-1.1.H8502208Bus7[0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11...
14.TA.30-57-Y-j19-1.1.H__850220914.TA.30-57-Y-j19-1.1.H8502209Bus7[0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11...
24.TA.30-57-Y-j19-1.1.H__850320224.TA.30-57-Y-j19-1.1.H8503202Bus7[0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11...
35.TA.30-57-Y-j19-1.1.H__850220835.TA.30-57-Y-j19-1.1.H8502208Bus7[0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11...
45.TA.30-57-Y-j19-1.1.H__850220945.TA.30-57-Y-j19-1.1.H8502209Bus7[0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11...
\n", + "
" + ], + "text/plain": [ + " key key_int trip_id stop_id \\\n", + "0 4.TA.30-57-Y-j19-1.1.H__8502208 0 4.TA.30-57-Y-j19-1.1.H 8502208 \n", + "1 4.TA.30-57-Y-j19-1.1.H__8502209 1 4.TA.30-57-Y-j19-1.1.H 8502209 \n", + "2 4.TA.30-57-Y-j19-1.1.H__8503202 2 4.TA.30-57-Y-j19-1.1.H 8503202 \n", + "3 5.TA.30-57-Y-j19-1.1.H__8502208 3 5.TA.30-57-Y-j19-1.1.H 8502208 \n", + "4 5.TA.30-57-Y-j19-1.1.H__8502209 4 5.TA.30-57-Y-j19-1.1.H 8502209 \n", + "\n", + " transport_type hour distribution \n", + "0 Bus 7 [0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11... \n", + "1 Bus 7 [0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11... \n", + "2 Bus 7 [0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11... \n", + "3 Bus 7 [0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11... \n", + "4 Bus 7 [0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11... " + ] + }, + "execution_count": 23, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "summary_df = pd.DataFrame(columns = ['key', 'key_int', 'trip_id', 'stop_id', 'transport_type', 'hour', 'distribution'])\n", + "n_fail = 0\n", + "size_stop_times = stoptimes.shape[0]\n", + "n_real = 0\n", + "n_all = 0\n", + "n_recov1 = 0\n", + "n_recov2 = 0\n", + "n_recov3 = 0\n", + "\n", + "for index, row in stoptimes.iterrows():\n", + " \n", + " trip_id = row[7]\n", + " stop_id = str(row[8])[:7]\n", + " transport_type = row[9]\n", + " key = trip_id + '__' + stop_id\n", + "\n", + " # Compute rounded hour using arrival if possible - recover with departure\n", + " hour = pd.to_datetime(stoptimes.loc[index]['arrival_time']).hour\n", + " if math.isnan(hour): # if arrival is NaT, use departure time\n", + " hour = pd.to_datetime(stoptimes.loc[index]['departure_time']).hour\n", + " \n", + " distrib = np.zeros(31)\n", + " keep_trying = True\n", + " \n", + " # 1) try d_real to get distribution from measured delays\n", + " if key in d_real:\n", + " distrib = d_real[key]\n", + " sum_distrib = np.sum(distrib)\n", + " if sum_distrib > 100 :\n", + " summary_df.loc[index, 'distribution'] = distrib\n", + " keep_trying = False \n", + " n_real += 1\n", + " \n", + " # 2) try d_all to get distribution from measured + estimated delays\n", + " if keep_trying and key in d_all:\n", + " distrib = d_all[key]\n", + " sum_distrib = np.sum(distrib)\n", + " if sum_distrib > 100 :\n", + " summary_df.loc[index, 'distribution'] = distrib\n", + " keep_trying = False\n", + " n_all += 1\n", + "\n", + " # 3) try first recovery table using stop_id, transport_type and hour\n", + " if keep_trying and (stop_id, hour, transport_type) in recovery_df.index:\n", + " distrib = np.array(recovery_df.loc[(stop_id, hour, transport_type)])\n", + " sum_distrib = np.sum(distrib)\n", + " if sum_distrib > 100 :\n", + " summary_df.loc[index, 'distribution'] = distrib\n", + " keep_trying = False \n", + " n_recov1 += 1\n", + " \n", + " # 4) use second recovery table using transport_type and hour \n", + " if keep_trying and (hour, transport_type) in recovery_df2.index:\n", + " distrib = np.array(recovery_df2.loc[(hour, transport_type)])\n", + " sum_distrib = np.sum(distrib)\n", + " if sum_distrib > 100 :\n", + " summary_df.loc[index, 'distribution'] = distrib\n", + " keep_trying = False \n", + " n_recov2 += 1\n", + " \n", + " # 5) use third recovery table using transport_type only \n", + " if keep_trying and (transport_type) in recovery_df3.index:\n", + " distrib = np.array(recovery_df3.loc[(transport_type)])\n", + " sum_distrib = np.sum(distrib)\n", + " summary_df.loc[index, 'distribution'] = distrib\n", + " keep_trying = False \n", + " n_recov3 += 1\n", + " \n", + " # Record results in summary\n", + " summary_df.loc[index, 'key'] = key\n", + " summary_df.loc[index, 'key_int'] = index\n", + " summary_df.loc[index, 'trip_id'] = trip_id\n", + " summary_df.loc[index, 'stop_id'] = stop_id\n", + " summary_df.loc[index, 'transport_type'] = transport_type\n", + " summary_df.loc[index, 'hour'] = hour\n", + "\n", + " # save number of failure for recovery\n", + " if keep_trying:\n", + " print('fail{}'.format(index), end = ', ')\n", + " n_fail += 1 \n", + " \n", + " # print progression \n", + " if (index % 10000) == 0 :\n", + " print('{}%'.format(round(100*index/size_stop_times,2)), end = ', ')\n", + " \n", + "summary_df.head()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [], + "source": [ + "# Load stop_time table, to use its order as a template for our final table \n", + "with gzip.open(\"../data/join_distribution_all.pkl.gz\", \"wb\") as out_file:\n", + " pickle.dump(summary_df, out_file)" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([ 0, 5, 7, 9, 9, 10, 10, 10, 11, 11, 11, 11, 11, 11, 11, 11, 11,\n", + " 11, 11, 11, 11, 11, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12])" + ] + }, + "execution_count": 28, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "summary_df['distribution'][0]" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [], + "source": [ + "list_all_rows = []\n", + "for index, row in summary_df.iterrows():\n", + " distrib = np.array(row['distribution'])\n", + " \n", + " # get total number of elements \n", + " N = np.sum(distrib)\n", + " \n", + " # make cumulative distribution probabilities\n", + " cdf_distrib = np.empty((len(distrib)), dtype=float)\n", + " save_x = 0\n", + " for x in range(len(distrib)):\n", + " cdf_distrib[x] = float(distrib[x])/float(N) + float(save_x)/float(N)\n", + " save_x += distrib[x]\n", + " \n", + " list_all_rows.append(cdf_distrib)" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([[0. , 0.01501502, 0.03603604, 0.06306306, 0.09009009,\n", + " 0.12012012, 0.15015015, 0.18018018, 0.21321321, 0.24624625,\n", + " 0.27927928, 0.31231231, 0.34534535, 0.37837838, 0.41141141,\n", + " 0.44444444, 0.47747748, 0.51051051, 0.54354354, 0.57657658,\n", + " 0.60960961, 0.64264264, 0.67567568, 0.71171171, 0.74774775,\n", + " 0.78378378, 0.81981982, 0.85585586, 0.89189189, 0.92792793,\n", + " 0.96396396, 1. ],\n", + " [0. , 0.01501502, 0.03603604, 0.06306306, 0.09009009,\n", + " 0.12012012, 0.15015015, 0.18018018, 0.21321321, 0.24624625,\n", + " 0.27927928, 0.31231231, 0.34534535, 0.37837838, 0.41141141,\n", + " 0.44444444, 0.47747748, 0.51051051, 0.54354354, 0.57657658,\n", + " 0.60960961, 0.64264264, 0.67567568, 0.71171171, 0.74774775,\n", + " 0.78378378, 0.81981982, 0.85585586, 0.89189189, 0.92792793,\n", + " 0.96396396, 1. ],\n", + " [0. , 0.01501502, 0.03603604, 0.06306306, 0.09009009,\n", + " 0.12012012, 0.15015015, 0.18018018, 0.21321321, 0.24624625,\n", + " 0.27927928, 0.31231231, 0.34534535, 0.37837838, 0.41141141,\n", + " 0.44444444, 0.47747748, 0.51051051, 0.54354354, 0.57657658,\n", + " 0.60960961, 0.64264264, 0.67567568, 0.71171171, 0.74774775,\n", + " 0.78378378, 0.81981982, 0.85585586, 0.89189189, 0.92792793,\n", + " 0.96396396, 1. ],\n", + " [0. , 0.01501502, 0.03603604, 0.06306306, 0.09009009,\n", + " 0.12012012, 0.15015015, 0.18018018, 0.21321321, 0.24624625,\n", + " 0.27927928, 0.31231231, 0.34534535, 0.37837838, 0.41141141,\n", + " 0.44444444, 0.47747748, 0.51051051, 0.54354354, 0.57657658,\n", + " 0.60960961, 0.64264264, 0.67567568, 0.71171171, 0.74774775,\n", + " 0.78378378, 0.81981982, 0.85585586, 0.89189189, 0.92792793,\n", + " 0.96396396, 1. ],\n", + " [0. , 0.01501502, 0.03603604, 0.06306306, 0.09009009,\n", + " 0.12012012, 0.15015015, 0.18018018, 0.21321321, 0.24624625,\n", + " 0.27927928, 0.31231231, 0.34534535, 0.37837838, 0.41141141,\n", + " 0.44444444, 0.47747748, 0.51051051, 0.54354354, 0.57657658,\n", + " 0.60960961, 0.64264264, 0.67567568, 0.71171171, 0.74774775,\n", + " 0.78378378, 0.81981982, 0.85585586, 0.89189189, 0.92792793,\n", + " 0.96396396, 1. ]])" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "final_df = pd.DataFrame(list_all_rows)\n", + "final_df.index = summary_df.index\n", + "final_np = final_df.to_numpy()\n", + "final_np[0:5,:]" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "sum(np.array(final_df.index == stoptimes.index)) == stoptimes.shape[0]" + ] + }, + { + "cell_type": "code", + "execution_count": 104, + "metadata": {}, + "outputs": [], + "source": [ + "# write recovery table \n", + "with gzip.open(\"../data/distrib_recov_tab_stopID_hour.pkl.gz\", \"wb\") as output_file:\n", + " pickle.dump(recovery_tab, output_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Poisson cumulative distribution\n", "\n", "The Poisson distribution is popular for modeling the number of times an event occurs in an interval of time or space. We modeled a poisson distribution for delays assuming parameter $k$ is the time in minutes (as it was done [here](https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0126137), formulas $(4),(5),(6)$).\n", "\n", "A discrete random variable X is said to have a Poisson distribution with parameter λ > 0, if, for k = 0, 1, 2, ..., the probability mass function of X is given by:\n", "\n", "$${\\displaystyle \\!f(k;\\lambda )=\\Pr(X=k)={\\frac {\\lambda ^{k}e^{-\\lambda }}{k!}},}$$\n", "where\n", "\n", "e is Euler's number (e = 2.71828...)\n", "k! is the factorial of k.\n", "The positive real number λ is equal to the expected value of X __and__ to its variance.\n", "\n", "$${\\displaystyle \\lambda =\\operatorname {E} (X)=\\operatorname {Var} (X)}$$\n", "\n", "We can approximate E[𝑋]∼$\\mu_i$ for our data $X_i$, if we assume the sample $X_i$ of size N follow the distribution of $X$ meaning $X_i$∼$X$.\n", "\n", "Poisson-related __assumptions__ :\n", "- $k$ is the __delay time in minutes__ and can take values 0, 1, 2, ... (strictly positive and discrete)\n", "- We assume our sampling $X_i$ of $X$ is good enough to approximate E[X] ~ $\\mu_i$\n", "- The occurrence of one event does not affect probability of others. That is, events occur independently.\n", " - __We assume being late one day is not affecting the delay of the day after__ \n", "- The average rate at which events occur is independent of any occurrences. For simplicity, this is usually assumed to be constant, but may in practice vary with time.\n", " - __we assumes delays occurs with a constant rate over time__\n", "- Two events cannot occur at exactly the same instant\n", "\n", "We made a function `poisson_proba` that takes a `trip_id`, a `stop_id`, an `arrival time` and a `departure time` and a dictionnary {key : distribution} to compute a __probability to be at least 2 minutes before departure of next trip__. \n", "\n", "We make a few __assumptions__ on our side :\n", "- We assume that if we have less than 2 minutes for the transfer, we miss it.\n", "- We assume the next train is on time.\n", "- As for poisson distribution $k$ is strictly positive, we assume trains ahead of schedule were on time ($k=0$)\n", "\n", "\n", "_Question we should address :_\n", "- _Is the poisson a reasonable approximation of the binomial distribution in our case ?_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's first test the poisson distribution and compare it with our distribution to see how well it fits the data. We will compute $Pr(X = k)$ for each values of k and look at the shape of the poisson distribution compared to the shape of our scaled data. Then, we will compare $\\sum_{k=0}^T Pr(X = k)$ with the cumulative distribution function which directly gives $Pr(k \\leq X)$" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "Invalid status code '404' from http://iccluster044.iccluster.epfl.ch:8998/sessions/6821 with error payload: \"Session '6821' not found.\"\n" ] } ], "source": [ "################################# POISSON FIT TEST #########################################\n", "\n", "# to do .. \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here are all the functions needed to calculate probability of success for a given transfer. We need the `trip_id`, `stop_id`, `departure_time`, `arrival_time` and dictionnary `d` (pickled load at the beginning of the cell) to be able to compute a probability of success with following function : \n", "\n", "`poisson_proba(trip_id, stop_id, arrival_time, departure_time, d)`" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "lambda (expectation given distribution): 1.0194769059543685 \n", "\n", "Probability of success for transfer time = 13.0 minutes : 0.999999999994185\n" ] } ], "source": [ "%local\n", "################################# POISSON FUNCTIONS ########################################\n", "\n", "import pickle \n", "import gzip\n", "import time\n", "import math \n", "import datetime\n", "import time\n", "from scipy.stats import poisson\n", "\n", "# Load dictionnary\n", "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", " d = pickle.load(input_file)\n", "\n", "# Load dictionnary\n", "with open(\"../data/stop_times_array.pkl\", \"rb\") as input_file:\n", " times = pickle.load(input_file)\n", "\n", "# we take two exemple time in format numpy.datetime64\n", "arr_time = times[4][1]\n", "dep_time = times[0][1]\n", "\n", "# Load distribution in dictinonary given a key\n", "def get_distrib(key, dico):\n", " if key in dico:\n", " return dico[key]\n", " else:\n", " raise ValueError(\"KEY ERROR: {} not found un distribution dictionnary\".format(key))\n", " \n", "# Evaluate lambda parameter assuming it is equal to average \n", "def evaluate_lamda(distrib):\n", " # First calculate total number of measures N\n", " N = 0 # by starting at -1 we ignore trains ahead of schedule\n", " for x in distrib:\n", " N += x\n", "\n", " lambda_p = 0 # expectation - we want to calculate it\n", " t = -1 # time = index - 1\n", "\n", " for x in distrib:\n", " if t>0:\n", " lambda_p += t*x\n", " t += 1\n", "\n", " # calculate lambda - the expectation of x\n", " if N > 0:\n", " lambda_p /= N \n", " print('lambda (expectation given distribution): ',lambda_p, '\\n')\n", " return lambda_p\n", " else : \n", " raise ValueError(\"ERROR : {} distribution has 0 counts\".format(key))\n", " #print('Returning 1 to avoid later problem... \\n')\n", " return 1\n", "\n", "# process time given as string in format 'hh:mm' - not needed\n", "def process_time_str(str_time):\n", " x = time.strptime(str_time,'%H:%M')\n", " return datetime.timedelta(hours=x.tm_hour,minutes=x.tm_min,seconds=x.tm_sec).total_seconds()\n", "\n", "# Calculate transfer time given two times in string format 'hh:mm'\n", "def get_transfer_time(arr_time, dep_time, delta=2.0):\n", " diff_time_min = (arr_time - dep_time).astype('timedelta64[m]') / np.timedelta64(1, 'm')\n", " return diff_time_min - delta\n", "\n", "# Calculate poisson probability of success for a given transfert \n", "# for a given trip_id, stop_id, arrival/departure times and dict\n", "def poisson_proba(trip_id, stop_id, arr_time, dep_time, dico):\n", " # Generate key from trip_id / stop_id \n", " key = str(trip_id) + '__' + str(stop_id[0:7]) # 7 first char to be sbb-compatible\n", "\n", " # Get distribution from dictionnary\n", " distrib = get_distrib(key, dico)\n", " \n", " # Calculate transfer time at disposal \n", " T = get_transfer_time(arr_time, dep_time)\n", " \n", " # Get lambda value to calculate proba\n", " lambda_p = evaluate_lamda(distrib)\n", "\n", " # Get proba\n", " if T > 2:\n", " poisson_p = poisson.cdf(T, lambda_p)\n", " else : \n", " poisson_p = 0.0 # if we have less than 2 minutes, we miss it\n", " \n", " print('Probability of success for transfer time = {} minutes : '.format(T),poisson_p)\n", " return poisson_p\n", "\n", "# Mock exemple of probability calculations with given inputs\n", "trip_id = '1286.TA.26-32-j19-1.12.H'\n", "stop_id = '8591184'\n", "\n", "# we take two exemple time from stop_times_array in format numpy.datetime64\n", "arr_time = times[3][1]\n", "dep_time = times[0][1]\n", "\n", "Pr = poisson_proba(trip_id, stop_id, arr_time, dep_time, d)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 } diff --git a/notebooks/probabilities.ipynb b/notebooks/probabilities.ipynb index 915b88a..faf0c2e 100644 --- a/notebooks/probabilities.ipynb +++ b/notebooks/probabilities.ipynb @@ -1,2251 +1,2476 @@ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Make distribution tables to calculate probabilities of transfer\n", "\n", "
Any application without a proper name would be promptly killed.
" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "Current session configs: {'conf': {'spark.app.name': 'lgptguys_final'}, 'kind': 'pyspark'}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "\n", + "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
7272application_1589299642358_1768pysparkidleLinkLink
7292application_1589299642358_1788pysparkbusyLinkLink
7326application_1589299642358_1822pysparkidleLinkLink
7369application_1589299642358_1865pysparkidleLinkLink
7388application_1589299642358_1884pysparkidleLinkLink
7393application_1589299642358_1889pysparkidleLinkLink
7398application_1589299642358_1894pysparkidleLinkLink
7407application_1589299642358_1903pysparkidleLinkLink
7412application_1589299642358_1908pysparkbusyLinkLink
7415application_1589299642358_1911pysparkidleLinkLink
7418application_1589299642358_1914pysparkidleLinkLink
7420application_1589299642358_1916pysparkbusyLinkLink
7421application_1589299642358_1917pysparkidleLinkLink
7422application_1589299642358_1918pysparkbusyLinkLink
7423application_1589299642358_1919pysparkidleLinkLink
7424application_1589299642358_1920pysparkidleLinkLink
7426application_1589299642358_1922pysparkidleLinkLink
7427application_1589299642358_1923pysparkidleLinkLink
7428application_1589299642358_1924pysparkbusyLinkLink
7429application_1589299642358_1925pysparkidleLinkLink
7431application_1589299642358_1927pysparkidleLinkLink
7433application_1589299642358_1929pysparkidleLinkLink
7434application_1589299642358_1930pysparkidleLinkLink
7435application_1589299642358_1931pysparkbusyLinkLink
7437application_1589299642358_1933pysparkidleLinkLink
7438application_1589299642358_1934pysparkidleLinkLink
7440application_1589299642358_1936pysparkidleLinkLink
7441application_1589299642358_1937pysparkidleLinkLink
7443application_1589299642358_1939pysparkidleLinkLink
7444application_1589299642358_1940pysparkidleLinkLink
7445application_1589299642358_1941pysparkidleLinkLink
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], "source": [ "%%configure\n", "{\"conf\": {\n", " \"spark.app.name\": \"lgptguys_final\"\n", "}}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Spark" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Initialization\n", - "%%spark" - ] - }, - { - "cell_type": "code", - "execution_count": 57, + "execution_count": 2, "metadata": {}, "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting Spark application\n" + ] + }, + { + "data": { + "text/html": [ + "\n", + "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
7446application_1589299642358_1942pysparkidleLinkLink
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "Successfully passed 'username' as 'username' to Spark kernel" + "SparkSession available as 'spark'.\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "An error was encountered:\n", + "unknown magic command '%spark'\n", + "UnknownMagic: unknown magic command '%spark'\n", + "\n" + ] + } + ], + "source": [ + "# Initialization\n", + "%%spark" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "An error was encountered:\n", + "Variable named username not found.\n" ] } ], "source": [ "%%send_to_spark -i username -t str -n username" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import useful libraries " ] }, { "cell_type": "code", - "execution_count": 58, + "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from geopy.distance import great_circle\n", "from pyspark.sql.functions import *\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read TimeTable data for routes / trips " ] }, { "cell_type": "code", - "execution_count": 59, + "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+-----------+--------+---------------------------+-------+\n", - "|stop_id_raw|stop_int|stop_name |stop_id|\n", - "+-----------+--------+---------------------------+-------+\n", - "|8500926 |0 |Oetwil a.d.L., Schweizäcker|8500926|\n", - "|8502186 |1 |Dietikon Stoffelbach |8502186|\n", - "|8502186:0:1|2 |Dietikon Stoffelbach |8502186|\n", - "|8502186:0:2|3 |Dietikon Stoffelbach |8502186|\n", - "|8502186P |4 |Dietikon Stoffelbach |8502186|\n", - "+-----------+--------+---------------------------+-------+\n", + "+-----------+---------------------------+-------+\n", + "|stop_id_raw|stop_name |stop_id|\n", + "+-----------+---------------------------+-------+\n", + "|8500926 |Oetwil a.d.L., Schweizäcker|8500926|\n", + "|8502186 |Dietikon Stoffelbach |8502186|\n", + "|8502186:0:1|Dietikon Stoffelbach |8502186|\n", + "|8502186:0:2|Dietikon Stoffelbach |8502186|\n", + "|8502186P |Dietikon Stoffelbach |8502186|\n", + "+-----------+---------------------------+-------+\n", "only showing top 5 rows" ] } ], "source": [ "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)\n", "\n", "# We use only first 7 characters of stop_id to remove special cases\n", - "stops_15km = stops_15km.select(col('stop_id').alias('stop_id_raw'), 'stop_int', 'stop_name')\\\n", + "stops_15km = stops_15km.select(col('stop_id').alias('stop_id_raw'), 'stop_name')\\\n", " .withColumn('stop_id',col('stop_id_raw').substr(1, 7))\n", "stops_15km.show(5, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format" ] }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "sbb = spark.read.orc('/data/sbb/orc/istdaten')" ] }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- betriebstag: string (nullable = true)\n", " |-- fahrt_bezeichner: string (nullable = true)\n", " |-- betreiber_id: string (nullable = true)\n", " |-- betreiber_abk: string (nullable = true)\n", " |-- betreiber_name: string (nullable = true)\n", " |-- produkt_id: string (nullable = true)\n", " |-- linien_id: string (nullable = true)\n", " |-- linien_text: string (nullable = true)\n", " |-- umlauf_id: string (nullable = true)\n", " |-- verkehrsmittel_text: string (nullable = true)\n", " |-- zusatzfahrt_tf: string (nullable = true)\n", " |-- faellt_aus_tf: string (nullable = true)\n", " |-- bpuic: string (nullable = true)\n", " |-- haltestellen_name: string (nullable = true)\n", " |-- ankunftszeit: string (nullable = true)\n", " |-- an_prognose: string (nullable = true)\n", " |-- an_prognose_status: string (nullable = true)\n", " |-- abfahrtszeit: string (nullable = true)\n", " |-- ab_prognose: string (nullable = true)\n", " |-- ab_prognose_status: string (nullable = true)\n", " |-- durchfahrt_tf: string (nullable = true)" ] } ], "source": [ "sbb.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Subset SBB data\n", "\n", "We take only stop_id in 15 km range from Zurich HB using `stop_id` field from _stops_15km_. We did not use only `geschaetz` prognose time as there was too few overlap between _timetable_ and _sbb_ datasets with only `geschaetz` arrival times. _To do next : Use only geschaetz when available_" ] }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "218697932\n", + "10848628\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", "|fahrt_bezeichner|haltestellen_name|ankunftszeit |abfahrtszeit |an_prognose |ab_prognose |stop_id|\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", "|85:11:10:002 |Zürich HB |03.09.2018 21:51| |03.09.2018 21:53:40| |8503000|\n", "|85:11:11:001 |Zürich HB | |03.09.2018 06:09| |03.09.2018 06:10:22|8503000|\n", "|85:11:12:001 |Zürich HB |03.09.2018 10:51| |03.09.2018 10:51:28| |8503000|\n", "|85:11:1251:003 |Zürich HB |03.09.2018 07:00| |03.09.2018 07:00:01| |8503000|\n", "|85:11:1252:001 |Zürich HB |03.09.2018 21:23|03.09.2018 21:36|03.09.2018 21:24:55|03.09.2018 21:36:57|8503000|\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", "only showing top 5 rows" ] } ], "source": [ "# Used to subset sbb table based on stop_id \n", "l1_id = stops_15km.select('stop_id').collect()\n", "l2_id = [item.stop_id for item in l1_id]\n", "\n", "# Used to subset sbb table based on stop_names \n", "l1_name = stops_15km.select('stop_name').collect()\n", "l2_name = [item.stop_name for item in l1_name]\n", "\n", "# Make the subset dataframe\n", - "sbb_filt = sbb.filter( sbb['bpuic'].isin(l2_id) | sbb['bpuic'].isin(l2_name) ) \\\n", + "sbb_filt = sbb.filter( ( sbb['bpuic'].isin(l2_id) | sbb['bpuic'].isin(l2_name) ) &\\\n", + " ((sbb.an_prognose_status == 'REAL') | \\\n", + " (sbb.an_prognose_status == 'GESCHAETZ') | \\\n", + " (sbb.ab_prognose_status == 'REAL') | \\\n", + " (sbb.ab_prognose_status == 'GESCHAETZ') ) ) \\\n", " .select('fahrt_bezeichner','haltestellen_name', \\\n", " 'ankunftszeit', 'abfahrtszeit', \\\n", " 'an_prognose', 'ab_prognose', \\\n", " col('bpuic').alias('stop_id'))\n", "\n", - "# ((sbb.an_prognose_status == 'GESCHAETZT') | \\\n", - "# ( sbb.ab_prognose_status == 'GESCHAETZT') ) \n", - "# sbb(\"betriebstag\").gt(lit(\"2019-01-01\")) &\\\n", - "# sbb(\"betriebstag\").lt(lit(\"2019-06-30\")) &\\\n", - " \n", "print sbb_filt.count()\n", "sbb_filt.show(5,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Write subset table in HDFS for better performance during later usage" ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "# save\n", "username = 'acoudray'\n", - "sbb_filt.write.format(\"orc\").save(\"/user/{}/sbb_filt_forDelays_noGaeschetz.orc\".format(username))" + "sbb_filt.write.format(\"orc\").save(\"/user/{}/sbb_filt_forDelays_GeschaetzAndReal.orc\".format(username))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Summary of tables writen in /user/{}/ :\n", "- sbb_filt_forDelays_noGaeschetz.orc : table with all dates, < 15km, no GESCHAETZ, used 7-char trimmed stop_id in timetable data\n", "- sbb_filt_forDelays2.orc : table with all dates, < 15km, only GESCHAETZ, used 7-char trimmed stop_id in timetable data\n", "- sbb_filt_forDelays.orc : table with all dates, < 15km, only GESCHAETZ\n", "- sbb_sub_forDelays.ord : Old to remove" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Measure Distributions of Delay Times per trip and station\n", "\n", "The goal of this chapter is to pre-compute probabilities for McRaptor implementation, which will be ultimately used to choose the best trip according to its time __and probability of success__. The goal is to create a distribution of arrival delays for each station / trip_id pair. \n", "\n", "We begin with a simple query of trip_id / station_id and build up to the full table generation made from correspondance tables between sbb and timetable trip_ids (they need to be translated first, which is done in `match_datasets.ipynb`.\n", "\n", "#### Simple task : returning the distribution for a given station / trip id\n", "\n", "Let's begin by exploring _sbb_ data and compute a distribution step by step for a given station / trip_id " ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "2133164" ] } ], "source": [ "# Load sbb data \n", "username='acoudray'\n", "sbb = spark.read.orc(\"/user/{}/sbb_filt_forDelays.orc\".format(username))\n", "sbb.count()" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+\n", "| haltestellen_name|\n", "+-------------------+\n", "|Winkel am Zürichsee|\n", "| Zürich Flughafen|\n", "| Kemptthal|\n", "| Urdorf|\n", "| Zürich Wiedikon|\n", "+-------------------+\n", "only showing top 5 rows" ] } ], "source": [ "sbb.select(\"haltestellen_name\").distinct().show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we show the first few lines of all unique stations. We pick one of them and show its first associated trip id." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+----------------+\n", "|fahrt_bezeichner|\n", "+----------------+\n", "| 85:11:1507:002|\n", "| 85:11:1509:003|\n", "| 85:11:1510:003|\n", "| 85:11:1511:003|\n", "| 85:11:1512:003|\n", "+----------------+\n", "only showing top 5 rows" ] } ], "source": [ "stop=\"Zürich Flughafen\"\n", "sbb.filter(sbb.haltestellen_name == stop).select(\"fahrt_bezeichner\").show(5)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+----------------+--------------+-------------------+-------------+-------------+-------+\n", "| station| trip_id| arrival_true|DiffInSeconds|DiffInMinutes|weekday|\n", "+----------------+--------------+-------------------+-------------+-------------+-------+\n", "|Zürich Flughafen|85:11:1507:002|2018-05-06 06:49:24| 24| 0.0| Sun|\n", "|Zürich Flughafen|85:11:1507:002|2018-05-05 06:49:15| 15| 0.0| Sat|\n", "|Zürich Flughafen|85:11:1507:002|2018-05-04 06:50:38| 98| 2.0| Fri|\n", "|Zürich Flughafen|85:11:1507:002|2018-05-03 06:50:11| 71| 1.0| Thu|\n", "|Zürich Flughafen|85:11:1507:002|2018-05-02 06:49:30| 30| 1.0| Wed|\n", "|Zürich Flughafen|85:11:1507:002|2018-05-01 06:49:38| 38| 1.0| Tue|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-30 06:49:59| 59| 1.0| Mon|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-29 06:49:16| 16| 0.0| Sun|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-28 06:49:37| 37| 1.0| Sat|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-27 06:50:00| 60| 1.0| Fri|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-26 06:49:58| 58| 1.0| Thu|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-25 06:49:44| 44| 1.0| Wed|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-24 06:50:10| 70| 1.0| Tue|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-23 06:49:53| 53| 1.0| Mon|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-22 06:49:33| 33| 1.0| Sun|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-21 06:49:00| 0| 0.0| Sat|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-20 06:49:43| 43| 1.0| Fri|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-19 06:49:00| 0| 0.0| Thu|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-18 06:49:39| 39| 1.0| Wed|\n", "|Zürich Flughafen|85:11:1507:002|2018-04-17 06:49:36| 36| 1.0| Tue|\n", "+----------------+--------------+-------------------+-------------+-------------+-------+\n", "only showing top 20 rows" ] } ], "source": [ "trip_id=\"85:11:1507:002\"\n", "\n", "# First filter - filter selected station/trip_id, with define arrival time and GAESCHETZ status\n", "# Select 4 fields of interest, rename \n", "# Convert date-like string to timestamp\n", "# Compute difference between scheduled and actual arrivals times\n", "# reselect to generate weekday\n", "sbb_filt = sbb.filter( (sbb.fahrt_bezeichner == trip_id) & (sbb.haltestellen_name == stop) )\\\n", " .select(col(\"haltestellen_name\").alias(\"station\"), \\\n", " col(\"fahrt_bezeichner\").alias(\"trip_id\"), \\\n", " col(\"an_prognose\").alias(\"arrival_true\"),\\\n", " col(\"ankunftszeit\").alias(\"arrival_expected\"))\\\n", " .withColumn('arrival_true',to_timestamp(col('arrival_true'),\\\n", " format='dd.MM.yyyy HH:mm:ss'))\\\n", " .withColumn('arrival_expected',to_timestamp(col('arrival_expected'),\\\n", " format='dd.MM.yyyy HH:mm'))\\\n", " .withColumn('DiffInSeconds',col('arrival_true').cast(LongType()) - col('arrival_expected').cast(LongType()))\\\n", " .withColumn('DiffInMinutes',round(col('DiffInSeconds')/60))\\\n", " .select(\"station\", \"trip_id\", \"arrival_true\", \"DiffInSeconds\", \"DiffInMinutes\",\\\n", " date_format('arrival_expected', 'E').alias('weekday'))\\\n", " .orderBy(\"arrival_true\", ascending=False)\n", "sbb_filt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Given a station name and a trip id, we can get all arrival times (prognosed and real), and compute all delays in seconds and minutes. As we see the expected arrival time `ankunftzeit` is always the same as opposed to the actual arrival `an_prognose` with `an_prognose_status` equal to `GESCHAETZT` which varies.\n", "\n", "We remove Saturdays and Sundays to compute the arrival distribution only based on week days " ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------------+-----+\n", "|DiffInMinutes|count|\n", "+-------------+-----+\n", "| 0.0| 19|\n", "| 1.0| 50|\n", "| 2.0| 14|\n", "| 3.0| 4|\n", "| 4.0| 1|\n", "| 11.0| 1|\n", "| 21.0| 1|\n", "+-------------+-----+" ] } ], "source": [ "sbb_filt.filter( (sbb_filt.weekday != \"Sun\") & (sbb_filt.weekday != \"Sat\") )\\\n", " .groupBy('DiffInMinutes').count()\\\n", " .orderBy(\"DiffInMinutes\").show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For next steps, we will be able to pivot this kind of table for multiple trip ids at once. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Load Table and make distribution from list of stations/trip_id\n", "\n", "Here we compute distribution of delays for a group of stations with all associated trips. The goal is to develop a script able to make a distribution for all stations/trips of interests.\n", "\n", "To train a bit the concept, let's first use all station with _Zurich_ pattern in their name and compute their delay distribution." ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+\n", "| haltestellen_name|\n", "+--------------------+\n", "| Winkel am Zürichsee|\n", "| Zürich Flughafen|\n", "| Zürich Wiedikon|\n", "| Zürich Stadelhofen|\n", "|Zürich Tiefenbrunnen|\n", "+--------------------+\n", "only showing top 5 rows" ] } ], "source": [ "expr = \"Z.rich*\" # regular expression to be used to get all Zurich* stations\n", "sbb.filter(sbb[\"haltestellen_name\"].rlike(expr))\\\n", " .select(\"haltestellen_name\")\\\n", " .distinct().show(5)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+---------------+-------------------+-------------+-------------+-------+\n", "| station| trip_id| arrival_true|DiffInSeconds|DiffInMinutes|weekday|\n", "+--------------------+---------------+-------------------+-------------+-------------+-------+\n", "| Zürich HB| 85:11:543:001|2018-05-07 01:12:14| 14| 0.0| Mon|\n", "| Zürich HB|85:11:30797:011|2018-05-07 00:58:45| 225| 4.0| Mon|\n", "| Zürich HB|85:11:19694:001|2018-05-07 00:56:12| -48| -1.0| Mon|\n", "| Zürich HB| 85:11:2294:003|2018-05-07 00:54:06| -54| -1.0| Mon|\n", "| Zürich Hardbrücke|85:11:30797:011|2018-05-07 00:54:03| 123| 2.0| Mon|\n", "| Zürich Stadelhofen|85:11:19694:001|2018-05-07 00:53:40| 40| 1.0| Mon|\n", "| Zürich Hardbrücke|85:11:30794:002|2018-05-07 00:50:26| 86| 1.0| Mon|\n", "| Zürich Oerlikon|85:11:30797:011|2018-05-07 00:49:46| 106| 2.0| Mon|\n", "|Zürich Tiefenbrunnen|85:11:19694:001|2018-05-07 00:49:38| -22| 0.0| Mon|\n", "| Zürich HB|85:11:30794:002|2018-05-07 00:47:16| 136| 2.0| Mon|\n", "| Zürich HB|85:11:30692:007|2018-05-07 00:45:02| 242| 4.0| Mon|\n", "| Zürich HB|85:11:20495:001|2018-05-07 00:45:01| -59| -1.0| Mon|\n", "| Zürich Altstetten|85:11:18594:001|2018-05-07 00:44:53| 53| 1.0| Mon|\n", "| Zürich Stadelhofen|85:11:30794:002|2018-05-07 00:44:27| 147| 2.0| Mon|\n", "| Zürich Stadelhofen|85:11:30692:007|2018-05-07 00:42:37| 277| 5.0| Mon|\n", "| Zürich HB| 85:11:4793:001|2018-05-07 00:42:10| 10| 0.0| Mon|\n", "| Zürich Flughafen| 85:11:2294:003|2018-05-07 00:42:07| -113| -2.0| Mon|\n", "| Zürich Hardbrücke|85:11:18594:001|2018-05-07 00:41:27| 27| 0.0| Mon|\n", "| Zürich HB|85:11:18795:001|2018-05-07 00:41:05| 65| 1.0| Mon|\n", "| Zürich Wipkingen|85:11:20495:001|2018-05-07 00:40:52| 52| 1.0| Mon|\n", "+--------------------+---------------+-------------------+-------------+-------------+-------+\n", "only showing top 20 rows" ] } ], "source": [ "expr = \"Z.rich*\"\n", "\n", "# First filter - Take Zurich-like stations , with define arrival time and GAESCHETZ status\n", "# Select 4 fields of interest, rename \n", "# Convert date-like string to timestamp\n", "# Compute difference between scheduled and actual arrivals times\n", "# reselect to generate weekday\n", "sbb_filt = sbb.filter((sbb[\"haltestellen_name\"].rlike(expr)) )\\\n", " .select(col(\"haltestellen_name\").alias(\"station\"), \\\n", " col(\"fahrt_bezeichner\").alias(\"trip_id\"), \\\n", " col(\"an_prognose\").alias(\"arrival_true\"),\\\n", " col(\"ankunftszeit\").alias(\"arrival_expected\"))\\\n", " .withColumn('arrival_true',to_timestamp(col('arrival_true'),\\\n", " format='dd.MM.yyyy HH:mm:ss'))\\\n", " .withColumn('arrival_expected',to_timestamp(col('arrival_expected'),\\\n", " format='dd.MM.yyyy HH:mm'))\\\n", " .withColumn('DiffInSeconds',col('arrival_true').cast(LongType()) - col('arrival_expected').cast(LongType()))\\\n", " .withColumn('DiffInMinutes',round(col('DiffInSeconds')/60))\\\n", " .select(\"station\", \"trip_id\", \"arrival_true\", \"DiffInSeconds\", \"DiffInMinutes\",\\\n", " date_format('arrival_expected', 'E').alias('weekday'))\\\n", " .orderBy(\"arrival_true\", ascending=False)\n", "\n", "# Remove Saturday and Sunday weekdays from table - show\n", "sbb_filt = sbb_filt.filter( (sbb_filt.weekday != \"Sun\") & (sbb_filt.weekday != \"Sat\") )\n", "sbb_filt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To make distribution, we use groupBy followed by a pivot using delay time in minutes. We fill null values with 0. No lower/upper bounds for now. Negative column keys means arrival ahead of schedule." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "textn", "| station| trip_id|null|-28.0|-15.0|-13.0|-12.0|-11.0|-10.0|-9.0|-8.0|-7.0|-6.0|-5.0|-4.0|-3.0|-2.0|-1.0|0.0|1.0|2.0|3.0|4.0|5.0|6.0|7.0|8.0|9.0|10.0|11.0|12.0|13.0|14.0|15.0|16.0|17.0|18.0|19.0|20.0|21.0|22.0|23.0|24.0|25.0|26.0|27.0|28.0|29.0|30.0|31.0|32.0|33.0|34.0|35.0|36.0|37.0|38.0|39.0|40.0|41.0|42.0|43.0|44.0|45.0|46.0|47.0|48.0|49.0|50.0|51.0|52.0|53.0|54.0|55.0|56.0|57.0|59.0|60.0|61.0|62.0|63.0|64.0|65.0|66.0|67.0|68.0|69.0|70.0|71.0|72.0|73.0|76.0|77.0|78.0|79.0|80.0|82.0|85.0|86.0|90.0|96.0|99.0|102.0|111.0|120.0|122.0|127.0|132.0|149.0|150.0|152.0|180.0|210.0|\nn", "|Zürich Tiefenbrunnen|85:11:19639:001| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 65| 10| 5| 2| 2| 2| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich Enge|85:11:18267:001| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 5| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich HB|85:11:30992:009| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 9| 3| 1| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich Altstetten|85:11:19978:001| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 2| 70| 8| 2| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich HB|85:11:18873:001| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 20| 22| 24| 7| 7| 1| 0| 0| 1| 0| 0| 0| 1| 0| 0| 0| 1| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\nn", "only showing top 5 rows" ] } ], "source": [ "sbb_filt.groupBy('station', 'trip_id').pivot(\"DiffInMinutes\").count()\\\n", " .na.fill(0)\\\n", " .show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As an addition to this distribution, we can set up lower / upper bound to constrain the distribution to a specific window of interest. We do not really care about train being ahead, so we put them all in -1 column index, And we look at delays until 30 minutes only." ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+------------------+---------------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", "| station| trip_id|-1.0|0.0|1.0|2.0|3.0|4.0|5.0|6.0|7.0|8.0|9.0|10.0|11.0|12.0|13.0|14.0|15.0|16.0|17.0|18.0|19.0|20.0|21.0|22.0|23.0|24.0|25.0|26.0|27.0|28.0|29.0|30.0|\n", "+------------------+---------------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", "| Zürich HB|85:11:30992:009| 9| 3| 1| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich HB|85:11:18873:001| 0| 20| 22| 24| 7| 7| 1| 0| 0| 1| 0| 0| 0| 1| 0| 0| 0| 1| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich Oerlikon|85:11:20438:002| 0| 0| 27| 50| 8| 4| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "|Zürich Wollishofen|85:11:18822:001| 0| 2| 2| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "| Zürich Flughafen| 85:11:2270:001| 9| 46| 30| 2| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "+------------------+---------------+----+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", "only showing top 5 rows" ] } ], "source": [ "lower_bound = -1.0\n", "upped_bound = +30.0\n", "\n", "sbb_bounded = sbb_filt.withColumn('DiffInMinutes_bounded1',\\\n", " greatest(col('DiffInMinutes'), lit(lower_bound) ))\\\n", " .withColumn('DiffInMinutes_bounded2',\\\n", " least(col('DiffInMinutes_bounded1'), lit(upped_bound) ))\n", "\n", "sbb_bounded.groupBy('station', 'trip_id').pivot(\"DiffInMinutes_bounded2\").count()\\\n", " .na.fill(0)\\\n", " .show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Work from translation tables \n", "\n", "We will use data generated in `match_datasets.ipynb`. We begin by looking at all trip_id that are found in both dataset with at least 5 stations in common.\n", "\n", "Our goal is to find a match in sbb dataset for all _timetable_ trips (and not the other way around). So we will focus on getting this assymetrical correspondance table. \n", "\n", "When we find a clear one-one match, we will mark them as _resolved_, when there is a one-to-many relation, we will call it _partly_resolved_ and if we cannot find a sbb trip that correspond to a timetable trip_id, we will call it _fail_to_resolve_. \n", "\n", "These labels will be used to differentiate 3 different ways to compute probabilities :\n", "- __One-to-one__ we find a clear match : we use distribution of delays on weekdays for a given trip/station_id based on all past sbb data. \n", "- __One-to-many__ we find multiple match :\n", " - First we double check the matches, if we have the same type of transportation for example.\n", " - If they seem to be correct, we can merge the trips from sbb and get the merged distribution of their delays.\n", "- __One-to-none__ we find no match : then we get the distribution of delays for similar transportation types, at similar hour (in a window), during weekdays of sbb dataset.\n", " - Alternative : Try to find the best match and use only the closest location/time to infer a given distribution.\n", " - Alternative 2 : use k-nearest neighbors in terms of location/time." ] }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "243152\n", "+------------------------+----------------------+-----+\n", "|trip_id |fahrt_bezeichner |count|\n", "+------------------------+----------------------+-----+\n", "|241.TA.26-14-j19-1.43.H |85:11:19435:001 |13 |\n", "|1419.TA.26-8-C-j19-1.8.R|85:3849:169172-07008-1|23 |\n", "|1015.TA.26-4-j19-1.25.H |85:3849:49891-03002-1 |7 |\n", "|1955.TA.26-13-j19-1.24.H|85:3849:89261-02013-1 |5 |\n", "|1217.TA.26-72-j19-1.6.R |85:849:55624-25033-1 |7 |\n", "+------------------------+----------------------+-----+\n", "only showing top 5 rows" ] } ], "source": [ "joined_trip_atL5 = spark.read.csv('data/lgpt_guys/joined_trip_atL5.csv', header = True)\n", "print joined_trip_atL5.count()\n", "joined_trip_atL5.show(5, False)" ] }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "31103" ] } ], "source": [ "joined_trip_atL5.select('fahrt_bezeichner').distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We also use the subset of sbb data (we use the filtered data `sbb_filt` made at the top of the notebook)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can begin by assembling sbb data set with translation table `joined_trip_atL5` " ] }, { "cell_type": "code", - "execution_count": 30, + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "10848628\n", + "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", + "|fahrt_bezeichner|haltestellen_name| ankunftszeit| abfahrtszeit| an_prognose| ab_prognose|stop_id|\n", + "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", + "| 85:11:10:002| Zürich HB|12.10.2018 21:51| |12.10.2018 21:51:50| |8503000|\n", + "| 85:11:10293:004| Zürich HB| |13.10.2018 00:25| |13.10.2018 00:26:08|8503000|\n", + "| 85:11:10293:004| Zürich Flughafen|13.10.2018 00:34|13.10.2018 00:35|13.10.2018 00:35:27|13.10.2018 00:36:44|8503016|\n", + "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+\n", + "only showing top 3 rows" + ] + } + ], + "source": [ + "username = 'acoudray'\n", + "sbb_filt = spark.read.orc(\"/user/{}/sbb_filt_forDelays_GeschaetzAndReal.orc\".format(username))\n", + "print(sbb_filt.count())\n", + "sbb_filt.show(3)" + ] + }, + { + "cell_type": "code", + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "652832165\n", + "16474877\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+-------+-----+\n", "|fahrt_bezeichner|haltestellen_name|ankunftszeit |abfahrtszeit |an_prognose |ab_prognose |stop_id|trip_id|count|\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+-------+-----+\n", - "|85:11:10:002 |Zürich HB |03.09.2018 21:51| |03.09.2018 21:53:40| |8503000|null |null |\n", - "|85:11:11:001 |Zürich HB | |03.09.2018 06:09| |03.09.2018 06:10:22|8503000|null |null |\n", - "|85:11:12:001 |Zürich HB |03.09.2018 10:51| |03.09.2018 10:51:28| |8503000|null |null |\n", - "|85:11:1251:003 |Zürich HB |03.09.2018 07:00| |03.09.2018 07:00:01| |8503000|null |null |\n", - "|85:11:1252:001 |Zürich HB |03.09.2018 21:23|03.09.2018 21:36|03.09.2018 21:24:55|03.09.2018 21:36:57|8503000|null |null |\n", - "|85:11:1255:001 |Zürich HB |03.09.2018 08:26|03.09.2018 08:37|03.09.2018 08:28:06|03.09.2018 08:39:07|8503000|null |null |\n", - "|85:11:1256:003 |Zürich HB |03.09.2018 17:53| |03.09.2018 17:55:21| |8503000|null |null |\n", - "|85:11:1260:004 |Zürich HB | |03.09.2018 21:00| |03.09.2018 21:01:13|8503000|null |null |\n", - "|85:11:1271:001 |Zürich HB |03.09.2018 10:00|03.09.2018 10:07|03.09.2018 09:59:07|03.09.2018 10:08:45|8503000|null |null |\n", - "|85:11:13:001 |Zürich HB | |03.09.2018 07:09| |03.09.2018 07:11:14|8503000|null |null |\n", + "|85:11:10:002 |Zürich HB |12.10.2018 21:51| |12.10.2018 21:51:50| |8503000|null |null |\n", + "|85:11:10293:004 |Zürich HB | |13.10.2018 00:25| |13.10.2018 00:26:08|8503000|null |null |\n", + "|85:11:10293:004 |Zürich Flughafen |13.10.2018 00:34|13.10.2018 00:35|13.10.2018 00:35:27|13.10.2018 00:36:44|8503016|null |null |\n", + "|85:11:10536:004 |Zürich HB | |12.10.2018 20:03| |12.10.2018 20:04:20|8503000|null |null |\n", + "|85:11:10537:006 |Zürich HB |12.10.2018 21:59| |12.10.2018 22:01:43| |8503000|null |null |\n", + "|85:11:10538:004 |Zürich HB | |12.10.2018 21:03| |12.10.2018 21:04:42|8503000|null |null |\n", + "|85:11:10539:005 |Zürich HB |12.10.2018 22:59| |12.10.2018 23:00:10| |8503000|null |null |\n", + "|85:11:10540:004 |Zürich HB | |12.10.2018 22:03| |12.10.2018 22:06:29|8503000|null |null |\n", + "|85:11:10734:007 |Zürich Flughafen |12.10.2018 20:16|12.10.2018 20:18|12.10.2018 20:15:27|12.10.2018 20:18:39|8503016|null |null |\n", + "|85:11:10734:007 |Zürich HB |12.10.2018 20:27|12.10.2018 20:32|12.10.2018 20:26:44|12.10.2018 20:33:02|8503000|null |null |\n", "+----------------+-----------------+----------------+----------------+-------------------+-------------------+-------+-------+-----+\n", "only showing top 10 rows" ] } ], "source": [ "joined_sbb = sbb_filt.join(joined_trip_atL5, on = ['fahrt_bezeichner'], how = 'left_outer')\n", "\n", "print joined_sbb.count()\n", "joined_sbb.show(10,False)" ] }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "1107043" + "46399" ] } ], "source": [ "joined_sbb.select(\"fahrt_bezeichner\", \"trip_id\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The reference table we will use is the `stop_times` tables containing trip_id and stop_id. As a next step, we will put them in the same order raptor will read them." ] }, { "cell_type": "code", - "execution_count": 32, + "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "247920\n", - "+-----------+--------------------+-------+\n", - "|stop_id_raw|trip_id |stop_id|\n", - "+-----------+--------------------+-------+\n", - "|8572747 |1.TA.1-231-j19-1.1.H|8572747|\n", - "|8573721 |1.TA.1-231-j19-1.1.H|8573721|\n", - "|8503598 |1.TA.1-231-j19-1.1.H|8503598|\n", - "+-----------+--------------------+-------+\n", - "only showing top 3 rows" + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", + "| trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|hour_departure| route_id|direction_id|\n", + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", + "|666.TA.26-4-j19-1...|8576182| 07:02:00| 07:02:00| 1| 0| 0| 7.0| 26-4-j19-1| 1|\n", + "|243.TA.26-311-j19...|8590834| 07:16:00| 07:16:00| 1| 0| 0| 7.0|26-311-j19-1| 1|\n", + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", + "only showing top 2 rows" ] } ], "source": [ "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n", + "stop_times_curated.show(2)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "250777\n", + "+-----------+-----------------------+-------+\n", + "|stop_id_raw|trip_id |stop_id|\n", + "+-----------+-----------------------+-------+\n", + "|8576182 |666.TA.26-4-j19-1.20.R |8576182|\n", + "|8590834 |243.TA.26-311-j19-1.3.R|8590834|\n", + "|8591349 |406.TA.26-62-j19-1.3.R |8591349|\n", + "+-----------+-----------------------+-------+\n", + "only showing top 3 rows" + ] + } + ], + "source": [ "# We use only first 7 characters of stop_id to remove special cases\n", "stop_times_curated = stop_times_curated.select(col('stop_id').alias('stop_id_raw'), \n", " 'trip_id')\\\n", " .withColumn('stop_id',col('stop_id_raw').substr(1, 7))\n", "\n", "print stop_times_curated.count()\n", "stop_times_curated.show(3, False)" ] }, { "cell_type": "code", - "execution_count": 33, + "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "22807" + "19800" ] } ], "source": [ "stop_times_curated.select('trip_id').distinct().count()" ] }, { "cell_type": "code", - "execution_count": 34, + "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "341372874\n", - "+---------------------+-------+-----+--------------------+----------------+----------------+-------------------+-------------------+\n", - "|trip_id |stop_id|count|fahrt_bezeichner |ankunftszeit |abfahrtszeit |an_prognose |ab_prognose |\n", - "+---------------------+-------+-----+--------------------+----------------+----------------+-------------------+-------------------+\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|11.12.2019 08:02|11.12.2019 08:02|11.12.2019 08:03:47|11.12.2019 08:03:59|\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|21.05.2019 08:02|21.05.2019 08:02|21.05.2019 08:08:11|21.05.2019 08:08:23|\n", - "|1.TA.26-163-j19-1.1.R|8590688|11 |85:849:62934-01162-1|11.12.2019 19:54|11.12.2019 19:54|11.12.2019 19:54:33|11.12.2019 19:54:39|\n", - "|1.TA.26-163-j19-1.1.R|8590688|11 |85:849:62934-01162-1|21.05.2019 19:54|21.05.2019 19:54|21.05.2019 19:54:52|21.05.2019 19:54:58|\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|14.11.2019 08:02|14.11.2019 08:02|14.11.2019 08:06:27|14.11.2019 08:06:39|\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|16.04.2019 08:02|16.04.2019 08:02|16.04.2019 08:06:37|16.04.2019 08:06:49|\n", - "|1.TA.26-163-j19-1.1.R|8590688|11 |85:849:62934-01162-1|14.11.2019 19:54|14.11.2019 19:54|14.11.2019 19:54:35|14.11.2019 19:54:41|\n", - "|1.TA.26-163-j19-1.1.R|8590688|11 |85:849:62934-01162-1|16.04.2019 19:54|16.04.2019 19:54|16.04.2019 19:55:52|16.04.2019 19:55:58|\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|13.12.2019 08:02|13.12.2019 08:02|13.12.2019 08:04:30|13.12.2019 08:04:42|\n", - "|1.TA.26-163-j19-1.1.R|8590688|5 |85:849:62887-01162-1|15.04.2019 08:02|15.04.2019 08:02|15.04.2019 08:06:39|15.04.2019 08:06:51|\n", - "+---------------------+-------+-----+--------------------+----------------+----------------+-------------------+-------------------+\n", + "9478785\n", + "+-------------------------+-------+-----+----------------+------------+------------+-----------+-----------+\n", + "|trip_id |stop_id|count|fahrt_bezeichner|ankunftszeit|abfahrtszeit|an_prognose|ab_prognose|\n", + "+-------------------------+-------+-----+----------------+------------+------------+-----------+-----------+\n", + "|1.TA.26-89-j19-1.1.R |8591209|null |null |null |null |null |null |\n", + "|10.TA.1-305-j19-1.1.R |8587018|null |null |null |null |null |null |\n", + "|10.TA.26-69-j19-1.2.H |8591122|null |null |null |null |null |null |\n", + "|10.TA.26-845-j19-1.2.H |8580879|null |null |null |null |null |null |\n", + "|10.TA.26-918-j19-1.1.R |8590701|null |null |null |null |null |null |\n", + "|10.TA.79-485-j19-1.1.R |8590461|null |null |null |null |null |null |\n", + "|100.TA.26-748-j19-1.1.R |8590543|null |null |null |null |null |null |\n", + "|1001.TA.26-70-A-j19-1.5.H|8591106|null |null |null |null |null |null |\n", + "|1005.TA.26-70-A-j19-1.5.H|8591410|null |null |null |null |null |null |\n", + "|1008.TA.26-142-j19-1.2.R |8590830|null |null |null |null |null |null |\n", + "+-------------------------+-------+-----+----------------+------------+------------+-----------+-----------+\n", "only showing top 10 rows" ] } ], "source": [ "stop_times_join = stop_times_curated.join(joined_sbb, on=['trip_id', 'stop_id'], \n", " how='left_outer')\\\n", " .select('trip_id', 'stop_id', 'count',\n", " 'fahrt_bezeichner', 'ankunftszeit', 'abfahrtszeit',\n", " 'an_prognose', 'ab_prognose')\n", "\n", "print stop_times_join.count()\n", "stop_times_join.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then compute arrival delays using the following approach : \n", "- arrival_true ( = `an_prognose`) - arrival_expected ( = `ankunftszeit`). Train being late have a positive delay and trains being ahead of schedule a negative one." ] }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------------------+-------------------+-------------+-------------+-------+\n", "|stop_id|trip_id |arrival_true |DiffInSeconds|DiffInMinutes|weekday|\n", "+-------+------------------------+-------------------+-------------+-------------+-------+\n", - "|8503006|419.TA.26-2-j19-1.164.H |2018-09-03 06:10:53|53 |0 |Mon |\n", - "|8503000|419.TA.26-2-j19-1.164.H |2018-09-03 06:16:22|82 |1 |Mon |\n", - "|8503011|419.TA.26-2-j19-1.164.H |2018-09-03 06:19:51|51 |0 |Mon |\n", - "|8503010|419.TA.26-2-j19-1.164.H |2018-09-03 06:21:58|-2 |0 |Mon |\n", - "|8503202|419.TA.26-2-j19-1.164.H |2018-09-03 06:30:11|11 |0 |Mon |\n", - "|8503204|419.TA.26-2-j19-1.164.H |2018-09-03 06:34:44|-16 |0 |Mon |\n", - "|8503204|214.TA.26-24-j19-1.121.R|2018-09-03 06:31:17|497 |8 |Mon |\n", - "|8503204|74.TA.26-2-j19-1.9.R |2018-09-03 06:31:17|497 |8 |Mon |\n", - "|8503202|214.TA.26-24-j19-1.121.R|2018-09-03 06:35:52|472 |7 |Mon |\n", - "|8503202|74.TA.26-2-j19-1.9.R |2018-09-03 06:35:52|472 |7 |Mon |\n", + "|8503006|419.TA.26-2-j19-1.164.H |2018-10-12 06:10:43|43 |0 |Fri |\n", + "|8503000|419.TA.26-2-j19-1.164.H |2018-10-12 06:15:56|56 |0 |Fri |\n", + "|8503011|419.TA.26-2-j19-1.164.H |2018-10-12 06:19:45|45 |0 |Fri |\n", + "|8503010|419.TA.26-2-j19-1.164.H |2018-10-12 06:21:35|-25 |0 |Fri |\n", + "|8503202|419.TA.26-2-j19-1.164.H |2018-10-12 06:29:28|-32 |0 |Fri |\n", + "|8503204|419.TA.26-2-j19-1.164.H |2018-10-12 06:34:42|-18 |0 |Fri |\n", + "|8503204|214.TA.26-24-j19-1.121.R|2018-10-12 06:23:17|17 |0 |Fri |\n", + "|8503204|74.TA.26-2-j19-1.9.R |2018-10-12 06:23:17|17 |0 |Fri |\n", + "|8503202|214.TA.26-24-j19-1.121.R|2018-10-12 06:27:58|-2 |0 |Fri |\n", + "|8503202|74.TA.26-2-j19-1.9.R |2018-10-12 06:27:58|-2 |0 |Fri |\n", "+-------+------------------------+-------------------+-------------+-------------+-------+\n", "only showing top 10 rows" ] } ], "source": [ "stop_times_diff = stop_times_join.select( col(\"an_prognose\").alias(\"arrival_true\"),\\\n", " col(\"ankunftszeit\").alias(\"arrival_expected\"),\\\n", " 'trip_id', 'stop_id')\\\n", " .withColumn('arrival_true',to_timestamp(col('arrival_true'),\\\n", " format='dd.MM.yyyy HH:mm:ss'))\\\n", " .withColumn('arrival_expected',to_timestamp(col('arrival_expected'),\\\n", " format='dd.MM.yyyy HH:mm'))\\\n", " .withColumn('DiffInSeconds',col('arrival_true').cast(LongType()) - col('arrival_expected').cast(LongType()))\\\n", " .withColumn('DiffInMinutes',(col('DiffInSeconds')/60).cast('integer'))\\\n", " .select(\"stop_id\", \"trip_id\", \"arrival_true\", \"DiffInSeconds\", \"DiffInMinutes\",\\\n", " date_format('arrival_expected', 'E').alias('weekday'))\n", "\n", "# Remove Saturday and Sunday weekdays from table - show\n", "stop_times_diff = stop_times_diff.filter( (stop_times_diff.weekday != \"Sun\") & (stop_times_diff.weekday != \"Sat\") )\n", "stop_times_diff.show(10, False)" ] }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", - "|stop_id|trip_id |-1 |0 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |\n", - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", - "|8503016|460.TA.26-24-j19-1.220.R |15 |73 |114 |64 |18 |6 |4 |0 |1 |1 |0 |1 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591079|313.TA.26-2-A-j19-1.1.H |15 |2240|683 |204|37 |6 |8 |5 |2 |1 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |2 |0 |0 |0 |0 |3 |\n", - "|8576195|1458.TA.26-2-A-j19-1.24.R|15 |2269|825 |316|110|58 |33 |13 |10 |7 |7 |0 |1 |1 |0 |0 |2 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |4 |\n", - "|8591299|1361.TA.26-2-A-j19-1.24.R|15 |1551|550 |109|30 |14 |13 |3 |4 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |1 |\n", - "|8591305|1384.TA.26-7-B-j19-1.6.R |6 |1557|405 |153|110|59 |22 |13 |8 |2 |0 |1 |1 |0 |1 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591298|2509.TA.26-7-B-j19-1.17.H|21 |2504|946 |267|82 |20 |10 |6 |3 |7 |3 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |\n", - "|8591174|356.TA.26-15-A-j19-1.5.R |8 |2070|851 |242|84 |21 |5 |6 |5 |0 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |5 |\n", - "|8591276|926.TA.26-14-A-j19-1.10.R|5 |3024|1772|538|147|48 |15 |9 |3 |1 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |1 |0 |0 |0 |0 |0 |0 |\n", - "|8530813|540.TA.26-18-j19-1.12.H |805|76 |96 |32 |16 |5 |4 |0 |1 |1 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591299|1362.TA.26-13-j19-1.20.R |40 |1986|1158|482|185|100|41 |19 |10 |5 |3 |3 |1 |0 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |2 |\n", - "|8591306|1972.TA.26-13-j19-1.24.H |31 |3243|1130|410|136|48 |9 |9 |6 |5 |2 |0 |0 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |2 |\n", - "|8576152|36.TA.26-732-j19-1.2.H |0 |185 |160 |83 |30 |17 |7 |2 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8590550|243.TA.26-752-j19-1.4.R |0 |607 |492 |191|30 |11 |1 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591354|1098.TA.26-768-j19-1.2.R |0 |128 |127 |60 |10 |9 |3 |3 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8580874|69.TA.26-817-j19-1.1.H |0 |16 |52 |40 |35 |22 |11 |4 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591371|1313.TA.26-46-j19-1.9.H |0 |702 |319 |210|118|39 |34 |16 |8 |3 |1 |1 |2 |0 |2 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591293|290.TA.26-62-j19-1.2.H |1 |1120|1084|478|179|59 |23 |17 |4 |3 |1 |0 |0 |2 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |8 |\n", - "|8580522|55.TA.26-83-j19-1.1.R |42 |2031|1180|488|157|62 |47 |28 |24 |20 |6 |5 |5 |4 |4 |2 |0 |5 |4 |3 |2 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |1 |\n", - "|8591163|1272.TA.26-80-j19-1.8.R |1 |450 |198 |99 |46 |15 |2 |4 |0 |0 |0 |0 |0 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591182|500.TA.26-89-j19-1.6.H |1 |415 |297 |233|128|75 |31 |16 |11 |5 |5 |2 |2 |0 |0 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "|stop_id|trip_id |-1 |0 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "|8503020|45.TA.26-7-A-j19-1.12.H |0 |537|107|38 |13 |2 |3 |4 |1 |1 |1 |0 |1 |1 |0 |1 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8594307|44.TA.1-11-B-j19-1.2.H |0 |1 |4 |1 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503125|59.TA.26-5-A-j19-1.28.R |0 |578|179|30 |12 |5 |2 |0 |1 |1 |0 |1 |1 |0 |0 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8590275|501.TA.1-2-A-j19-1.15.R |0 |23 |28 |9 |4 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503000|147.TA.26-15-j19-1.41.H |0 |271|114|20 |7 |2 |1 |0 |0 |0 |0 |0 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503203|590.TA.26-8-A-j19-1.353.H|0 |463|648|340|106|34 |20 |8 |5 |3 |6 |3 |3 |7 |3 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8502208|432.TA.26-24-j19-1.220.R |0 |86 |40 |10 |3 |2 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503305|201.TA.26-24-j19-1.121.R |0 |184|60 |22 |7 |5 |2 |0 |1 |0 |0 |0 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8590279|136.TA.1-4-B-j19-1.10.H |0 |4 |4 |3 |2 |2 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503003|258.TA.26-16-A-j19-1.93.H|85 |913|359|167|53 |34 |20 |10 |4 |5 |3 |2 |0 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503011|571.TA.26-8-A-j19-1.347.H|0 |590|457|116|40 |16 |4 |7 |1 |1 |2 |1 |1 |1 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503020|389.TA.26-7-A-j19-1.108.R|0 |243|150|19 |1 |1 |0 |1 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8517376|197.TA.1-17-A-j19-1.16.R |0 |274|499|212|101|34 |4 |2 |1 |0 |0 |1 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503006|377.TA.26-7-A-j19-1.108.R|0 |541|223|58 |9 |2 |2 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503104|135.TA.26-6-A-j19-1.32.R |0 |394|165|63 |30 |10 |5 |5 |2 |2 |1 |1 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8502187|187.TA.1-17-A-j19-1.16.R |1 |479|71 |6 |4 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503020|105.TA.26-5-A-j19-1.37.R |0 |443|226|98 |35 |14 |5 |2 |0 |2 |1 |0 |2 |1 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8502221|54.TA.26-5-A-j19-1.28.R |152|492|116|41 |14 |6 |8 |2 |1 |0 |0 |1 |0 |1 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8502273|101.TA.1-17-A-j19-1.9.R |2 |507|330|205|95 |25 |4 |2 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503052|238.TA.26-10-B-j19-1.10.R|0 |260|82 |47 |21 |12 |6 |3 |3 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "only showing top 20 rows" ] } ], "source": [ "# we bound distribution to this \n", "lower_bound = -1\n", "upped_bound = +30\n", "\n", "stop_times_bounded = stop_times_diff.withColumn('DiffInMinutes_bounded1',\\\n", " greatest(col('DiffInMinutes'), lit(lower_bound) ))\\\n", " .withColumn('DiffInMinutes_bounded2',\\\n", " least(col('DiffInMinutes_bounded1'), lit(upped_bound) ))\n", "\n", "stop_times_distribution = stop_times_bounded.groupBy('stop_id', 'trip_id')\\\n", " .pivot(\"DiffInMinutes_bounded2\").count()\\\n", " .na.fill(0)\n", "\n", "stop_times_distribution.show(20, False)" ] }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "223579" + "12309" ] } ], "source": [ "stop_times_distribution.count()" ] }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ - "stop_times_distribution.write.csv('data/lgpt_guys/distribution_1to1match.csv', \\\n", + "stop_times_distribution.write.csv('data/lgpt_guys/distribution_geschaetzAndReal.csv', \\\n", " header = True, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analysing matches found \n" ] }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 28, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", - "| trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_int| route_id|sequence_1| trip_1| route_int|\n", - "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", - "|1.TA.1-231-j19-1.1.H|8572747| 09:37:00| 09:37:00| 1| 0| 0| 500|1-231-j19-1| 10.0|1.TA.1-231-j19-1.1.H|592705486850|\n", - "|1.TA.1-231-j19-1.1.H|8573721| 09:50:00| 09:50:00| 10| 0| 0| 599|1-231-j19-1| 11.0|1.TA.1-231-j19-1.1.H|592705486850|\n", - "|1.TA.1-231-j19-1.1.H|8503598| 09:53:00| 09:53:00| 11| 0| 0| 401|1-231-j19-1| 12.0|1.TA.1-231-j19-1.1.H|592705486850|\n", - "|1.TA.1-231-j19-1.1.H|8573720| 09:55:00| 09:59:00| 12| 0| 0| 598|1-231-j19-1| 13.0|1.TA.1-231-j19-1.1.H|592705486850|\n", - "|1.TA.1-231-j19-1.1.H|8503598| 10:00:00| 10:00:00| 13| 0| 0| 401|1-231-j19-1| 14.0|1.TA.1-231-j19-1.1.H|592705486850|\n", - "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", + "+--------------------+------------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", + "| trip_id| stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|hour_departure| route_id|direction_id|\n", + "+--------------------+------------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", + "|666.TA.26-4-j19-1...| 8576182| 07:02:00| 07:02:00| 1| 0| 0| 7.0| 26-4-j19-1| 1|\n", + "|243.TA.26-311-j19...| 8590834| 07:16:00| 07:16:00| 1| 0| 0| 7.0|26-311-j19-1| 1|\n", + "|406.TA.26-62-j19-...| 8591349| 07:24:00| 07:24:00| 1| 0| 0| 7.0| 26-62-j19-1| 1|\n", + "|62.TA.57-2-Y-j19-...|8503000:0:13| 07:34:00| 07:34:00| 1| 0| 0| 7.0|57-2-Y-j19-1| 0|\n", + "|1179.TA.26-5-B-j1...| 8591245| 07:36:00| 07:36:00| 1| 0| 0| 7.0|26-5-B-j19-1| 1|\n", + "+--------------------+------------+------------+--------------+-------------+-----------+-------------+--------------+------------+------------+\n", "only showing top 5 rows" ] } ], "source": [ "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n", "stop_times_curated.show(5)" ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 29, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "247920\n", - "+-----------+--------------------+-------+\n", - "|stop_id_raw|trip_id |stop_id|\n", - "+-----------+--------------------+-------+\n", - "|8572747 |1.TA.1-231-j19-1.1.H|8572747|\n", - "|8573721 |1.TA.1-231-j19-1.1.H|8573721|\n", - "|8503598 |1.TA.1-231-j19-1.1.H|8503598|\n", - "+-----------+--------------------+-------+\n", + "250777\n", + "+-----------+-----------------------+-------+\n", + "|stop_id_raw|trip_id |stop_id|\n", + "+-----------+-----------------------+-------+\n", + "|8576182 |666.TA.26-4-j19-1.20.R |8576182|\n", + "|8590834 |243.TA.26-311-j19-1.3.R|8590834|\n", + "|8591349 |406.TA.26-62-j19-1.3.R |8591349|\n", + "+-----------+-----------------------+-------+\n", "only showing top 3 rows" ] } ], "source": [ "# We use only first 7 characters of stop_id to remove special cases\n", "stop_times_curated = stop_times_curated.select(col('stop_id').alias('stop_id_raw'), \n", " 'trip_id')\\\n", " .withColumn('stop_id',col('stop_id_raw').substr(1, 7))\n", "\n", "print stop_times_curated.count()\n", "stop_times_curated.show(3, False)" ] }, { "cell_type": "code", - "execution_count": 76, + "execution_count": 30, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "223579\n", - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", - "|stop_id|trip_id |-1 |0 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |\n", - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", - "|8591415|285.TA.26-5-B-j19-1.3.H |1 |1115|429 |261|101|29 |10 |5 |3 |3 |4 |3 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591298|777.TA.26-15-A-j19-1.6.H |6 |977 |599 |240|89 |30 |19 |7 |2 |3 |2 |2 |0 |0 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |9 |\n", - "|8591093|1407.TA.26-2-A-j19-1.24.R|9 |1484|568 |174|40 |16 |8 |7 |2 |0 |0 |3 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |\n", - "|8591220|305.TA.26-9-B-j19-1.1.R |26 |1865|742 |214|57 |14 |6 |3 |2 |3 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |2 |\n", - "|8591368|1260.TA.26-13-j19-1.20.R |26 |1963|813 |279|72 |22 |6 |3 |1 |1 |1 |0 |1 |2 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |4 |\n", - "|8591071|928.TA.26-14-A-j19-1.10.R|9 |2598|1731|676|148|28 |10 |3 |4 |2 |1 |0 |1 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591306|407.TA.26-17-j19-1.1.H |14 |1501|688 |219|90 |43 |31 |14 |11 |4 |2 |2 |2 |3 |1 |0 |0 |0 |1 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |1 |1 |\n", - "|8590461|781.TA.26-485-j19-1.8.H |14 |418 |275 |102|16 |2 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591065|370.TA.26-752-j19-1.5.R |6 |307 |121 |24 |3 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", - "|8591445|4117.TA.26-31-j19-1.26.R |1 |410 |161 |138|134|133|79 |52 |37 |33 |10 |7 |5 |2 |1 |1 |1 |1 |0 |2 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |1 |1 |\n", - "+-------+-------------------------+---+----+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "12309\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "|stop_id|trip_id |-1 |0 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "|8503003|286.TA.26-11-j19-1.80.H |0 |395|75 |23 |5 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503089|40.TA.26-4-B-j19-1.1.R |0 |378|238|194|70 |46 |30 |11 |9 |8 |6 |4 |2 |2 |1 |0 |2 |2 |1 |0 |1 |1 |0 |0 |0 |0 |0 |1 |0 |0 |0 |0 |\n", + "|8503094|166.TA.26-4-B-j19-1.7.H |0 |28 |158|102|50 |23 |21 |15 |7 |7 |1 |0 |1 |1 |2 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503203|580.TA.26-8-A-j19-1.347.H|0 |659|504|246|110|58 |32 |13 |3 |1 |0 |5 |6 |3 |2 |2 |1 |1 |0 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8502223|116.TA.26-14-j19-1.18.R |0 |31 |144|56 |12 |3 |0 |0 |0 |0 |3 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503053|55.TA.79-10-B-j19-1.3.H |0 |165|43 |7 |2 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503000|37.TA.26-15-j19-1.17.R |8 |220|93 |47 |26 |9 |5 |5 |1 |1 |1 |0 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503006|297.TA.26-14-j19-1.41.H |0 |123|197|59 |14 |11 |6 |1 |1 |0 |0 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503091|24.TA.26-4-B-j19-1.1.R |0 |184|258|64 |7 |2 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "|8503147|150.TA.26-3-j19-1.12.H |5 |470|119|43 |14 |7 |1 |3 |1 |0 |0 |1 |1 |0 |1 |0 |0 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", + "+-------+-------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "only showing top 10 rows" ] } ], "source": [ - "stop_times_distrib = spark.read.csv('data/lgpt_guys/distribution_1to1match.csv', \\\n", + "stop_times_distrib = spark.read.csv('data/lgpt_guys/distribution_geschaetzAndReal.csv', \\\n", " header = True)\n", "print stop_times_distrib.count()\n", "stop_times_distrib.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How many unique combination of stop_id / trip_id do we have ?" ] }, { "cell_type": "code", - "execution_count": 42, + "execution_count": 31, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "223579" + "12309" ] } ], "source": [ "stop_times_distrib.select(\"stop_id\",\"trip_id\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How many seems to have an empty line ? " ] }, { "cell_type": "code", - "execution_count": 54, + "execution_count": 32, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "8" + "4" ] } ], "source": [ "stop_times_distrib.filter( (stop_times_distrib['-1'] == 0) &\\\n", " (stop_times_distrib['0'] == 0) &\\\n", " (stop_times_distrib['1'] == 0) &\\\n", " (stop_times_distrib['2'] == 0) ).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Actually there is no line with all values equal to zero : it would not have been assembled at the pivot stage. Now we want to see how many of the `stop_times_curated` lines we can get from this table :" ] }, { "cell_type": "code", - "execution_count": 77, + "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", - "| key| -1| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 27| 28| 29| 30|\n", - "+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....| 0| 78| 21| 6| 3| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....| 0| 170| 42| 4| 2| 0| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 0| 170| 42| 4| 2| 0| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....| 2| 94| 9| 2| 2| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....| 0| 78| 25| 6| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 4| 194| 16| 4| 0| 4| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 4| 194| 16| 4| 0| 4| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 6| 188| 18| 4| 2| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 6| 188| 18| 4| 2| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....| 5| 93| 8| 2| 1| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-231-j19-1....|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-231-j19-1....| 0| 93| 12| 3| 1| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", - "|1.TA.1-44-j19-1.1...|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "|1.TA.1-44-j19-1.1...|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|null|\n", - "+--------------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+\n", + "+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "| key| -1| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 27| 28| 29| 30|\n", + "+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", + "|10.TA.1-11-B-j19-...| 0| 2| 2| 1| 3| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.1-11-B-j19-...| 0| 3| 2| 2| 3| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.1-11-B-j19-...| 0| 0| 4| 4| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.1-11-B-j19-...| 0| 1| 5| 3| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.1-11-B-j19-...| 0| 1| 3| 4| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.26-912-j19-...| 0| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 1|129|143| 71| 33| 17| 11| 4| 3| 3| 1| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 0|333| 40| 22| 9| 5| 3| 4| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 1|340| 37| 21| 6| 2| 6| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 0| 0|177| 23| 7| 0| 1| 0| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 0|266| 81| 33| 15| 10| 3| 4| 3| 1| 0| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|10.TA.79-10-B-j19...| 0|142|139| 69| 31| 16| 8| 5| 4| 1| 1| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|325| 62| 27| 1| 2| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|310| 84| 15| 5| 4| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|330| 73| 9| 4| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|257|103| 42| 14| 1| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0| 69|228| 88| 23| 8| 1| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|300| 86| 22| 8| 1| 0| 2| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|200|158| 38| 19| 2| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "|100.TA.26-6-A-j19...| 0|349| 47| 14| 7| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", + "+--------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "only showing top 20 rows" ] } ], "source": [ "stop_times_final = stop_times_curated.join(stop_times_distrib,\\\n", " on = ['stop_id', 'trip_id'],\\\n", - " how = 'left_outer').drop('stop_id_raw')\\\n", + " how = 'inner').drop('stop_id_raw')\\\n", ".orderBy('trip_id', 'stop_id')\\\n", ".withColumn('key2', concat(col('trip_id'), lit('__'), col('stop_id')))\\\n", ".drop('trip_id').drop('stop_id')\\\n", ".select(col('key2').alias('key'), \"*\")\\\n", ".drop('key2')\n", "\n", "stop_times_final.show(20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We still have null values. Let's count how many null we have on the full table" ] }, { "cell_type": "code", - "execution_count": 81, + "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "reference table stop_times number of lines : 247920\n", - "distribution table number of lines : 247920\n", - "Number of missing keys in distribution : 23503" + "reference table stop_times number of lines : 250777\n", + "distribution table number of lines : 12309\n", + "Number of missing keys in distribution : 0" ] } ], "source": [ "print \"reference table stop_times number of lines : {}\".format(stop_times_curated.count())\n", "print \"distribution table number of lines : {}\".format(stop_times_final.count())\n", "print \"Number of missing keys in distribution : {}\".format(stop_times_final.filter(stop_times_final['0'].isNull()).count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "This represent around 10% of missing data. This is not so bad.\n", - "\n", "We write two version of the table : one with missing values, and one with missing values filled with '1',allowing development of next steps in the meantime (filling these values with a better approach is discussed in next section _Recovering missing distributions_)" ] }, { "cell_type": "code", "execution_count": 108, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "stop_times_final_fill1 = stop_times_final.na.fill(1) # not working, not IntegerType ..." ] }, { "cell_type": "code", "execution_count": 110, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "|key |-1 |0 |1 |2 |3 |4 |5 |6 |7 |8 |9 |10 |11 |12 |13 |14 |15 |16 |17 |18 |19 |20 |21 |22 |23 |24 |25 |26 |27 |28 |29 |30 |\n", "+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "|1.TA.1-231-j19-1.1.H__8502553|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8502879|0 |78 |21 |6 |3 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8502955|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8503598|0 |170|42 |4 |2 |0 |2 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8503598|0 |170|42 |4 |2 |0 |2 |2 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8572600|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8572601|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8572602|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8572603|2 |94 |9 |2 |2 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "|1.TA.1-231-j19-1.1.H__8572747|1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |1 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |0 |\n", "+-----------------------------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+\n", "only showing top 10 rows" ] } ], "source": [ "# This contains the list of columns where we apply replace() function\n", "all_column_names = stop_times_final.columns\n", "columns_to_remove = ['key']\n", "columns_for_replacement = [i for i in all_column_names if i not in columns_to_remove]\n", "\n", "# Doing the replacement on all the requisite columns\n", "for i in columns_for_replacement:\n", " stop_times_final_fill1 = stop_times_final_fill1.withColumn(i,when((col(i).isNull()),int(int(i)<=10))\\\n", " .otherwise(col(i).cast(IntegerType())))\n", "stop_times_final_fill1.show(10, False)" ] }, { "cell_type": "code", "execution_count": 111, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "stop_times_final.write.csv('data/lgpt_guys/distribution_1to1match_wNull.csv', \\\n", " header = True, mode=\"overwrite\")\n", "stop_times_final_fill1.write.csv('data/lgpt_guys/distribution_1to1match_fill1.csv', \\\n", - " header = True, mode=\"overwrite\")\n" + " header = True, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Use local python to make definitive table with right ordering \n", "\n", "We first use the tables where null values were filled with 1 and 0 " ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, + "outputs": [], + "source": [ + "username = 'acoudray'\n", + "stop_times_final_fill1.write.csv(\"/user/{}/distribution_1to1match_fill1.csv\".format(username), \\\n", + " header = True, mode = 'overwrite')" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" - }, + } + ], + "source": [ + "# for geschaetz only\n", + "stop_times_final.write.csv(\"/user/{}/distribution_1to1match_geschaetz.csv\".format(username), \\\n", + " header = True, mode = 'overwrite')" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "name 'stop_times_final_fill1' is not defined\n", - "Traceback (most recent call last):\n", - "NameError: name 'stop_times_final_fill1' is not defined\n", - "\n" - ] + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "username = 'acoudray'\n", - "stop_times_final_fill1.write.csv(\"/user/{}/distribution_1to1match_fill1.csv\".format(username), \\\n", + "# for geschaetz and real only\n", + "stop_times_final.write.csv(\"/user/{}/distribution_1to1match_geschaetzAndReal.csv\".format(username), \\\n", " header = True, mode = 'overwrite')" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "| trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_int| route_id|sequence_1| trip_1| route_int|\n", "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "|1.TA.1-231-j19-1.1.H|8572747| 09:37:00| 09:37:00| 1| 0| 0| 500|1-231-j19-1| 10.0|1.TA.1-231-j19-1.1.H|592705486850|\n", "|1.TA.1-231-j19-1.1.H|8573721| 09:50:00| 09:50:00| 10| 0| 0| 599|1-231-j19-1| 11.0|1.TA.1-231-j19-1.1.H|592705486850|\n", "|1.TA.1-231-j19-1.1.H|8503598| 09:53:00| 09:53:00| 11| 0| 0| 401|1-231-j19-1| 12.0|1.TA.1-231-j19-1.1.H|592705486850|\n", "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "only showing top 3 rows" ] } ], "source": [ "username = 'acoudray'\n", "stop_times_curated.write.csv(\"/user/{}/stop_times_curated_sbbCompatible\".format(username), \\\n", " header = True, mode = 'overwrite')\n", "stop_times_curated.show(3)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 36, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/plain": [ + "[('10.TA.1-11-B-j19-1.1.R__8590314',\n", + " array([0, 2, 2, 1, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8590317',\n", + " array([0, 3, 2, 2, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8594304',\n", + " array([0, 0, 4, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8594307',\n", + " array([0, 1, 5, 3, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.1-11-B-j19-1.1.R__8594310',\n", + " array([0, 1, 3, 4, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.26-912-j19-1.2.R__8576195',\n", + " array([0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.79-10-B-j19-1.2.R__8503051',\n", + " array([ 1, 129, 143, 71, 33, 17, 11, 4, 3, 3, 1, 1, 1,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.79-10-B-j19-1.2.R__8503052',\n", + " array([ 0, 333, 40, 22, 9, 5, 3, 4, 1, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.79-10-B-j19-1.2.R__8503053',\n", + " array([ 1, 340, 37, 21, 6, 2, 6, 2, 2, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0])),\n", + " ('10.TA.79-10-B-j19-1.2.R__8503054',\n", + " array([ 0, 0, 177, 23, 7, 0, 1, 0, 1, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,\n", + " 0, 0, 0, 0, 0, 0]))]" + ] + }, + "execution_count": 36, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ "%local\n", "\n", "from hdfs3 import HDFileSystem\n", "import pandas as pd\n", "import numpy as np \n", "import pickle \n", "import gzip\n", "from itertools import islice\n", "\n", "hdfs = HDFileSystem(host='hdfs://iccluster044.iccluster.epfl.ch', port=8020, user='ebouille')\n", "\n", "username = 'acoudray'\n", "\n", "# Load distribution file from HDFS and concatenate individual csv\n", - "distrib_files = hdfs.glob('/user/{}/distribution_1to1match_fill1.csv/*.csv'.format(username))\n", + "distrib_files = hdfs.glob('/user/{}/distribution_1to1match_geschaetzAndReal.csv/*.csv'.format(username))\n", "distrib = pd.DataFrame()\n", "for file in distrib_files:\n", " with hdfs.open(file) as f:\n", " distrib = distrib.append(pd.read_csv(f))\n", "distrib = distrib.set_index('key')\n", "\n", "# zip index and values to get {key : np.array()} shape \n", "d = dict(zip(distrib.index, np.array(distrib.values)))\n", "\n", "# Write it to local \n", - "with gzip.open(\"../data/distributions.pickle\", \"wb\") as output_file:\n", + "with gzip.open(\"../data/distributions_geschaetzAndReal.pkl.gz\", \"wb\") as output_file:\n", " pickle.dump(d, output_file)\n", "\n", "# Functon to take a slice from a dictionnary - head equivalent\n", "def take(n, iterable):\n", " \"Return first n items of the iterable as a list\"\n", " return list(islice(iterable, n))\n", "\n", "# display a slice of it\n", - "take(50, d.items())" + "take(10, d.items())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How many RAM does the dictionnary occupy when it is open ? Open pickle and calculate amount of memory occupied using _resource_ lib" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "length of dict : 246968\n", "Data size is: 106968218\n" ] } ], "source": [ "%local \n", "\n", "import pickle \n", "import gzip\n", "import sys\n", "import os\n", "import resource\n", "\n", "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", " d = pickle.load(input_file)\n", " \n", "\n", "d['1290.TA.26-32-j19-1.12.H__8591151']\n", "print('length of dict : ',len(d))\n", "\n", "def getsizeof_r(obj):\n", " total = 0\n", " if isinstance(obj, list):\n", " for i in obj:\n", " total += getsizeof_r(i)\n", " elif isinstance(obj, dict):\n", " for k, v in obj.items():\n", " total += getsizeof_r(k) + getsizeof_r(v)\n", " else:\n", " total += sys.getsizeof(obj)\n", " return total\n", "\n", "print('Data size is: {}'.format(getsizeof_r(d)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "How many time does it take to access elements in the dictionnary ?" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[ 0 1008 405 207 95 39 25 11 5 3 0 0 0 0\n", " 0 0 0 0 0 0 0 0 0 0 0 0 0 0\n", " 0 0 0 0]\n", "running time to get value from key when exists : 0.0004305839538574219\n", "\n", "KEY ERROR: .26-32-j19-1.12.H__8591151 not found un distribution dictionnary\n", "running time to get error when key does NOT exists : 0.00010466575622558594\n", "\n" ] } ], "source": [ "%local\n", "\n", "import pickle \n", "import gzip\n", "import time\n", "\n", "def get_distribution(key, dico):\n", " if key in dico:\n", " print(dico[key])\n", " else:\n", " print(\"KEY ERROR: {} not found un distribution dictionnary\".format(key))\n", " \n", "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", " d = pickle.load(input_file)\n", " \n", "this_key = '1290.TA.26-32-j19-1.12.H__8591151'\n", "\n", "start = time.time()\n", "get_distribution(this_key, d)\n", "end = time.time()\n", "print(\"running time to get value from key when exists : {}\\n\".format(end - start))\n", "\n", "start = time.time()\n", "get_distribution(this_key.replace('1290.TA',''), d)\n", "end = time.time()\n", "print(\"running time to get error when key does NOT exists : {}\\n\".format(end - start))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "when key exists we access it in $5\\cdot10^{-4}$ seconds and when it does not exists error message is displayed in $1\\cdot10^{-4}$ seconds. Should be more than enough to be called multiple time when using raptor." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Make function to compute probabilities from distributions\n", "\n", "We make a script that takes a key, a arrival time and a departure time to compute a probability to be at least 2 minutes ahead for transfert. We assume that with less than 2 minutes, we miss the transfert.\n", "\n", "We will use a Poisson distribution to compute this probability using its cumulative distribution.\n", "\n", "A Poisson Process meets the following criteria (in reality many phenomena modeled as Poisson processes don’t meet these exactly):\n", "- Events are independent of each other. The occurrence of one event does not affect the probability another event will occur.\n", "- The average rate (events per time period) is constant.\n", "- Two events cannot occur at the same time.\n", "\n", "Bounds for the tail probabilities of a Poisson random variable ${\\displaystyle X\\sim \\operatorname {Pois} (\\lambda )}$ can be derived using a Chernoff bound argument: \n", "\n", "$${\\displaystyle P(X\\leq x)\\leq {\\frac {(e\\lambda )^{x}e^{-\\lambda }}{x^{x}}},{\\text{ for }}x<\\lambda .}$$\n", "\n", "So in our case all we need to find is $\\lambda$. The positive real number λ is equal to the expected value of X and also to its variance :\n", "\n", "$${\\lambda =\\operatorname {E} (X)=\\operatorname {Var} (X)}$$\n", "\n", "We can easily find $\\lambda$ by finding the _average number of success per unit time_. We have a distribution going from -1 to +30, therefore we will iterate over it and sum up all successes $x_t$ at each time point $t$, for all our time points. \n", "\n", "$$ {\\lambda = \\frac{1}{N} \\displaystyle\\sum_{t=-1}^{N=30} x_t \\cdot t}$$" ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "lambda (expectation given distribution): 4.0 \n", "\n", "Probability of success for transfer time = 5.0 minutes : 0.7851303870304052\n" ] }, { "data": { "text/plain": [ "0.7851303870304052" ] }, "execution_count": 40, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%local\n", "\n", "import pickle \n", "import gzip\n", "import time\n", "import math \n", "import datetime\n", "import time\n", "from scipy.stats import poisson\n", "\n", "with gzip.open(\"../data/distributions.pickle\", \"rb\") as input_file:\n", " d = pickle.load(input_file)\n", " \n", "def get_distrib(key, dico):\n", " if key in dico:\n", " return dico[key]\n", " else:\n", " raise ValueError(\"KEY ERROR: {} not found un distribution dictionnary\".format(key))\n", " \n", "def evaluate_lamda(distrib):\n", " # First calculate total number of measures N\n", " N = 0\n", " for x in distrib:\n", " N += x\n", "\n", " lambda_p = 0 # expectation - we want to calculate it\n", " t = -1 # time = index - 1\n", "\n", " for x in distrib:\n", " lambda_p += t*x\n", " t += 1\n", "\n", " # calculate lambda which is the expectation of x\n", " if N > 0:\n", " lambda_p /= N \n", " print('lambda (expectation given distribution): ',lambda_p, '\\n')\n", " return lambda_p\n", " else : \n", " raise ValueError(\"ERROR : {} distribution has 0 counts\".format(key))\n", " #print('Returning 1 to avoid later problem... \\n')\n", " return 1\n", "\n", "def process_time(str_time):\n", " x = time.strptime(str_time,'%H:%M')\n", " return datetime.timedelta(hours=x.tm_hour,minutes=x.tm_min,seconds=x.tm_sec).total_seconds()\n", "\n", "def get_transfer_time(arr_time, dep_time, delta=2.0):\n", " diff_time_min = ( process_time(dep_time) - process_time(arr_time) ) / 60\n", " return diff_time_min - delta\n", "\n", "def poisson_proba(trip_id, stop_id, arr_time, dep_time, dico):\n", " # Generate key from trip_id / stop_id \n", " key = str(trip_id) + '__' + str(stop_id[0:7]) # 7 first char to be sbb-compatible\n", "\n", " # Get distribution from dictionnary\n", " distrib = get_distrib(key, dico)\n", " \n", " # Calculate transfer time at disposal \n", " T = get_transfer_time(arr_time, dep_time)\n", " \n", " # Get lambda value to calculate proba\n", " lambda_p = evaluate_lamda(distrib)\n", "\n", " # Get proba\n", " poisson_p = poisson.cdf(T, lambda_p)\n", " print('Probability of success for transfer time = {} minutes : '.format(T),poisson_p)\n", "\n", " return poisson_p\n", "\n", "# 129.TA.90-173-Y-j19-1.1.H__8530643\n", "# input data :\n", "trip_id = '129.TA.90-173-Y-j19-1.1.H'\n", "stop_id = '8530643'\n", "arrival_time = '07:45'\n", "departure_time = '07:52'\n", "Pr = poisson_proba(trip_id, stop_id, arrival_time, departure_time, d)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# trip_id missing \n", "\n", "stop_id , time , transport_type -> estimate proba \n", "\n", "# Make recovery tables\n", "1500 x 24 x 5 = 180'000\n", "\n", "# Validate recovery table \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }