diff --git a/notebooks/match_datasets.ipynb b/notebooks/match_datasets.ipynb index 2a5463b..2da0cbb 100644 --- a/notebooks/match_datasets.ipynb +++ b/notebooks/match_datasets.ipynb @@ -1,1495 +1,1572 @@ { "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ - "# To Begin With...\n", + "## Match datasets \n", "\n", "### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.\n", "\n", "
Any application without a proper name would be promptly killed.
" ] }, { "cell_type": "code", - "execution_count": 77, + "execution_count": 1, "metadata": {}, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "A session has already been started. If you intend to recreate the session with new configurations, please include the -f argument.\n" - ] + "data": { + "text/html": [ + "Current session configs: {'conf': {'spark.app.name': 'lgptguys_final'}, 'kind': 'pyspark'}
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "\n", + "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6287application_1589299642358_0776pysparkidleLinkLink
6309application_1589299642358_0798pysparkidleLinkLink
6316application_1589299642358_0805pysparkidleLinkLink
6317application_1589299642358_0806pysparkidleLinkLink
6318application_1589299642358_0807pysparkidleLinkLink
6319application_1589299642358_0808pysparkidleLinkLink
" + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ "%%configure\n", "{\"conf\": {\n", " \"spark.app.name\": \"lgptguys_final\"\n", - "}}" + "}}\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Spark" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Starting Spark application\n" ] }, { "data": { "text/html": [ "\n", - "
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
6219application_1589299642358_0707pysparkidleLinkLink
" + "IDYARN Application IDKindStateSpark UIDriver logCurrent session?6320application_1589299642358_0809pysparkidleLinkLink✔" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "SparkSession available as 'spark'.\n" ] }, { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "unknown magic command '%spark'\n", "UnknownMagic: unknown magic command '%spark'\n", "\n" ] } ], "source": [ "# Initialization\n", "%%spark" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "Variable named username not found.\n" ] } ], "source": [ "%%send_to_spark -i username -t str -n username" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import useful libraries " ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from geopy.distance import great_circle\n", "from pyspark.sql.functions import *\n", - "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType" + "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, TimestampType" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "sbb = spark.read.orc('/data/sbb/orc/istdaten')" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- betriebstag: string (nullable = true)\n", " |-- fahrt_bezeichner: string (nullable = true)\n", " |-- betreiber_id: string (nullable = true)\n", " |-- betreiber_abk: string (nullable = true)\n", " |-- betreiber_name: string (nullable = true)\n", " |-- produkt_id: string (nullable = true)\n", " |-- linien_id: string (nullable = true)\n", " |-- linien_text: string (nullable = true)\n", " |-- umlauf_id: string (nullable = true)\n", " |-- verkehrsmittel_text: string (nullable = true)\n", " |-- zusatzfahrt_tf: string (nullable = true)\n", " |-- faellt_aus_tf: string (nullable = true)\n", " |-- bpuic: string (nullable = true)\n", " |-- haltestellen_name: string (nullable = true)\n", " |-- ankunftszeit: string (nullable = true)\n", " |-- an_prognose: string (nullable = true)\n", " |-- an_prognose_status: string (nullable = true)\n", " |-- abfahrtszeit: string (nullable = true)\n", " |-- ab_prognose: string (nullable = true)\n", " |-- ab_prognose_status: string (nullable = true)\n", " |-- durchfahrt_tf: string (nullable = true)" ] } ], "source": [ "sbb.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read TimeTable curated data\n", "\n", "contains only stops / trips in a 15km range from Zurich HB" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], "source": [ "stops_15km = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map timetable data and sbb data \n", "\n", "There is no direct mapping between the data in sbb/orc and the route/trip_id given in the new tables. We will need to find heuristics to get the data needed.\n", "\n", "If we cannot find may imagine getting the station using their names, and exploring the average delay distribution for a given station at a given hour, with a given time window wround it. \n", "\n", "First idea : we use stops names from _actual_data_ which corrresponds to _stop_name_ in the _timetable_ data. We can then estimate the delays according associated with the station and the hour." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get corresponding stop_id between two datasets \n", "\n", "We first look at the station names in timetable dataset. Stop_id can be given in multiple formats :\n", "- `8502186` : the format defining the stop itself, which matches sbb `bpuic` field\n", "\n", "We will call the 3 next ones __Special cases__ throughout the notebook :\n", "- `8502186:0:1` or `8502186:0:2` : The individual platforms are separated by “:”. A “platform” can also be a platform+sectors (e.g. “8500010:0:7CD”).\n", "- `8502186P` : All the stops have a common “parent” “8500010P”.\n", "- `8502186:0:Bfpl` : if the RBS uses it for rail replacement buses.\n", "\n", "source : [timetable cookbook](https://opentransportdata.swiss/en/cookbook/gtfs/), section stops.txt \n", "\n", "In the sbb actual_data we find equivalent to stop_id in its first format defining the station without platform information, in its `bpuic` field" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------+\n", "| bpuic|\n", "+-------+\n", "|8502128|\n", "|8595521|\n", "|8591973|\n", "|8501214|\n", "|8501469|\n", "+-------+\n", "only showing top 5 rows" ] } ], "source": [ "sbb.select(\"bpuic\").distinct().show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We first load previously curated _timetable_ dataset, containing only station at less than 15 km from Zurich HauptBanhof. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "timetable_ids = stops_15km.select('stop_name', 'stop_id').distinct().orderBy(\"stop_id\")\n", "print timetable_ids.count()\n", "timetable_ids.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that we have multiple `stop_id` for the same station, as defined above. We may need to reformat input `stop_id`, e.g. by removing trailiig '0:1', '0:2' or 'P' in the final model (= __special cases__) \n", "\n", "We then subset sbb table to take only rows that either has a common stop_id or a common stop_name. _Takes a bit of time ~ 10 minutes_" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "4741\n", "+--------------------+-------+\n", "| haltestellen_name|stop_id|\n", "+--------------------+-------+\n", "|Oetwil a.d.L., Sc...|8500926|\n", "|Dietikon Stoffelbach|8502186|\n", "|Rudolfstetten Hof...|8502187|\n", "| Zufikon Hammergut|8502188|\n", "| Horgen Oberdorf|8502208|\n", "| Oberrieden Dorf|8502209|\n", "| Urdorf|8502220|\n", "| Birmensdorf ZH|8502221|\n", "| Bonstetten-Wettswil|8502222|\n", "| Hedingen|8502223|\n", "| Affoltern am Albis|8502224|\n", "| Urdorf Weihermatt|8502229|\n", "| Zufikon Belv�d�re|8502268|\n", "| Zufikon Belvédère|8502268|\n", "| Bergfrieden|8502270|\n", "| Bremgarten|8502273|\n", "| Zufikon|8502274|\n", "| Heinrüti|8502275|\n", "| Widen Heinr�ti|8502275|\n", "| Widen Heinrüti|8502275|\n", "+--------------------+-------+\n", "only showing top 20 rows" ] } ], "source": [ "# Used to subset sbb table based on stop_id \n", "l1_id = stops_15km.select('stop_id').collect()\n", "l2_id = [item.stop_id for item in l1_id]\n", "\n", "# Used to subset sbb table based on stop_names \n", "l1_name = stops_15km.select('stop_name').collect()\n", "l2_name = [item.stop_name for item in l1_name]\n", "\n", "# Make the subset dataframe\n", "sbb_filt = sbb.where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n", " .select('haltestellen_name', col('bpuic').alias('stop_id'))\\\n", " .distinct().orderBy(\"stop_id\")\n", "\n", "# Print number of lines and show\n", "print sbb_filt.count()\n", "sbb_filt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We get duplicated stop names, which seems to come from weird characters. _Maybe to do : Do we want to find a solution for that ? Most probably we just need stop_id and do not care about names_\n", "\n", "Then we can make a joined table using stop_id from timetable and see how many correspondance we find between timetable and sbb dataset." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "2628\n", "+-----------+--------------------+--------------------+\n", "| stop_id| timetable_name| sbb_name|\n", "+-----------+--------------------+--------------------+\n", "| 8502508|Spreitenbach, Rai...|Spreitenbach, Rai...|\n", "| 8503078| Waldburg| Waldburg|\n", "| 8503101P| Küsnacht ZH| null|\n", "|8503306:0:2| Dietlikon| null|\n", "| 8503376|Ottikon b. Kemptthal|Ottikon b. Kemptthal|\n", "| 8506895| Lufingen, Dorf| Lufingen, Dorf|\n", "| 8573729|Bonstetten, Isenbach|Bonstetten, Isenbach|\n", "| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n", "| 8587967|Erlenbach ZH, Im ...|Erlenbach ZH, Im ...|\n", "| 8589111|Horgen, Gumelenst...|Horgen, Gumelenst...|\n", "| 8590819| Thalwil, Mettli| Thalwil, Mettli|\n", "| 8591190| Zürich, Heuried| Z�rich, Heuried|\n", "| 8591190| Zürich, Heuried| Zürich, Heuried|\n", "| 8591284| Zürich, Neeserweg| Zürich, Neeserweg|\n", "| 8591284| Zürich, Neeserweg| Z�rich, Neeserweg|\n", "|8502223:0:2| Hedingen| null|\n", "|8502274:0:1| Zufikon| null|\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8503001P| Zürich Altstetten| null|\n", "| 8588312|Effretikon, Kapel...|Effretikon, Kapel...|\n", "+-----------+--------------------+--------------------+\n", "only showing top 20 rows" ] } ], "source": [ "joined_stop_table = timetable_ids.join(sbb_filt, on=['stop_id'], how='left_outer')\\\n", " .select(\"stop_id\",\n", " col('stop_name').alias('timetable_name'),\n", " col('haltestellen_name').alias('sbb_name'))\n", "print joined_stop_table.count()\n", "joined_stop_table.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The stop_id which have no correspondance in the sbb dataset have _null_ in their `sbb_name`.\n", "\n", "By counting null values, we can find how many stations could not be found in sbb dataset. We expect all stations containing trailing '0:1' (platform information) or 'P' (parent station) to be absent from sbb dataset. " ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------+--------------+--------+\n", "|stop_id|timetable_name|sbb_name|\n", "+-------+--------------+--------+\n", "| 0| 0| 408|\n", "+-------+--------------+--------+" ] } ], "source": [ "from pyspark.sql.functions import col\n", "\n", "joined_stop_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", " for c in joined_stop_table.columns]).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have 408 station with no match in sbb data. Below, A few lines of these :" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+--------------------+--------+\n", "| stop_id| timetable_name|sbb_name|\n", "+-----------+--------------------+--------+\n", "| 8503101P| Küsnacht ZH| null|\n", "|8503306:0:2| Dietlikon| null|\n", "|8502223:0:2| Hedingen| null|\n", "|8502274:0:1| Zufikon| null|\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8503001P| Zürich Altstetten| null|\n", "|8503065:0:3| Forch| null|\n", "| 8503201P| Rüschlikon| null|\n", "|8503305:0:6| Effretikon| null|\n", "|8573726:0:D|Bonstetten-Wettsw...| null|\n", "+-----------+--------------------+--------+\n", "only showing top 10 rows" ] } ], "source": [ "joined_stop_table.filter(col('sbb_name').isNull()).show(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we now how many `stop_id` are missing from sbb dataset.\n", "\n", "Last check, we may ask ourselves if all stations without platform information and not being a parent station can be found in sbb. If yes " ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------+--------------------+--------+\n", "| stop_id| timetable_name|sbb_name|\n", "+-----------+--------------------+--------+\n", "|8502758:0:B|Hausen am Albis, ...| null|\n", "| 8502208P| Horgen Oberdorf| null|\n", "|8502268:0:1| Zufikon Belvédère| null|\n", "| 8503855P| Horgen, Bahnhof| null|\n", "| 8502758P|Hausen am Albis, ...| null|\n", "+-----------+--------------------+--------+\n", "only showing top 5 rows" ] } ], "source": [ "# Regular expression - for some reason, * in first position is not accepted. \n", "# This is not a problem since all stop_id we use begin with 85 (Switzerland index)\n", "expr1 = \"85*\\\\:.\\\\:*\" # capture stop_id with platform info\n", "expr2 = \"85*P$\" # capture stop_id with parent info\n", "expr3 = \"85*Bfpl$\" # capture stop_id with rail replacement info \n", "\n", "joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr1)) | \\\n", " (joined_stop_table[\"stop_id\"].rlike(expr2)) | \\\n", " (joined_stop_table[\"stop_id\"].rlike(expr3)) ).show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we know how to get all `stop_id` with weird characters corresponding to special cases in timetable. This special cases can be resolved by removing their trailing characters. \n", "\n", "Let's see if there is any null __outside__ theses cases. " ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "70\n", "+-------+--------------------+--------+\n", "|stop_id| timetable_name|sbb_name|\n", "+-------+--------------------+--------+\n", "|8591421|Zürich, Waldhaus ...| null|\n", "|8503084| Zürich, Dolder| null|\n", "|8530648| Greifensee (See)| null|\n", "|8503081| Felsenegg| null|\n", "|8503879| Widen, Imbismatt| null|\n", "|8580847|Bremgarten AG, Bi...| null|\n", "|8503677| Kilchberg Bendlikon| null|\n", "|8502979|Oberwil-Lieli, Li...| null|\n", "|8591394|Zürich, Titlisstr...| null|\n", "|8502955| Oberlunkhofen, Post| null|\n", "|8582529|Bremgarten AG, Zu...| null|\n", "|8572595|Birmensdorf ZH, A...| null|\n", "|8582462|Bremgarten AG, Ze...| null|\n", "|8502894|Oberwil-Lieli, Ob...| null|\n", "|8502560| Berikon, Kirche| null|\n", "|8576164| Zufikon, Bachhalde| null|\n", "|8530647| Fällanden (See)| null|\n", "|8530644| Meilen Autoquai| null|\n", "|8503082|Adliswil (Luftsei...| null|\n", "|8502553|Unterlunkhofen, B...| null|\n", "+-------+--------------------+--------+\n", "only showing top 20 rows" ] } ], "source": [ "# we only wants pattern with 7 characters, finishing with a number and beginning with 85\n", "expr = '85[0-9][0-9][0-9][0-9][0-9]$'\n", "stop_id_notfound = joined_stop_table.filter( (joined_stop_table[\"stop_id\"].rlike(expr)) & \\\n", " col('sbb_name').isNull() ) \n", "print stop_id_notfound.count()\n", "stop_id_notfound.show(10, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is 70 stop_id with no matches in sbb dataset. These stop_id could potentially be problematic if one wants to get delays probabilities at these places. Stations from timetable being special cases can be resolved by only using their first 7 characters. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Get corresponding trip_id between two datasets \n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now have a table with corresponding stop_id between the datasets. We know we can trail special cases in timetable to get biuc code in sbb dataset. Let's now try to match the trip_id between the two tables. \n", "\n", "In sbb dataset, the trip ids are defined by `FAHRT_BEZEICHNER` field and in timetable `trip_id`. We will use corresponding station_id and arrival_times in order to get corresponding trip_id.\n", "\n", "Our goal is to find a match in sbb dataset for all _timetable_ trips (and not the other way around). So we will focus on getting this assymetrical correspondance table. \n", "\n", "When we find a clear one-one match, we will mark them as _resolved_, when there is a one-to-many relation, we will call it _partly_resolved_ and if we cannot find a sbb trip that correspond to a timetable trip_id, we will call it _fail_to_resolve_. \n", "\n", "These labels will be used to differentiate 3 different ways to compute probabilities :\n", "- __One-to-one__ we find a clear match : we use distribution of delays on weekdays for a given trip/station_id based on all past sbb data. \n", "- __One-to-many__ we find multiple match :\n", " - First we double check the matches, if we have the same type of transportation for example.\n", " - If they seem to be correct, we can merge the trips from sbb and get the merged distribution of their delays.\n", "- __One-to-none__ we find no match : then we get the distribution of delays for similar transportation types, at similar hour (in a window), during weekdays of sbb dataset.\n", " - Alternative : Try to find the best match and use only the closest location/time to infer a given distribution.\n", " - Alternative 2 : use k-nearest neighbors in terms of location/time." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We first load _timetable_ with curated trip_id, in a 15km radius from Zurich HB. " ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "247920\n", - "+------------------------+-----------+------------+--------------+-------------+-----------+-------------+-------------+--------+-------------+\n", - "|trip_id |stop_id |arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|route_int |stop_int|route_id |\n", - "+------------------------+-----------+------------+--------------+-------------+-----------+-------------+-------------+--------+-------------+\n", - "|813.TA.26-33-B-j19-1.6.R|8503610 |08:02:00 |08:02:00 |1 |0 |0 |1185410973697|402 |26-33-B-j19-1|\n", - "|36.TA.26-737-j19-1.1.H |8590696 |08:07:00 |08:07:00 |1 |0 |0 |592705486850 |1234 |26-737-j19-1 |\n", - "|96.TA.26-725-j19-1.4.R |8576127:0:A|08:17:00 |08:17:00 |1 |0 |0 |335007449088 |660 |26-725-j19-1 |\n", - "+------------------------+-----------+------------+--------------+-------------+-----------+-------------+-------------+--------+-------------+\n", + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", + "|trip_id |stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_int|route_id |sequence_1|trip_1 |route_int |\n", + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", + "|1.TA.1-231-j19-1.1.H|8572747|09:37:00 |09:37:00 |1 |0 |0 |500 |1-231-j19-1|10.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", + "|1.TA.1-231-j19-1.1.H|8573721|09:50:00 |09:50:00 |10 |0 |0 |599 |1-231-j19-1|11.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", + "|1.TA.1-231-j19-1.1.H|8503598|09:53:00 |09:53:00 |11 |0 |0 |401 |1-231-j19-1|12.0 |1.TA.1-231-j19-1.1.H|592705486850|\n", + "+--------------------+-------+------------+--------------+-------------+-----------+-------------+--------+-----------+----------+--------------------+------------+\n", "only showing top 3 rows" ] } ], "source": [ "stop_times_curated = spark.read.csv('data/lgpt_guys/stop_times_curated.csv', header = True)\n", "print stop_times_curated.count()\n", "stop_times_curated.show(3, False)" ] }, { "cell_type": "code", - "execution_count": 48, + "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------------------------+------------+------------+--------------+\n", - "|trip_id |stop_id |arrival_time|departure_time|\n", - "+------------------------+------------+------------+--------------+\n", - "|813.TA.26-33-B-j19-1.6.R|8503610 |08:02 |08:02 |\n", - "|36.TA.26-737-j19-1.1.H |8590696 |08:07 |08:07 |\n", - "|96.TA.26-725-j19-1.4.R |8576127:0:A |08:17 |08:17 |\n", - "|249.TA.26-83-j19-1.2.H |8591276 |08:28 |08:28 |\n", - "|458.TA.26-752-j19-1.7.H |8590573 |08:29 |08:29 |\n", - "|103.TA.1-17-A-j19-1.9.R |8503508:0:11|08:33 |08:33 |\n", - "|1230.TA.26-69-j19-1.5.R |8591101 |08:35 |08:35 |\n", - "|137.TA.26-69-j19-1.3.R |8591276 |08:44 |08:44 |\n", - "|155.TA.26-66-j19-1.4.R |8591286 |08:55 |08:55 |\n", - "|1016.TA.26-33E-j19-1.9.R|8591230 |09:04 |09:04 |\n", - "+------------------------+------------+------------+--------------+\n", - "only showing top 10 rows" - ] } ], "source": [ "# Make the subset dataframe\n", "stop_times_format = stop_times_curated\\\n", " .select('trip_id', 'stop_id', \n", " unix_timestamp(stop_times_curated.arrival_time, 'HH:mm:ss')\\\n", " .alias('arrival_time_ut'),\\\n", " unix_timestamp(stop_times_curated.departure_time, 'HH:mm:ss')\\\n", " .alias('departure_time_ut') )\\\n", " .select('trip_id', 'stop_id', \n", " from_unixtime('arrival_time_ut')\\\n", " .alias('arrival_time_dty'),\n", " from_unixtime('departure_time_ut')\\\n", " .alias('departure_time_dty'))\\\n", " .select('trip_id', 'stop_id', \n", " date_format('arrival_time_dty', 'hh:mm')\\\n", " .alias('arrival_time'),\n", " date_format('departure_time_dty', 'hh:mm')\\\n", - " .alias('departure_time'))\n", - "stop_times_format.show(10, False)" + " .alias('departure_time'))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have reformated `arrival_time` and `departure_time` to get it in `hh:mm` format. To be sure to get every possible match and allow possible mistakes on the time schedules between both datasets, we decided to round to quarter hour the times. Same trip_ids should still be able to be seen as they should share stop_id and times, and this approximation to the quarter hour should be enough to avoid getting wrong trips." ] }, { "cell_type": "code", - "execution_count": 49, + "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+------------------------+-------+------------+--------------+\n", - "|trip_id |stop_id|arrival_time|departure_time|\n", - "+------------------------+-------+------------+--------------+\n", - "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n", - "|36.TA.26-737-j19-1.1.H |8590696|08:00 |08:00 |\n", - "|96.TA.26-725-j19-1.4.R |8576127|08:15 |08:15 |\n", - "|249.TA.26-83-j19-1.2.H |8591276|08:30 |08:30 |\n", - "|458.TA.26-752-j19-1.7.H |8590573|08:30 |08:30 |\n", - "|103.TA.1-17-A-j19-1.9.R |8503508|08:30 |08:30 |\n", - "|1230.TA.26-69-j19-1.5.R |8591101|08:30 |08:30 |\n", - "|137.TA.26-69-j19-1.3.R |8591276|08:45 |08:45 |\n", - "|155.TA.26-66-j19-1.4.R |8591286|09:00 |09:00 |\n", - "|1016.TA.26-33E-j19-1.9.R|8591230|09:00 |09:00 |\n", - "+------------------------+-------+------------+--------------+\n", + "+--------------------+-------+------------+--------------+\n", + "|trip_id |stop_id|arrival_time|departure_time|\n", + "+--------------------+-------+------------+--------------+\n", + "|1.TA.1-231-j19-1.1.H|8572747|09:30 |09:30 |\n", + "|1.TA.1-231-j19-1.1.H|8573721|09:45 |09:45 |\n", + "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573720|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8503598|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573721|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573722|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8573723|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8583071|10:00 |10:00 |\n", + "|1.TA.1-231-j19-1.1.H|8572603|10:00 |10:00 |\n", + "+--------------------+-------+------------+--------------+\n", "only showing top 10 rows" ] } ], "source": [ "# next two lines used to round time to quarter hour ( = 900 seconds )\n", "quarter_r_arr = ((round(unix_timestamp(col(\"arrival_time\"), 'HH:mm') / 900) * 900)\n", " .cast(\"timestamp\"))\n", "quarter_r_dep = ((round(unix_timestamp(col(\"departure_time\"), 'HH:mm') / 900) * 900)\n", " .cast(\"timestamp\"))\n", "\n", "# Times are rounded\n", "stop_times_format_r = stop_times_format.withColumn(\"arrival_time_rounded\", quarter_r_arr) \\\n", " .withColumn(\"departure_time_rounded\", quarter_r_dep)\n", "\n", "# Reformating date/time in time with hh:mm format\n", - "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id', \n", - " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", - " .alias('arrival_time_ut'),\\\n", - " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", - " .alias('departure_time_ut') )\\\n", - " .select('trip_id', 'stop_id', \n", + "stop_times_format = stop_times_format_r.select('trip_id', 'stop_id',\\\n", + " unix_timestamp(stop_times_format_r.arrival_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", + " .alias('arrival_time_ut'),\\\n", + " unix_timestamp(stop_times_format_r.departure_time_rounded, 'yyyy-mm-dd HH:mm:ss')\\\n", + " .alias('departure_time_ut') )\\\n", + " .select('trip_id', 'stop_id',\n", " from_unixtime('arrival_time_ut')\\\n", " .alias('arrival_time_dty'),\n", " from_unixtime('departure_time_ut')\\\n", " .alias('departure_time_dty'))\\\n", - " .select('trip_id', 'stop_id', \n", + " .select('trip_id', 'stop_id',\n", " date_format('arrival_time_dty', 'hh:mm')\\\n", " .alias('arrival_time'),\n", " date_format('departure_time_dty', 'hh:mm')\\\n", " .alias('departure_time'))\n", "\n", "# We use only first 7 characters of stop_id to remove special cases\n", "stop_times_format = stop_times_format.withColumn('stop_id',\n", " stop_times_format.stop_id.substr(1, 7))\n", "\n", "stop_times_format.show(10, False)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Here is an example of a single trip_id : it goes from stop to stop and has various arrival / departure along its journey. As the times are rounded, they seem to be the same. Most probably the time between stops is very short." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------------------------+-------+------------+--------------+\n", + "|trip_id |stop_id|arrival_time|departure_time|\n", + "+------------------------+-------+------------+--------------+\n", + "|813.TA.26-33-B-j19-1.6.R|8503610|08:00 |08:00 |\n", + "|813.TA.26-33-B-j19-1.6.R|8594239|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8580522|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591323|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591066|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591292|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591326|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591335|08:15 |08:15 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591339|08:30 |08:30 |\n", + "|813.TA.26-33-B-j19-1.6.R|8591362|08:30 |08:30 |\n", + "+------------------------+-------+------------+--------------+\n", + "only showing top 10 rows" + ] + } + ], + "source": [ + "stop_times_format.filter(stop_times_format['trip_id'] == '813.TA.26-33-B-j19-1.6.R').show(10,False)" + ] + }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will only use first 7 characters of `stop_id` field in order to get exact match with sbb dataset. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We use time rounded to quarter hour, so that unexact match between two datasets should not be a problem." ] }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "20201" + "247920\n", + "22807" ] } ], "source": [ - "print stop_times_format.count()\n", - "stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()" + "#print stop_times_format.count()\n", + "#stop_times_format.select(\"trip_id\").distinct().orderBy(\"trip_id\").count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match" + "We have _timetable_ trip_id, the stop_id as defined above, and arrival/departure time. The idea is to match these information with the ones we have in sbb dataset. The stop_id should match.\n", + "\n", + "We will subset sbb dataset to get only the 13th of May in sbb dataset :" ] }, { "cell_type": "code", - "execution_count": 62, + "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ - "# Used to subset sbb table based on stop_id \n", - "#l1_id = stops_15km.select('stop_id').collect()\n", - "#l2_id = [item.stop_id for item in l1_id]\n", - "\n", - "# Used to subset sbb table based on stop_names \n", - "#l1_name = stops_15km.select('stop_name').collect()\n", - "#l2_name = [item.stop_name for item in l1_name]\n", - "\n", - "# Make the subset dataframe\n", - "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", - " .cast(\"timestamp\"))\n", - "\n", - "sbb_filt = sbb.filter((sbb.an_prognose != \"\") & (sbb.abfahrtszeit != \"\") & \\\n", - " (sbb.an_prognose_status == 'GESCHAETZT'))\\\n", - " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n", - " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep) \\\n", - " .select('fahrt_bezeichner','haltestellen_name', \\\n", - " 'ankunftszeit_rounded', 'abfahrtszeit_rounded', \\\n", - " col('bpuic').alias('stop_id'))\n", - "\n", - "# .where(sbb['bpuic'].isin(l2_id) | sbb['haltestellen_name'].isin(l2_name))\\\n" + "sbb_sub = sbb.filter( (sbb.ankunftszeit != \"\") | (sbb.abfahrtszeit != \"\") )\\\n", + " .select('fahrt_bezeichner','haltestellen_name', \\\n", + " 'ankunftszeit', 'abfahrtszeit', 'bpuic')\\\n", + " .withColumn('ankunftszeit_ts',to_timestamp(col('ankunftszeit'),\\\n", + " format='dd.MM.yyyy HH:mm:ss'))\\\n", + " .withColumn('abfahrtszeit_ts',to_timestamp(col('abfahrtszeit'),\\\n", + " format='dd.MM.yyyy HH:mm'))\\" ] }, { "cell_type": "code", - "execution_count": 63, + "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ - "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", - " unix_timestamp(sbb_filt.ankunftszeit_rounded, 'dd.MM.yyyy HH:mm')\\\n", - " .alias('ankunftszeit_ut'),\n", - " unix_timestamp(sbb_filt.abfahrtszeit_rounded, 'dd.MM.yyyy HH:mm')\\\n", - " .alias('abfahrtszeit_ut'))\\\n", - " .select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", - " from_unixtime('ankunftszeit_ut')\\\n", - " .alias('ankunftszeit_dty'),\n", - " from_unixtime('abfahrtszeit_ut')\\\n", - " .alias('abfahrtszeit_dty'))\\\n", - " .select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", - " date_format('ankunftszeit_dty', 'hh:mm')\\\n", - " .alias('arrival_time'),\n", - " date_format('abfahrtszeit_dty', 'hh:mm')\\\n", - " .alias('departure_time'))" + "# Used to subset sbb table based on stop_id \n", + "l1_id = stops_15km.select('stop_id').collect()\n", + "l2_id = [item.stop_id for item in l1_id]\n", + "\n", + "# Make the subset dataframe\n", + "quarter_round_arr = ((round(unix_timestamp(col(\"ankunftszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "quarter_round_dep = ((round(unix_timestamp(col(\"abfahrtszeit\"), 'dd.MM.yyyy HH:mm') / 900) * 900)\n", + " .cast(\"timestamp\"))\n", + "\n", + "sbb_filt = sbb.filter( ( (col('ankunftszeit') != \"\") | (col('abfahrtszeit') != \"\") ) & \\\n", + " sbb['bpuic'].isin(l2_id) )\\\n", + " .select('fahrt_bezeichner','haltestellen_name', \\\n", + " 'ankunftszeit', 'abfahrtszeit', \\\n", + " col('bpuic').alias('stop_id')) \\\n", + " .withColumn(\"ankunftszeit_rounded\", quarter_round_arr) \\\n", + " .withColumn(\"abfahrtszeit_rounded\", quarter_round_dep)" ] }, { "cell_type": "code", - "execution_count": 64, + "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "+----------------+------------------------------+-------+------------+--------------+\n", - "|fahrt_bezeichner|haltestellen_name |stop_id|arrival_time|departure_time|\n", - "+----------------+------------------------------+-------+------------+--------------+\n", - "|85:870:51001 |Wangenried, Linde |8576902|05:30 |05:30 |\n", - "|85:870:51001 |Wangenried, Unterdorf |8576901|05:30 |05:30 |\n", - "|85:870:51001 |Wangen a.d. Aare, Unterführung|8589946|05:30 |05:30 |\n", - "|85:870:51001 |Wangen a.A., Bahnhof |8576900|05:30 |05:30 |\n", - "|85:870:51002 |Herzogenbuchsee, Sonnenplatz |8576918|05:15 |05:15 |\n", - "|85:870:51002 |Herzogenbuchsee, Friedhof |8576919|05:15 |05:15 |\n", - "|85:870:51002 |Bützberg, Wyssenried |8508961|05:15 |05:15 |\n", - "|85:870:51002 |Bützberg, Neuquartier |8576920|05:15 |05:15 |\n", - "|85:870:51002 |Bützberg, Tell |8508960|05:15 |05:15 |\n", - "|85:870:51002 |Bützberg, Käserei |8576921|05:15 |05:15 |\n", - "+----------------+------------------------------+-------+------------+--------------+\n", - "only showing top 10 rows" + "+----------------+-----------------+-------+------------+--------------+\n", + "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "|85:11:10:002 |Zürich HB |8503000|09:45 |null |\n", + "|85:11:11:001 |Zürich HB |8503000|null |06:15 |\n", + "|85:11:12:001 |Zürich HB |8503000|10:45 |null |\n", + "|85:11:1251:003 |Zürich HB |8503000|07:00 |null |\n", + "|85:11:1252:001 |Zürich HB |8503000|09:30 |09:30 |\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "only showing top 5 rows" ] } ], "source": [ - "sbb_filt_format.show(10, False)" + "sbb_filt_format = sbb_filt.select('fahrt_bezeichner','haltestellen_name', 'stop_id',\n", + " date_format(from_unixtime(unix_timestamp(col('ankunftszeit_rounded'), \\\n", + " 'dd.MM.yyyy HH:mm')), \\\n", + " 'hh:mm')\\\n", + " .alias('arrival_time'),\n", + " date_format(from_unixtime(unix_timestamp(col('abfahrtszeit_rounded'), \\\n", + " 'dd.MM.yyyy HH:mm')), \\\n", + " 'hh:mm')\\\n", + " .alias('departure_time'))\n", + "sbb_filt_format.show(5, False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#sbb_filt_format.select(\"fahrt_bezeichner\").distinct().orderBy(\"fahrt_bezeichner\").count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Let's have a look at a single trip_id according to sbb dataset (called `fahrt_bezeichner`). " ] }, { "cell_type": "code", - "execution_count": 65, + "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ - "406667" + "+----------------+-----------------+-------+------------+--------------+\n", + "|fahrt_bezeichner|haltestellen_name|stop_id|arrival_time|departure_time|\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n", + "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n", + "|85:78:24815:001 |Friesenberg |8503052|01:45 |01:45 |\n", + "|85:78:24815:001 |Schweighof |8503053|01:45 |01:45 |\n", + "|85:78:24815:001 |Triemli |8503054|01:45 |01:45 |\n", + "|85:78:24815:001 |Uitikon Waldegg |8503055|01:45 |01:45 |\n", + "|85:78:24815:001 |Ringlikon |8503056|01:45 |01:45 |\n", + "|85:78:24815:001 |Uetliberg |8503057|02:00 |null |\n", + "|85:78:24815:001 |Zürich Selnau |8503090|01:30 |01:30 |\n", + "|85:78:24815:001 |Zürich Binz |8503051|01:45 |01:45 |\n", + "+----------------+-----------------+-------+------------+--------------+\n", + "only showing top 10 rows" ] } ], "source": [ - "sbb_filt_format.select(\"fahrt_bezeichner\").distinct().orderBy(\"fahrt_bezeichner\").count()" + "fahrt_bezeichner_example = '85:78:24815:001'\n", + "sbb_filt_format.filter(sbb_filt_format['fahrt_bezeichner'] == fahrt_bezeichner_example)\\\n", + " .show(10,False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ option to get results " + "We can now create a joined table using `stop_time_format` and `sbb_filt_format`. We use only stop_id and times to merge the tables. The idea is then to compare the trip_id from both dataset than end up on the same line. We use join with _left_outer_ so that we can only have _null_ values on the sbb side (assymetrical join)." ] }, { "cell_type": "code", - "execution_count": 76, + "execution_count": 16, "metadata": {}, "outputs": [ { - "name": "stderr", + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", "output_type": "stream", "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" + "+-------+------------------------+---------------------+\n", + "|stop_id|trip_id |fahrt_bezeichner |\n", + "+-------+------------------------+---------------------+\n", + "|8502508|269.TA.26-303-j19-1.3.H |85:849:609297-18301-1|\n", + "|8502750|35.TA.1-331-j19-1.3.R |null |\n", + "|8503000|128.TA.17-4-j19-1.23.R |85:11:1957:001 |\n", + "|8503000|153.TA.26-6-A-j19-1.34.R|85:11:71531:006 |\n", + "|8503000|173.TA.21-75-j19-1.47.R |85:11:18368:001 |\n", + "|8503000|173.TA.21-75-j19-1.47.R |85:11:89171:000 |\n", + "|8503000|182.TA.26-1-j19-1.90.H |85:11:88622:002 |\n", + "|8503000|237.TA.1-36-j19-1.79.R |85:11:10580:002 |\n", + "|8503000|269.TA.26-11-j19-1.78.H |85:11:554:006 |\n", + "|8503000|3.TA.6-1-j19-1.3.R |85:11:30580:003 |\n", + "|8503000|3.TA.6-1-j19-1.3.R |85:11:88921:003 |\n", + "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:727:002 |\n", + "|8503000|331.TA.26-6-A-j19-1.45.H|85:11:30730:003 |\n", + "|8503000|345.TA.16-5-j19-1.71.R |85:11:730:002 |\n", + "|8503000|345.TA.16-5-j19-1.71.R |85:11:19122:001 |\n", + "|8503000|345.TA.16-5-j19-1.71.R |85:11:70727:009 |\n", + "|8503000|352.TA.20-2-j19-1.146.H |85:11:1531:002 |\n", + "|8503000|38.TA.26-9-A-j19-1.3.H |85:11:70580:001 |\n", + "|8503000|551.TA.26-11-j19-1.126.R|85:11:88320:003 |\n", + "|8503000|63.TA.1-36-j19-1.21.R |85:11:31615:039 |\n", + "+-------+------------------------+---------------------+\n", + "only showing top 20 rows" ] } ], "source": [ "joined_trip_table = stop_times_format.join(sbb_filt_format,\\\n", " on=['stop_id', 'arrival_time'], \n", " how='left_outer')\\\n", - " .select('stop_id', 'arrival_time',\n", - " 'trip_id', 'fahrt_bezeichner')\n", - "joined_trip_table.show(10, False)" + " .select('stop_id', \n", + " 'trip_id', 'fahrt_bezeichner') \\\n", + " .distinct()\n", + "joined_trip_table.show(20, False)" ] }, { "cell_type": "code", - "execution_count": 72, + "execution_count": 17, "metadata": {}, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" - ] + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ - "joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", - " for c in joined_trip_table.columns]).show()" + "#joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", + "# for c in joined_trip_table.columns]).show()" ] }, { "cell_type": "code", - "execution_count": 73, + "execution_count": 18, "metadata": {}, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" - ] + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ "joined_trip_filt = joined_trip_table.filter( (col('trip_id').isNotNull()) & \n", " (col('fahrt_bezeichner').isNotNull()) )\\\n", " .select(\"trip_id\", \"fahrt_bezeichner\")\\\n", " .groupBy(\"trip_id\", \"fahrt_bezeichner\")\\\n", - " .count()\\\n", - " .orderBy(\"trip_id\")" + " .count()" ] }, { "cell_type": "code", - "execution_count": 74, + "execution_count": null, "metadata": {}, "outputs": [ { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" - ] + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "bbc32e4a021a45eb82f4870a3893fbec", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" + ] + }, + "metadata": {}, + "output_type": "display_data" } ], "source": [ "print joined_trip_filt.count()\n", - "joined_trip_filt.orderBy(\"fahrt_bezeichner\").show(20, False)" + "joined_trip_filt.show(10, False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "joined_trip_filt.write.csv('data/lgpt_guys/joined_trip_id.csv', header = True, mode=\"overwrite\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Explore trips with no matches " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This shows how many trip_id / fahrt_bezeichner there is in the joined dataframe :" ] }, { "cell_type": "code", - "execution_count": 61, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" - ] - } - ], + "outputs": [], + "source": [ + "joined_trip_filt = spark.read.csv('data/lgpt_guys/stops_15km.csv', header = True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "print joined_trip_table.select(\"trip_id\").distinct().count()\n", "print joined_trip_table.select(\"fahrt_bezeichner\").distinct().count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And below how many of them we could get a match (of at least 1 stop/time, therefore most probably higher than reality)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print joined_trip_filt.select(\"trip_id\").distinct().count()\n", "print joined_trip_filt.select(\"fahrt_bezeichner\").distinct().count()" ] }, { "cell_type": "code", - "execution_count": 60, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "An error was encountered:\n", - "Session 6219 did not reach idle status in time. Current status is busy.\n" - ] - } - ], + "outputs": [], "source": [ "joined_trip_table.select([count(when(col(c).isNull(), c)).alias(c) \\\n", " for c in joined_trip_table.columns]).show()" ] }, { "cell_type": "code", - "execution_count": 53, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+-------+------------+--------------------+----------------+\n", - "|stop_id|arrival_time| trip_id|fahrt_bezeichner|\n", - "+-------+------------+--------------------+----------------+\n", - "|8500926| 09:00|44.TA.26-301-j19-...| null|\n", - "|8502979| 06:00|105.TA.1-350-j19-...| null|\n", - "|8502979| 06:00|138.TA.1-350-j19-...| null|\n", - "|8503056| 03:00|142.TA.26-10-B-j1...| null|\n", - "|8503063| 05:15|568.TA.26-18-j19-...| null|\n", - "|8503063| 05:15|578.TA.26-18-j19-...| null|\n", - "|8503067| 06:30|443.TA.26-18-j19-...| null|\n", - "|8503067| 06:30|134.TA.26-18-j19-...| null|\n", - "|8503072| 03:45|563.TA.26-18-j19-...| null|\n", - "|8503072| 03:45|319.TA.26-18-j19-...| null|\n", - "+-------+------------+--------------------+----------------+\n", - "only showing top 10 rows" - ] - } - ], + "outputs": [], "source": [ "joined_trip_table.filter( (col('trip_id').isNull()) |\n", " (col('fahrt_bezeichner').isNull()) ).show(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can create a dictionnaries with stop_id -> stop_name structure from timetable data, to be able to compare both dataset more carefully " ] }, { "cell_type": "code", - "execution_count": 24, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - } - ], + "outputs": [], "source": [ "newrdd = timetable_ids.rdd\n", "timetable_rdd = newrdd.map(lambda x : (x[1],x[0]))\n", "timetable_stopid_dict = timetable_rdd.collectAsMap()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here is an exemple of how to get a station name from a stop_id " ] }, { "cell_type": "code", - "execution_count": 25, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" - ] - }, - "metadata": {}, - "output_type": "display_data" - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "u'Oberrieden Dorf'" - ] - } - ], + "outputs": [], "source": [ "timetable_stopid_dict.get(\"8502209\")" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }