# Generating a cleaned dataset compatible with the RAPTOR file structure

*Author: Cyril Pulver, with help and boilerplate code from Tomas Turner*

Below, we pre-process the data to generate the files required to run RAPTOR.

The main reasoning behind this way of cleaning the data is the following:
**Given a GTFS file `stop_times.txt` formatted to a cleaned GTFS-like `stop_times` table, it is possible to reconstruct everything required to run RAPTOR**. In particular, trips and routes are reconstructed directly from the cleaned `stop_times` table and without insight from the GTFS file `routes.txt`.

The key idea is to filter out stops not belonging to the 15km radius around Zürich HB, filter out services not running on standard business days and filter out stop times not between 7 am and 8pm. Trimming the data does not really matter to run RAPTOR in itself, but lightens the computational cost of estimating the probability distribution of delays. However, the key stop to run RAPTOR properly is the following: trips from the GTFS file format are grouped in RAPTOR-compatible trips (a sequence of stop times where stops are always served in the same order) based on reconstructed lists of stops served **directly in the cleaned `stop_times` table**. Routes are then constructed by finding trips serving the same sequence of stops.

Finally, footpaths and associated walking times are estimated from the bird's fly distance between all pairs of stops as defined by GPS coordinates. We only keep footpaths for distances less than 500m, no matter the terrain betwen both stops. Walking times are derived from distances by considering a uniform waking velocity of 50 meters per minute. We only compute walking times between stops that do not share the same parent stop (i.e stops with differing `BPUIC`). Indeed, footpaths within large stations depend largely on station plans themselves and less so on the GPS coordinates of the different platforms. A 2 minute minimum transfer time is enforced directly in the main looop of RAPTOR to account for any connection happening within a single parent station.

Below, we present a step by step outline of the data wrangling strategy:

- 1) Filter out stops out of 15km radius around Zürich HB
    - Done on the GTFS file `stops.txt`
    

- 2) Give a general stop id to stops by trimming stop_id to the first 7 characters. Has the overall effect of gathering stops sharing a parent stop, but it is more robust to inconcistencies detected in the data. Also, the general stop id corresponds to the BPUIC field in the SBB dataset. That will come handy when mining for information about delays.
    - Done on stops (add a `stop_id_general` column)
        - Used as an input for stopTimes. At this point, stop_times contains only stops within 15km of ZH HB
    
    
- 3) Keep only trips belonging to services that run each day of the business week:
    - Obtain the list of services from the GTFS file `calendar.txt`
        - Serves as an input to filter trips
            - Serves as an input to filter `stop_times`
      
      
- 4) Keep only stop times between 7am and 7pm
    - Done directly on `stop_times`
    
    
- 5) Find unique trips, based on the stops sequence and the departure times sequence. We need to merge trips serving the same stops at the same times each day into a single trip, as the date is not used as an input to RAPTOR.
    - sort `stop_times` by trip and stop sequence
    - For each trip
        - build a list containing the sequence of stops visited (`all_stops`)
        - build a list containing all departure times for those stops
    - keep only a single trip between those that share the same lists of stops visited and departure times.
    - Filter `stop_times` based on said unique trips.
    
    
- 6) Building routes based on unique trips sharing the same `all_stops` column
    - order (at this point unique) trips by `all_stops` and earliest departure time (i.e the departure time from the first stop of the trip)
    - group trips by `all_stops` and assign a unique integer `route_int` to each group
    - Add this information to `stop_times`
    
    
- 7) Assigning unique integer indices to stops
    - get unique general (parent) stop names from `stop_times`
    - assign an integer index `stop_int` from 0 to n_stops-1 with a spark dataframe adapted version of `zipWithIndex`
    - Add the `stop_int` to `stop_times`
   
    
- 8) Getting transport type information (Bus, Tram, Train etc) from the GTFS file `routes.txt`
    - inner join `stop_times` with `routes.txt` using `route_id`


- 9) Assigning a monotonically increasing index to be able to re-sort `stop_times` by `route_int` and trip as is required in the RAPTOR data structure StopTimes.


- 10) Computing footpaths that serve as an input to RAPTOR's Transfers data structure. We only consider stops present in the final and filtered `stop_times` table.
    - get all unique `stop_int` with a single pair of coordinates
    - cross-join two copies of this dataframe. The result is all permutations of `stop_int` pairs
    - drop pairs where `general_stop_id` is the same
    - compute the bird's fly distance with geopy for all other pairs of `stop_int` using GPS coordinates
    - filter out pairs of stops with a distance greater than 500 meters
    - transform distance to walking time in seconds
    - order by `stop_int1`, then `stop_int2` to mimic the data structure Transfers in RAPTOR
    - create a monotonically increasing ID to easily sort on this table after reading from a distributed file system
    
    
Executing all the cells in this notebook generates cleaned .csv tables saved on HDFS that serve as an input to the `notebooks/transfer_to_local.ipynb` notebook.

In [1]:
%%configure
{"conf": {
    "spark.app.name": "lgptguys_final"
}}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7933,application_1589299642358_2451,pyspark,idle,Link,Link,
7946,application_1589299642358_2464,pyspark,idle,Link,Link,
7951,application_1589299642358_2469,pyspark,idle,Link,Link,
7958,application_1589299642358_2476,pyspark,idle,Link,Link,
7959,application_1589299642358_2477,pyspark,idle,Link,Link,
7962,application_1589299642358_2480,pyspark,idle,Link,Link,
7965,application_1589299642358_2485,pyspark,idle,Link,Link,
7968,application_1589299642358_2488,pyspark,idle,Link,Link,
7971,application_1589299642358_2491,pyspark,idle,Link,Link,
7972,application_1589299642358_2492,pyspark,idle,Link,Link,


In [2]:
# Initialization
%%spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7985,application_1589299642358_2506,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
unknown magic command '%spark'
UnknownMagic: unknown magic command '%spark'



In [3]:
from geopy.distance import great_circle
from pyspark.sql.functions import *
import numpy as np
import pandas as pd
from geopy.distance import great_circle
from pyspark.sql.types import DoubleType
from pyspark.sql.types import DateType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 1) Filtering out stops not within 15km of ZH HB

In [4]:
stops = spark.read.csv("/data/sbb/timetables/csv/stops/2019/05/14/stops.txt", header=True, sep = ",")
stops.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+----------------+-----------------+-------------+--------------+
|stop_id|           stop_name|        stop_lat|         stop_lon|location_type|parent_station|
+-------+--------------------+----------------+-----------------+-------------+--------------+
|1322000|            Altoggio|46.1672513851495|   8.345807131427|         null|          null|
|1322001|        Antronapiana| 46.060121674738| 8.11361957990831|         null|          null|
|1322002|              Anzola|45.9898698225697| 8.34571729989858|         null|          null|
|1322003|              Baceno|46.2614983591677| 8.31925293162473|         null|          null|
|1322004|Beura Cardezza, c...|46.0790618438814| 8.29927439970313|         null|          null|
|1322005|Bognanco, T. Vill...|46.1222963432243| 8.21077237789936|         null|          null|
|1322006|           Boschetto|46.0656504576122| 8.26113193273411|         null|          null|
|1322007|            Cadarese|46.2978807772998|  8

In [5]:
stops.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

30631

In [6]:
#defining udf function
@udf("float")
def great_circle_udf(x, y):
    return great_circle(x, y).kilometers

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# Zurich HB coordinates
zurich_geo = (47.378177, 8.540192)

#transforming Zurich HB coordinates in a spark dataframe column object
zurich_geo_col = struct(lit(zurich_geo[0]), lit(zurich_geo[1]))

#applying filter function based on distance
stops_15km = stops.filter(great_circle_udf(zurich_geo_col, struct(stops.stop_lat, stops.stop_lon)) < 15)
stops_15km.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+-------------+--------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+-----------+--------------------+----------------+----------------+-------------+--------------+
|    8500926|Oetwil a.d.L., Sc...|47.4236270123012| 8.4031825286317|         null|          null|
|    8502186|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|         null|      8502186P|
|8502186:0:1|Dietikon Stoffelbach|47.3934666445388|8.39894248049007|         null|      8502186P|
|8502186:0:2|Dietikon Stoffelbach|47.3935274568464|8.39894248049007|         null|      8502186P|
|   8502186P|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|            1|          null|
|    8502187|Rudolfstetten Hof...|47.3646945560768|8.37709545277724|         null|      8502187P|
|8502187:0:1|Rudolfstetten Hof...|47.3647554015789|8.37709545277724|         null|      8502187P|
|8502187:0:2|Rudolfs

In [8]:
stops_15km.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1883

# 2) Merging stops that share a parent stop

In [9]:
stops_15km.filter(stops_15km.parent_station.isNotNull()).show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+----------------+----------------+-------------+--------------+
|     stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+------------+--------------------+----------------+----------------+-------------+--------------+
|     8502186|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|         null|      8502186P|
| 8502186:0:1|Dietikon Stoffelbach|47.3934666445388|8.39894248049007|         null|      8502186P|
| 8502186:0:2|Dietikon Stoffelbach|47.3935274568464|8.39894248049007|         null|      8502186P|
|     8502187|Rudolfstetten Hof...|47.3646945560768|8.37709545277724|         null|      8502187P|
| 8502187:0:1|Rudolfstetten Hof...|47.3647554015789|8.37709545277724|         null|      8502187P|
| 8502187:0:2|Rudolfstetten Hof...|47.3648162470108|8.37709545277724|         null|      8502187P|
|     8502188|   Zufikon Hammergut|47.3558347019549|8.35472740219955|         null|      8502188P|
| 8502188:

In [10]:
stops_15km.filter(stops_15km.parent_station.isNull()).show(50)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+-------------+--------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|
+-----------+--------------------+----------------+----------------+-------------+--------------+
|    8500926|Oetwil a.d.L., Sc...|47.4236270123012| 8.4031825286317|         null|          null|
|   8502186P|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|            1|          null|
|   8502187P|Rudolfstetten Hof...|47.3646945560768|8.37709545277724|            1|          null|
|   8502188P|   Zufikon Hammergut|47.3558347019549|8.35472740219955|            1|          null|
|   8502208P|     Horgen Oberdorf|47.2587475534877|8.58979854578067|            1|          null|
|   8502209P|     Oberrieden Dorf|47.2767238569466|  8.577635356832|            1|          null|
|   8502220P|              Urdorf|47.3908820565997|8.43471339510869|            1|          null|
|   8502221P|      B

It is clear that parent stops were not properly assigned for all stops (e.g Zufikon Belvédère where there is a platform stop, but no parent stop). Thus, we create a new column `stop_id_general` that contains only the 7 first characters of `stop_id`

In [11]:
stops_15km = stops_15km.withColumn('stop_id_general',col('stop_id').substr(1, 7))
stops_15km.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+----------------+----------------+-------------+--------------+---------------+
|    stop_id|           stop_name|        stop_lat|        stop_lon|location_type|parent_station|stop_id_general|
+-----------+--------------------+----------------+----------------+-------------+--------------+---------------+
|    8500926|Oetwil a.d.L., Sc...|47.4236270123012| 8.4031825286317|         null|          null|        8500926|
|    8502186|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|         null|      8502186P|        8502186|
|8502186:0:1|Dietikon Stoffelbach|47.3934666445388|8.39894248049007|         null|      8502186P|        8502186|
|8502186:0:2|Dietikon Stoffelbach|47.3935274568464|8.39894248049007|         null|      8502186P|        8502186|
|   8502186P|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|            1|          null|        8502186|
|    8502187|Rudolfstetten Hof...|47.3646945560768|8.37709545277724|         null|      

Next, we filter stop_times with the 15km radius, and add the stop_id_general column

In [12]:
stops_15km_for_join = stops_15km.select(stops_15km.stop_id, 
                                        stops_15km.stop_id_general, 
                                        stops.stop_name, 
                                        stops.stop_lat, 
                                        stops.stop_lon)
stops_15km_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------+--------------------+----------------+----------------+
|    stop_id|stop_id_general|           stop_name|        stop_lat|        stop_lon|
+-----------+---------------+--------------------+----------------+----------------+
|    8500926|        8500926|Oetwil a.d.L., Sc...|47.4236270123012| 8.4031825286317|
|    8502186|        8502186|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|
|8502186:0:1|        8502186|Dietikon Stoffelbach|47.3934666445388|8.39894248049007|
|8502186:0:2|        8502186|Dietikon Stoffelbach|47.3935274568464|8.39894248049007|
|   8502186P|        8502186|Dietikon Stoffelbach|47.3934058321612|8.39894248049007|
|    8502187|        8502187|Rudolfstetten Hof...|47.3646945560768|8.37709545277724|
|8502187:0:1|        8502187|Rudolfstetten Hof...|47.3647554015789|8.37709545277724|
|8502187:0:2|        8502187|Rudolfstetten Hof...|47.3648162470108|8.37709545277724|
|   8502187P|        8502187|Rudolfstetten Hof...|47.364694556076

In [13]:
stop_times = spark.read.csv("/data/sbb/timetables/csv/stop_times/2019/05/14/stop_times.txt", header=True, sep = ",")
stop_times.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------+-----------+-------------+-----------+-------------+
|             trip_id|arrival_time|departure_time|    stop_id|stop_sequence|pickup_type|drop_off_type|
+--------------------+------------+--------------+-----------+-------------+-----------+-------------+
|1.TA.1-1-B-j19-1.1.R|    04:20:00|      04:20:00|8500010:0:3|            1|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:24:00|      04:24:00|8500020:0:3|            2|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:28:00|      04:28:00|8500021:0:5|            3|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:30:00|      04:30:00|8517131:0:2|            4|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:32:00|      04:32:00|8500300:0:5|            5|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:35:00|      04:35:00|8500313:0:2|            6|          0|            0|
|1.TA.1-1-B-j19-1.1.R|    04:37:00|      04:38:00|8500301:0:3|           

In [14]:
stop_times.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

11128930

In [15]:
stop_times_15km = stop_times.join(stops_15km_for_join, how="inner", on = "stop_id").dropDuplicates()
stop_times_15km.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+--------------------+------------+--------------+-------------+-----------+-------------+---------------+-------------------+----------------+----------------+
|    stop_id|             trip_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|          stop_name|        stop_lat|        stop_lon|
+-----------+--------------------+------------+--------------+-------------+-----------+-------------+---------------+-------------------+----------------+----------------+
|8503202:0:5|61.TA.25-75-j19-1...|    15:12:00|      15:14:00|            5|          0|            0|        8503202|            Thalwil|47.2962171893553|8.56475351565593|
|8503202:0:5|99.TA.25-75-j19-1...|    19:14:00|      19:15:00|            5|          0|            0|        8503202|            Thalwil|47.2962171893553|8.56475351565593|
|8503202:0:5|137.TA.25-75-j19-...|    23:12:00|      23:14:00|            5|          0|            0|        8503202|            Thalw

In [16]:
stop_times_15km.write.csv('data/lgpt_guys/stop_times_15km.csv', header=True, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3) Keep only services that run each day of the week

In [17]:
stop_times_15km = spark.read.csv('data/lgpt_guys/stop_times_15km.csv', header=True)
stop_times_15km.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2322109

In [18]:
calendar = spark.read.csv("/data/sbb/timetables/csv/calendar/2019/05/14/calendar.txt", header=True, sep = ",")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
calendar.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|  TA+b0nx9|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b03bf|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0008|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxg|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b08k4|     1|      0|        0|       0|     0|       0|     0|  20181209|20191214|
|  TA+b06hs|     0|      0|        0|       0|     1|       0|     0|  20181209|20191214|
|  TA+b09de|     0|      0|        0|       0|     1|       0|     0|  20181209|20191214|
|  TA+b0nxn|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b05q

In [20]:
calendar_business_days = calendar.filter((calendar.monday==1) & \
                                         (calendar.tuesday==1) & \
                                         (calendar.wednesday==1) & \
                                         (calendar.thursday==1) & \
                                         (calendar.friday==1))
calendar_business_days.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|service_id|monday|tuesday|wednesday|thursday|friday|saturday|sunday|start_date|end_date|
+----------+------+-------+---------+--------+------+--------+------+----------+--------+
|  TA+b0nx9|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b03bf|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0008|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxg|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxn|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxd|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxh|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nxi|     1|      1|        1|       1|     1|       0|     0|  20181209|20191214|
|  TA+b0nx

