diff --git a/notebooks/match_datasets.ipynb b/notebooks/match_datasets.ipynb index 2da0cbb..b5d00c8 100644 --- a/notebooks/match_datasets.ipynb +++ b/notebooks/match_datasets.ipynb @@ -1,1572 +1,2210 @@ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Match datasets \n", "\n", "### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.\n", "\n", "
Any application without a proper name would be promptly killed.
" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Current session configs: {'conf': {'spark.app.name': 'lgptguys_final'}, 'kind': 'pyspark'}
" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "\n", - "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6287application_1589299642358_0776pysparkidleLinkLink
6309application_1589299642358_0798pysparkidleLinkLink
6316application_1589299642358_0805pysparkidleLinkLink
6317application_1589299642358_0806pysparkidleLinkLink
6318application_1589299642358_0807pysparkidleLinkLink
6319application_1589299642358_0808pysparkidleLinkLink
" + "IDYARN Application IDKindStateSpark UIDriver logCurrent session?6332application_1589299642358_0821pysparkbusyLinkLink" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "%%configure\n", "{\"conf\": {\n", " \"spark.app.name\": \"lgptguys_final\"\n", "}}\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Spark" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", - "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6320application_1589299642358_0809pysparkidleLinkLink
" + "IDYARN Application IDKindStateSpark UIDriver logCurrent session?6333application_1589299642358_0822pysparkidleLinkLink✔" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "unknown magic command '%spark'\n", "UnknownMagic: unknown magic command '%spark'\n", "\n" ] } ], "source": [ "# Initialization\n", "%%spark" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%%send_to_spark -i username -t str -n username" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Import useful libraries " + ] + }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { - "name": "stderr", + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from geopy.distance import great_circle\n", + "from pyspark.sql.functions import *\n", + "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Read TimeTable curated data\n", + "\n", + "contains only stops / trips in a 15km range from Zurich HB" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", "output_type": "stream", "text": [ - "An error was encountered:\n", - "Variable named username not found.\n" + "+-----------+--------+---------------------------+-------+\n", + "|stop_id_raw|stop_int|stop_name |stop_id|\n", + "+-----------+--------+---------------------------+-------+\n", + "|8500926 |0 |Oetwil a.d.L., Schweizäcker|8500926|\n", + "|8502186 |1 |Dietikon Stoffelbach |8502186|\n", + "|8502186:0:1|2 |Dietikon Stoffelbach |8502186|\n", + "|8502186:0:2|3 |Dietikon Stoffelbach |8502186|\n", + "|8502186P |4 |Dietikon Stoffelbach |8502186|\n", + "+-----------+--------+---------------------------+-------+\n", + "only showing top 5 rows" ] } ], "source": [ - "%%send_to_spark -i username -t str -n username" + "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)\n", + "\n", + "# We use only first 7 characters of stop_id to remove special cases\n", + "stops_15km = stops_15km.select(col('stop_id').alias('stop_id_raw'), 'stop_int', 'stop_name')\\\n", + " .withColumn('stop_id',col('stop_id_raw').substr(1, 7))\n", + "stops_15km.show(5, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Import useful libraries " + "we have slighlty modified stop_id (original is in `stop_id_raw`) to get a stop_id compatible with sbb dataset (only use 7 first characters)" ] }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1586" + ] } ], "source": [ - "from geopy.distance import great_circle\n", - "from pyspark.sql.functions import *\n", - "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType" + "stops_15km.select('stop_id').distinct().count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This is the number of unique `stop_id` using only 7 first characters. Taking `stop_id_raw`, we wiould get roughly 300 more." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format" ] }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "sbb = spark.read.orc('/data/sbb/orc/istdaten')" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- betriebstag: string (nullable = true)\n", " |-- fahrt_bezeichner: string (nullable = true)\n", " |-- betreiber_id: string (nullable = true)\n", " |-- betreiber_abk: string (nullable = true)\n", " |-- betreiber_name: string (nullable = true)\n", " |-- produkt_id: string (nullable = true)\n", " |-- linien_id: string (nullable = true)\n", " |-- linien_text: string (nullable = true)\n", " |-- umlauf_id: string (nullable = true)\n", " |-- verkehrsmittel_text: string (nullable = true)\n", " |-- zusatzfahrt_tf: string (nullable = true)\n", " |-- faellt_aus_tf: string (nullable = true)\n", " |-- bpuic: string (nullable = true)\n", " |-- haltestellen_name: string (nullable = true)\n", " |-- ankunftszeit: string (nullable = true)\n", " |-- an_prognose: string (nullable = true)\n", " |-- an_prognose_status: string (nullable = true)\n", " |-- abfahrtszeit: string (nullable = true)\n", " |-- ab_prognose: string (nullable = true)\n", " |-- ab_prognose_status: string (nullable = true)\n", " |-- durchfahrt_tf: string (nullable = true)" ] } ], "source": [ "sbb.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "### Read TimeTable curated data\n", + "### Subset SBB data\n", "\n", - "contains only stops / trips in a 15km range from Zurich HB" + "We take only stop_id in 15 km range from Zurich HB" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "35548" + ] } ], "source": [ - "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)" + "sbb.select('bpuic').distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Map timetable data and sbb data \n", - "\n", - "There is no direct mapping between the data in sbb/orc and the route/trip_id given in the new tables. We will need to find heuristics to get the data needed.\n", - "\n", - "If we cannot find may imagine getting the station using their names, and exploring the average delay distribution for a given station at a given hour, with a given time window wround it. \n", - "\n", - "First idea : we use stops names from _actual_data_ which corrresponds to _stop_name_ in the _timetable_ data. We can then estimate the delays according associated with the station and the hour." + "This is the total number of unique `stop_id` in sbb dataset " ] }, { - "cell_type": "markdown", + "cell_type": "code", + "execution_count": 53, "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], "source": [ - "### Get corresponding stop_id between two datasets \n", - "\n", - "We first look at the station names in timetable dataset. Stop_id can be given in multiple formats :\n", - "- `8502186` : the format defining the stop itself, which matches sbb `bpuic` field\n", - "\n", - "We will call the 3 next ones __Special cases__ throughout the notebook :\n", - "- `8502186:0:1` or `8502186:0:2` : The individual platforms are separated by “:”. A “platform” can also be a platform+sectors (e.g. “8500010:0:7CD”).\n", - "- `8502186P` : All the stops have a common “parent” “8500010P”.\n", - "- `8502186:0:Bfpl` : if the RBS uses it for rail replacement buses.\n", + "# Used to subset sbb table based on stop_id \n", + "l1_id = stops_15km.select('stop_id').collect()\n", + "l2_id = [item.stop_id for item in l1_id]\n", "\n", - "source : [timetable cookbook](https://opentransportdata.swiss/en/cookbook/gtfs/), section stops.txt \n", + "# Used to subset sbb table based on stop_names \n", + "l1_name = stops_15km.select('stop_name').collect()\n", + "l2_name = [item.stop_name for item in l1_name]\n", "\n", - "In the sbb actual_data we find equivalent to stop_id in its first format defining the station without platform information, in its `bpuic` field" + "# Make the subset dataframe\n", + "sbb_filt = sbb.filter( sbb['bpuic'].isin(l2_id) | sbb['bpuic'].isin(l2_name) )\\\n", + " .select('fahrt_bezeichner','haltestellen_name', 'linien_id',\\\n", + " 'ankunftszeit', 'abfahrtszeit', 'betriebstag',\\\n", + " col('bpuic').alias('stop_id'))\n", + "# Get counts before / after subset\n", + "#print sbb.count()\n", + "#print sbb_filt.count()" ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+-------+\n", - "| bpuic|\n", - "+-------+\n", - "|8502128|\n", - "|8595521|\n", - "|8591973|\n", - "|8501214|\n", - "|8501469|\n", - "+-------+\n", - "only showing top 5 rows" + "1516" ] } ], "source": [ - "sbb.select(\"bpuic\").distinct().show(5)" + "print sbb_filt.select('stop_id').distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We first load previously curated _timetable_ dataset, containing only station at less than 15 km from Zurich HauptBanhof. " + "This is the number of stop_id that are found in sbb dataset based on _timetable_ dataset." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we want to subset to take only the schedule of May 13-17, 2019" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 54, "metadata": {}, - "outputs": [], - "source": [] + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "|fahrt_bezeichner|haltestellen_name|linien_id| ankunftszeit| abfahrtszeit|betriebstag|stop_id|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "| 85:11:10:002| Zürich HB| 10|03.09.2018 21:51| | 03.09.2018|8503000|\n", + "| 85:11:11:001| Zürich HB| 11| |03.09.2018 06:09| 03.09.2018|8503000|\n", + "| 85:11:12:001| Zürich HB| 12|03.09.2018 10:51| | 03.09.2018|8503000|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "only showing top 3 rows" + ] + } + ], + "source": [ + "sbb_filt.show(3)" + ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 55, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1734112\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "|fahrt_bezeichner|haltestellen_name|linien_id|ankunftszeit |abfahrtszeit |betriebstag|stop_id|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "|85:11:10:001 |Zürich HB |10 |14.05.2019 21:50| |14.05.2019 |8503000|\n", + "|85:11:1007:001 |Zürich HB |1007 |14.05.2019 06:23| |14.05.2019 |8503000|\n", + "|85:11:1009:001 |Zürich HB |1009 |14.05.2019 07:23| |14.05.2019 |8503000|\n", + "|85:11:1011:001 |Zürich HB |1011 |14.05.2019 08:23| |14.05.2019 |8503000|\n", + "|85:11:10190:001 |Zürich Flughafen |10190 |14.05.2019 22:46|14.05.2019 22:48|14.05.2019 |8503016|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "only showing top 5 rows" + ] } ], "source": [ - "timetable_ids = stops_15km.select('stop_name', 'stop_id').distinct().orderBy(\"stop_id\")\n", - "print timetable_ids.count()\n", - "timetable_ids.show()" + "sbb_subTime = sbb_filt.filter( (sbb_filt.betriebstag == '13.05.2019') | (sbb_filt.betriebstag == '14.05.2019') |\\\n", + " (sbb_filt.betriebstag == '15.05.2019') | (sbb_filt.betriebstag == '16.05.2019') |\\\n", + " (sbb_filt.betriebstag == '17.05.2019') )\n", + "print sbb_subTime.count()\n", + "sbb_subTime.show(5,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We see that we have multiple `stop_id` for the same station, as defined above. We may need to reformat input `stop_id`, e.g. by removing trailiig '0:1', '0:2' or 'P' in the final model (= __special cases__) \n", - "\n", - "We then subset sbb table to take only rows that either has a common stop_id or a common stop_name. _Takes a bit of time ~ 10 minutes_" + "We write the resulting subset " ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 57, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "4741\n", - "+--------------------+-------+\n", - "| haltestellen_name|stop_id|\n", - "+--------------------+-------+\n", - "|Oetwil a.d.L., Sc...|8500926|\n", - "|Dietikon Stoffelbach|8502186|\n", - "|Rudolfstetten Hof...|8502187|\n", - "| Zufikon Hammergut|8502188|\n", - "| Horgen Oberdorf|8502208|\n", - "| Oberrieden Dorf|8502209|\n", - "| Urdorf|8502220|\n", - "| Birmensdorf ZH|8502221|\n", - "| Bonstetten-Wettswil|8502222|\n", - "| Hedingen|8502223|\n", - "| Affoltern am Albis|8502224|\n", - "| Urdorf Weihermatt|8502229|\n", - "| Zufikon Belv�d�re|8502268|\n", - "| Zufikon Belvédère|8502268|\n", - "| Bergfrieden|8502270|\n", - "| Bremgarten|8502273|\n", - "| Zufikon|8502274|\n", - "| Heinrüti|8502275|\n", - "| Widen Heinr�ti|8502275|\n", - "| Widen Heinrüti|8502275|\n", - "+--------------------+-------+\n", - "only showing top 20 rows" - ] } ], "source": [ - "# Used to subset sbb table based on stop_id \n", - "l1_id = stops_15km.select('stop_id').collect()\n", - "l2_id = [item.stop_id for item in l1_id]\n", - "\n", - "# Used to subset sbb table based on stop_names \n", - "l1_name = stops_15km.select('stop_name').collect()\n", - "l2_name = [item.stop_name for item in l1_name]\n", - "\n", - "# Make the subset dataframe\n", - "sbb_filt = sbb.where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n", - " .select('haltestellen_name', col('bpuic').alias('stop_id'))\\\n", - " .distinct().orderBy(\"stop_id\")\n", - "\n", - "# Print number of lines and show\n", - "print sbb_filt.count()\n", - "sbb_filt.show()" + "# save\n", + "username = 'acoudray'\n", + "sbb_subTime.write.format(\"orc\").save(\"/user/{}/sbb_subTime2.orc\".format(username))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We get duplicated stop names, which seems to come from weird characters. _Maybe to do : Do we want to find a solution for that ? Most probably we just need stop_id and do not care about names_\n", - "\n", - "Then we can make a joined table using stop_id from timetable and see how many correspondance we find between timetable and sbb dataset." + "Let's also make a small table with only one day, the 13th of May 2019" ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 58, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "343567\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "|fahrt_bezeichner|haltestellen_name|linien_id|ankunftszeit |abfahrtszeit |betriebstag|stop_id|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "|85:11:10:001 |Zürich HB |10 |13.05.2019 21:50| |13.05.2019 |8503000|\n", + "|85:11:1007:001 |Zürich HB |1007 |13.05.2019 06:23| |13.05.2019 |8503000|\n", + "|85:11:1009:001 |Zürich HB |1009 |13.05.2019 07:23| |13.05.2019 |8503000|\n", + "|85:11:1011:001 |Zürich HB |1011 |13.05.2019 08:23| |13.05.2019 |8503000|\n", + "|85:11:10190:001 |Zürich Flughafen |10190 |13.05.2019 22:46|13.05.2019 22:48|13.05.2019 |8503016|\n", + "+----------------+-----------------+---------+----------------+----------------+-----------+-------+\n", + "only showing top 5 rows" + ] + } + ], + "source": [ + "sbb_oneday = sbb_filt.filter( (sbb_filt.betriebstag == '13.05.2019') )\n", + "print sbb_oneday.count()\n", + "sbb_oneday.show(5,False)" + ] + }, + { + "cell_type": "code", + "execution_count": 59, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# save\n", + "username = 'acoudray'\n", + "sbb_subTime.write.format(\"orc\").save(\"/user/{}/sbb_oneday.orc\".format(username))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "For now we have these files in /user/{}/ :\n", + "- `sbb_filt.orc` : every day with stations < 15km\n", + "- `sbb_subTime.orc` : schedule of May 13-17, 2019, stations < 15km\n", + "- `sbb_subTime2.orc` : schedule of May 13-17, 2019, stations < 15km, `linien_id` field added\n", + "- `sbb_oneday.orc` : May 13th 2019 only, stations < 15km, `linien_id` field added" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Map timetable data and sbb data \n", + "\n", + "There is no direct mapping between the data in sbb/orc and the route/trip_id given in the _timetable_ dataset. We will need to find heuristics to get the data needed.\n", + "\n", + "To begin with, we have a look at the stop_id / stop_name correspondance between the two tables." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get corresponding stop_id between two datasets \n", + "\n", + "We first look at the station names in timetable dataset. Stop_id can be given in multiple formats :\n", + "- `8502186` : the format defining the stop itself, which matches sbb `bpuic` field\n", + "\n", + "We will call the 3 next ones __Special cases__ throughout the notebook :\n", + "- `8502186:0:1` or `8502186:0:2` : The individual platforms are separated by “:”. A “platform” can also be a platform+sectors (e.g. “8500010:0:7CD”).\n", + "- `8502186P` : All the stops have a common “parent” “8500010P”.\n", + "- `8502186:0:Bfpl` : if the RBS uses it for rail replacement buses.\n", + "\n", + "source : [timetable cookbook](https://opentransportdata.swiss/en/cookbook/gtfs/), section stops.txt \n", + "\n", + "In the sbb actual_data we find equivalent to stop_id in its first format defining the station without platform information, in its `bpuic` field" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+\n", + "| bpuic|\n", + "+-------+\n", + "|8502128|\n", + "|8595521|\n", + "|8591973|\n", + "|8501214|\n", + "|8501469|\n", + "+-------+\n", + "only showing top 5 rows" + ] + } + ], + "source": [ + "sbb.select(\"bpuic\").distinct().show(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We first load previously curated _timetable_ dataset, containing only station at less than 15 km from Zurich HauptBanhof. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "timetable_ids = stops_15km.select('stop_name', 'stop_id').distinct().orderBy(\"stop_id\")\n", + "print timetable_ids.count()\n", + "timetable_ids.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We see that we have multiple `stop_id` for the same station, as defined above. We may need to reformat input `stop_id`, e.g. by removing trailiig '0:1', '0:2' or 'P' in the final model (= __special cases__) \n", + "\n", + "We then subset sbb table to take only rows that either has a common stop_id or a common stop_name. _Takes a bit of time ~ 10 minutes_" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "4741\n", + "+--------------------+-------+\n", + "| haltestellen_name|stop_id|\n", + "+--------------------+-------+\n", + "|Oetwil a.d.L., Sc...|8500926|\n", + "|Dietikon Stoffelbach|8502186|\n", + "|Rudolfstetten Hof...|8502187|\n", + "| Zufikon Hammergut|8502188|\n", + "| Horgen Oberdorf|8502208|\n", + "| Oberrieden Dorf|8502209|\n", + "| Urdorf|8502220|\n", + "| Birmensdorf ZH|8502221|\n", + "| Bonstetten-Wettswil|8502222|\n", + "| Hedingen|8502223|\n", + "| Affoltern am Albis|8502224|\n", + "| Urdorf Weihermatt|8502229|\n", + "| Zufikon Belv�d�re|8502268|\n", + "| Zufikon Belvédère|8502268|\n", + "| Bergfrieden|8502270|\n", + "| Bremgarten|8502273|\n", + "| Zufikon|8502274|\n", + "| Heinrüti|8502275|\n", + "| Widen Heinr�ti|8502275|\n", + "| Widen Heinrüti|8502275|\n", + "+--------------------+-------+\n", + "only showing top 20 rows" + ] + } + ], + "source": [ + "# Used to subset sbb table based on stop_id \n", + "l1_id = stops_15km.select('stop_id').collect()\n", + "l2_id = [item.stop_id for item in l1_id]\n", + "\n", + "# Used to subset sbb table based on stop_names \n", + "l1_name = stops_15km.select('stop_name').collect()\n", + "l2_name = [item.stop_name for item in l1_name]\n", + "\n", + "# Make the subset dataframe\n", + "sbb_filt = sbb.where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n", + " .select('haltestellen_name', col('bpuic').alias('stop_id'))\\\n", + " .distinct().orderBy(\"stop_id\")\n", + "\n", + "# Print number of lines and show\n", + "print sbb_filt.count()\n", + "sbb_filt.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We get duplicated stop names, which seems to come from weird characters. _Maybe to do : Do we want to find a solution for that ? Most probably we just need stop_id and do not care about names_\n", + "\n", + "Then we can make a joined table using stop_id from timetable and see how many correspondance we find between timetable and sbb dataset." + ] + }, + { + "cell_type": "code", + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "2628\n", "+-----------+--------------------+--------------------+\n", "| stop_id| timetable_name| sbb_name|\n", "+-----------+--------------------+--------------------+\n", "| 8502508|Spreitenbach, Rai...|Spreitenbach, Rai...|\n", "| 8503078| Waldburg| Waldburg|\n", "| 8503101P| Küsnacht ZH| null|\n", "|8503306:0:2| Dietlikon| null|\n", "| 8503376|Ottikon b. Kemptthal|Ottikon b. Kemptthal|\n", "| 8506895| Lufingen, Dorf| Lufingen, Dorf|\n", "| 8573729|Bonstetten, Isenbach|Bonstetten, Isenbach|\n", "| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n", "| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n", "| 8589111|Horgen, Gumelenst...|Horgen, Gumelenst...|\n", "| 8590819| Thalwil, Mettli| Thalwil, Mettli|\n", "| 8591190| Zürich, Heuried| Z�rich, Heuried|\n", "| 8591190| Zürich, Heuried| Zürich, Heuried|\n", "| 8591284| Zürich, Neeserweg| Zürich, Neeserweg|\n", "| 8591284| Zürich, Neeserweg| Z�rich, Neeserweg|\n", "|8502223:0:2| Hedingen| null|\n", "|8502274:0:1| Zufikon| null|\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8503001P| Zürich Altstetten| null|\n", "| 8588312|Effretikon, Kapel...|Effretikon, Kapel...|\n", "+-----------+--------------------+--------------------+\n", "only showing top 20 rows" ] } ], "source": [ "joined_stop_table = timetable_ids.join(sbb_filt, on=['stop_id'], how='left_outer')\\\n", " .select(\"stop_id\",\n", " col('stop_name').alias('timetable_name'),\n", " col('haltestellen_name').alias('sbb_name'))\n", "print joined_stop_table.count()\n", "joined_stop_table.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The stop_id which have no correspondance in the sbb dataset have _null_ in their `sbb_name`.\n", "\n", "By counting null values, we can find how many stations could not be found in sbb dataset. We expect all stations containing trailing '0:1' (platform information) or 'P' (parent station) to be absent from sbb dataset. " ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------------+--------+\n", "|stop_id|timetable_name|sbb_name|\n", "+-------+--------------+--------+\n", "| 0| 0| 408|\n", "+-------+--------------+--------+" ] } ], "source": [ "from pyspark.sql.functions import col\n", "\n", "joined_stop_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", " for c in joined_stop_table.columns]).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have 408 station with no match in sbb data. Below, A few lines of these :" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+--------------------+--------+\n", "| stop_id| timetable_name|sbb_name|\n", "+-----------+--------------------+--------+\n", "| 8503101P| Küsnacht ZH| null|\n", "|8503306:0:2| Dietlikon| null|\n", "|8502223:0:2| Hedingen| null|\n", "|8502274:0:1| Zufikon| null|\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8503001P| Zürich Altstetten| null|\n", "|8503065:0:3| Forch| null|\n", "| 8503201P| Rüschlikon| null|\n", "|8503305:0:6| Effretikon| null|\n", "|8573726:0:D|Bonstetten-Wettsw...| null|\n", "+-----------+--------------------+--------+\n", "only showing top 10 rows" ] } ], "source": [ "joined_stop_table.filter(col('sbb_name').isNull()).show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we now how many `stop_id` are missing from sbb dataset.\n", "\n", "Last check, we may ask ourselves if all stations without platform information and not being a parent station can be found in sbb. If yes " ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+--------------------+--------+\n", "| stop_id| timetable_name|sbb_name|\n", "+-----------+--------------------+--------+\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8502208P| Horgen Oberdorf| null|\n", "|8502268:0:1| Zufikon Belvédère| null|\n", "| 8503855P| Horgen, Bahnhof| null|\n", "| 8502758P|Hausen am Albis, ...| null|\n", "+-----------+--------------------+--------+\n", "only showing top 5 rows" ] } ], "source": [ "# Regular expression - for some reason, * in first position is not accepted. \n", "# This is not a problem since all stop_id we use begin with 85 (Switzerland index)\n", "expr1 = \"85*\\\\:.\\\\:*\" # capture stop_id with platform info\n", "expr2 = \"85*P$\" # capture stop_id with parent info\n", "expr3 = \"85*Bfpl$\" # capture stop_id with rail replacement info \n", "\n", "joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr1)) | \\\n", " (joined_stop_table[\"stop_id\"].rlike(expr2)) | \\\n", " (joined_stop_table[\"stop_id\"].rlike(expr3)) ).show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we know how to get all `stop_id` with weird characters corresponding to special cases in timetable. This special cases can be resolved by removing their trailing characters. \n", "\n", "Let's see if there is any null __outside__ theses cases. " ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "70\n", "+-------+--------------------+--------+\n", "|stop_id| timetable_name|sbb_name|\n", "+-------+--------------------+--------+\n", "|8591421|Zürich, Waldhaus ...| null|\n", "|8503084| Zürich, Dolder| null|\n", "|8530648| Greifensee (See)| null|\n", "|8503081| Felsenegg| null|\n", "|8503879| Widen, Imbismatt| null|\n", "|8580847|Bremgarten AG, Bi...| null|\n", "|8503677| Kilchberg Bendlikon| null|\n", "|8502979|Oberwil-Lieli, Li...| null|\n", "|8591394|Zürich, Titlisstr...| null|\n", "|8502955| Oberlunkhofen, Post| null|\n", "|8582529|Bremgarten AG, Zu...| null|\n", "|8572595|Birmensdorf ZH, A...| null|\n", "|8582462|Bremgarten AG, Ze...| null|\n", "|8502894|Oberwil-Lieli, Ob...| null|\n", "|8502560| Berikon, Kirche| null|\n", "|8576164| Zufikon, Bachhalde| null|\n", "|8530647| Fällanden (See)| null|\n", "|8530644| Meilen Autoquai| null|\n", "|8503082|Adliswil (Luftsei...| null|\n", "|8502553|Unterlunkhofen, B...| null|\n", "+-------+--------------------+--------+\n", "only showing top 20 rows" ] } ], "source": [ "# we only wants pattern with 7 characters, finishing with a number and beginning with 85\n", "expr = '85[0-9][0-9][0-9][0-9][0-9]$'\n", "stop_id_notfound = joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr)) & \\\n", " col('sbb_name').isNull() ) \n", "print stop_id_notfound.count()\n", "stop_id_notfound.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is 70 stop_id with no matches in sbb dataset. These stop_id could potentially be problematic if one wants to get delays probabilities at these places. Stations from timetable being special cases can be resolved by only using their first 7 characters. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get corresponding trip_id between two datasets \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a table with corresponding stop_id between the datasets. We know we can trail special cases in timetable to get biuc code in sbb dataset. Let's now try to match the trip_id between the two tables. \n", "\n", "In sbb dataset, the trip ids are defined by `FAHRT_BEZEICHNER` field and in timetable `trip_id`. We will use corresponding station_id and arrival_times in order to get corresponding trip_id.\n", "\n", "Our goal is to find a match in sbb dataset for all _timetable_ trips (and not the other way around). So we will focus on getting this assymetrical correspondance table. \n", "\n", "When we find a clear one-one match, we will mark them as _resolved_, when there is a one-to-many relation, we will call it _partly_resolved_ and if we cannot find a sbb trip that correspond to a timetable trip_id, we will call it _fail_to_resolve_. \n", "\n", "These labels will be used to differentiate 3 different ways to compute probabilities :\n", "- __One-to-one__ we find a clear match : we use distribution of delays on weekdays for a given trip/station_id based on all past sbb data. \n", "- __One-to-many__ we find multiple match :\n", " - First we double check the matches, if we have the same type of transportation for example.\n", " - If they seem to be correct, we can merge the trips from sbb and get the merged distribution of their delays.\n", "- __One-to-none__ we find no match : then we get the distribution of delays for similar transportation types, at similar hour (in a window), during weekdays of sbb dataset.\n", " - Alternative : Try to find the best match and use only the closest location/time to infer a given distribution.\n", " - Alternative 2 : use k-nearest neighbors in terms of location/time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ + "__Timetable dataset__ \n", + "\n", "We first load _timetable_ with curated trip_id, in a 15km radius from Zurich HB. " ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 30, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "247920\n", "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "|trip_id |stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_int|route_id |sequence_1|trip_1 |route_int |\n", "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "|1.TA.1-231-j19-1.1.H|8572747|09:37:00 |09:37:00 |1 |0 |0 |500 |1-231-j19-1|10.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", "|1.TA.1-231-j19-1.1.H|8573721|09:50:00 |09:50:00 |10 |0 |0 |599 |1-231-j19-1|11.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", "|1.TA.1-231-j19-1.1.H|8503598|09:53:00 |09:53:00 |11 |0 |0 |401 |1-231-j19-1|12.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "only showing top 3 rows" ] } ], "source": [ - "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n", - "print stop_times_curated.count()\n", - "stop_times_curated.show(3, False)" + "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n", + "print stop_times_curated.count()\n", + "stop_times_curated.show(3, False)" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Make the subset dataframe\n", + "stop_times_format = stop_times_curated\\\n", + " .select('trip_id', 'stop_id', \n", + " unix_timestamp(stop_times_curated.arrival_time, 'HH:mm:ss')\\\n", + " .alias('arrival_time_ut'),\\\n", + " unix_timestamp(stop_times_curated.departure_time, 'HH:mm:ss')\\\n", + " .alias('departure_time_ut') )\\\n", + " .select('trip_id', 'stop_id', \n", + " from_unixtime('arrival_time_ut')\\\n", + " .alias('arrival_time_dty'),\n", + " from_unixtime('departure_time_ut')\\\n", + " .alias('departure_time_dty'))\\\n", + " .select('trip_id', 'stop_id', \n", + " date_format('arrival_time_dty', 'hh:mm')\\\n", + " .alias('arrival_time'),\n", + " date_format('departure_time_dty', 'hh:mm')\\\n", + " .alias('departure_time'))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have reformated `arrival_time` and `departure_time` to get it in `hh:mm` format. To be sure to get every possible match and allow possible mistakes on the time schedules between both datasets, we decided to round to quarter hour the times. Same trip_ids should still be able to be seen as they should share stop_id and times, and this approximation to the quarter hour should be enough to avoid getting wrong trips." + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------+-------+------------+--------------+\n", + "|trip_id |stop_id|arrival_time|departure_time|\n", + "+--------------------+-------+------------+--------------+\n", + "|1.TA.1-231-j19-1.1.H|8572747|09:30 |09:30 |\n", + "|1.TA.1-231-j19-1.1.H|8573721|09:45 |09:45 |\n", + "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573720|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573721|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573722|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573723|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8583071|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8572603|10:00 |10:00 |\n", + "+--------------------+-------+------------+--------------+\n", + "only showing top 10 rows" + ] + } + ], + "source": [ + "# next two lines used to round time to quarter hour ( = 900 seconds )\n", + "quarter_r_arr = ((round(unix_timestamp(col(\"arrival_time\"), 'HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "quarter_r_dep = ((round(unix_timestamp(col(\"departure_time\"), 'HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "\n", + "# Times are rounded\n", + "stop_times_format_r = stop_times_format.withColumn(\"arrival_time_rounded\", quarter_r_arr) \\\n", + " .withColumn(\"departure_time_rounded\", quarter_r_dep)\n", + "\n", + "# Reformating date/time in time with hh:mm format\n", + "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id',\\\n", + " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", + " .alias('arrival_time_ut'),\\\n", + " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", + " .alias('departure_time_ut') )\\\n", + " .select('trip_id', 'stop_id',\n", + " from_unixtime('arrival_time_ut')\\\n", + " .alias('arrival_time_dty'),\n", + " from_unixtime('departure_time_ut')\\\n", + " .alias('departure_time_dty'))\\\n", + " .select('trip_id', 'stop_id',\n", + " date_format('arrival_time_dty', 'hh:mm')\\\n", + " .alias('arrival_time'),\n", + " date_format('departure_time_dty', 'hh:mm')\\\n", + " .alias('departure_time'))\n", + "\n", + "# We use only first 7 characters of stop_id to remove special cases\n", + "stop_times_format = stop_times_format.withColumn('stop_id',\n", + " stop_times_format.stop_id.substr(1, 7))\n", + "\n", + "stop_times_format.show(10, False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here is an example of a single trip_id : it goes from stop to stop and has various arrival / departure along its journey. As the times are rounded, they seem to be the same. Most probably the time between stops is very short." + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------------------------+-------+------------+--------------+\n", + "|trip_id |stop_id|arrival_time|departure_time|\n", + "+------------------------+-------+------------+--------------+\n", + "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n", + "|813.TA.26-33-B-j19-1.6.R|8594239|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8580522|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591323|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591066|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591292|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591326|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591335|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591339|08:30 |08:30 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591362|08:30 |08:30 |\n", + "+------------------------+-------+------------+--------------+\n", + "only showing top 10 rows" + ] + } + ], + "source": [ + "stop_times_format.filter(stop_times_format['trip_id'] == '813.TA.26-33-B-j19-1.6.R').show(10,False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We will only use first 7 characters of `stop_id` field in order to get exact match with sbb dataset. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We use time rounded to quarter hour, so that unexact match between two datasets should not be a problem." + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "#print stop_times_format.count()\n", + "#stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match.\n", + "\n", + " __SBB dataset__\n", + " \n", + "We will subset sbb dataset to get only the 13th of May in sbb dataset :" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "1734112" + ] + } + ], + "source": [ + "username='acoudray'\n", + "sbb = spark.read.orc(\"/user/{}/sbb_subTime.orc\".format(username))\n", + "sbb.count()" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# Following are used to round arrival / departure times\n", + "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "\n", + "# Make the subset dataframe\n", + "sbb_filt = sbb.filter( ( (col('ankunftszeit') != \"\") | (col('abfahrtszeit') != \"\") ) )\\\n", + " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n", + " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep)" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+----------------+-----------------+-------+------------+--------------+\n", + "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "|85:11:10:001 |Zürich HB |8503000|09:45 |null |\n", + "|85:11:1007:001 |Zürich HB |8503000|06:30 |null |\n", + "|85:11:1009:001 |Zürich HB |8503000|07:30 |null |\n", + "|85:11:1011:001 |Zürich HB |8503000|08:30 |null |\n", + "|85:11:10190:001 |Zürich Flughafen |8503016|10:45 |10:45 |\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "only showing top 5 rows" + ] + } + ], + "source": [ + "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", + " date_format(from_unixtime(unix_timestamp(col('ankunftszeit_rounded'), \\\n", + " 'dd.MM.yyyy HH:mm')), \\\n", + " 'hh:mm')\\\n", + " .alias('arrival_time'),\n", + " date_format(from_unixtime(unix_timestamp(col('abfahrtszeit_rounded'), \\\n", + " 'dd.MM.yyyy HH:mm')), \\\n", + " 'hh:mm')\\\n", + " .alias('departure_time'))\n", + "sbb_filt_format.show(5, False)" ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 38, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "40184" + ] } ], "source": [ - "# Make the subset dataframe\n", - "stop_times_format = stop_times_curated\\\n", - " .select('trip_id', 'stop_id', \n", - " unix_timestamp(stop_times_curated.arrival_time, 'HH:mm:ss')\\\n", - " .alias('arrival_time_ut'),\\\n", - " unix_timestamp(stop_times_curated.departure_time, 'HH:mm:ss')\\\n", - " .alias('departure_time_ut') )\\\n", - " .select('trip_id', 'stop_id', \n", - " from_unixtime('arrival_time_ut')\\\n", - " .alias('arrival_time_dty'),\n", - " from_unixtime('departure_time_ut')\\\n", - " .alias('departure_time_dty'))\\\n", - " .select('trip_id', 'stop_id', \n", - " date_format('arrival_time_dty', 'hh:mm')\\\n", - " .alias('arrival_time'),\n", - " date_format('departure_time_dty', 'hh:mm')\\\n", - " .alias('departure_time'))" + "sbb_filt_format.select(\"fahrt_bezeichner\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We have reformated `arrival_time` and `departure_time` to get it in `hh:mm` format. To be sure to get every possible match and allow possible mistakes on the time schedules between both datasets, we decided to round to quarter hour the times. Same trip_ids should still be able to be seen as they should share stop_id and times, and this approximation to the quarter hour should be enough to avoid getting wrong trips." + "Let's have a look at a single trip_id according to sbb dataset (called `fahrt_bezeichner`). " ] }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+--------------------+-------+------------+--------------+\n", - "|trip_id |stop_id|arrival_time|departure_time|\n", - "+--------------------+-------+------------+--------------+\n", - "|1.TA.1-231-j19-1.1.H|8572747|09:30 |09:30 |\n", - "|1.TA.1-231-j19-1.1.H|8573721|09:45 |09:45 |\n", - "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8573720|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8573721|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8573722|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8573723|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8583071|10:00 |10:00 |\n", - "|1.TA.1-231-j19-1.1.H|8572603|10:00 |10:00 |\n", - "+--------------------+-------+------------+--------------+\n", - "only showing top 10 rows" + "+----------------+-----------------+-------+------------+--------------+\n", + "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "+----------------+-----------------+-------+------------+--------------+" ] } ], "source": [ - "# next two lines used to round time to quarter hour ( = 900 seconds )\n", - "quarter_r_arr = ((round(unix_timestamp(col(\"arrival_time\"), 'HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "quarter_r_dep = ((round(unix_timestamp(col(\"departure_time\"), 'HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "\n", - "# Times are rounded\n", - "stop_times_format_r = stop_times_format.withColumn(\"arrival_time_rounded\", quarter_r_arr) \\\n", - " .withColumn(\"departure_time_rounded\", quarter_r_dep)\n", - "\n", - "# Reformating date/time in time with hh:mm format\n", - "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id',\\\n", - " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", - " .alias('arrival_time_ut'),\\\n", - " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", - " .alias('departure_time_ut') )\\\n", - " .select('trip_id', 'stop_id',\n", - " from_unixtime('arrival_time_ut')\\\n", - " .alias('arrival_time_dty'),\n", - " from_unixtime('departure_time_ut')\\\n", - " .alias('departure_time_dty'))\\\n", - " .select('trip_id', 'stop_id',\n", - " date_format('arrival_time_dty', 'hh:mm')\\\n", - " .alias('arrival_time'),\n", - " date_format('departure_time_dty', 'hh:mm')\\\n", - " .alias('departure_time'))\n", - "\n", - "# We use only first 7 characters of stop_id to remove special cases\n", - "stop_times_format = stop_times_format.withColumn('stop_id',\n", - " stop_times_format.stop_id.substr(1, 7))\n", - "\n", - "stop_times_format.show(10, False)" + "fahrt_bezeichner_example = '85:78:24815:001'\n", + "sbb_filt_format.filter(sbb_filt_format['fahrt_bezeichner'] == fahrt_bezeichner_example)\\\n", + " .show(10,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Here is an example of a single trip_id : it goes from stop to stop and has various arrival / departure along its journey. As the times are rounded, they seem to be the same. Most probably the time between stops is very short." + "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ so that we can only have _null_ values on the sbb side (assymetrical join)." ] }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 40, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+------------------------+-------+------------+--------------+\n", - "|trip_id |stop_id|arrival_time|departure_time|\n", - "+------------------------+-------+------------+--------------+\n", - "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n", - "|813.TA.26-33-B-j19-1.6.R|8594239|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8580522|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591323|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591066|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591292|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591326|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591335|08:15 |08:15 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591339|08:30 |08:30 |\n", - "|813.TA.26-33-B-j19-1.6.R|8591362|08:30 |08:30 |\n", - "+------------------------+-------+------------+--------------+\n", - "only showing top 10 rows" + "+-------+------------+------------------------+---------------------+\n", + "|stop_id|arrival_time|trip_id |fahrt_bezeichner |\n", + "+-------+------------+------------------------+---------------------+\n", + "|8502186|04:45 |54.TA.1-17-A-j19-1.5.H |85:31:660:000 |\n", + "|8502208|06:15 |699.TA.26-24-j19-1.256.R|85:11:20467:001 |\n", + "|8502208|10:15 |643.TA.26-24-j19-1.254.R|85:11:20486:001 |\n", + "|8502209|08:15 |727.TA.26-24-j19-1.273.H|85:11:20427:001 |\n", + "|8502221|04:00 |89.TA.26-5-A-j19-1.37.R |85:11:18561:001 |\n", + "|8502221|12:00 |88.TA.26-14-j19-1.18.R |85:11:19445:001 |\n", + "|8502274|06:00 |60.TA.1-17-A-j19-1.5.H |85:31:565:000 |\n", + "|8502274|11:00 |32.TA.1-17-A-j19-1.5.H |85:31:591:000 |\n", + "|8502276|02:15 |44.TA.1-17-A-j19-1.5.H |85:31:546:000 |\n", + "|8502276|06:15 |60.TA.1-17-A-j19-1.5.H |85:31:510:000 |\n", + "|8502277|01:15 |40.TA.1-17-A-j19-1.5.H |85:31:542:000 |\n", + "|8502278|01:15 |40.TA.1-17-A-j19-1.5.H |85:31:642:000 |\n", + "|8502495|02:15 |42.TA.26-70-A-j19-1.3.R |85:849:58306-03070-1 |\n", + "|8502495|02:45 |997.TA.26-70-A-j19-1.5.H|85:849:58308-03070-1 |\n", + "|8502495|06:30 |965.TA.26-70-A-j19-1.5.H|85:849:147522-01184-1|\n", + "|8502495|07:15 |508.TA.26-70-A-j19-1.3.R|85:849:76225-02070-1 |\n", + "|8502495|07:30 |44.TA.26-184-j19-1.1.H |85:849:58286-03070-1 |\n", + "|8502495|08:00 |482.TA.26-185-j19-1.2.R |85:849:147526-01184-1|\n", + "|8502495|08:45 |497.TA.26-70-A-j19-1.3.R|85:849:58392-03070-1 |\n", + "|8502495|12:45 |89.TA.26-70-A-j19-1.3.R |85:849:147556-01184-1|\n", + "+-------+------------+------------------------+---------------------+\n", + "only showing top 20 rows" ] } ], "source": [ - "stop_times_format.filter(stop_times_format['trip_id'] == '813.TA.26-33-B-j19-1.6.R').show(10,False)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We will only use first 7 characters of `stop_id` field in order to get exact match with sbb dataset. " + "joined_trip_table = stop_times_format.join(sbb_filt_format,\\\n", + " on=['stop_id', 'arrival_time'], \n", + " how='left_outer')\\\n", + " .select('stop_id', 'arrival_time',\n", + " 'trip_id', 'fahrt_bezeichner') \\\n", + " .distinct()\n", + "joined_trip_table.show(20, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We use time rounded to quarter hour, so that unexact match between two datasets should not be a problem." + "This is the raw results of the intersection. Note that we used a `distinct()` to avoid having multiple lines corresponding to multiple days. Each line must be a different stops no matter which day it is, so that we can count how many stops (with same arrival_time) are share between trip_id from _timetable_ and sbb data." ] }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 41, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "247920\n", - "22807" - ] } ], "source": [ - "#print stop_times_format.count()\n", - "#stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match.\n", + "joined_trip_filt = joined_trip_table.select(\"trip_id\", \"fahrt_bezeichner\")\\\n", + " .groupBy(\"trip_id\", \"fahrt_bezeichner\")\\\n", + " .count()\n", "\n", - "We will subset sbb dataset to get only the 13th of May in sbb dataset :" + "# .filter( (col('fahrt_bezeichner').isNotNull()) )\\" ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 42, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------------------------+----------------------+-----+\n", + "|trip_id |fahrt_bezeichner |count|\n", + "+--------------------------+----------------------+-----+\n", + "|1.TA.26-350-j19-1.1.H |85:801:200881-35200-1 |10 |\n", + "|1630.TA.26-3-A-j19-1.8.R |85:3849:50310-03003-1 |4 |\n", + "|137.TA.26-9-A-j19-1.57.R |85:11:18977:002 |13 |\n", + "|1291.TA.26-80-j19-1.8.R |85:849:60037-03080-1 |1 |\n", + "|1474.TA.26-33E-j19-1.9.R |85:849:94630-11412-1 |7 |\n", + "|1986.TA.26-11-A-j19-1.27.R|85:849:67214-12412-1 |1 |\n", + "|2728.TA.26-31-j19-1.17.H |85:3849:166531-06008-1|1 |\n", + "|1262.TA.26-8-C-j19-1.8.R |85:849:247075-16341-1 |1 |\n", + "|1142.TA.26-4-j19-1.25.H |85:3849:85685-02002-1 |7 |\n", + "|1026.TA.26-4-j19-1.25.H |85:3849:85630-02002-1 |4 |\n", + "|1361.TA.26-2-A-j19-1.24.R |85:3849:86350-02004-1 |2 |\n", + "|1432.TA.26-2-A-j19-1.24.R |85:3849:86339-02004-1 |2 |\n", + "|106.TA.26-38-j19-1.2.H |85:849:59831-03080-1 |1 |\n", + "|118.TA.26-311-j19-1.2.R |85:849:240925-37301-1 |1 |\n", + "|1567.TA.26-3-A-j19-1.8.R |85:3849:90085-02017-1 |1 |\n", + "|2320.TA.26-14-A-j19-1.25.H|85:3849:89988-02017-1 |1 |\n", + "|1318.TA.26-13-j19-1.20.R |85:3849:50558-03004-1 |2 |\n", + "|2044.TA.26-13-j19-1.24.H |85:3849:89180-02013-1 |4 |\n", + "|1861.TA.26-9-B-j19-1.19.R |85:3849:52137-03009-1 |17 |\n", + "|182.TA.26-760-j19-1.7.H |85:773:7251-01752-1 |2 |\n", + "+--------------------------+----------------------+-----+\n", + "only showing top 20 rows" + ] } ], "source": [ - "sbb_sub = sbb.filter( (sbb.ankunftszeit != \"\") | (sbb.abfahrtszeit != \"\") )\\\n", - " .select('fahrt_bezeichner','haltestellen_name', \\\n", - " 'ankunftszeit', 'abfahrtszeit', 'bpuic')\\\n", - " .withColumn('ankunftszeit_ts',to_timestamp(col('ankunftszeit'),\\\n", - " format='dd.MM.yyyy HH:mm:ss'))\\\n", - " .withColumn('abfahrtszeit_ts',to_timestamp(col('abfahrtszeit'),\\\n", - " format='dd.MM.yyyy HH:mm'))\\" + "joined_trip_filt.show(20, False)" ] }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 43, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "2368016" + ] } ], "source": [ - "# Used to subset sbb table based on stop_id \n", - "l1_id = stops_15km.select('stop_id').collect()\n", - "l2_id = [item.stop_id for item in l1_id]\n", - "\n", - "# Make the subset dataframe\n", - "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "\n", - "sbb_filt = sbb.filter( ( (col('ankunftszeit') != \"\") | (col('abfahrtszeit') != \"\") ) & \\\n", - " sbb['bpuic'].isin(l2_id) )\\\n", - " .select('fahrt_bezeichner','haltestellen_name', \\\n", - " 'ankunftszeit', 'abfahrtszeit', \\\n", - " col('bpuic').alias('stop_id')) \\\n", - " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n", - " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep)" + "joined_trip_filt.count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "How many unique trip_id from _timetable_ can we get in this intersection ?" ] }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+----------------+-----------------+-------+------------+--------------+\n", - "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", - "+----------------+-----------------+-------+------------+--------------+\n", - "|85:11:10:002 |Zürich HB |8503000|09:45 |null |\n", - "|85:11:11:001 |Zürich HB |8503000|null |06:15 |\n", - "|85:11:12:001 |Zürich HB |8503000|10:45 |null |\n", - "|85:11:1251:003 |Zürich HB |8503000|07:00 |null |\n", - "|85:11:1252:001 |Zürich HB |8503000|09:30 |09:30 |\n", - "+----------------+-----------------+-------+------------+--------------+\n", - "only showing top 5 rows" + "22807" ] } ], "source": [ - "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", - " date_format(from_unixtime(unix_timestamp(col('ankunftszeit_rounded'), \\\n", - " 'dd.MM.yyyy HH:mm')), \\\n", - " 'hh:mm')\\\n", - " .alias('arrival_time'),\n", - " date_format(from_unixtime(unix_timestamp(col('abfahrtszeit_rounded'), \\\n", - " 'dd.MM.yyyy HH:mm')), \\\n", - " 'hh:mm')\\\n", - " .alias('departure_time'))\n", - "sbb_filt_format.show(5, False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "#sbb_filt_format.select(\"fahrt_bezeichner\").distinct().orderBy(\"fahrt_bezeichner\").count()" + "joined_trip_filt.select('trip_id').distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Let's have a look at a single trip_id according to sbb dataset (called `fahrt_bezeichner`). " + "How many unique _fahrt_bezeichner_ from sbb data can we find in the intersection ?" ] }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 45, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+----------------+-----------------+-------+------------+--------------+\n", - "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", - "+----------------+-----------------+-------+------------+--------------+\n", - "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n", - "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n", - "|85:78:24815:001 |Friesenberg |8503052|01:45 |01:45 |\n", - "|85:78:24815:001 |Schweighof |8503053|01:45 |01:45 |\n", - "|85:78:24815:001 |Triemli |8503054|01:45 |01:45 |\n", - "|85:78:24815:001 |Uitikon Waldegg |8503055|01:45 |01:45 |\n", - "|85:78:24815:001 |Ringlikon |8503056|01:45 |01:45 |\n", - "|85:78:24815:001 |Uetliberg |8503057|02:00 |null |\n", - "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n", - "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n", - "+----------------+-----------------+-------+------------+--------------+\n", - "only showing top 10 rows" + "38294" ] } ], "source": [ - "fahrt_bezeichner_example = '85:78:24815:001'\n", - "sbb_filt_format.filter(sbb_filt_format['fahrt_bezeichner'] == fahrt_bezeichner_example)\\\n", - " .show(10,False)" + "joined_trip_filt.select('fahrt_bezeichner').distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ so that we can only have _null_ values on the sbb side (assymetrical join)." + "Let's try to use a threshold to only take intersection whenever there is at least X stops shared between the two trip_id version" ] }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 46, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+-------+------------------------+---------------------+\n", - "|stop_id|trip_id |fahrt_bezeichner |\n", - "+-------+------------------------+---------------------+\n", - "|8502508|269.TA.26-303-j19-1.3.H |85:849:609297-18301-1|\n", - "|8502750|35.TA.1-331-j19-1.3.R |null |\n", - "|8503000|128.TA.17-4-j19-1.23.R |85:11:1957:001 |\n", - "|8503000|153.TA.26-6-A-j19-1.34.R|85:11:71531:006 |\n", - "|8503000|173.TA.21-75-j19-1.47.R |85:11:18368:001 |\n", - "|8503000|173.TA.21-75-j19-1.47.R |85:11:89171:000 |\n", - "|8503000|182.TA.26-1-j19-1.90.H |85:11:88622:002 |\n", - "|8503000|237.TA.1-36-j19-1.79.R |85:11:10580:002 |\n", - "|8503000|269.TA.26-11-j19-1.78.H |85:11:554:006 |\n", - "|8503000|3.TA.6-1-j19-1.3.R |85:11:30580:003 |\n", - "|8503000|3.TA.6-1-j19-1.3.R |85:11:88921:003 |\n", - "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:727:002 |\n", - "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:30730:003 |\n", - "|8503000|345.TA.16-5-j19-1.71.R |85:11:730:002 |\n", - "|8503000|345.TA.16-5-j19-1.71.R |85:11:19122:001 |\n", - "|8503000|345.TA.16-5-j19-1.71.R |85:11:70727:009 |\n", - "|8503000|352.TA.20-2-j19-1.146.H |85:11:1531:002 |\n", - "|8503000|38.TA.26-9-A-j19-1.3.H |85:11:70580:001 |\n", - "|8503000|551.TA.26-11-j19-1.126.R|85:11:88320:003 |\n", - "|8503000|63.TA.1-36-j19-1.21.R |85:11:31615:039 |\n", - "+-------+------------------------+---------------------+\n", - "only showing top 20 rows" + "+------------------------+---------------------+-----+\n", + "|trip_id |fahrt_bezeichner |count|\n", + "+------------------------+---------------------+-----+\n", + "|438.TA.26-759-j19-1.4.H |85:773:194167-24759-1|5 |\n", + "|598.TA.26-33-B-j19-1.3.H|85:849:92773-02072-1 |5 |\n", + "|4116.TA.26-31-j19-1.26.R|85:3849:50431-03003-1|6 |\n", + "|622.TA.26-3-A-j19-1.2.H |85:3849:50253-03003-1|7 |\n", + "|870.TA.26-11-A-j19-1.3.H|85:3849:88866-02011-1|8 |\n", + "+------------------------+---------------------+-----+\n", + "only showing top 5 rows" ] } ], "source": [ - "joined_trip_table = stop_times_format.join(sbb_filt_format,\\\n", - " on=['stop_id', 'arrival_time'], \n", - " how='left_outer')\\\n", - " .select('stop_id', \n", - " 'trip_id', 'fahrt_bezeichner') \\\n", - " .distinct()\n", - "joined_trip_table.show(20, False)" + "cutoff_min_overlap = 5\n", + "joined_trip_atL5 = joined_trip_filt.filter(col('count') >= cutoff_min_overlap )\n", + "joined_trip_atL5.show(20, False)" ] }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 47, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "15548" + ] } ], "source": [ - "#joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", - "# for c in joined_trip_table.columns]).show()" + "joined_trip_atL5.select('trip_id').distinct().count()" ] }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 48, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "31103" + ] } ], "source": [ - "joined_trip_filt = joined_trip_table.filter( (col('trip_id').isNotNull()) & \n", - " (col('fahrt_bezeichner').isNotNull()) )\\\n", - " .select(\"trip_id\", \"fahrt_bezeichner\")\\\n", - " .groupBy(\"trip_id\", \"fahrt_bezeichner\")\\\n", - " .count()" + "joined_trip_atL5.select('fahrt_bezeichner').distinct().count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "There is approximately 2 trips in sbb data for 1 trip in timetable data" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { - "model_id": "bbc32e4a021a45eb82f4870a3893fbec", + "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ - "print joined_trip_filt.count()\n", - "joined_trip_filt.show(10, False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "joined_trip_filt.write.csv('data/lgpt_guys/joined_trip_id.csv', header = True, mode=\"overwrite\")" + "joined_trip_filt.write.csv('data/lgpt_guys/joined_trip_filt.csv', header = True, mode=\"overwrite\")\n", + "joined_trip_atL5.write.csv('data/lgpt_guys/joined_trip_atL5.csv', header = True, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Explore trips with no matches " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This shows how many trip_id / fahrt_bezeichner there is in the joined dataframe :" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_trip_filt = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print joined_trip_table.select(\"trip_id\").distinct().count()\n", "print joined_trip_table.select(\"fahrt_bezeichner\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And below how many of them we could get a match (of at least 1 stop/time, therefore most probably higher than reality)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print joined_trip_filt.select(\"trip_id\").distinct().count()\n", "print joined_trip_filt.select(\"fahrt_bezeichner\").distinct().count()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", " for c in joined_trip_table.columns]).show()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "joined_trip_table.filter( (col('trip_id').isNull()) |\n", " (col('fahrt_bezeichner').isNull()) ).show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can create a dictionnaries with stop_id -> stop_name structure from timetable data, to be able to compare both dataset more carefully " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "newrdd = timetable_ids.rdd\n", "timetable_rdd = newrdd.map(lambda x : (x[1],x[0]))\n", "timetable_stopid_dict = timetable_rdd.collectAsMap()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is an exemple of how to get a station name from a stop_id " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "timetable_stopid_dict.get(\"8502209\")" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }