# To Begin With...

### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.

<div class='alert alert-info'><b>Any application without a proper name would be promptly killed.</b></div>

In [48]:
%%configure
{"conf": {
    "spark.app.name": "lgptguys_final"
}}

A session has already been started. If you intend to recreate the session with new configurations, please include the -f argument.


### Start Spark

In [49]:
# Initialization
%%spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
unknown magic command '%spark'
UnknownMagic: unknown magic command '%spark'



In [50]:
%%send_to_spark -i username -t str -n username

An error was encountered:
Variable named username not found.


### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format

In [51]:
sbb = spark.read.orc('/data/sbb/orc/istdaten')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
sbb.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- betriebstag: string (nullable = true)
 |-- fahrt_bezeichner: string (nullable = true)
 |-- betreiber_id: string (nullable = true)
 |-- betreiber_abk: string (nullable = true)
 |-- betreiber_name: string (nullable = true)
 |-- produkt_id: string (nullable = true)
 |-- linien_id: string (nullable = true)
 |-- linien_text: string (nullable = true)
 |-- umlauf_id: string (nullable = true)
 |-- verkehrsmittel_text: string (nullable = true)
 |-- zusatzfahrt_tf: string (nullable = true)
 |-- faellt_aus_tf: string (nullable = true)
 |-- bpuic: string (nullable = true)
 |-- haltestellen_name: string (nullable = true)
 |-- ankunftszeit: string (nullable = true)
 |-- an_prognose: string (nullable = true)
 |-- an_prognose_status: string (nullable = true)
 |-- abfahrtszeit: string (nullable = true)
 |-- ab_prognose: string (nullable = true)
 |-- ab_prognose_status: string (nullable = true)
 |-- durchfahrt_tf: string (nullable = true)

Let's pick one random station name and construct its distribution

In [53]:
sbb.select("haltestellen_name").distinct().show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+
|haltestellen_name|
+-----------------+
|        Grandvaux|
|        Männedorf|
|    Forst, Breite|
|    Schönbühl SBB|
|        Studen BE|
+-----------------+
only showing top 5 rows

In [24]:
stop="Männedorf"
sbb.filter(sbb.haltestellen_name == stop).select("fahrt_bezeichner").show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|fahrt_bezeichner|
+----------------+
| 85:11:18719:002|
| 85:11:18720:001|
| 85:11:18721:001|
| 85:11:18722:001|
| 85:11:18723:001|
+----------------+
only showing top 5 rows

In [63]:
from pyspark.sql.functions import to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType


trip_id="85:11:18719:002"
stop="Männedorf"

sbb_filt = sbb.filter((sbb.fahrt_bezeichner == trip_id) & (sbb.haltestellen_name == stop) &\
                       (sbb.an_prognose != "") &  (sbb.abfahrtszeit != "") & \
                       (sbb.an_prognose_status == 'GESCHAETZT'))\
                .select("an_prognose", "ankunftszeit")\
                .withColumn('an_prognose',to_timestamp(col('an_prognose'),\
                                                          format='dd.MM.yyyy HH:mm:ss'))\
                .withColumn('ankunftszeit',to_timestamp(col('ankunftszeit'),\
                                                           format='dd.MM.yyyy HH:mm'))\
                .withColumn('DiffInSeconds',col('an_prognose').cast(LongType()) - col('ankunftszeit').cast(LongType()))\
                .withColumn('DiffInMinutes',round(col('DiffInSeconds')/60))
sbb_filt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+-------------------+-------------+-------------+
|        an_prognose|       ankunftszeit|DiffInSeconds|DiffInMinutes|
+-------------------+-------------------+-------------+-------------+
|2018-04-16 06:05:56|2018-04-16 06:04:00|          116|          2.0|
|2018-03-08 06:05:15|2018-03-08 06:04:00|           75|          1.0|
|2018-03-01 06:05:10|2018-03-01 06:04:00|           70|          1.0|
|2018-01-16 06:05:20|2018-01-16 06:04:00|           80|          1.0|
|2018-04-21 06:04:54|2018-04-21 06:04:00|           54|          1.0|
|2018-03-17 06:05:10|2018-03-17 06:04:00|           70|          1.0|
|2018-03-31 06:04:52|2018-03-31 06:04:00|           52|          1.0|
|2018-01-04 06:05:22|2018-01-04 06:04:00|           82|          1.0|
|2018-03-03 06:05:29|2018-03-03 06:04:00|           89|          1.0|
|2018-01-27 06:05:13|2018-01-27 06:04:00|           73|          1.0|
|2018-01-06 06:05:06|2018-01-06 06:04:00|           66|          1.0|
|2018-03-04 06:05:12

In [67]:
sbb_filt.groupBy('DiffInMinutes').count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+-----+
|DiffInMinutes|count|
+-------------+-----+
|          1.0|   62|
|          4.0|    1|
|          3.0|   10|
|          2.0|   39|
|          6.0|    1|
|         16.0|    1|
+-------------+-----+