In [21]:
trips = spark.read.csv("/data/sbb/timetables/csv/trips/2019/05/14/trips.txt", header=True, sep = ",")
trips.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+--------------------+------------------+---------------+------------+
|   route_id|service_id|             trip_id|     trip_headsign|trip_short_name|direction_id|
+-----------+----------+--------------------+------------------+---------------+------------+
|1-1-C-j19-1|  TA+b0001|5.TA.1-1-C-j19-1.3.R|Zofingen, Altachen|            108|           1|
|1-1-C-j19-1|  TA+b0001|7.TA.1-1-C-j19-1.3.R|Zofingen, Altachen|            112|           1|
|1-1-C-j19-1|  TA+b0001|9.TA.1-1-C-j19-1.3.R|Zofingen, Altachen|            116|           1|
|1-1-C-j19-1|  TA+b0001|11.TA.1-1-C-j19-1...|Zofingen, Altachen|            120|           1|
|1-1-C-j19-1|  TA+b0001|13.TA.1-1-C-j19-1...|Zofingen, Altachen|            124|           1|
|1-1-C-j19-1|  TA+b0001|15.TA.1-1-C-j19-1...|Zofingen, Altachen|            128|           1|
|1-1-C-j19-1|  TA+b0001|17.TA.1-1-C-j19-1...|Zofingen, Altachen|            132|           1|
|1-1-C-j19-1|  TA+b0001|18.TA.1-1-C-j19-1...|Zofingen, Altac

In [22]:
trips.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1017413

Is there any useful information contained in `start_date` and `end_date` ?

In [23]:
calendar_business_days.select(calendar_business_days.start_date, calendar_business_days.end_date).dropDuplicates().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------+
|start_date|end_date|
+----------+--------+
|  20181209|20191214|
+----------+--------+

`start_date` and `end_date` will not provide us with useful information as their values are the same for all services.

In [24]:
calendar_business_days_for_join = calendar_business_days.select(calendar_business_days.service_id)                                                                
calendar_business_days_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|service_id|
+----------+
|  TA+b0nx9|
|  TA+b03bf|
|  TA+b0008|
|  TA+b0nxg|
|  TA+b0nxn|
|  TA+b0nxd|
|  TA+b0nxh|
|  TA+b0nxi|
|  TA+b0nxl|
|  TA+b0f63|
|  TA+b0f6a|
|  TA+b0ap6|
|  TA+b03c1|
|  TA+b0nke|
|  TA+b09su|
|  TA+b00bo|
|  TA+b0nxc|
|  TA+b0nxq|
|  TA+b0nuo|
|  TA+b0nxv|
+----------+
only showing top 20 rows

In [25]:
trips_business_week = trips.join(calendar_business_days_for_join, how="inner", on = "service_id").dropDuplicates()
trips_business_week.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+--------------------+--------------------+---------------+------------+
|service_id|    route_id|             trip_id|       trip_headsign|trip_short_name|direction_id|
+----------+------------+--------------------+--------------------+---------------+------------+
|  TA+b0001| 1-1-C-j19-1|46.TA.1-1-C-j19-1...|Aarburg-Oftringen...|            113|           0|
|  TA+b0001| 1-1-C-j19-1|59.TA.1-1-C-j19-1...|Aarburg-Oftringen...|            139|           0|
|  TA+b0001| 1-340-j19-1|2.TA.1-340-j19-1.1.H|  Wohlen AG, Bahnhof|            105|           0|
|  TA+b0001| 1-354-j19-1|36.TA.1-354-j19-1...|Kaiserstuhl AG, B...|          35435|           0|
|  TA+b0001| 1-354-j19-1|47.TA.1-354-j19-1...|Kaiserstuhl AG, B...|          35467|           0|
|  TA+b0001| 1-393-j19-1|70.TA.1-393-j19-1...|Othmarsingen, Bah...|          14060|           0|
|  TA+b0001| 1-508-j19-1|87.TA.1-508-j19-1...|Aarburg-Oftringen...|           8178|           1|
|  TA+b0001| 2-230-j19-1|28.TA

In [26]:
trips_business_week.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

528368

In [27]:
trips_business_week_for_join = trips_business_week.drop('service_id')
trips_business_week_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+--------------------+--------------------+---------------+------------+
|    route_id|             trip_id|       trip_headsign|trip_short_name|direction_id|
+------------+--------------------+--------------------+---------------+------------+
| 1-1-C-j19-1|46.TA.1-1-C-j19-1...|Aarburg-Oftringen...|            113|           0|
| 1-1-C-j19-1|59.TA.1-1-C-j19-1...|Aarburg-Oftringen...|            139|           0|
| 1-340-j19-1|2.TA.1-340-j19-1.1.H|  Wohlen AG, Bahnhof|            105|           0|
| 1-354-j19-1|36.TA.1-354-j19-1...|Kaiserstuhl AG, B...|          35435|           0|
| 1-354-j19-1|47.TA.1-354-j19-1...|Kaiserstuhl AG, B...|          35467|           0|
| 1-393-j19-1|70.TA.1-393-j19-1...|Othmarsingen, Bah...|          14060|           0|
| 1-508-j19-1|87.TA.1-508-j19-1...|Aarburg-Oftringen...|           8178|           1|
| 2-230-j19-1|28.TA.2-230-j19-1...|     Trogen, Bahnhof|          23023|           0|
| 3-193-j19-1|221.TA.3-193-j19-...|  Appenzell, Bahnho

In [28]:
stop_times_15km_business_week = stop_times_15km.join(trips_business_week_for_join, how="inner", on = "trip_id").dropDuplicates()
stop_times_15km_business_week.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+------------+--------------------+---------------+------------+
|             trip_id|    stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|    route_id|       trip_headsign|trip_short_name|direction_id|
+--------------------+-----------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+------------+--------------------+---------------+------------+
|1005.TA.26-131-j1...|    8589111|    16:15:00|      16:15:00|            2|          0|            0|        8589111|Horgen, Gumelenst...| 47.260856991692|8.59230484542371|26-131-j19-1|      Horgen, Aamüli|            636|           0|
|1005.TA.26-131-j1...|    8588984|    16:21:00|     

In [29]:
stop_times_15km_business_week.write.csv('data/lgpt_guys/stop_times_15km_business_week.csv', header=True, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 4) Keeping only departure times between a certain time of the day (7am, 8pm)

In [30]:
stop_times_15km_business_week = spark.read.csv('data/lgpt_guys/stop_times_15km_business_week.csv', header=True)
stop_times_15km_business_week.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+
|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|     route_id|       trip_headsign|trip_short_name|direction_id|
+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+
|1.TA.26-925-j19-1...|8576080|    15:27:00|      15:27:00|           23|          0|            0|        8576080|     Meilen, Bahnhof|47.2694401970586|8.64488323901054| 26-925-j19-1|     Meilen, Bahnhof|            280|           0|
|1.TA.26-925-j19-1...|8576082|    15:22:00|      15:22:00|      

In [31]:
stop_times_15km_business_week.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

398630

In [32]:
stop_times_15km_business_week = stop_times_15km_business_week.withColumn("departure_hour", stop_times_15km_business_week.departure_time.substr(0, 2).cast('int'))
stop_times_15km_business_week.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|     route_id|       trip_headsign|trip_short_name|direction_id|departure_hour|
+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|1.TA.26-925-j19-1...|8576080|    15:27:00|      15:27:00|           23|          0|            0|        8576080|     Meilen, Bahnhof|47.2694401970586|8.64488323901054| 26-925-j19-1|     Meilen, Bahnhof|            280|           0|            15|
|1.T

In [33]:
departure_earliest = 7
departure_latest = 19
stop_times_15km_business_week_standard_hours = stop_times_15km_business_week.filter((stop_times_15km_business_week.departure_hour>=departure_earliest) & \
                                                                                   (stop_times_15km_business_week.departure_hour<= departure_latest))
