{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# To Begin With...\n", "\n", "### Name your spark application as `GASPAR_final` or `GROUP_NAME_final`.\n", "\n", "
Any application without a proper name would be promptly killed.
" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "A session has already been started. If you intend to recreate the session with new configurations, please include the -f argument.\n" ] } ], "source": [ "%%configure\n", "{\"conf\": {\n", " \"spark.app.name\": \"lgptguys_final\"\n", "}}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Start Spark" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "unknown magic command '%spark'\n", "UnknownMagic: unknown magic command '%spark'\n", "\n" ] } ], "source": [ "# Initialization\n", "%%spark" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "An error was encountered:\n", "Variable named username not found.\n" ] } ], "source": [ "%%send_to_spark -i username -t str -n username" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the [SBB actual data](https://opentransportdata.swiss/en/dataset/istdaten) in ORC format" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "sbb = spark.read.orc('/data/sbb/orc/istdaten')" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- betriebstag: string (nullable = true)\n", " |-- fahrt_bezeichner: string (nullable = true)\n", " |-- betreiber_id: string (nullable = true)\n", " |-- betreiber_abk: string (nullable = true)\n", " |-- betreiber_name: string (nullable = true)\n", " |-- produkt_id: string (nullable = true)\n", " |-- linien_id: string (nullable = true)\n", " |-- linien_text: string (nullable = true)\n", " |-- umlauf_id: string (nullable = true)\n", " |-- verkehrsmittel_text: string (nullable = true)\n", " |-- zusatzfahrt_tf: string (nullable = true)\n", " |-- faellt_aus_tf: string (nullable = true)\n", " |-- bpuic: string (nullable = true)\n", " |-- haltestellen_name: string (nullable = true)\n", " |-- ankunftszeit: string (nullable = true)\n", " |-- an_prognose: string (nullable = true)\n", " |-- an_prognose_status: string (nullable = true)\n", " |-- abfahrtszeit: string (nullable = true)\n", " |-- ab_prognose: string (nullable = true)\n", " |-- ab_prognose_status: string (nullable = true)\n", " |-- durchfahrt_tf: string (nullable = true)" ] } ], "source": [ "sbb.printSchema()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's pick one random station name and construct its distribution" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-----------------+\n", "|haltestellen_name|\n", "+-----------------+\n", "| Grandvaux|\n", "| Männedorf|\n", "| Forst, Breite|\n", "| Schönbühl SBB|\n", "| Studen BE|\n", "+-----------------+\n", "only showing top 5 rows" ] } ], "source": [ "sbb.select(\"haltestellen_name\").distinct().show(5)" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+----------------+\n", "|fahrt_bezeichner|\n", "+----------------+\n", "| 85:11:18719:002|\n", "| 85:11:18720:001|\n", "| 85:11:18721:001|\n", "| 85:11:18722:001|\n", "| 85:11:18723:001|\n", "+----------------+\n", "only showing top 5 rows" ] } ], "source": [ "stop=\"Männedorf\"\n", "sbb.filter(sbb.haltestellen_name == stop).select(\"fahrt_bezeichner\").show(5)" ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+-------------------+-------------+-------------+\n", "| an_prognose| ankunftszeit|DiffInSeconds|DiffInMinutes|\n", "+-------------------+-------------------+-------------+-------------+\n", "|2018-04-16 06:05:56|2018-04-16 06:04:00| 116| 2.0|\n", "|2018-03-08 06:05:15|2018-03-08 06:04:00| 75| 1.0|\n", "|2018-03-01 06:05:10|2018-03-01 06:04:00| 70| 1.0|\n", "|2018-01-16 06:05:20|2018-01-16 06:04:00| 80| 1.0|\n", "|2018-04-21 06:04:54|2018-04-21 06:04:00| 54| 1.0|\n", "|2018-03-17 06:05:10|2018-03-17 06:04:00| 70| 1.0|\n", "|2018-03-31 06:04:52|2018-03-31 06:04:00| 52| 1.0|\n", "|2018-01-04 06:05:22|2018-01-04 06:04:00| 82| 1.0|\n", "|2018-03-03 06:05:29|2018-03-03 06:04:00| 89| 1.0|\n", "|2018-01-27 06:05:13|2018-01-27 06:04:00| 73| 1.0|\n", "|2018-01-06 06:05:06|2018-01-06 06:04:00| 66| 1.0|\n", "|2018-03-04 06:05:12|2018-03-04 06:04:00| 72| 1.0|\n", "|2018-01-02 06:05:03|2018-01-02 06:04:00| 63| 1.0|\n", "|2018-01-07 06:05:15|2018-01-07 06:04:00| 75| 1.0|\n", "|2018-01-14 06:05:16|2018-01-14 06:04:00| 76| 1.0|\n", "|2018-01-20 06:05:21|2018-01-20 06:04:00| 81| 1.0|\n", "|2018-03-25 06:06:42|2018-03-25 06:04:00| 162| 3.0|\n", "|2018-04-15 06:05:43|2018-04-15 06:04:00| 103| 2.0|\n", "|2018-04-22 06:05:12|2018-04-22 06:04:00| 72| 1.0|\n", "|2018-05-01 06:05:29|2018-05-01 06:04:00| 89| 1.0|\n", "+-------------------+-------------------+-------------+-------------+\n", "only showing top 20 rows" ] } ], "source": [ "from pyspark.sql.functions import to_timestamp, current_timestamp\n", "from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType\n", "\n", "\n", "trip_id=\"85:11:18719:002\"\n", "stop=\"Männedorf\"\n", "\n", "sbb_filt = sbb.filter((sbb.fahrt_bezeichner == trip_id) & (sbb.haltestellen_name == stop) &\\\n", " (sbb.an_prognose != \"\") & (sbb.abfahrtszeit != \"\") & \\\n", " (sbb.an_prognose_status == 'GESCHAETZT'))\\\n", " .select(\"an_prognose\", \"ankunftszeit\")\\\n", " .withColumn('an_prognose',to_timestamp(col('an_prognose'),\\\n", " format='dd.MM.yyyy HH:mm:ss'))\\\n", " .withColumn('ankunftszeit',to_timestamp(col('ankunftszeit'),\\\n", " format='dd.MM.yyyy HH:mm'))\\\n", " .withColumn('DiffInSeconds',col('an_prognose').cast(LongType()) - col('ankunftszeit').cast(LongType()))\\\n", " .withColumn('DiffInMinutes',round(col('DiffInSeconds')/60))\n", "sbb_filt.show()" ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "data": { "application/vnd.jupyter.widget-view+json": { "model_id": "", "version_major": 2, "version_minor": 0 }, "text/plain": [ "FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stdout", "output_type": "stream", "text": [ "+-------------+-----+\n", "|DiffInMinutes|count|\n", "+-------------+-----+\n", "| 1.0| 62|\n", "| 4.0| 1|\n", "| 3.0| 10|\n", "| 2.0| 39|\n", "| 6.0| 1|\n", "| 16.0| 1|\n", "+-------------+-----+" ] } ], "source": [ "sbb_filt.groupBy('DiffInMinutes').count().show()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "", "name": "pysparkkernel" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }