diff --git a/notebooks/match_datasets.ipynb b/notebooks/match_datasets.ipynb
index 2da0cbb..b5d00c8 100644
--- a/notebooks/match_datasets.ipynb
+++ b/notebooks/match_datasets.ipynb
@@ -1,1572 +1,2210 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Match datasets \n",
"\n",
"### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.\n",
"\n",
"
Any application without a proper name would be promptly killed.
"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"Current session configs: {'conf': {'spark.app.name': 'lgptguys_final'}, 'kind': 'pyspark'}
"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"\n",
- "ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|
6287 | application_1589299642358_0776 | pyspark | idle | Link | Link | |
6309 | application_1589299642358_0798 | pyspark | idle | Link | Link | |
6316 | application_1589299642358_0805 | pyspark | idle | Link | Link | |
6317 | application_1589299642358_0806 | pyspark | idle | Link | Link | |
6318 | application_1589299642358_0807 | pyspark | idle | Link | Link | |
6319 | application_1589299642358_0808 | pyspark | idle | Link | Link | |
"
+ "ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|
6332 | application_1589299642358_0821 | pyspark | busy | Link | Link | |
"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"%%configure\n",
"{\"conf\": {\n",
" \"spark.app.name\": \"lgptguys_final\"\n",
"}}\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Start Spark"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Starting Spark application\n"
]
},
{
"data": {
"text/html": [
"\n",
- "ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|
6320 | application_1589299642358_0809 | pyspark | idle | Link | Link | ✔ |
"
+ "ID | YARN Application ID | Kind | State | Spark UI | Driver log | Current session? |
---|
6333 | application_1589299642358_0822 | pyspark | idle | Link | Link | ✔ |
"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"SparkSession available as 'spark'.\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"An error was encountered:\n",
"unknown magic command '%spark'\n",
"UnknownMagic: unknown magic command '%spark'\n",
"\n"
]
}
],
"source": [
"# Initialization\n",
"%%spark"
]
},
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "%%send_to_spark -i username -t str -n username"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Import useful libraries "
+ ]
+ },
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
- "name": "stderr",
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "from geopy.distance import great_circle\n",
+ "from pyspark.sql.functions import *\n",
+ "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Read TimeTable curated data\n",
+ "\n",
+ "contains only stops / trips in a 15km range from Zurich HB"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
"output_type": "stream",
"text": [
- "An error was encountered:\n",
- "Variable named username not found.\n"
+ "+-----------+--------+---------------------------+-------+\n",
+ "|stop_id_raw|stop_int|stop_name |stop_id|\n",
+ "+-----------+--------+---------------------------+-------+\n",
+ "|8500926 |0 |Oetwil a.d.L., Schweizäcker|8500926|\n",
+ "|8502186 |1 |Dietikon Stoffelbach |8502186|\n",
+ "|8502186:0:1|2 |Dietikon Stoffelbach |8502186|\n",
+ "|8502186:0:2|3 |Dietikon Stoffelbach |8502186|\n",
+ "|8502186P |4 |Dietikon Stoffelbach |8502186|\n",
+ "+-----------+--------+---------------------------+-------+\n",
+ "only showing top 5 rows"
]
}
],
"source": [
- "%%send_to_spark -i username -t str -n username"
+ "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)\n",
+ "\n",
+ "# We use only first 7 characters of stop_id to remove special cases\n",
+ "stops_15km = stops_15km.select(col('stop_id').alias('stop_id_raw'), 'stop_int', 'stop_name')\\\n",
+ " .withColumn('stop_id',col('stop_id_raw').substr(1, 7))\n",
+ "stops_15km.show(5, False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "### Import useful libraries "
+ "we have slighlty modified stop_id (original is in `stop_id_raw`) to get a stop_id compatible with sbb dataset (only use 7 first characters)"
]
},
{
"cell_type": "code",
- "execution_count": 4,
+ "execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "1586"
+ ]
}
],
"source": [
- "from geopy.distance import great_circle\n",
- "from pyspark.sql.functions import *\n",
- "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType"
+ "stops_15km.select('stop_id').distinct().count()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "This is the number of unique `stop_id` using only 7 first characters. Taking `stop_id_raw`, we wiould get roughly 300 more."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format"
]
},
{
"cell_type": "code",
- "execution_count": 5,
+ "execution_count": 50,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"sbb = spark.read.orc('/data/sbb/orc/istdaten')"
]
},
{
"cell_type": "code",
- "execution_count": 6,
+ "execution_count": 51,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- betriebstag: string (nullable = true)\n",
" |-- fahrt_bezeichner: string (nullable = true)\n",
" |-- betreiber_id: string (nullable = true)\n",
" |-- betreiber_abk: string (nullable = true)\n",
" |-- betreiber_name: string (nullable = true)\n",
" |-- produkt_id: string (nullable = true)\n",
" |-- linien_id: string (nullable = true)\n",
" |-- linien_text: string (nullable = true)\n",
" |-- umlauf_id: string (nullable = true)\n",
" |-- verkehrsmittel_text: string (nullable = true)\n",
" |-- zusatzfahrt_tf: string (nullable = true)\n",
" |-- faellt_aus_tf: string (nullable = true)\n",
" |-- bpuic: string (nullable = true)\n",
" |-- haltestellen_name: string (nullable = true)\n",
" |-- ankunftszeit: string (nullable = true)\n",
" |-- an_prognose: string (nullable = true)\n",
" |-- an_prognose_status: string (nullable = true)\n",
" |-- abfahrtszeit: string (nullable = true)\n",
" |-- ab_prognose: string (nullable = true)\n",
" |-- ab_prognose_status: string (nullable = true)\n",
" |-- durchfahrt_tf: string (nullable = true)"
]
}
],
"source": [
"sbb.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "### Read TimeTable curated data\n",
+ "### Subset SBB data\n",
"\n",
- "contains only stops / trips in a 15km range from Zurich HB"
+ "We take only stop_id in 15 km range from Zurich HB"
]
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "35548"
+ ]
}
],
"source": [
- "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)"
+ "sbb.select('bpuic').distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "## Map timetable data and sbb data \n",
- "\n",
- "There is no direct mapping between the data in sbb/orc and the route/trip_id given in the new tables. We will need to find heuristics to get the data needed.\n",
- "\n",
- "If we cannot find may imagine getting the station using their names, and exploring the average delay distribution for a given station at a given hour, with a given time window wround it. \n",
- "\n",
- "First idea : we use stops names from _actual_data_ which corrresponds to _stop_name_ in the _timetable_ data. We can then estimate the delays according associated with the station and the hour."
+ "This is the total number of unique `stop_id` in sbb dataset "
]
},
{
- "cell_type": "markdown",
+ "cell_type": "code",
+ "execution_count": 53,
"metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
"source": [
- "### Get corresponding stop_id between two datasets \n",
- "\n",
- "We first look at the station names in timetable dataset. Stop_id can be given in multiple formats :\n",
- "- `8502186` : the format defining the stop itself, which matches sbb `bpuic` field\n",
- "\n",
- "We will call the 3 next ones __Special cases__ throughout the notebook :\n",
- "- `8502186:0:1` or `8502186:0:2` : The individual platforms are separated by “:”. A “platform” can also be a platform+sectors (e.g. “8500010:0:7CD”).\n",
- "- `8502186P` : All the stops have a common “parent” “8500010P”.\n",
- "- `8502186:0:Bfpl` : if the RBS uses it for rail replacement buses.\n",
+ "# Used to subset sbb table based on stop_id \n",
+ "l1_id = stops_15km.select('stop_id').collect()\n",
+ "l2_id = [item.stop_id for item in l1_id]\n",
"\n",
- "source : [timetable cookbook](https://opentransportdata.swiss/en/cookbook/gtfs/), section stops.txt \n",
+ "# Used to subset sbb table based on stop_names \n",
+ "l1_name = stops_15km.select('stop_name').collect()\n",
+ "l2_name = [item.stop_name for item in l1_name]\n",
"\n",
- "In the sbb actual_data we find equivalent to stop_id in its first format defining the station without platform information, in its `bpuic` field"
+ "# Make the subset dataframe\n",
+ "sbb_filt = sbb.filter( sbb['bpuic'].isin(l2_id) | sbb['bpuic'].isin(l2_name) )\\\n",
+ " .select('fahrt_bezeichner','haltestellen_name', 'linien_id',\\\n",
+ " 'ankunftszeit', 'abfahrtszeit', 'betriebstag',\\\n",
+ " col('bpuic').alias('stop_id'))\n",
+ "# Get counts before / after subset\n",
+ "#print sbb.count()\n",
+ "#print sbb_filt.count()"
]
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+-------+\n",
- "| bpuic|\n",
- "+-------+\n",
- "|8502128|\n",
- "|8595521|\n",
- "|8591973|\n",
- "|8501214|\n",
- "|8501469|\n",
- "+-------+\n",
- "only showing top 5 rows"
+ "1516"
]
}
],
"source": [
- "sbb.select(\"bpuic\").distinct().show(5)"
+ "print sbb_filt.select('stop_id').distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We first load previously curated _timetable_ dataset, containing only station at less than 15 km from Zurich HauptBanhof. "
+ "This is the number of stop_id that are found in sbb dataset based on _timetable_ dataset."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Now we want to subset to take only the schedule of May 13-17, 2019"
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 54,
"metadata": {},
- "outputs": [],
- "source": []
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "|fahrt_bezeichner|haltestellen_name|linien_id| ankunftszeit| abfahrtszeit|betriebstag|stop_id|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "| 85:11:10:002| Zürich HB| 10|03.09.2018 21:51| | 03.09.2018|8503000|\n",
+ "| 85:11:11:001| Zürich HB| 11| |03.09.2018 06:09| 03.09.2018|8503000|\n",
+ "| 85:11:12:001| Zürich HB| 12|03.09.2018 10:51| | 03.09.2018|8503000|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "only showing top 3 rows"
+ ]
+ }
+ ],
+ "source": [
+ "sbb_filt.show(3)"
+ ]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 55,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "1734112\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "|fahrt_bezeichner|haltestellen_name|linien_id|ankunftszeit |abfahrtszeit |betriebstag|stop_id|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "|85:11:10:001 |Zürich HB |10 |14.05.2019 21:50| |14.05.2019 |8503000|\n",
+ "|85:11:1007:001 |Zürich HB |1007 |14.05.2019 06:23| |14.05.2019 |8503000|\n",
+ "|85:11:1009:001 |Zürich HB |1009 |14.05.2019 07:23| |14.05.2019 |8503000|\n",
+ "|85:11:1011:001 |Zürich HB |1011 |14.05.2019 08:23| |14.05.2019 |8503000|\n",
+ "|85:11:10190:001 |Zürich Flughafen |10190 |14.05.2019 22:46|14.05.2019 22:48|14.05.2019 |8503016|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "only showing top 5 rows"
+ ]
}
],
"source": [
- "timetable_ids = stops_15km.select('stop_name', 'stop_id').distinct().orderBy(\"stop_id\")\n",
- "print timetable_ids.count()\n",
- "timetable_ids.show()"
+ "sbb_subTime = sbb_filt.filter( (sbb_filt.betriebstag == '13.05.2019') | (sbb_filt.betriebstag == '14.05.2019') |\\\n",
+ " (sbb_filt.betriebstag == '15.05.2019') | (sbb_filt.betriebstag == '16.05.2019') |\\\n",
+ " (sbb_filt.betriebstag == '17.05.2019') )\n",
+ "print sbb_subTime.count()\n",
+ "sbb_subTime.show(5,False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We see that we have multiple `stop_id` for the same station, as defined above. We may need to reformat input `stop_id`, e.g. by removing trailiig '0:1', '0:2' or 'P' in the final model (= __special cases__) \n",
- "\n",
- "We then subset sbb table to take only rows that either has a common stop_id or a common stop_name. _Takes a bit of time ~ 10 minutes_"
+ "We write the resulting subset "
]
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 57,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "4741\n",
- "+--------------------+-------+\n",
- "| haltestellen_name|stop_id|\n",
- "+--------------------+-------+\n",
- "|Oetwil a.d.L., Sc...|8500926|\n",
- "|Dietikon Stoffelbach|8502186|\n",
- "|Rudolfstetten Hof...|8502187|\n",
- "| Zufikon Hammergut|8502188|\n",
- "| Horgen Oberdorf|8502208|\n",
- "| Oberrieden Dorf|8502209|\n",
- "| Urdorf|8502220|\n",
- "| Birmensdorf ZH|8502221|\n",
- "| Bonstetten-Wettswil|8502222|\n",
- "| Hedingen|8502223|\n",
- "| Affoltern am Albis|8502224|\n",
- "| Urdorf Weihermatt|8502229|\n",
- "| Zufikon Belv�d�re|8502268|\n",
- "| Zufikon Belvédère|8502268|\n",
- "| Bergfrieden|8502270|\n",
- "| Bremgarten|8502273|\n",
- "| Zufikon|8502274|\n",
- "| Heinrüti|8502275|\n",
- "| Widen Heinr�ti|8502275|\n",
- "| Widen Heinrüti|8502275|\n",
- "+--------------------+-------+\n",
- "only showing top 20 rows"
- ]
}
],
"source": [
- "# Used to subset sbb table based on stop_id \n",
- "l1_id = stops_15km.select('stop_id').collect()\n",
- "l2_id = [item.stop_id for item in l1_id]\n",
- "\n",
- "# Used to subset sbb table based on stop_names \n",
- "l1_name = stops_15km.select('stop_name').collect()\n",
- "l2_name = [item.stop_name for item in l1_name]\n",
- "\n",
- "# Make the subset dataframe\n",
- "sbb_filt = sbb.where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n",
- " .select('haltestellen_name', col('bpuic').alias('stop_id'))\\\n",
- " .distinct().orderBy(\"stop_id\")\n",
- "\n",
- "# Print number of lines and show\n",
- "print sbb_filt.count()\n",
- "sbb_filt.show()"
+ "# save\n",
+ "username = 'acoudray'\n",
+ "sbb_subTime.write.format(\"orc\").save(\"/user/{}/sbb_subTime2.orc\".format(username))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We get duplicated stop names, which seems to come from weird characters. _Maybe to do : Do we want to find a solution for that ? Most probably we just need stop_id and do not care about names_\n",
- "\n",
- "Then we can make a joined table using stop_id from timetable and see how many correspondance we find between timetable and sbb dataset."
+ "Let's also make a small table with only one day, the 13th of May 2019"
]
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 58,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "343567\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "|fahrt_bezeichner|haltestellen_name|linien_id|ankunftszeit |abfahrtszeit |betriebstag|stop_id|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "|85:11:10:001 |Zürich HB |10 |13.05.2019 21:50| |13.05.2019 |8503000|\n",
+ "|85:11:1007:001 |Zürich HB |1007 |13.05.2019 06:23| |13.05.2019 |8503000|\n",
+ "|85:11:1009:001 |Zürich HB |1009 |13.05.2019 07:23| |13.05.2019 |8503000|\n",
+ "|85:11:1011:001 |Zürich HB |1011 |13.05.2019 08:23| |13.05.2019 |8503000|\n",
+ "|85:11:10190:001 |Zürich Flughafen |10190 |13.05.2019 22:46|13.05.2019 22:48|13.05.2019 |8503016|\n",
+ "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n",
+ "only showing top 5 rows"
+ ]
+ }
+ ],
+ "source": [
+ "sbb_oneday = sbb_filt.filter( (sbb_filt.betriebstag == '13.05.2019') )\n",
+ "print sbb_oneday.count()\n",
+ "sbb_oneday.show(5,False)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 59,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# save\n",
+ "username = 'acoudray'\n",
+ "sbb_subTime.write.format(\"orc\").save(\"/user/{}/sbb_oneday.orc\".format(username))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "For now we have these files in /user/{}/ :\n",
+ "- `sbb_filt.orc` : every day with stations < 15km\n",
+ "- `sbb_subTime.orc` : schedule of May 13-17, 2019, stations < 15km\n",
+ "- `sbb_subTime2.orc` : schedule of May 13-17, 2019, stations < 15km, `linien_id` field added\n",
+ "- `sbb_oneday.orc` : May 13th 2019 only, stations < 15km, `linien_id` field added"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Map timetable data and sbb data \n",
+ "\n",
+ "There is no direct mapping between the data in sbb/orc and the route/trip_id given in the _timetable_ dataset. We will need to find heuristics to get the data needed.\n",
+ "\n",
+ "To begin with, we have a look at the stop_id / stop_name correspondance between the two tables."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Get corresponding stop_id between two datasets \n",
+ "\n",
+ "We first look at the station names in timetable dataset. Stop_id can be given in multiple formats :\n",
+ "- `8502186` : the format defining the stop itself, which matches sbb `bpuic` field\n",
+ "\n",
+ "We will call the 3 next ones __Special cases__ throughout the notebook :\n",
+ "- `8502186:0:1` or `8502186:0:2` : The individual platforms are separated by “:”. A “platform” can also be a platform+sectors (e.g. “8500010:0:7CD”).\n",
+ "- `8502186P` : All the stops have a common “parent” “8500010P”.\n",
+ "- `8502186:0:Bfpl` : if the RBS uses it for rail replacement buses.\n",
+ "\n",
+ "source : [timetable cookbook](https://opentransportdata.swiss/en/cookbook/gtfs/), section stops.txt \n",
+ "\n",
+ "In the sbb actual_data we find equivalent to stop_id in its first format defining the station without platform information, in its `bpuic` field"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| bpuic|\n",
+ "+-------+\n",
+ "|8502128|\n",
+ "|8595521|\n",
+ "|8591973|\n",
+ "|8501214|\n",
+ "|8501469|\n",
+ "+-------+\n",
+ "only showing top 5 rows"
+ ]
+ }
+ ],
+ "source": [
+ "sbb.select(\"bpuic\").distinct().show(5)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We first load previously curated _timetable_ dataset, containing only station at less than 15 km from Zurich HauptBanhof. "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "timetable_ids = stops_15km.select('stop_name', 'stop_id').distinct().orderBy(\"stop_id\")\n",
+ "print timetable_ids.count()\n",
+ "timetable_ids.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We see that we have multiple `stop_id` for the same station, as defined above. We may need to reformat input `stop_id`, e.g. by removing trailiig '0:1', '0:2' or 'P' in the final model (= __special cases__) \n",
+ "\n",
+ "We then subset sbb table to take only rows that either has a common stop_id or a common stop_name. _Takes a bit of time ~ 10 minutes_"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "4741\n",
+ "+--------------------+-------+\n",
+ "| haltestellen_name|stop_id|\n",
+ "+--------------------+-------+\n",
+ "|Oetwil a.d.L., Sc...|8500926|\n",
+ "|Dietikon Stoffelbach|8502186|\n",
+ "|Rudolfstetten Hof...|8502187|\n",
+ "| Zufikon Hammergut|8502188|\n",
+ "| Horgen Oberdorf|8502208|\n",
+ "| Oberrieden Dorf|8502209|\n",
+ "| Urdorf|8502220|\n",
+ "| Birmensdorf ZH|8502221|\n",
+ "| Bonstetten-Wettswil|8502222|\n",
+ "| Hedingen|8502223|\n",
+ "| Affoltern am Albis|8502224|\n",
+ "| Urdorf Weihermatt|8502229|\n",
+ "| Zufikon Belv�d�re|8502268|\n",
+ "| Zufikon Belvédère|8502268|\n",
+ "| Bergfrieden|8502270|\n",
+ "| Bremgarten|8502273|\n",
+ "| Zufikon|8502274|\n",
+ "| Heinrüti|8502275|\n",
+ "| Widen Heinr�ti|8502275|\n",
+ "| Widen Heinrüti|8502275|\n",
+ "+--------------------+-------+\n",
+ "only showing top 20 rows"
+ ]
+ }
+ ],
+ "source": [
+ "# Used to subset sbb table based on stop_id \n",
+ "l1_id = stops_15km.select('stop_id').collect()\n",
+ "l2_id = [item.stop_id for item in l1_id]\n",
+ "\n",
+ "# Used to subset sbb table based on stop_names \n",
+ "l1_name = stops_15km.select('stop_name').collect()\n",
+ "l2_name = [item.stop_name for item in l1_name]\n",
+ "\n",
+ "# Make the subset dataframe\n",
+ "sbb_filt = sbb.where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n",
+ " .select('haltestellen_name', col('bpuic').alias('stop_id'))\\\n",
+ " .distinct().orderBy(\"stop_id\")\n",
+ "\n",
+ "# Print number of lines and show\n",
+ "print sbb_filt.count()\n",
+ "sbb_filt.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We get duplicated stop names, which seems to come from weird characters. _Maybe to do : Do we want to find a solution for that ? Most probably we just need stop_id and do not care about names_\n",
+ "\n",
+ "Then we can make a joined table using stop_id from timetable and see how many correspondance we find between timetable and sbb dataset."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"2628\n",
"+-----------+--------------------+--------------------+\n",
"| stop_id| timetable_name| sbb_name|\n",
"+-----------+--------------------+--------------------+\n",
"| 8502508|Spreitenbach, Rai...|Spreitenbach, Rai...|\n",
"| 8503078| Waldburg| Waldburg|\n",
"| 8503101P| Küsnacht ZH| null|\n",
"|8503306:0:2| Dietlikon| null|\n",
"| 8503376|Ottikon b. Kemptthal|Ottikon b. Kemptthal|\n",
"| 8506895| Lufingen, Dorf| Lufingen, Dorf|\n",
"| 8573729|Bonstetten, Isenbach|Bonstetten, Isenbach|\n",
"| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n",
"| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n",
"| 8589111|Horgen, Gumelenst...|Horgen, Gumelenst...|\n",
"| 8590819| Thalwil, Mettli| Thalwil, Mettli|\n",
"| 8591190| Zürich, Heuried| Z�rich, Heuried|\n",
"| 8591190| Zürich, Heuried| Zürich, Heuried|\n",
"| 8591284| Zürich, Neeserweg| Zürich, Neeserweg|\n",
"| 8591284| Zürich, Neeserweg| Z�rich, Neeserweg|\n",
"|8502223:0:2| Hedingen| null|\n",
"|8502274:0:1| Zufikon| null|\n",
"|8502758:0:B|Hausen am Albis, ...| null|\n",
"| 8503001P| Zürich Altstetten| null|\n",
"| 8588312|Effretikon, Kapel...|Effretikon, Kapel...|\n",
"+-----------+--------------------+--------------------+\n",
"only showing top 20 rows"
]
}
],
"source": [
"joined_stop_table = timetable_ids.join(sbb_filt, on=['stop_id'], how='left_outer')\\\n",
" .select(\"stop_id\",\n",
" col('stop_name').alias('timetable_name'),\n",
" col('haltestellen_name').alias('sbb_name'))\n",
"print joined_stop_table.count()\n",
"joined_stop_table.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The stop_id which have no correspondance in the sbb dataset have _null_ in their `sbb_name`.\n",
"\n",
"By counting null values, we can find how many stations could not be found in sbb dataset. We expect all stations containing trailing '0:1' (platform information) or 'P' (parent station) to be absent from sbb dataset. "
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------+--------------+--------+\n",
"|stop_id|timetable_name|sbb_name|\n",
"+-------+--------------+--------+\n",
"| 0| 0| 408|\n",
"+-------+--------------+--------+"
]
}
],
"source": [
"from pyspark.sql.functions import col\n",
"\n",
"joined_stop_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n",
" for c in joined_stop_table.columns]).show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We have 408 station with no match in sbb data. Below, A few lines of these :"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+--------------------+--------+\n",
"| stop_id| timetable_name|sbb_name|\n",
"+-----------+--------------------+--------+\n",
"| 8503101P| Küsnacht ZH| null|\n",
"|8503306:0:2| Dietlikon| null|\n",
"|8502223:0:2| Hedingen| null|\n",
"|8502274:0:1| Zufikon| null|\n",
"|8502758:0:B|Hausen am Albis, ...| null|\n",
"| 8503001P| Zürich Altstetten| null|\n",
"|8503065:0:3| Forch| null|\n",
"| 8503201P| Rüschlikon| null|\n",
"|8503305:0:6| Effretikon| null|\n",
"|8573726:0:D|Bonstetten-Wettsw...| null|\n",
"+-----------+--------------------+--------+\n",
"only showing top 10 rows"
]
}
],
"source": [
"joined_stop_table.filter(col('sbb_name').isNull()).show(10)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now we now how many `stop_id` are missing from sbb dataset.\n",
"\n",
"Last check, we may ask ourselves if all stations without platform information and not being a parent station can be found in sbb. If yes "
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+--------------------+--------+\n",
"| stop_id| timetable_name|sbb_name|\n",
"+-----------+--------------------+--------+\n",
"|8502758:0:B|Hausen am Albis, ...| null|\n",
"| 8502208P| Horgen Oberdorf| null|\n",
"|8502268:0:1| Zufikon Belvédère| null|\n",
"| 8503855P| Horgen, Bahnhof| null|\n",
"| 8502758P|Hausen am Albis, ...| null|\n",
"+-----------+--------------------+--------+\n",
"only showing top 5 rows"
]
}
],
"source": [
"# Regular expression - for some reason, * in first position is not accepted. \n",
"# This is not a problem since all stop_id we use begin with 85 (Switzerland index)\n",
"expr1 = \"85*\\\\:.\\\\:*\" # capture stop_id with platform info\n",
"expr2 = \"85*P$\" # capture stop_id with parent info\n",
"expr3 = \"85*Bfpl$\" # capture stop_id with rail replacement info \n",
"\n",
"joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr1)) | \\\n",
" (joined_stop_table[\"stop_id\"].rlike(expr2)) | \\\n",
" (joined_stop_table[\"stop_id\"].rlike(expr3)) ).show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we know how to get all `stop_id` with weird characters corresponding to special cases in timetable. This special cases can be resolved by removing their trailing characters. \n",
"\n",
"Let's see if there is any null __outside__ theses cases. "
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"70\n",
"+-------+--------------------+--------+\n",
"|stop_id| timetable_name|sbb_name|\n",
"+-------+--------------------+--------+\n",
"|8591421|Zürich, Waldhaus ...| null|\n",
"|8503084| Zürich, Dolder| null|\n",
"|8530648| Greifensee (See)| null|\n",
"|8503081| Felsenegg| null|\n",
"|8503879| Widen, Imbismatt| null|\n",
"|8580847|Bremgarten AG, Bi...| null|\n",
"|8503677| Kilchberg Bendlikon| null|\n",
"|8502979|Oberwil-Lieli, Li...| null|\n",
"|8591394|Zürich, Titlisstr...| null|\n",
"|8502955| Oberlunkhofen, Post| null|\n",
"|8582529|Bremgarten AG, Zu...| null|\n",
"|8572595|Birmensdorf ZH, A...| null|\n",
"|8582462|Bremgarten AG, Ze...| null|\n",
"|8502894|Oberwil-Lieli, Ob...| null|\n",
"|8502560| Berikon, Kirche| null|\n",
"|8576164| Zufikon, Bachhalde| null|\n",
"|8530647| Fällanden (See)| null|\n",
"|8530644| Meilen Autoquai| null|\n",
"|8503082|Adliswil (Luftsei...| null|\n",
"|8502553|Unterlunkhofen, B...| null|\n",
"+-------+--------------------+--------+\n",
"only showing top 20 rows"
]
}
],
"source": [
"# we only wants pattern with 7 characters, finishing with a number and beginning with 85\n",
"expr = '85[0-9][0-9][0-9][0-9][0-9]$'\n",
"stop_id_notfound = joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr)) & \\\n",
" col('sbb_name').isNull() ) \n",
"print stop_id_notfound.count()\n",
"stop_id_notfound.show(10, False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"There is 70 stop_id with no matches in sbb dataset. These stop_id could potentially be problematic if one wants to get delays probabilities at these places. Stations from timetable being special cases can be resolved by only using their first 7 characters. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Get corresponding trip_id between two datasets \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We now have a table with corresponding stop_id between the datasets. We know we can trail special cases in timetable to get biuc code in sbb dataset. Let's now try to match the trip_id between the two tables. \n",
"\n",
"In sbb dataset, the trip ids are defined by `FAHRT_BEZEICHNER` field and in timetable `trip_id`. We will use corresponding station_id and arrival_times in order to get corresponding trip_id.\n",
"\n",
"Our goal is to find a match in sbb dataset for all _timetable_ trips (and not the other way around). So we will focus on getting this assymetrical correspondance table. \n",
"\n",
"When we find a clear one-one match, we will mark them as _resolved_, when there is a one-to-many relation, we will call it _partly_resolved_ and if we cannot find a sbb trip that correspond to a timetable trip_id, we will call it _fail_to_resolve_. \n",
"\n",
"These labels will be used to differentiate 3 different ways to compute probabilities :\n",
"- __One-to-one__ we find a clear match : we use distribution of delays on weekdays for a given trip/station_id based on all past sbb data. \n",
"- __One-to-many__ we find multiple match :\n",
" - First we double check the matches, if we have the same type of transportation for example.\n",
" - If they seem to be correct, we can merge the trips from sbb and get the merged distribution of their delays.\n",
"- __One-to-none__ we find no match : then we get the distribution of delays for similar transportation types, at similar hour (in a window), during weekdays of sbb dataset.\n",
" - Alternative : Try to find the best match and use only the closest location/time to infer a given distribution.\n",
" - Alternative 2 : use k-nearest neighbors in terms of location/time."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
+ "__Timetable dataset__ \n",
+ "\n",
"We first load _timetable_ with curated trip_id, in a 15km radius from Zurich HB. "
]
},
{
"cell_type": "code",
- "execution_count": 8,
+ "execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"247920\n",
"+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n",
"|trip_id |stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_int|route_id |sequence_1|trip_1 |route_int |\n",
"+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n",
"|1.TA.1-231-j19-1.1.H|8572747|09:37:00 |09:37:00 |1 |0 |0 |500 |1-231-j19-1|10.0 |1.TA.1-231-j19-1.1.H|592705486850|\n",
"|1.TA.1-231-j19-1.1.H|8573721|09:50:00 |09:50:00 |10 |0 |0 |599 |1-231-j19-1|11.0 |1.TA.1-231-j19-1.1.H|592705486850|\n",
"|1.TA.1-231-j19-1.1.H|8503598|09:53:00 |09:53:00 |11 |0 |0 |401 |1-231-j19-1|12.0 |1.TA.1-231-j19-1.1.H|592705486850|\n",
"+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n",
"only showing top 3 rows"
]
}
],
"source": [
- "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n",
- "print stop_times_curated.count()\n",
- "stop_times_curated.show(3, False)"
+ "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n",
+ "print stop_times_curated.count()\n",
+ "stop_times_curated.show(3, False)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 31,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Make the subset dataframe\n",
+ "stop_times_format = stop_times_curated\\\n",
+ " .select('trip_id', 'stop_id', \n",
+ " unix_timestamp(stop_times_curated.arrival_time, 'HH:mm:ss')\\\n",
+ " .alias('arrival_time_ut'),\\\n",
+ " unix_timestamp(stop_times_curated.departure_time, 'HH:mm:ss')\\\n",
+ " .alias('departure_time_ut') )\\\n",
+ " .select('trip_id', 'stop_id', \n",
+ " from_unixtime('arrival_time_ut')\\\n",
+ " .alias('arrival_time_dty'),\n",
+ " from_unixtime('departure_time_ut')\\\n",
+ " .alias('departure_time_dty'))\\\n",
+ " .select('trip_id', 'stop_id', \n",
+ " date_format('arrival_time_dty', 'hh:mm')\\\n",
+ " .alias('arrival_time'),\n",
+ " date_format('departure_time_dty', 'hh:mm')\\\n",
+ " .alias('departure_time'))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We have reformated `arrival_time` and `departure_time` to get it in `hh:mm` format. To be sure to get every possible match and allow possible mistakes on the time schedules between both datasets, we decided to round to quarter hour the times. Same trip_ids should still be able to be seen as they should share stop_id and times, and this approximation to the quarter hour should be enough to avoid getting wrong trips."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 32,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+-------+------------+--------------+\n",
+ "|trip_id |stop_id|arrival_time|departure_time|\n",
+ "+--------------------+-------+------------+--------------+\n",
+ "|1.TA.1-231-j19-1.1.H|8572747|09:30 |09:30 |\n",
+ "|1.TA.1-231-j19-1.1.H|8573721|09:45 |09:45 |\n",
+ "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8573720|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8573721|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8573722|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8573723|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8583071|10:00 |10:00 |\n",
+ "|1.TA.1-231-j19-1.1.H|8572603|10:00 |10:00 |\n",
+ "+--------------------+-------+------------+--------------+\n",
+ "only showing top 10 rows"
+ ]
+ }
+ ],
+ "source": [
+ "# next two lines used to round time to quarter hour ( = 900 seconds )\n",
+ "quarter_r_arr = ((round(unix_timestamp(col(\"arrival_time\"), 'HH:mm') / 900) * 900)\n",
+ " .cast(\"timestamp\"))\n",
+ "quarter_r_dep = ((round(unix_timestamp(col(\"departure_time\"), 'HH:mm') / 900) * 900)\n",
+ " .cast(\"timestamp\"))\n",
+ "\n",
+ "# Times are rounded\n",
+ "stop_times_format_r = stop_times_format.withColumn(\"arrival_time_rounded\", quarter_r_arr) \\\n",
+ " .withColumn(\"departure_time_rounded\", quarter_r_dep)\n",
+ "\n",
+ "# Reformating date/time in time with hh:mm format\n",
+ "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id',\\\n",
+ " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n",
+ " .alias('arrival_time_ut'),\\\n",
+ " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n",
+ " .alias('departure_time_ut') )\\\n",
+ " .select('trip_id', 'stop_id',\n",
+ " from_unixtime('arrival_time_ut')\\\n",
+ " .alias('arrival_time_dty'),\n",
+ " from_unixtime('departure_time_ut')\\\n",
+ " .alias('departure_time_dty'))\\\n",
+ " .select('trip_id', 'stop_id',\n",
+ " date_format('arrival_time_dty', 'hh:mm')\\\n",
+ " .alias('arrival_time'),\n",
+ " date_format('departure_time_dty', 'hh:mm')\\\n",
+ " .alias('departure_time'))\n",
+ "\n",
+ "# We use only first 7 characters of stop_id to remove special cases\n",
+ "stop_times_format = stop_times_format.withColumn('stop_id',\n",
+ " stop_times_format.stop_id.substr(1, 7))\n",
+ "\n",
+ "stop_times_format.show(10, False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Here is an example of a single trip_id : it goes from stop to stop and has various arrival / departure along its journey. As the times are rounded, they seem to be the same. Most probably the time between stops is very short."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 33,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------------+-------+------------+--------------+\n",
+ "|trip_id |stop_id|arrival_time|departure_time|\n",
+ "+------------------------+-------+------------+--------------+\n",
+ "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8594239|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8580522|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591323|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591066|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591292|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591326|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591335|08:15 |08:15 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591339|08:30 |08:30 |\n",
+ "|813.TA.26-33-B-j19-1.6.R|8591362|08:30 |08:30 |\n",
+ "+------------------------+-------+------------+--------------+\n",
+ "only showing top 10 rows"
+ ]
+ }
+ ],
+ "source": [
+ "stop_times_format.filter(stop_times_format['trip_id'] == '813.TA.26-33-B-j19-1.6.R').show(10,False)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We will only use first 7 characters of `stop_id` field in order to get exact match with sbb dataset. "
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We use time rounded to quarter hour, so that unexact match between two datasets should not be a problem."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 34,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "#print stop_times_format.count()\n",
+ "#stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match.\n",
+ "\n",
+ " __SBB dataset__\n",
+ " \n",
+ "We will subset sbb dataset to get only the 13th of May in sbb dataset :"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 35,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "1734112"
+ ]
+ }
+ ],
+ "source": [
+ "username='acoudray'\n",
+ "sbb = spark.read.orc(\"/user/{}/sbb_subTime.orc\".format(username))\n",
+ "sbb.count()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 36,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "# Following are used to round arrival / departure times\n",
+ "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n",
+ " .cast(\"timestamp\"))\n",
+ "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n",
+ " .cast(\"timestamp\"))\n",
+ "\n",
+ "# Make the subset dataframe\n",
+ "sbb_filt = sbb.filter( ( (col('ankunftszeit') != \"\") | (col('abfahrtszeit') != \"\") ) )\\\n",
+ " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n",
+ " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 37,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "application/vnd.jupyter.widget-view+json": {
+ "model_id": "",
+ "version_major": 2,
+ "version_minor": 0
+ },
+ "text/plain": [
+ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+----------------+-----------------+-------+------------+--------------+\n",
+ "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n",
+ "+----------------+-----------------+-------+------------+--------------+\n",
+ "|85:11:10:001 |Zürich HB |8503000|09:45 |null |\n",
+ "|85:11:1007:001 |Zürich HB |8503000|06:30 |null |\n",
+ "|85:11:1009:001 |Zürich HB |8503000|07:30 |null |\n",
+ "|85:11:1011:001 |Zürich HB |8503000|08:30 |null |\n",
+ "|85:11:10190:001 |Zürich Flughafen |8503016|10:45 |10:45 |\n",
+ "+----------------+-----------------+-------+------------+--------------+\n",
+ "only showing top 5 rows"
+ ]
+ }
+ ],
+ "source": [
+ "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n",
+ " date_format(from_unixtime(unix_timestamp(col('ankunftszeit_rounded'), \\\n",
+ " 'dd.MM.yyyy HH:mm')), \\\n",
+ " 'hh:mm')\\\n",
+ " .alias('arrival_time'),\n",
+ " date_format(from_unixtime(unix_timestamp(col('abfahrtszeit_rounded'), \\\n",
+ " 'dd.MM.yyyy HH:mm')), \\\n",
+ " 'hh:mm')\\\n",
+ " .alias('departure_time'))\n",
+ "sbb_filt_format.show(5, False)"
]
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 38,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "40184"
+ ]
}
],
"source": [
- "# Make the subset dataframe\n",
- "stop_times_format = stop_times_curated\\\n",
- " .select('trip_id', 'stop_id', \n",
- " unix_timestamp(stop_times_curated.arrival_time, 'HH:mm:ss')\\\n",
- " .alias('arrival_time_ut'),\\\n",
- " unix_timestamp(stop_times_curated.departure_time, 'HH:mm:ss')\\\n",
- " .alias('departure_time_ut') )\\\n",
- " .select('trip_id', 'stop_id', \n",
- " from_unixtime('arrival_time_ut')\\\n",
- " .alias('arrival_time_dty'),\n",
- " from_unixtime('departure_time_ut')\\\n",
- " .alias('departure_time_dty'))\\\n",
- " .select('trip_id', 'stop_id', \n",
- " date_format('arrival_time_dty', 'hh:mm')\\\n",
- " .alias('arrival_time'),\n",
- " date_format('departure_time_dty', 'hh:mm')\\\n",
- " .alias('departure_time'))"
+ "sbb_filt_format.select(\"fahrt_bezeichner\").distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We have reformated `arrival_time` and `departure_time` to get it in `hh:mm` format. To be sure to get every possible match and allow possible mistakes on the time schedules between both datasets, we decided to round to quarter hour the times. Same trip_ids should still be able to be seen as they should share stop_id and times, and this approximation to the quarter hour should be enough to avoid getting wrong trips."
+ "Let's have a look at a single trip_id according to sbb dataset (called `fahrt_bezeichner`). "
]
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 39,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+--------------------+-------+------------+--------------+\n",
- "|trip_id |stop_id|arrival_time|departure_time|\n",
- "+--------------------+-------+------------+--------------+\n",
- "|1.TA.1-231-j19-1.1.H|8572747|09:30 |09:30 |\n",
- "|1.TA.1-231-j19-1.1.H|8573721|09:45 |09:45 |\n",
- "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8573720|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8573721|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8573722|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8573723|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8583071|10:00 |10:00 |\n",
- "|1.TA.1-231-j19-1.1.H|8572603|10:00 |10:00 |\n",
- "+--------------------+-------+------------+--------------+\n",
- "only showing top 10 rows"
+ "+----------------+-----------------+-------+------------+--------------+\n",
+ "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n",
+ "+----------------+-----------------+-------+------------+--------------+\n",
+ "+----------------+-----------------+-------+------------+--------------+"
]
}
],
"source": [
- "# next two lines used to round time to quarter hour ( = 900 seconds )\n",
- "quarter_r_arr = ((round(unix_timestamp(col(\"arrival_time\"), 'HH:mm') / 900) * 900)\n",
- " .cast(\"timestamp\"))\n",
- "quarter_r_dep = ((round(unix_timestamp(col(\"departure_time\"), 'HH:mm') / 900) * 900)\n",
- " .cast(\"timestamp\"))\n",
- "\n",
- "# Times are rounded\n",
- "stop_times_format_r = stop_times_format.withColumn(\"arrival_time_rounded\", quarter_r_arr) \\\n",
- " .withColumn(\"departure_time_rounded\", quarter_r_dep)\n",
- "\n",
- "# Reformating date/time in time with hh:mm format\n",
- "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id',\\\n",
- " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n",
- " .alias('arrival_time_ut'),\\\n",
- " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n",
- " .alias('departure_time_ut') )\\\n",
- " .select('trip_id', 'stop_id',\n",
- " from_unixtime('arrival_time_ut')\\\n",
- " .alias('arrival_time_dty'),\n",
- " from_unixtime('departure_time_ut')\\\n",
- " .alias('departure_time_dty'))\\\n",
- " .select('trip_id', 'stop_id',\n",
- " date_format('arrival_time_dty', 'hh:mm')\\\n",
- " .alias('arrival_time'),\n",
- " date_format('departure_time_dty', 'hh:mm')\\\n",
- " .alias('departure_time'))\n",
- "\n",
- "# We use only first 7 characters of stop_id to remove special cases\n",
- "stop_times_format = stop_times_format.withColumn('stop_id',\n",
- " stop_times_format.stop_id.substr(1, 7))\n",
- "\n",
- "stop_times_format.show(10, False)"
+ "fahrt_bezeichner_example = '85:78:24815:001'\n",
+ "sbb_filt_format.filter(sbb_filt_format['fahrt_bezeichner'] == fahrt_bezeichner_example)\\\n",
+ " .show(10,False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "Here is an example of a single trip_id : it goes from stop to stop and has various arrival / departure along its journey. As the times are rounded, they seem to be the same. Most probably the time between stops is very short."
+ "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ so that we can only have _null_ values on the sbb side (assymetrical join)."
]
},
{
"cell_type": "code",
- "execution_count": 11,
+ "execution_count": 40,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+------------------------+-------+------------+--------------+\n",
- "|trip_id |stop_id|arrival_time|departure_time|\n",
- "+------------------------+-------+------------+--------------+\n",
- "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8594239|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8580522|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591323|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591066|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591292|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591326|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591335|08:15 |08:15 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591339|08:30 |08:30 |\n",
- "|813.TA.26-33-B-j19-1.6.R|8591362|08:30 |08:30 |\n",
- "+------------------------+-------+------------+--------------+\n",
- "only showing top 10 rows"
+ "+-------+------------+------------------------+---------------------+\n",
+ "|stop_id|arrival_time|trip_id |fahrt_bezeichner |\n",
+ "+-------+------------+------------------------+---------------------+\n",
+ "|8502186|04:45 |54.TA.1-17-A-j19-1.5.H |85:31:660:000 |\n",
+ "|8502208|06:15 |699.TA.26-24-j19-1.256.R|85:11:20467:001 |\n",
+ "|8502208|10:15 |643.TA.26-24-j19-1.254.R|85:11:20486:001 |\n",
+ "|8502209|08:15 |727.TA.26-24-j19-1.273.H|85:11:20427:001 |\n",
+ "|8502221|04:00 |89.TA.26-5-A-j19-1.37.R |85:11:18561:001 |\n",
+ "|8502221|12:00 |88.TA.26-14-j19-1.18.R |85:11:19445:001 |\n",
+ "|8502274|06:00 |60.TA.1-17-A-j19-1.5.H |85:31:565:000 |\n",
+ "|8502274|11:00 |32.TA.1-17-A-j19-1.5.H |85:31:591:000 |\n",
+ "|8502276|02:15 |44.TA.1-17-A-j19-1.5.H |85:31:546:000 |\n",
+ "|8502276|06:15 |60.TA.1-17-A-j19-1.5.H |85:31:510:000 |\n",
+ "|8502277|01:15 |40.TA.1-17-A-j19-1.5.H |85:31:542:000 |\n",
+ "|8502278|01:15 |40.TA.1-17-A-j19-1.5.H |85:31:642:000 |\n",
+ "|8502495|02:15 |42.TA.26-70-A-j19-1.3.R |85:849:58306-03070-1 |\n",
+ "|8502495|02:45 |997.TA.26-70-A-j19-1.5.H|85:849:58308-03070-1 |\n",
+ "|8502495|06:30 |965.TA.26-70-A-j19-1.5.H|85:849:147522-01184-1|\n",
+ "|8502495|07:15 |508.TA.26-70-A-j19-1.3.R|85:849:76225-02070-1 |\n",
+ "|8502495|07:30 |44.TA.26-184-j19-1.1.H |85:849:58286-03070-1 |\n",
+ "|8502495|08:00 |482.TA.26-185-j19-1.2.R |85:849:147526-01184-1|\n",
+ "|8502495|08:45 |497.TA.26-70-A-j19-1.3.R|85:849:58392-03070-1 |\n",
+ "|8502495|12:45 |89.TA.26-70-A-j19-1.3.R |85:849:147556-01184-1|\n",
+ "+-------+------------+------------------------+---------------------+\n",
+ "only showing top 20 rows"
]
}
],
"source": [
- "stop_times_format.filter(stop_times_format['trip_id'] == '813.TA.26-33-B-j19-1.6.R').show(10,False)"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "We will only use first 7 characters of `stop_id` field in order to get exact match with sbb dataset. "
+ "joined_trip_table = stop_times_format.join(sbb_filt_format,\\\n",
+ " on=['stop_id', 'arrival_time'], \n",
+ " how='left_outer')\\\n",
+ " .select('stop_id', 'arrival_time',\n",
+ " 'trip_id', 'fahrt_bezeichner') \\\n",
+ " .distinct()\n",
+ "joined_trip_table.show(20, False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We use time rounded to quarter hour, so that unexact match between two datasets should not be a problem."
+ "This is the raw results of the intersection. Note that we used a `distinct()` to avoid having multiple lines corresponding to multiple days. Each line must be a different stops no matter which day it is, so that we can count how many stops (with same arrival_time) are share between trip_id from _timetable_ and sbb data."
]
},
{
"cell_type": "code",
- "execution_count": 13,
+ "execution_count": 41,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
- },
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "247920\n",
- "22807"
- ]
}
],
"source": [
- "#print stop_times_format.count()\n",
- "#stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()"
- ]
- },
- {
- "cell_type": "markdown",
- "metadata": {},
- "source": [
- "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match.\n",
+ "joined_trip_filt = joined_trip_table.select(\"trip_id\", \"fahrt_bezeichner\")\\\n",
+ " .groupBy(\"trip_id\", \"fahrt_bezeichner\")\\\n",
+ " .count()\n",
"\n",
- "We will subset sbb dataset to get only the 13th of May in sbb dataset :"
+ "# .filter( (col('fahrt_bezeichner').isNotNull()) )\\"
]
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 42,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------------+----------------------+-----+\n",
+ "|trip_id |fahrt_bezeichner |count|\n",
+ "+--------------------------+----------------------+-----+\n",
+ "|1.TA.26-350-j19-1.1.H |85:801:200881-35200-1 |10 |\n",
+ "|1630.TA.26-3-A-j19-1.8.R |85:3849:50310-03003-1 |4 |\n",
+ "|137.TA.26-9-A-j19-1.57.R |85:11:18977:002 |13 |\n",
+ "|1291.TA.26-80-j19-1.8.R |85:849:60037-03080-1 |1 |\n",
+ "|1474.TA.26-33E-j19-1.9.R |85:849:94630-11412-1 |7 |\n",
+ "|1986.TA.26-11-A-j19-1.27.R|85:849:67214-12412-1 |1 |\n",
+ "|2728.TA.26-31-j19-1.17.H |85:3849:166531-06008-1|1 |\n",
+ "|1262.TA.26-8-C-j19-1.8.R |85:849:247075-16341-1 |1 |\n",
+ "|1142.TA.26-4-j19-1.25.H |85:3849:85685-02002-1 |7 |\n",
+ "|1026.TA.26-4-j19-1.25.H |85:3849:85630-02002-1 |4 |\n",
+ "|1361.TA.26-2-A-j19-1.24.R |85:3849:86350-02004-1 |2 |\n",
+ "|1432.TA.26-2-A-j19-1.24.R |85:3849:86339-02004-1 |2 |\n",
+ "|106.TA.26-38-j19-1.2.H |85:849:59831-03080-1 |1 |\n",
+ "|118.TA.26-311-j19-1.2.R |85:849:240925-37301-1 |1 |\n",
+ "|1567.TA.26-3-A-j19-1.8.R |85:3849:90085-02017-1 |1 |\n",
+ "|2320.TA.26-14-A-j19-1.25.H|85:3849:89988-02017-1 |1 |\n",
+ "|1318.TA.26-13-j19-1.20.R |85:3849:50558-03004-1 |2 |\n",
+ "|2044.TA.26-13-j19-1.24.H |85:3849:89180-02013-1 |4 |\n",
+ "|1861.TA.26-9-B-j19-1.19.R |85:3849:52137-03009-1 |17 |\n",
+ "|182.TA.26-760-j19-1.7.H |85:773:7251-01752-1 |2 |\n",
+ "+--------------------------+----------------------+-----+\n",
+ "only showing top 20 rows"
+ ]
}
],
"source": [
- "sbb_sub = sbb.filter( (sbb.ankunftszeit != \"\") | (sbb.abfahrtszeit != \"\") )\\\n",
- " .select('fahrt_bezeichner','haltestellen_name', \\\n",
- " 'ankunftszeit', 'abfahrtszeit', 'bpuic')\\\n",
- " .withColumn('ankunftszeit_ts',to_timestamp(col('ankunftszeit'),\\\n",
- " format='dd.MM.yyyy HH:mm:ss'))\\\n",
- " .withColumn('abfahrtszeit_ts',to_timestamp(col('abfahrtszeit'),\\\n",
- " format='dd.MM.yyyy HH:mm'))\\"
+ "joined_trip_filt.show(20, False)"
]
},
{
"cell_type": "code",
- "execution_count": 13,
+ "execution_count": 43,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "2368016"
+ ]
}
],
"source": [
- "# Used to subset sbb table based on stop_id \n",
- "l1_id = stops_15km.select('stop_id').collect()\n",
- "l2_id = [item.stop_id for item in l1_id]\n",
- "\n",
- "# Make the subset dataframe\n",
- "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n",
- " .cast(\"timestamp\"))\n",
- "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n",
- " .cast(\"timestamp\"))\n",
- "\n",
- "sbb_filt = sbb.filter( ( (col('ankunftszeit') != \"\") | (col('abfahrtszeit') != \"\") ) & \\\n",
- " sbb['bpuic'].isin(l2_id) )\\\n",
- " .select('fahrt_bezeichner','haltestellen_name', \\\n",
- " 'ankunftszeit', 'abfahrtszeit', \\\n",
- " col('bpuic').alias('stop_id')) \\\n",
- " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n",
- " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep)"
+ "joined_trip_filt.count()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "How many unique trip_id from _timetable_ can we get in this intersection ?"
]
},
{
"cell_type": "code",
- "execution_count": 14,
+ "execution_count": 44,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+----------------+-----------------+-------+------------+--------------+\n",
- "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n",
- "+----------------+-----------------+-------+------------+--------------+\n",
- "|85:11:10:002 |Zürich HB |8503000|09:45 |null |\n",
- "|85:11:11:001 |Zürich HB |8503000|null |06:15 |\n",
- "|85:11:12:001 |Zürich HB |8503000|10:45 |null |\n",
- "|85:11:1251:003 |Zürich HB |8503000|07:00 |null |\n",
- "|85:11:1252:001 |Zürich HB |8503000|09:30 |09:30 |\n",
- "+----------------+-----------------+-------+------------+--------------+\n",
- "only showing top 5 rows"
+ "22807"
]
}
],
"source": [
- "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n",
- " date_format(from_unixtime(unix_timestamp(col('ankunftszeit_rounded'), \\\n",
- " 'dd.MM.yyyy HH:mm')), \\\n",
- " 'hh:mm')\\\n",
- " .alias('arrival_time'),\n",
- " date_format(from_unixtime(unix_timestamp(col('abfahrtszeit_rounded'), \\\n",
- " 'dd.MM.yyyy HH:mm')), \\\n",
- " 'hh:mm')\\\n",
- " .alias('departure_time'))\n",
- "sbb_filt_format.show(5, False)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "#sbb_filt_format.select(\"fahrt_bezeichner\").distinct().orderBy(\"fahrt_bezeichner\").count()"
+ "joined_trip_filt.select('trip_id').distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "Let's have a look at a single trip_id according to sbb dataset (called `fahrt_bezeichner`). "
+ "How many unique _fahrt_bezeichner_ from sbb data can we find in the intersection ?"
]
},
{
"cell_type": "code",
- "execution_count": 15,
+ "execution_count": 45,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+----------------+-----------------+-------+------------+--------------+\n",
- "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n",
- "+----------------+-----------------+-------+------------+--------------+\n",
- "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n",
- "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n",
- "|85:78:24815:001 |Friesenberg |8503052|01:45 |01:45 |\n",
- "|85:78:24815:001 |Schweighof |8503053|01:45 |01:45 |\n",
- "|85:78:24815:001 |Triemli |8503054|01:45 |01:45 |\n",
- "|85:78:24815:001 |Uitikon Waldegg |8503055|01:45 |01:45 |\n",
- "|85:78:24815:001 |Ringlikon |8503056|01:45 |01:45 |\n",
- "|85:78:24815:001 |Uetliberg |8503057|02:00 |null |\n",
- "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n",
- "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n",
- "+----------------+-----------------+-------+------------+--------------+\n",
- "only showing top 10 rows"
+ "38294"
]
}
],
"source": [
- "fahrt_bezeichner_example = '85:78:24815:001'\n",
- "sbb_filt_format.filter(sbb_filt_format['fahrt_bezeichner'] == fahrt_bezeichner_example)\\\n",
- " .show(10,False)"
+ "joined_trip_filt.select('fahrt_bezeichner').distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ so that we can only have _null_ values on the sbb side (assymetrical join)."
+ "Let's try to use a threshold to only take intersection whenever there is at least X stops shared between the two trip_id version"
]
},
{
"cell_type": "code",
- "execution_count": 16,
+ "execution_count": 46,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
- "+-------+------------------------+---------------------+\n",
- "|stop_id|trip_id |fahrt_bezeichner |\n",
- "+-------+------------------------+---------------------+\n",
- "|8502508|269.TA.26-303-j19-1.3.H |85:849:609297-18301-1|\n",
- "|8502750|35.TA.1-331-j19-1.3.R |null |\n",
- "|8503000|128.TA.17-4-j19-1.23.R |85:11:1957:001 |\n",
- "|8503000|153.TA.26-6-A-j19-1.34.R|85:11:71531:006 |\n",
- "|8503000|173.TA.21-75-j19-1.47.R |85:11:18368:001 |\n",
- "|8503000|173.TA.21-75-j19-1.47.R |85:11:89171:000 |\n",
- "|8503000|182.TA.26-1-j19-1.90.H |85:11:88622:002 |\n",
- "|8503000|237.TA.1-36-j19-1.79.R |85:11:10580:002 |\n",
- "|8503000|269.TA.26-11-j19-1.78.H |85:11:554:006 |\n",
- "|8503000|3.TA.6-1-j19-1.3.R |85:11:30580:003 |\n",
- "|8503000|3.TA.6-1-j19-1.3.R |85:11:88921:003 |\n",
- "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:727:002 |\n",
- "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:30730:003 |\n",
- "|8503000|345.TA.16-5-j19-1.71.R |85:11:730:002 |\n",
- "|8503000|345.TA.16-5-j19-1.71.R |85:11:19122:001 |\n",
- "|8503000|345.TA.16-5-j19-1.71.R |85:11:70727:009 |\n",
- "|8503000|352.TA.20-2-j19-1.146.H |85:11:1531:002 |\n",
- "|8503000|38.TA.26-9-A-j19-1.3.H |85:11:70580:001 |\n",
- "|8503000|551.TA.26-11-j19-1.126.R|85:11:88320:003 |\n",
- "|8503000|63.TA.1-36-j19-1.21.R |85:11:31615:039 |\n",
- "+-------+------------------------+---------------------+\n",
- "only showing top 20 rows"
+ "+------------------------+---------------------+-----+\n",
+ "|trip_id |fahrt_bezeichner |count|\n",
+ "+------------------------+---------------------+-----+\n",
+ "|438.TA.26-759-j19-1.4.H |85:773:194167-24759-1|5 |\n",
+ "|598.TA.26-33-B-j19-1.3.H|85:849:92773-02072-1 |5 |\n",
+ "|4116.TA.26-31-j19-1.26.R|85:3849:50431-03003-1|6 |\n",
+ "|622.TA.26-3-A-j19-1.2.H |85:3849:50253-03003-1|7 |\n",
+ "|870.TA.26-11-A-j19-1.3.H|85:3849:88866-02011-1|8 |\n",
+ "+------------------------+---------------------+-----+\n",
+ "only showing top 5 rows"
]
}
],
"source": [
- "joined_trip_table = stop_times_format.join(sbb_filt_format,\\\n",
- " on=['stop_id', 'arrival_time'], \n",
- " how='left_outer')\\\n",
- " .select('stop_id', \n",
- " 'trip_id', 'fahrt_bezeichner') \\\n",
- " .distinct()\n",
- "joined_trip_table.show(20, False)"
+ "cutoff_min_overlap = 5\n",
+ "joined_trip_atL5 = joined_trip_filt.filter(col('count') >= cutoff_min_overlap )\n",
+ "joined_trip_atL5.show(20, False)"
]
},
{
"cell_type": "code",
- "execution_count": 17,
+ "execution_count": 47,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "15548"
+ ]
}
],
"source": [
- "#joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n",
- "# for c in joined_trip_table.columns]).show()"
+ "joined_trip_atL5.select('trip_id').distinct().count()"
]
},
{
"cell_type": "code",
- "execution_count": 18,
+ "execution_count": 48,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "31103"
+ ]
}
],
"source": [
- "joined_trip_filt = joined_trip_table.filter( (col('trip_id').isNotNull()) & \n",
- " (col('fahrt_bezeichner').isNotNull()) )\\\n",
- " .select(\"trip_id\", \"fahrt_bezeichner\")\\\n",
- " .groupBy(\"trip_id\", \"fahrt_bezeichner\")\\\n",
- " .count()"
+ "joined_trip_atL5.select('fahrt_bezeichner').distinct().count()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "There is approximately 2 trips in sbb data for 1 trip in timetable data"
]
},
{
"cell_type": "code",
- "execution_count": null,
+ "execution_count": 49,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
- "model_id": "bbc32e4a021a45eb82f4870a3893fbec",
+ "model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
- "print joined_trip_filt.count()\n",
- "joined_trip_filt.show(10, False)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": [
- "joined_trip_filt.write.csv('data/lgpt_guys/joined_trip_id.csv', header = True, mode=\"overwrite\")"
+ "joined_trip_filt.write.csv('data/lgpt_guys/joined_trip_filt.csv', header = True, mode=\"overwrite\")\n",
+ "joined_trip_atL5.write.csv('data/lgpt_guys/joined_trip_atL5.csv', header = True, mode=\"overwrite\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Explore trips with no matches "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This shows how many trip_id / fahrt_bezeichner there is in the joined dataframe :"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"joined_trip_filt = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print joined_trip_table.select(\"trip_id\").distinct().count()\n",
"print joined_trip_table.select(\"fahrt_bezeichner\").distinct().count()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"And below how many of them we could get a match (of at least 1 stop/time, therefore most probably higher than reality)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print joined_trip_filt.select(\"trip_id\").distinct().count()\n",
"print joined_trip_filt.select(\"fahrt_bezeichner\").distinct().count()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n",
" for c in joined_trip_table.columns]).show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"joined_trip_table.filter( (col('trip_id').isNull()) |\n",
" (col('fahrt_bezeichner').isNull()) ).show(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can create a dictionnaries with stop_id -> stop_name structure from timetable data, to be able to compare both dataset more carefully "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"newrdd = timetable_ids.rdd\n",
"timetable_rdd = newrdd.map(lambda x : (x[1],x[0]))\n",
"timetable_stopid_dict = timetable_rdd.collectAsMap()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is an exemple of how to get a station name from a stop_id "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"timetable_stopid_dict.get(\"8502209\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 3
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}