stop_times_15km_business_week_standard_hours.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|     route_id|       trip_headsign|trip_short_name|direction_id|departure_hour|
+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|1.TA.26-925-j19-1...|8576080|    15:27:00|      15:27:00|           23|          0|            0|        8576080|     Meilen, Bahnhof|47.2694401970586|8.64488323901054| 26-925-j19-1|     Meilen, Bahnhof|            280|           0|            15|
|1.T

In [34]:
# intermediate saving point
stop_times_15km_business_week_standard_hours.write.csv('data/lgpt_guys/stop_times_15km_business_week_standard_hours.csv', header = True, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 5) Order stop_times as to reconstruct routes for RAPTOR

## Building a list of unique trips according to 1) the stop sequence and 2) the departure time sequence

In [35]:
# we start fresh from here, where stop_times is in fact stop_times_15km_business_week_standard_hours loaded from the server
stop_times = spark.read.csv('data/lgpt_guys/stop_times_15km_business_week_standard_hours.csv', header = True)
stop_times.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

304085

In [36]:
stop_times.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|     route_id|       trip_headsign|trip_short_name|direction_id|departure_hour|
+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-------------+--------------------+---------------+------------+--------------+
|1.TA.26-925-j19-1...|8576080|    15:27:00|      15:27:00|           23|          0|            0|        8576080|     Meilen, Bahnhof|47.2694401970586|8.64488323901054| 26-925-j19-1|     Meilen, Bahnhof|            280|           0|            15|
|1.T

In [37]:
stop_times = stop_times.sort(stop_times.trip_id, stop_times.stop_sequence.cast('int'))
stop_times.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-----------+--------------------+---------------+------------+--------------+
|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|pickup_type|drop_off_type|stop_id_general|           stop_name|        stop_lat|        stop_lon|   route_id|       trip_headsign|trip_short_name|direction_id|departure_hour|
+--------------------+-------+------------+--------------+-------------+-----------+-------------+---------------+--------------------+----------------+----------------+-----------+--------------------+---------------+------------+--------------+
|1.TA.1-231-j19-1.1.H|8572747|    09:37:00|      09:37:00|            1|          0|            0|        8572747|Bremgarten AG, Ba...|47.3516902622456|8.34617544069354|1-231-j19-1|         Jonen, Post|          23127|           0|             9|
|1.TA.1-231-

In [38]:
from pyspark.sql.window import Window
w= (
    Window.partitionBy("trip_id")
    .orderBy(stop_times.stop_sequence.cast('int'))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

This step is a bit technical. We aim at identifying trips that are identical, although they may bear a different `trip_id`. Indeed, we used data from services running every day of a standard business week. But we do not take days of the week into account, only departure and arrival **hours**. Therefore, we must find a way to identify and merge identical trips in terms of stops served and arrival and departure times. 

To do so, we use window functions on each trip to build a stop sequence and a list of departure times. When departure times are identical, arrival times are considered identical.

In [39]:
# code from https://stackoverflow.com/questions/56763946/concat-multiple-string-rows-for-each-unique-id-by-a-particular-order
from pyspark.sql import functions as F
stop_times.withColumn("all_stops",F.collect_list("stop_id_general").over(w))\
.withColumn("all_departures",F.collect_list("departure_time").over(w))\
.select(F.col('trip_id'), F.col('stop_id_general'), F.col('departure_time'), 
        F.col('stop_sequence'), F.col('departure_hour'), F.col('all_stops'))\
.show(100, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------+---------------+--------------+-------------+--------------+------------------------------------------------------------------------------------------------------------------------------+
|trip_id                  |stop_id_general|departure_time|stop_sequence|departure_hour|all_stops                                                                                                                     |
+-------------------------+---------------+--------------+-------------+--------------+------------------------------------------------------------------------------------------------------------------------------+
|1005.TA.26-131-j19-1.9.H |8503855        |16:14:00      |1            |16            |[8503855]                                                                                                                     |
|1005.TA.26-131-j19-1.9.H |8589111        |16:15:00      |2            |16            |[8503855, 8589111]                                   

We successfully built incremental lists of departure times and stop_id sequences. We now need to select for the longest list for each `trip_id`. The `groupBy` line below does exactly that.

In [40]:
trips_with_duplicates= stop_times.withColumn("all_stops",F.collect_list("stop_id_general").over(w))\
.withColumn("all_departures",F.collect_list("departure_time").over(w))\
.withColumn("all_arrivals",F.collect_list("arrival_time").over(w))\
.groupBy("trip_id")\
.agg(F.max("all_stops").alias("all_stops"), F.max("all_departures").alias("all_departures"), F.max("all_arrivals").alias("all_arrivals"))\

trips_with_duplicates.show(10, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------+------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------+
|trip_id                  |all_stops                                                                                                                     |all_departures                                                                                                                              |all_arrivals                                                                                                                                |
+-------------------------+---------------------------------------------------------------------------------------------------

In [41]:
trips_with_duplicates.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

25127

Are there many trips with a single stop ?

In [42]:
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())

trips_with_duplicates.withColumn("stop_count", slen(trips_with_duplicates.all_stops)).filter(F.col('stop_count')==1).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+--------------+------------+----------+
|             trip_id|all_stops|all_departures|all_arrivals|stop_count|
+--------------------+---------+--------------+------------+----------+
|366.TA.11-3-j19-1...|[8503000]|    [15:23:00]|  [15:23:00]|         1|
|457.TA.26-24-j19-...|[8502208]|    [16:45:00]|  [16:45:00]|         1|
|99.TA.1-321-j19-1...|[8502750]|    [16:07:00]|  [16:07:00]|         1|
|31.TA.80-158-Y-j1...|[8503000]|    [14:38:00]|  [14:38:00]|         1|
|423.TA.1-36-j19-1...|[8503000]|    [09:36:00]|  [09:36:00]|         1|
|808.TA.26-24-j19-...|[8502208]|    [17:15:00]|  [17:15:00]|         1|
|1.TA.20-E03-j19-1...|[8596126]|    [19:15:00]|  [19:10:00]|         1|
|103.TA.1-321-j19-...|[8502750]|    [18:07:00]|  [18:07:00]|         1|
|123.TA.1-321-j19-...|[8502750]|    [12:07:00]|  [12:07:00]|         1|
|141.TA.20-2-j19-1...|[8503000]|    [08:10:00]|  [08:10:00]|         1|
|17.TA.17-4-j19-1.5.H|[8503000]|    [12:37:00]|  [12:37:00]|    

In [43]:
trips_with_duplicates.withColumn("stop_count", slen(trips_with_duplicates.all_stops)).filter(F.col('stop_count')==1).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

714

How many trips share exactly the same departure **and** arrival times at all stops ?

In [44]:
trips_with_duplicates.dropDuplicates(['all_stops', 'all_departures', 'all_arrivals']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+
|             trip_id|           all_stops|      all_departures|        all_arrivals|
+--------------------+--------------------+--------------------+--------------------+
|382.TA.11-3-j19-1...|           [8503000]|          [15:53:00]|          [15:53:00]|
|87.TA.6-8-j19-1.64.R|  [8503016, 8503000]|[08:46:00, 09:02:00]|[08:44:00, 08:55:00]|
|32.TA.79-10-B-j19...|[8503054, 8503053...|[17:57:00, 17:59:...|[17:57:00, 17:59:...|
|448.TA.26-LAF-j19...|  [8503082, 8503081]|[19:20:00, 19:25:00]|[19:20:00, 19:25:00]|
|294.TA.26-5-A-j19...|[8503125, 8503003...|[17:21:00, 17:32:...|[17:20:00, 17:32:...|
|996.TA.26-12-j19-...|[8503147, 8503003...|[14:34:00, 14:39:...|[14:33:00, 14:38:...|
|10.TA.30-170-Y-j1...|[8503202, 8502209...|[10:30:00, 10:35:...|[10:30:00, 10:35:...|
|43.TA.26-2-j19-1....|[8503204, 8503202...|[09:24:00, 09:29:...|[09:23:00, 09:28:...|
|580.TA.26-8-A-j19...|[8503204, 8503203...|[18:00:00, 

In [45]:
trips_with_duplicates.dropDuplicates(['all_stops', 'all_departures', 'all_arrivals']).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20086

In [46]:
trips_with_duplicates.dropDuplicates(['all_stops', 'all_departures']).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

20041

There seem to be a fraction of trips (less 0.2%) that share the exact same departure times at all stops, but not the same arrival times at all stops. To be on the safe side, we define identical trips based on the sequence of stops and the sequence of departure times.

All in all, we remove ~5000 duplicated trips from all trips.

In [47]:
trips_unique = trips_with_duplicates.dropDuplicates(['all_stops', 'all_departures'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lastly, we remove trips that only serve a single stop (most likely due to the pruning of stops ouside the 15km radius of Zürich HB)

In [48]:
# removing trips with a single stop only:
trips_unique = trips_unique.withColumn("stop_count", slen(trips_with_duplicates.all_stops)).filter(F.col('stop_count')>1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
trips_unique.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+
|             trip_id|           all_stops|      all_departures|        all_arrivals|stop_count|
+--------------------+--------------------+--------------------+--------------------+----------+
|168.TA.1-17-A-j19...|[8502273, 8517377...|[17:51:00, 17:52:...|[17:51:00, 17:52:...|         7|
|15.TA.80-53-Y-j19...|  [8503000, 8503202]|[09:12:00, 09:21:00]|[09:12:00, 09:21:00]|         2|
|80.TA.16-5-j19-1....|[8503016, 8503006...|[19:09:00, 19:15:...|[19:07:00, 19:13:...|         3|
|136.TA.26-10-B-j1...|[8503057, 8503056...|[12:54:00, 12:59:...|[12:54:00, 12:59:...|         8|
|73.TA.26-4-B-j19-...|[8503088, 8503090...|[11:38:00, 11:39:...|[11:38:00, 11:39:...|        12|
|55.TA.26-7-A-j19-...|[8503104, 8503003...|[14:03:00, 14:15:...|[14:02:00, 14:14:...|        11|
|551.TA.26-11-j19-...|[8503147, 8503003...|[18:20:00, 18:25:...|[18:20:00, 18:25:...|         8|
|216.TA.26-24-j19-...|[8503204

In [50]:
trips_unique.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

19614

# 6) building routes

- 6) building routes based on unique trips
    - order unique_trips by stop_sequence, earliest departure time
    - each window with the same stop_sequence gets a unique routeID
    
- 7) generate a RAPTOR compatible stop_times
    - filter with unique_trips
    - sort by routeID, earliest departure time
    
We start by getting the first departure time for each unique trip, to be able to order them by route and first departure time for RAPTOR's `stopTimes` data structure.

In [51]:
# code from https://stackoverflow.com/questions/52975567/get-first-n-elements-from-dataframe-arraytype-column-in-pyspark

trips_unique = trips_unique.withColumn('departure_first_stop', F.col("all_departures")[0])
trips_unique.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|             trip_id|           all_stops|      all_departures|        all_arrivals|stop_count|departure_first_stop|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|168.TA.1-17-A-j19...|[8502273, 8517377...|[17:51:00, 17:52:...|[17:51:00, 17:52:...|         7|            17:51:00|
|15.TA.80-53-Y-j19...|  [8503000, 8503202]|[09:12:00, 09:21:00]|[09:12:00, 09:21:00]|         2|            09:12:00|
|80.TA.16-5-j19-1....|[8503016, 8503006...|[19:09:00, 19:15:...|[19:07:00, 19:13:...|         3|            19:09:00|
|136.TA.26-10-B-j1...|[8503057, 8503056...|[12:54:00, 12:59:...|[12:54:00, 12:59:...|         8|            12:54:00|
|73.TA.26-4-B-j19-...|[8503088, 8503090...|[11:38:00, 11:39:...|[11:38:00, 11:39:...|        12|            11:38:00|
|55.TA.26-7-A-j19-...|[8503104, 8503003...|[14:03:00, 14

In [52]:
#ordering by stop_sequence (arbitrary order) and departure at the first stop (ascending)
trips_unique = trips_unique.sort(trips_unique.all_stops, trips_unique.departure_first_stop)
trips_unique.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|             trip_id|           all_stops|      all_departures|        all_arrivals|stop_count|departure_first_stop|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+
|203.TA.1-17-A-j19...|[8502187, 8502277...|[07:01:00, 07:02:...|[07:01:00, 07:02:...|         7|            07:01:00|
|4.TA.30-57-Y-j19-...|[8502208, 8502209...|[07:18:00, 07:23:...|[07:18:00, 07:23:...|         3|            07:18:00|
|5.TA.30-57-Y-j19-...|[8502208, 8502209...|[07:48:00, 07:53:...|[07:48:00, 07:53:...|         3|            07:48:00|
|6.TA.30-57-Y-j19-...|[8502208, 8502209...|[08:18:00, 08:23:...|[08:18:00, 08:23:...|         3|            08:18:00|
|7.TA.30-57-Y-j19-...|[8502208, 8502209...|[08:48:00, 08:53:...|[08:48:00, 08:53:...|         3|            08:48:00|
|8.TA.30-57-Y-j19-...|[8502208, 8502209...|[09:18:00, 09

In RAPTOR, routes are defined as collections of unique trips serving the same stop sequences at different times. Therefore, there is one route per sequence of stops, i.e unique entry in column `all_stops`. However, there is no specific rule to order routes depending on the stops they serve. We simply subset unique sequences of stops and index them from 0 to n-1 routes.

In [53]:
routes = trips_unique.select(trips_unique.all_stops).distinct()
routes.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|           all_stops|
+--------------------+
|[8591049, 8591128...|
|[8591057, 8591402...|
|  [8591281, 8591046]|
|[8591825, 8590504...|
|  [8573205, 8588553]|
|[8576240, 8591353...|
|[8591061, 8591270...|
|[8575921, 8575920...|
|[8591035, 8591134...|
|[8595129, 8590543...|
|[8576127, 8576139...|
|[8503010, 8503011...|
|  [8575927, 8594339]|
|[8591031, 8588553...|
|[8503674, 8503659...|
|[8576171, 8576172...|
|[8576276, 8576277...|
|[8590805, 8590794...|
|[8591110, 8591306...|
|[8502208, 8502209...|
+--------------------+
only showing top 20 rows

In [54]:
#building an index from 0 to n_routes
# code from https://stackoverflow.com/questions/39057766/spark-equivelant-of-zipwithindex-in-dataframe
from pyspark.sql.types import StructType, StructField, LongType
def dfZipWithIndex (df, offset=0, colName="rowId"):
    '''
        Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
        and preserves a schema

        :param df: source dataframe
        :param offset: adjustment to zipWithIndex()'s index
        :param colName: name of the index column
    '''

    new_schema = StructType(
                    [StructField(colName,LongType(),True)]        # new added field in front
                    + df.schema.fields                            # previous schema
                )

    zipped_rdd = df.rdd.zipWithIndex()

    new_rdd = zipped_rdd.map(lambda args: ([args[1] + offset] + list(args[0])))

    return spark.createDataFrame(new_rdd, new_schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
routes_indexed = dfZipWithIndex(routes, 0, 'route_int')
routes_indexed.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+
|route_int|           all_stops|
+---------+--------------------+
|        0|[8576240, 8591353...|
|        1|[8591049, 8591128...|
|        2|[8591057, 8591402...|
|        3|  [8591281, 8591046]|
|        4|[8591825, 8590504...|
|        5|  [8573205, 8588553]|
|        6|[8591061, 8591270...|
|        7|[8575921, 8575920...|
|        8|[8595129, 8590543...|
|        9|[8591035, 8591134...|
|       10|[8503010, 8503011...|
|       11|[8591031, 8588553...|
|       12|  [8575927, 8594339]|
|       13|[8576127, 8576139...|
|       14|[8502208, 8502209...|
|       15|[8503674, 8503659...|
|       16|[8576171, 8576172...|
|       17|[8590805, 8590794...|
|       18|[8591110, 8591306...|
|       19|[8576276, 8576277...|
+---------+--------------------+
only showing top 20 rows

In [56]:
trips_unique = trips_unique.join(routes_indexed, how='inner', on='all_stops').dropDuplicates()
trips_unique.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|           all_stops|             trip_id|      all_departures|        all_arrivals|stop_count|departure_first_stop|route_int|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|[8591825, 8590504...|117.TA.26-703-j19...|[18:04:00, 18:05:...|[18:04:00, 18:05:...|         9|            18:04:00|        4|
|[8591355, 8591354...|1232.TA.26-75-A-j...|[17:06:00, 17:07:...|[17:06:00, 17:07:...|        21|            17:06:00|       78|
|[8591355, 8591354...|1135.TA.26-75-A-j...|[12:59:00, 13:00:...|[12:59:00, 13:00:...|        21|            12:59:00|       78|
|[8591401, 8503610...|1269.TA.26-80-j19...|[19:22:00, 19:23:...|[19:22:00, 19:23:...|        29|            19:22:00|      108|
|[8580449, 8591063...|1892.TA.26-781-j1...|[15:17:00, 15:18:...|[15:17:00, 15:18:...|        12|        

In [57]:
# converting arrays to strings to be able to store the data as csv
trips_unique_string_lists = trips_unique.withColumn("all_stops", F.concat_ws(" ", "all_stops"))\
.withColumn("all_departures", F.concat_ws(" ", "all_departures"))\
.withColumn("all_arrivals", F.concat_ws(" ", "all_arrivals"))
trips_unique_string_lists.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|           all_stops|             trip_id|      all_departures|        all_arrivals|stop_count|departure_first_stop|route_int|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|8591825 8590504 8...|117.TA.26-703-j19...|18:04:00 18:05:00...|18:04:00 18:05:00...|         9|            18:04:00|        4|
|8591355 8591354 8...|1232.TA.26-75-A-j...|17:06:00 17:07:00...|17:06:00 17:07:00...|        21|            17:06:00|       78|
|8591355 8591354 8...|1135.TA.26-75-A-j...|12:59:00 13:00:00...|12:59:00 13:00:00...|        21|            12:59:00|       78|
|8591276 8591101 8...|570.TA.26-69-j19-...|15:21:00 15:23:00...|15:21:00 15:23:00...|         9|            15:21:00|      104|
|8573504 8581548 8...|247.TA.26-813-j19...|19:15:00 19:15:00...|19:15:00 19:15:00...|         8|        

In [58]:
trips_unique_string_lists.write.csv('data/lgpt_guys/trips_unique_string_lists.csv', header = True, mode="overwrite")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
# we prepare an inner join on trips from trips_unique with stopTimes.

trips_unique_string_lists = spark.read.csv('data/lgpt_guys/trips_unique_string_lists.csv', header = True)
trips_unique_string_lists.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|           all_stops|             trip_id|      all_departures|        all_arrivals|stop_count|departure_first_stop|route_int|
+--------------------+--------------------+--------------------+--------------------+----------+--------------------+---------+
|8590464 8590463 8...|603.TA.26-185-j19...|17:51:00 17:53:00...|17:51:00 17:53:00...|        14|            17:51:00|       21|
|8503305 8503306 8...|641.TA.26-8-A-j19...|09:19:00 09:23:00...|09:18:00 09:23:00...|        13|            09:19:00|       43|
|8503057 8503056 8...|139.TA.26-10-B-j1...|13:54:00 13:59:00...|13:54:00 13:59:00...|         8|            13:54:00|       83|
|8591401 8503610 8...|1310.TA.26-80-j19...|17:51:00 17:52:00...|17:51:00 17:52:00...|        29|            17:51:00|      108|
|8591401 8503610 8...|1341.TA.26-80-j19...|16:51:00 16:52:00...|16:51:00 16:52:00...|        29|        

In [60]:
trips_unique_for_join = trips_unique_string_lists.select(trips_unique_string_lists.trip_id, \
                                                         trips_unique_string_lists.departure_first_stop, \
                                                        trips_unique_string_lists.route_int, \
                                                        trips_unique_string_lists.stop_count)
trips_unique_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+---------+----------+
|             trip_id|departure_first_stop|route_int|stop_count|
+--------------------+--------------------+---------+----------+
|603.TA.26-185-j19...|            17:51:00|       21|        14|
|641.TA.26-8-A-j19...|            09:19:00|       43|        13|
|139.TA.26-10-B-j1...|            13:54:00|       83|         8|
|1310.TA.26-80-j19...|            17:51:00|      108|        29|
|1341.TA.26-80-j19...|            16:51:00|      108|        29|
|19.TA.26-4-B-j19-...|            11:10:00|      129|        12|
|280.TA.26-303-j19...|            07:51:00|      144|        23|
|126.TA.26-751-j19...|            15:59:00|      146|        10|
|421.TA.26-61-j19-...|            08:42:00|      148|        19|
|1514.TA.26-17-j19...|            14:11:00|      221|        17|
|1546.TA.26-17-j19...|            12:33:00|      221|        17|
|195.TA.26-733-j19...|            14:45:00|      238|        10|
|247.TA.26-733-j19...|   

In [61]:
stop_times = stop_times.join(trips_unique_for_join, how='inner', on='trip_id')
stop_times.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

260459

Note that this csv is does not carry an index allowing to quickly sort it after loading.

In [62]:
stop_times.write.csv('data/lgpt_guys/stop_times_with_route_int.csv', header=True, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 7 Generating an index from 0 to n_stops-1 for stops:

In RAPTOR, stops are indexed (in an arbitrary order) from 0 to the number of stops minus one. We generate this index below.

In [63]:
stop_times = spark.read.csv('data/lgpt_guys/stop_times_with_route_int.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

How many unique routes do we find ?

In [64]:
stop_times.select(stop_times.route_int).dropDuplicates().count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1461

In [65]:
stops_general_indexed = dfZipWithIndex(stop_times.select(stop_times.stop_id_general).dropDuplicates(),
                                      0,
                                      'stop_int')
stops_general_indexed.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------------+
|stop_int|stop_id_general|
+--------+---------------+
|       0|        8503376|
|       1|        8502508|
|       2|        8503088|
|       3|        8589111|
|       4|        8591284|
|       5|        8591190|
|       6|        8503078|
|       7|        8587967|
|       8|        8590819|
|       9|        8591362|
|      10|        8591149|
|      11|        8591315|
|      12|        8588312|
|      13|        8590541|
|      14|        8590804|
|      15|        8591085|
|      16|        8590273|
|      17|        8591271|
|      18|        8591165|
|      19|        8591080|
+--------+---------------+
only showing top 20 rows

In [66]:
stops_general_indexed.write.csv('data/lgpt_guys/stops_general_indexed.csv', header=True, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, we add this index to `stop_times` and drop columns we won't be using anymore:
- `pickup_type`
- `drop_off_type`
- `departure_hour`

Note that spark does not maintain order after joins, therefore we will need to reorder stop_times after all the processing is done.

In [67]:
stops_general_indexed = spark.read.csv('data/lgpt_guys/stops_general_indexed.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [68]:
stop_times = stop_times.join(stops_general_indexed, how='inner', on='stop_id_general')\
.drop('pickup_type', 'drop_off_type', 'departure_hour')
stop_times.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------------------+-------+------------+--------------+-------------+--------------------+----------------+----------------+------------+--------------------+---------------+------------+--------------------+---------+----------+--------+
|stop_id_general|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|           stop_name|        stop_lat|        stop_lon|    route_id|       trip_headsign|trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|
+---------------+--------------------+-------+------------+--------------+-------------+--------------------+----------------+----------------+------------+--------------------+---------------+------------+--------------------+---------+----------+--------+
|        8590679|610.TA.26-185-j19...|8590679|    10:56:00|      10:56:00|            5|Kilchberg ZH, Spital|47.3217079365566|8.53537860586113|26-185-j19-1|Zürich Wollishofe...|           9270|           1|            10:51:00

# 8 Adding transport types to stop_times from routes.txt

In [69]:
routes = spark.read.csv("/data/sbb/timetables/csv/routes/2019/05/14/routes.txt", header=True, sep = ",")
routes.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------+----------------+---------------+----------+----------+
|   route_id|agency_id|route_short_name|route_long_name|route_desc|route_type|
+-----------+---------+----------------+---------------+----------+----------+
|11-40-j19-1|      801|             040|           null|       Bus|       700|
|11-61-j19-1|     7031|             061|           null|       Bus|       700|
|11-62-j19-1|     7031|             062|           null|       Bus|       700|
|24-64-j19-1|      801|             064|           null|       Bus|       700|
|11-83-j19-1|      801|             083|           null|       Bus|       700|
|1-1-B-j19-1|       11|               1|           null|    S-Bahn|       400|
|1-1-A-j19-1|       11|               1|           null|    S-Bahn|       400|
|1-1-C-j19-1|      723|               1|           null|       Bus|       700|
|1-1-D-j19-1|      840|               1|           null|       Bus|       700|
|1-1-E-j19-1|      886|               1|           n

In [70]:
routes_for_join = routes.select(routes.route_id, routes.route_desc)
routes_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+----------+
|   route_id|route_desc|
+-----------+----------+
|11-40-j19-1|       Bus|
|11-61-j19-1|       Bus|
|11-62-j19-1|       Bus|
|24-64-j19-1|       Bus|
|11-83-j19-1|       Bus|
|1-1-B-j19-1|    S-Bahn|
|1-1-A-j19-1|    S-Bahn|
|1-1-C-j19-1|       Bus|
|1-1-D-j19-1|       Bus|
|1-1-E-j19-1|       Bus|
|  1-1-j19-1| Intercity|
|  4-1-j19-1|    S-Bahn|
|  5-1-j19-1|      Tram|
|6-1-A-j19-1|       Bus|
|6-1-B-j19-1|    S-Bahn|
|6-1-C-j19-1|       Bus|
|6-1-D-j19-1|       Bus|
|6-1-E-j19-1|       Bus|
|  6-1-j19-1| Intercity|
|8-1-A-j19-1|       Bus|
+-----------+----------+
only showing top 20 rows

In [71]:
stop_times = stop_times.join(routes_for_join, how='inner', on='route_id')
stop_times.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+---------------+--------------------+-------+------------+--------------+-------------+--------------------+----------------+----------------+--------------------+---------------+------------+--------------------+---------+----------+--------+----------+
|    route_id|stop_id_general|             trip_id|stop_id|arrival_time|departure_time|stop_sequence|           stop_name|        stop_lat|        stop_lon|       trip_headsign|trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|route_desc|
+------------+---------------+--------------------+-------+------------+--------------+-------------+--------------------+----------------+----------------+--------------------+---------------+------------+--------------------+---------+----------+--------+----------+
|26-185-j19-1|        8590679|610.TA.26-185-j19...|8590679|    10:56:00|      10:56:00|            5|Kilchberg ZH, Spital|47.3217079365566|8.53537860586113|Zürich Wollishofe...|           9270|

## 9 VERY IMPORTANT: final sort before writing to csv

In [72]:
stop_times = stop_times.sort(stop_times.route_int.cast('int'), 
      stop_times.departure_first_stop, 
      stop_times.trip_id, 
      stop_times.stop_sequence.cast('int'))\
.withColumn('monotonically_increasing_id', F.monotonically_increasing_id())

stop_times.show(100, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+------------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+---------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|route_id     |stop_id_general|trip_id                 |stop_id|arrival_time|departure_time|stop_sequence|stop_name                     |stop_lat        |stop_lon        |trip_headsign              |trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|route_desc|monotonically_increasing_id|
+-------------+---------------+------------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+---------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|26-13-j19-1  |8576240        |2064.TA.26-1

In [73]:
stop_times.write.csv('data/lgpt_guys/stop_times_final_cyril.csv', header=True, mode = 'overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 10 Footpaths

We make the simplifying assumption that within stops sharing the same 7 first characters (`stop_id_general`) (almost the same as grouping them by parent stations, but more robust), transfer times take 2 minutes, no matter the stop. This may break for very large stations (such as Zürich HB).

Between different `stop_id_general`, walking time is computed as the distance (which must be max. 500m) divided by a constant walking speed of 50 meters per minute.

We only consider stops present in the final and filtered `stop_times` table.

- get all unique `stop_int` with a single pair of coordinates
- cross-join two copies of this dataframe. The result is all permutations of `stop_int` pairs
- drop pairs where `general_stop_id` is the same
- compute the bird's fly distance with geopy for all other pairs of `stop_int` using GPS coordinates
- filter out pairs of stops with a distance greater than 500 meters
- transform distance to walking time in seconds
- order by `stop_int1`, then `stop_int2` to mimic the data structure Transfers in RAPTOR
- create a monotonically increasing ID to easily sort on this table after reading from a distributed file system

In [74]:
stop_times = spark.read.csv('data/lgpt_guys/stop_times_final_cyril.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Getting one pair of coordinates per parent stop:

In [75]:
stop_times.select(stop_times.stop_id_general, stop_times.stop_lat, stop_times.stop_lon, stop_times.stop_name).dropDuplicates()\
.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+----------------+----------------+--------------------+
|stop_id_general|        stop_lat|        stop_lon|           stop_name|
+---------------+----------------+----------------+--------------------+
|        8576163|47.4446882894765|8.63618754705906| Bassersdorf, Rietli|
|        8588279|47.4314059613092|8.66796994181563|Tagelswangen, Ger...|
|        8588740|47.4448766240902|8.57874926778446|Kloten, Neubrunne...|
|        8595714|47.4485885009565|8.45665025434994| Regensdorf, Allmend|
|        8588054|47.3600457522121|8.71459250506792|   Uster, Meieracher|
|        8590851|47.3894589587793|8.67489595265716|Volketswil, Chappeli|
|        8591345|47.3845933768342|8.47776964668253|Zürich, Schulhaus...|
|        8591264|47.4060107375098|8.58139031472014|  Zürich, Luegisland|
|        8503202|47.2960953402584|8.56475351565593|             Thalwil|
|        8573163|47.2857066724744|8.55511459265598|Gattikon, Gattike...|
|        8590795|47.3955220616337|8.46779834702741|

In [76]:
from pyspark.sql.window import Window
w= (
    Window.partitionBy("stop_id_general")
    .orderBy('stop_name')
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
from pyspark.sql import functions as F

stop_coordinates = stop_times.select(stop_times.stop_id_general, stop_times.stop_int, stop_times.stop_lat, stop_times.stop_lon, stop_times.stop_name,
                  F.first("stop_lat").over(w).alias("stop_lat_first"),
                 F.first("stop_lon").over(w).alias("stop_lon_first"),
                 F.first("stop_name").over(w).alias("stop_name_first"))\
.select(F.col('stop_id_general'), F.col('stop_int'), F.col('stop_lat_first'), F.col("stop_lon_first"), F.col("stop_name_first"))\
.dropDuplicates()\

stop_coordinates.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|
+---------------+--------+----------------+----------------+--------------------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|
|        8503078|       1|47.3454760357765| 8.5930234976511|            Waldburg|
|        8503088|       2|47.3774949037101|8.53916949636064|       Zürich HB SZU|
|        8503376|       8|47.4353132339136| 8.7169371079598|Ottikon b. Kemptthal|
|        8587967|       7|47.2955835709855|8.60393802835468|Erlenbach ZH, Im ...|
|        8589111|       5| 47.260856991692|8.59230484542371|Horgen, Gumelenst...|
|        8590819|       6|47.2821844204798|8.57300004996529|     Thalwil, Mettli|
|        8591190|       4|47.3694098744442|8.50635403902719|     Zürich, Heuried|
|        8591284|       3|47.3909246273101|8.47396977303017|   Zürich, Neeserweg|
|        8588312

In [78]:
stop_coordinates.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1407

In [79]:
stop_coordinates_for_join = stop_coordinates.select(stop_coordinates.stop_id_general.alias('stop_id_general_2'),
                                                   stop_coordinates.stop_int.alias('stop_int_2'),
                                                   stop_coordinates.stop_lat_first.alias('stop_lat_first_2'),
                                                   stop_coordinates.stop_lon_first.alias('stop_lon_first_2'),
                                                   stop_coordinates.stop_name_first.alias('stop_name_first_2'))
stop_coordinates_for_join.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+----------+----------------+----------------+--------------------+
|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2|
+-----------------+----------+----------------+----------------+--------------------+
|          8502508|         0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|
|          8503078|         1|47.3454760357765| 8.5930234976511|            Waldburg|
|          8503088|         2|47.3774949037101|8.53916949636064|       Zürich HB SZU|
|          8503376|         8|47.4353132339136| 8.7169371079598|Ottikon b. Kemptthal|
|          8587967|         7|47.2955835709855|8.60393802835468|Erlenbach ZH, Im ...|
|          8589111|         5| 47.260856991692|8.59230484542371|Horgen, Gumelenst...|
|          8590819|         6|47.2821844204798|8.57300004996529|     Thalwil, Mettli|
|          8591190|         4|47.3694098744442|8.50635403902719|     Zürich, Heuried|
|          8591284|         3|47.3909246273101|8.47396

We perform a cross-join (every possible combination of row gets created), then drop every row where the stop_id is the same.

In [80]:
stop_coordinates_cross = stop_coordinates.crossJoin(stop_coordinates_for_join)\
.filter(F.col('stop_id_general')!=F.col('stop_id_general_2'))
stop_coordinates_cross.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2|
+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503078|         1|47.3454760357765| 8.5930234976511|            Waldburg|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503088|         2|47.3774949037101|8.53916949636064|       Zürich HB SZU|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503376|         8|47.4353132339136| 8.7169371079598|Ottikon b. Kemptthal|
|   

Finally, we compute the distance in meters, as well as the time in seconds it takes to walk this distance

In [81]:
# adding distance
stop_coordinates_cross_distance = stop_coordinates_cross.withColumn("distance", \
                                                 great_circle_udf(struct(stop_coordinates_cross.stop_lat_first, stop_coordinates_cross.stop_lon_first), \
                                                                  struct(stop_coordinates_cross.stop_lat_first_2, stop_coordinates_cross.stop_lon_first_2)))
stop_coordinates_cross_distance.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+---------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2| distance|
+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+---------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503078|         1|47.3454760357765| 8.5930234976511|            Waldburg|18.017555|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503088|         2|47.3774949037101|8.53916949636064|       Zürich HB SZU|12.902226|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503376|         8|47.4353132

In [82]:
stop_coordinates_cross_distance_time = stop_coordinates_cross_distance.withColumn('walking_time', 
                                                                                  (F.col('distance')/0.05*60).cast('int'))
stop_coordinates_cross_distance_time.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+----------+------------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2|  distance|walking_time|
+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+----------+------------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503078|         1|47.3454760357765| 8.5930234976511|            Waldburg| 18.017555|       21621|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8503088|         2|47.3774949037101|8.53916949636064|       Zürich HB SZU| 12.902226|       15482|
|        8502508|       0|47.4154457211288|8.37718

Removing pairs of stops more than 500 meters (0.5 km) away

In [83]:
stop_coordinates_cross_distance_time_filtered = stop_coordinates_cross_distance_time.filter(F.col('distance')<=0.5)
stop_coordinates_cross_distance_time_filtered.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+-----------+------------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2|   distance|walking_time|
+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+-----------+------------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8590268|       815|47.4142117310803| 8.3795209040447|   Spreitenbach, ASP| 0.22296342|         267|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8590270|      1350|47.4179500849385|8.37208285349115| Spreitenbach, Brüel|  0.4742755|         569|
|        8503078|       1|47.3454760357765| 8

Sorting by stop_int, then stop_int 2 to sort it as `transfers` in RAPTOR.

In [84]:
stop_coordinates_cross_distance_time_filtered_sorted = stop_coordinates_cross_distance_time_filtered.sort(F.col('stop_int').cast('int'), F.col('stop_int_2').cast('int'))\
.withColumn('monotonically_increasing_id', F.monotonically_increasing_id())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
stop_coordinates_cross_distance_time_filtered_sorted.write.csv('data/lgpt_guys/transfers_cyril.csv', header=True, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
stop_coordinates_cross_distance_time_filtered_sorted = spark.read.csv('data/lgpt_guys/transfers_cyril.csv', header=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
stop_coordinates_cross_distance_time_filtered_sorted.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

6264

In [88]:
stop_coordinates_cross_distance_time_filtered_sorted.sort(F.col('monotonically_increasing_id')).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+----------+------------+---------------------------+
|stop_id_general|stop_int|  stop_lat_first|  stop_lon_first|     stop_name_first|stop_id_general_2|stop_int_2|stop_lat_first_2|stop_lon_first_2|   stop_name_first_2|  distance|walking_time|monotonically_increasing_id|
+---------------+--------+----------------+----------------+--------------------+-----------------+----------+----------------+----------------+--------------------+----------+------------+---------------------------+
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8590268|       815|47.4142117310803| 8.3795209040447|   Spreitenbach, ASP|0.22296342|         267|                          0|
|        8502508|       0|47.4154457211288|8.37718528430566|Spreitenbach, Rai...|          8590270|      1350|47.4179500849385|8

# 11 Verifying a few routes and trips on real data

In [111]:
stop_times = spark.read.csv('data/lgpt_guys/stop_times_final_cyril.csv', header=True)
stop_times.sort(stop_times.route_int.cast('int'), 
      stop_times.departure_first_stop, 
      stop_times.trip_id, 
      stop_times.stop_sequence.cast('int'))\
.select(stop_times.route_int, stop_times.arrival_time, stop_times.departure_time, stop_times.monotonically_increasing_id)\
.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------------+--------------+---------------------------+
|route_int|arrival_time|departure_time|monotonically_increasing_id|
+---------+------------+--------------+---------------------------+
|        0|    07:00:00|      07:00:00|                          0|
|        0|    07:01:00|      07:01:00|                          1|
|        0|    07:02:00|      07:02:00|                          2|
|        0|    07:03:00|      07:03:00|                          3|
|        0|    07:05:00|      07:05:00|                          4|
|        0|    07:06:00|      07:06:00|                          5|
|        0|    07:08:00|      07:08:00|                          6|
|        0|    07:09:00|      07:09:00|                          7|
|        0|    07:10:00|      07:10:00|                          8|
|        0|    07:11:00|      07:11:00|                          9|
|        0|    07:12:00|      07:12:00|                         10|
|        0|    07:14:00|      07:14:00|         

Casting spark's `monotonically_increasing_id` as `LongType` (and not `IntegerType`) is paramount to make sure the sort happens as expected. Otherwise, there are not enough bytes to represent the index as an integer.

In [122]:
from pyspark.sql.types import LongType
stop_times = spark.read.csv('data/lgpt_guys/stop_times_final_cyril.csv', header=True)
stop_times = stop_times.sort(stop_times.monotonically_increasing_id.cast(LongType()))

stop_times.show(100, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+---------------+------------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+---------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|route_id     |stop_id_general|trip_id                 |stop_id|arrival_time|departure_time|stop_sequence|stop_name                     |stop_lat        |stop_lon        |trip_headsign              |trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|route_desc|monotonically_increasing_id|
+-------------+---------------+------------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+---------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|26-13-j19-1  |8576240        |2064.TA.26-1

In [123]:
stop_times.where(stop_times.route_int==500).show(100, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+---------------+-----------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+--------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|route_id    |stop_id_general|trip_id                |stop_id|arrival_time|departure_time|stop_sequence|stop_name                     |stop_lat        |stop_lon        |trip_headsign             |trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|route_desc|monotonically_increasing_id|
+------------+---------------+-----------------------+-------+------------+--------------+-------------+------------------------------+----------------+----------------+--------------------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|26-660-j19-1|8576167        |486.TA.26-660-j19-1.9.

Validated on sbb.ch

In [126]:
stop_times.where(stop_times.route_int==800).show(100, 0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------+---------------+---------------------+-------+------------+--------------+-------------+--------------------------+----------------+----------------+-----------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|route_id   |stop_id_general|trip_id              |stop_id|arrival_time|departure_time|stop_sequence|stop_name                 |stop_lat        |stop_lon        |trip_headsign    |trip_short_name|direction_id|departure_first_stop|route_int|stop_count|stop_int|route_desc|monotonically_increasing_id|
+-----------+---------------+---------------------+-------+------------+--------------+-------------+--------------------------+----------------+----------------+-----------------+---------------+------------+--------------------+---------+----------+--------+----------+---------------------------+
|26-46-j19-1|8591328        |55.TA.26-46-j19-1.3.H|8591328|08:35:00    |08:35:00      |1            

Verified on sbb.ch. This one is interesting: the service stops in the middle of the day. That is also what is observed on sbb.ch.