Con Apache Spark ho la possibilità di caricare in memoria grosse quantità di dati di diverso formato, csv, txt, json etc.
I dati vengono memorizzati in un dataset o in un dataframe e successivamente processati.
Ci sono diverse strategie per l’ingestione di uno o più file di dati in Apache Spark:
- leggere il file come un file di testo, i dati su ciascuna riga vengono memorizzati come una unica stringa, dalla quale posso estrarre le informazioni che mi interessano per inserirle nelle diverse colonne di un dataframe.
- leggere il file come un file csv, fornendo uno
schema
o lasciando che Spark intuisca da solo lo schema da applicare - definire una classe corrispondente ai dati da memorizzare, leggere il file e forzare il casting dei dati alla classe definita
Vediamo come procedere in ciascuno dei tre casi.
Leggere come file di testo
val df = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").cache df.show(truncate=false)
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |value | +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |in24.inetnebr.com − − [01/Aug/1995:00:00:01 −0400] "GET /shuttle/missions/sts−68/news/sts−68−mcc−05.txt HTTP/1.0" 200 1839 | |uplherc.upl.com − − [01/Aug/1995:00:00:07 −0400] "GET / HTTP/1.0" 304 0 | |uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/ksclogo−medium.gif HTTP/1.0" 304 0 | |uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/MOSAIC−logosmall.gif HTTP/1.0" 304 0 | |uplherc.upl.com − − [01/Aug/1995:00:00:08 −0400] "GET /images/USA−logosmall.gif HTTP/1.0" 304 0 | |ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:09 −0400] "GET /images/launch−logo.gif HTTP/1.0" 200 1713 | |uplherc.upl.com − − [01/Aug/1995:00:00:10 −0400] "GET /images/WORLD−logosmall.gif HTTP/1.0" 304 0 | |slppp6.intermind.net − − [01/Aug/1995:00:00:10 −0400] "GET /history/skylab/skylab.html HTTP/1.0" 200 1687 | |piweba4y.prodigy.com − − [01/Aug/1995:00:00:10 −0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853 | |slppp6.intermind.net − − [01/Aug/1995:00:00:11 −0400] "GET /history/skylab/skylab−small.gif HTTP/1.0" 200 9202 | |slppp6.intermind.net − − [01/Aug/1995:00:00:12 −0400] "GET /images/ksclogosmall.gif HTTP/1.0" 200 3635 | |ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:12 −0400] "GET /history/apollo/images/apollo−logo1.gif HTTP/1.0" 200 1173 | |slppp6.intermind.net − − [01/Aug/1995:00:00:13 −0400] "GET /history/apollo/images/apollo−logo.gif HTTP/1.0" 200 3047 | |uplherc.upl.com − − [01/Aug/1995:00:00:14 −0400] "GET /images/NASA−logosmall.gif HTTP/1.0" 304 0 | |133.43.96.45 − − [01/Aug/1995:00:00:16 −0400] "GET /shuttle/missions/sts−69/mission−sts−69.html HTTP/1.0" 200 10566 | |kgtyk4.kj.yamagata−u.ac.jp − − [01/Aug/1995:00:00:17 −0400] "GET / HTTP/1.0" 200 7280 | |kgtyk4.kj.yamagata−u.ac.jp − − [01/Aug/1995:00:00:18 −0400] "GET /images/ksclogo−medium.gif HTTP/1.0" 200 5866 | |d0ucr6.fnal.gov − − [01/Aug/1995:00:00:19 −0400] "GET /history/apollo/apollo−16/apollo−16.html HTTP/1.0" 200 2743 | |ix−esc−ca2−07.ix.netcom.com − − [01/Aug/1995:00:00:19 −0400] "GET /shuttle/resources/orbiters/discovery.html HTTP/1.0" 200 6849| |d0ucr6.fnal.gov − − [01/Aug/1995:00:00:20 −0400] "GET /history/apollo/apollo−16/apollo−16−patch−small.gif HTTP/1.0" 200 14897 | +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ only showing top 20 rows df: org.apache.spark.sql.Dataset[String] = [value: string]
tra le funzioni di Spark SQL trovo regexp_extract per eseguire una espressione regolare su ciascuna riga del dataframe.
def regexp_extract(e: Column, exp: String, groupIdx: Int): Column
I parametri della funzione sono:
– e
: la colonna del dataframe alla quale applicare l’espressione regolare
– exp
: l’espressione regolare nel formato “””< espress. regolare >”””
– groupIdx
: se ci sono più matching devo indicare quale voglio selezionare, il primo è 0
import org.apache.spark.sql.functions._ val df_col = df.select(regexp_extract($"value", """[a-zA-z0-9\-\.]+""", 0).alias("host"), regexp_extract($"value", """\[([0-9a-z-A-Z\:\/]+)\s""", 1).alias("datetime"), regexp_extract($"value", """GET\s([\/a-zA-Z0-9\.\-\_]+)\sHTTP""",1).alias("request"), regexp_extract($"value", """\"\s([0-9]+)\s""",1).alias("code"), regexp_extract($"value", """\s([0-9]+)$""", 1).alias("bytes")).cache df_col.show
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | in24.inetnebr.com|01/Aug/1995:00:00:01|/shuttle/missions...| 200| 1839| | uplherc.upl.com|01/Aug/1995:00:00:07| /| 304| 0| | uplherc.upl.com|01/Aug/1995:00:00:08|/images/ksclogo−m...| 304| 0| | uplherc.upl.com|01/Aug/1995:00:00:08|/images/MOSAIC−lo...| 304| 0| | uplherc.upl.com|01/Aug/1995:00:00:08|/images/USA−logos...| 304| 0| |ix−esc−ca2−07.ix....|01/Aug/1995:00:00:09|/images/launch−lo...| 200| 1713| | uplherc.upl.com|01/Aug/1995:00:00:10|/images/WORLD−log...| 304| 0| |slppp6.intermind.net|01/Aug/1995:00:00:10|/history/skylab/s...| 200| 1687| |piweba4y.prodigy.com|01/Aug/1995:00:00:10|/images/launchmed...| 200|11853| |slppp6.intermind.net|01/Aug/1995:00:00:11|/history/skylab/s...| 200| 9202| |slppp6.intermind.net|01/Aug/1995:00:00:12|/images/ksclogosm...| 200| 3635| |ix−esc−ca2−07.ix....|01/Aug/1995:00:00:12|/history/apollo/i...| 200| 1173| |slppp6.intermind.net|01/Aug/1995:00:00:13|/history/apollo/i...| 200| 3047| | uplherc.upl.com|01/Aug/1995:00:00:14|/images/NASA−logo...| 304| 0| | 133.43.96.45|01/Aug/1995:00:00:16|/shuttle/missions...| 200|10566| |kgtyk4.kj.yamagat...|01/Aug/1995:00:00:17| /| 200| 7280| |kgtyk4.kj.yamagat...|01/Aug/1995:00:00:18|/images/ksclogo−m...| 200| 5866| | d0ucr6.fnal.gov|01/Aug/1995:00:00:19|/history/apollo/a...| 200| 2743| |ix−esc−ca2−07.ix....|01/Aug/1995:00:00:19|/shuttle/resource...| 200| 6849| | d0ucr6.fnal.gov|01/Aug/1995:00:00:20|/history/apollo/a...| 200|14897| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 20 rows import org.apache.spark.sql.functions._ df_col: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]
Vediamo per ogni colonna se ci sono elementi nulli o NaN
df_col.agg(sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"), sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"), sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"), sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"), sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ |host−null|datetime−null|request−null|code−null|bytes−null| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ | 0| 0| 0| 0| 0| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
df_col.agg(sum(when($"host".isNaN, 1).otherwise(0)).alias("host-nan"), sum(when($"datetime".isNaN, 1).otherwise(0)).alias("datetime-nan"), sum(when($"request".isNaN, 1).otherwise(0)).alias("request-nan"), sum(when($"code".isNaN, 1).otherwise(0)).alias("code-nan"), sum(when($"bytes".isNaN, 1).otherwise(0)).alias("bytes-nan")).show
+−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+ |host−nan|datetime−nan|request−nan|code−nan|bytes−nan| +−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+ | 0| 0| 0| 0| 0| +−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−−−+−−−−−−−−+−−−−−−−−−+
df_col.select($"datetime").count
res38: Long = 1043177
Sembra che questo approccio abbia funzionato bene e i valori siano stati estratti correttamente in ogni riga.
Leggere il file come csv
Vediamo innanzitutto come è composta una riga:
in24.inetnebr.com - - [01/Aug/1995:00:00:01 -0400] "GET /shuttle/missions/sts-68/news/sts-68-mcc-05.txt HTTP/1.0" 200 1839
possiamo usare lo spazio come separatore e poi eliminare le colonne che non servono.
val df = spark.read.format("csv").option("header","false").option("sep"," ").load("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").drop("_c1","_c2","_c4").toDF("host","datetime","request" ,"code","bytes").cache df.show(5)
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ |in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 5 rows df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]
df.select( sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"), sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"), sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"), sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"), sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ |host−null|datetime−null|request−null|code−null|bytes−null| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ | 0| 0| 0| 0| 0| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
Vediamo lo schema del dataframe
df.printSchema
root |−− host: string (nullable = true) |−− datetime: string (nullable = true) |−− request: string (nullable = true) |−− code: string (nullable = true) |−− bytes: string (nullable = true)
Applichiamo describe ad ogni colonna del dataframe
df.describe("host","datetime","request","code","bytes").show()
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ |summary| host| datetime| request| code| bytes| +−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ | count| 1043177| 1043177| 1043177| 1043177| 1043177| | mean| null| null| null|210.77027485611083|17679.736875431394| | stddev| null| null| null| 33.52356783510582| 68832.10308836344| | min| ***.novo.dk|[01/Aug/1995:00:0...| GET /| 200| −| | max|zztduffy.slip.cc....|[22/Aug/1995:23:5...|POST /shuttle/mis...| HTTP/1.0"| 99981| +−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
Dalla tabella sopra ottengo due importanti informazioni:
– nella colonna request ci sono sia GET che POST
– nella colonna code alcuni valori sono sbagliati, contengono la stringa “HTTP/1.0”“ invece dei codici numerici
– nella colonna bytes alcuni valori sono non numerici e sono rappresentati come ”-” per questo non è risultato come Null o NaN
Filtriamo gli elementi di code che sono malformati, ovvero che contengono valori non numerici
df.filter($"code" contains "HTTP/1.0").show(truncate=false)
+−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+ |host |datetime |request |code |bytes| +−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+ |jhanley.doe.state.la.us |[08/Aug/1995:11:24:31|GET /ksc.html |HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:18:45:48|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:18:45:49|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |128.95.234.46 |[10/Aug/1995:19:25:40|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:08:13|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:08:13|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:08:41|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:08:41|GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:09:31|GET /shuttle/missions/sts−70/images/KSC−95EC−o667.gif|HTTP/1.0"|404 | |redx3.cac.washington.edu |[10/Aug/1995:20:09:32|GET /shuttle/missions/sts−70/images/KSC−95EC−o667.gif|HTTP/1.0"|404 | |macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:15|GET /shuttle/missions/sts−9/sts−9−patch−small.gif |HTTP/1.0"|404 | |macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:18|GET /shuttle/missions/sts−45/sts−45−patch−small.gif |HTTP/1.0"|404 | |macba305.aerov.jussieu.fr|[22/Aug/1995:11:03:19|GET /shuttle/missions/sts−57/sts−57−patch−small.gif" |HTTP/1.0"|404 | +−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−+−−−−−+
Ci dovremmo chiedere come mai questo errore nell’ingestione di queste righe. Dovremmo cercare di filtrare queste righe senza però suddividere in colonne.
Ci torna utile leggere il file come textfile e filtrare le righe che contengono le richieste del filtro sopra.
val df_text = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT").filter( ($"value" contains "GET /ksc.html") && ($"value" contains "08/Aug/1995:11:24:31") || ($"value" contains "/shuttle/missions/sts-70/images/ksc-95ec-o667.gif") && (($"value" contains "10/Aug/1995:18:45:48") || ($"value" contains "10/Aug/1995:18:45:49") || ($"value" contains "10/Aug/1995:19:25:40") || ($"value" contains "10/Aug/1995:20:08:13") || ($"value" contains "10/Aug/1995:20:08:41") || ($"value" contains "10/Aug/1995:20:09:31") || ($"value" contains "10/Aug/1995:20:09:32")) || (($"value" contains "GET /shuttle/missions/sts-9/sts-9-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:15")) || (($"value" contains "GET /shuttle/missions/sts-45/sts-45-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:18")) || (($"value" contains "GET /shuttle/missions/sts-57/sts-57-patch-small.gif") && ($"value" contains "22/Aug/1995:11:03:19"))) df_text.show(5, truncate=false)
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |value | +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |jhanley.doe.state.la.us − − [08/Aug/1995:11:24:31 −0400] "GET /ksc.html" HTTP/1.0" 404 − | |redx3.cac.washington.edu − − [10/Aug/1995:18:45:48 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −| |redx3.cac.washington.edu − − [10/Aug/1995:18:45:49 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −| |128.95.234.46 − − [10/Aug/1995:19:25:40 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 − | |redx3.cac.washington.edu − − [10/Aug/1995:20:08:13 −0400] "GET /shuttle/missions/sts−70/images/ksc−95ec−o667.gif" HTTP/1.0" 404 −| +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ only showing top 5 rows df_text: org.apache.spark.sql.Dataset[String] = [value: string]
Come si vede le righe in questione sono tutte malformate, cioè invece di essere nel formato "GET <url> HTTP/1.0"
hanno degli extra “.
Si noti che ” è il simbolo quote di default nel csv dataframe reader, cioè quello che vi è tra virgolette viene considerato come un’unica stringa anche se c’è un carattere separator (lo spazio nel nostro caso) in mezzo. Per questo motivo "GET <url> HTTP/1.0"
è visto come una unica stringa. Se invece ci sono altre virgolette in mezzo la lettura e la separazione della riga in field non funzionerà.
A questo punto ho due opzioni:
– correggere queste righe manualmente
– eliminare queste righe
Visto che non sono molte, e che non mi sembrano particolarmente importanti (anche se sono tutti codici 404, page not found) decido di eliminare queste righe.
val df_clean = df.filter(not(col("code").contains("HTTP"))).cache df_clean.show
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304| 0| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687| |piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304| 0| | 133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...| GET / HTTP/1.0| 200| 7280| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 20 rows df_clean: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, datetime: string ... 3 more fields]
Potrebbe essere una buona idea quella di convertire in interi le ultime due colonne.
Definiamo una funzione in Scala e poi la convertiamo in UDF (User Defined Function) per usarla in Spark.
La funzione st_to_int
definita sotto converte una stringa in interi. Una struttura try-catch fa in modo che quando non sia possibile convertire a intero, perchè ho una riga malformata, la funzione ritorna -9999 o 0 nel caso che bytes sia uguale a “-”.
//val st_to_int : String => Int = x => x.toInt def st_to_int (x: String): Int = { try { x.toInt } catch { case ex: NumberFormatException => { if (x == "-") 0 else -9999 } } } val udf_st_to_int = udf(st_to_int _) val df_conv = df_clean.withColumn("code", udf_st_to_int($"code")).withColumn("bytes", udf_st_to_int($"bytes")) df_conv.show(5) df_conv.printSchema
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ |in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 5 rows root |−− host: string (nullable = true) |−− datetime: string (nullable = true) |−− request: string (nullable = true) |−− code: integer (nullable = false) |−− bytes: integer (nullable = false) st_to_int: (x: String)Int udf_st_to_int: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(StringType))) df_conv: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 3 more fields]
Verifichiamo di avere corretto tutti i problemi usando la funzione describe
sui dataframe prima e dopo.
df.describe("code","bytes").show
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ |summary| code| bytes| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+ | count| 1043177| 1043177| | mean|210.77027485611083|17679.736875431394| | stddev| 33.52356783510582| 68832.10308836344| | min| 200| −| | max| HTTP/1.0"| 99981| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+
df_conv.describe("code","bytes").show
+−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+ |summary| code| bytes| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+ | count| 1043164| 1043164| | mean|210.77027485611083|17531.77418219954| | stddev| 33.52356783510582|68562.39834125541| | min| 200| 0| | max| 501| 3421948| +−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
👌
Importare i dati in un dataset
Se uso un dataset devo definire un tipo per ogni riga di dati. In questo modo i miei dati tipizzati
case class server_connection(host: String, datetime: String, request: String, code: Int, bytes: Int)
defined class server_connection
Per fare il mapping dei singoli field della classe server_connection le colonne devono avere gli stessi nomi dei field, quindi in queto caso: host,datetime,request,code,bytes.
Per convertire in Dataset[server_connection] devo usare la funzione .as[server_connection]
val df = spark.read.format("csv").option("header","false").option("sep"," ") .schema("host string, _c1 string, _c2 string, datetime string, _c4 string, request string, code int, bytes int") .load("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT") .drop("_c1","_c2","_c4") .as[server_connection] .cache df.show
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304| 0| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687| |piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304| 0| | 133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...| GET / HTTP/1.0| 200| 7280| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 20 rows df: org.apache.spark.sql.Dataset[server_connection] = [host: string, datetime: string ... 3 more fields]
df.printSchema
root |−− host: string (nullable = true) |−− datetime: string (nullable = true) |−− request: string (nullable = true) |−− code: integer (nullable = true) |−− bytes: integer (nullable = true)
Possiamo anche fare un .describe
sulle colonne code e bytes e vediamo che non ci sono campi/righe malformate.
Inoltre poiché describe è una operazione che viene eseguita su tutte le colonne ci potrà rivelare se ci sono problemi con con qualche record.
df.describe("host","datetime","request","code","bytes").show
+−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+ |summary| host| datetime| request| code| bytes| +−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+ | count| 1034421| 1034421| 1034421| 1034421| 1034421| | mean| null| null| null| 209.3868917974403|17679.95398681968| | stddev| null| null| null|29.757618057686102|68832.50836237728| | min| ***.novo.dk|[01/Aug/1995:00:0...| GET /| 200| 0| | max|zztduffy.slip.cc....|[22/Aug/1995:23:5...|POST /cgi−bin/new...| 500| 3421948| +−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−+
Vediamo se ci sono elementi Null.
df.select( sum(when($"host".isNull, 1).otherwise(0)).alias("host-null"), sum(when($"datetime".isNull, 1).otherwise(0)).alias("datetime-null"), sum(when($"request".isNull, 1).otherwise(0)).alias("request-null"), sum(when($"code".isNull, 1).otherwise(0)).alias("code-null"), sum(when($"bytes".isNull, 1).otherwise(0)).alias("bytes-null")).show
+−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ |host−null|datetime−null|request−null|code−null|bytes−null| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+ | 8756| 8756| 8756| 8756| 8756| +−−−−−−−−−+−−−−−−−−−−−−−+−−−−−−−−−−−−+−−−−−−−−−+−−−−−−−−−−+
Come si vede per 8756 record non è riuscito ad applicare lo schema pertanto ha riportato il record come null
df.filter($"host".isNull).show
+−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+ |host|datetime|request|code|bytes| +−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+ |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| |null| null| null|null| null| +−−−−+−−−−−−−−+−−−−−−−+−−−−+−−−−−+ only showing top 20 rows
Dopo un po’ di investigazioni ci accorgiamo che i record null corrispondono ai casi in cui bytes=="-"
Eseguo una espressione regolare sul record in formato testo per vedere che effetivamente ottengo lo stesso numero: 8756.
La espressione regolare è semplicemente “”“-$”””, ove – è il simbolo da cercare, $ indica la fine della riga.
val df_raw = spark.read.textFile("D:\\AnacondaProjects\\Datasets\\apache.access.log.PROJECT\\apache.access.log.PROJECT") def find_pattern(x: String) = { val pattern = """-$""".r pattern.findFirstIn(x) != None } val find_pattern_udf = udf(find_pattern _) df_raw.filter( find_pattern_udf($"value") ).show(5, truncate=false) println(s"Ho questo numero di matching: ${df_raw.filter( find_pattern_udf($"value") ).count}")
+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |value | +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |gw1.att.com − − [01/Aug/1995:00:03:53 −0400] "GET /shuttle/missions/sts−73/news HTTP/1.0" 302 − | |js002.cc.utsunomiya−u.ac.jp − − [01/Aug/1995:00:07:33 −0400] "GET /shuttle/resources/orbiters/discovery.gif HTTP/1.0" 404 −| |tia1.eskimo.com − − [01/Aug/1995:00:28:41 −0400] "GET /pub/winvn/release.txt HTTP/1.0" 404 − | |itws.info.eng.niigata−u.ac.jp − − [01/Aug/1995:00:38:01 −0400] "GET /ksc.html/facts/about_ksc.html HTTP/1.0" 403 − | |grimnet23.idirect.com − − [01/Aug/1995:00:50:12 −0400] "GET /www/software/winvn/winvn.html HTTP/1.0" 404 − | +−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ only showing top 5 rows Ho questo numero di matching: 8756 df_raw: org.apache.spark.sql.Dataset[String] = [value: string] find_pattern: (x: String)Boolean find_pattern_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BooleanType,Some(List(StringType)))
Verificato.
Adesso eliminiamo i record nulli.
val df_clean = df.filter($"bytes".isNotNull) df_clean.show(5) df_clean.count
+−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ | host| datetime| request|code|bytes| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ |in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0| +−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+ only showing top 5 rows df_clean: org.apache.spark.sql.Dataset[server_connection] = [host: string, datetime: string ... 3 more fields] res55: Long = 1034421
Per ripulire i campi datetime e request possiamo procedere come in precedenza.
Per il campo datetime occorre prendere tutti i caratteri dal secondo fino alla fine.
val clean: String => String = x => x.slice(1,x.size) val clean_udf = udf(clean) val df_clean_datetime = df_clean.withColumn("clean_datetime", clean_udf($"datetime")) df_clean_datetime.show df_clean_datetime.describe("clean_datetime")
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+ | host| datetime| request|code|bytes| clean_datetime| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+ | in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|01/Aug/1995:00:00:01| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0|01/Aug/1995:00:00:07| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0|01/Aug/1995:00:00:08| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0|01/Aug/1995:00:00:08| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0|01/Aug/1995:00:00:08| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|01/Aug/1995:00:00:09| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304| 0|01/Aug/1995:00:00:10| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|01/Aug/1995:00:00:10| |piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|01/Aug/1995:00:00:10| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|01/Aug/1995:00:00:11| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|01/Aug/1995:00:00:12| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|01/Aug/1995:00:00:12| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|01/Aug/1995:00:00:13| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304| 0|01/Aug/1995:00:00:14| | 133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|01/Aug/1995:00:00:16| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...| GET / HTTP/1.0| 200| 7280|01/Aug/1995:00:00:17| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|01/Aug/1995:00:00:18| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|01/Aug/1995:00:00:19| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|01/Aug/1995:00:00:19| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|01/Aug/1995:00:00:20| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+ only showing top 20 rows clean: String => String = <function1> clean_udf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) df_clean_datetime: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 4 more fields] res56: org.apache.spark.sql.DataFrame = [summary: string, clean_datetime: string]
Per il campo request posso usare una espressione regolare
val regexp = """\s*\/+([a-zA-Z0-9\/\-\_]*)\s*""".r val func1: String => String = x => regexp.findFirstIn(x).getOrElse("") val udf_func1 = udf(func1) val df_clean_request = df_clean_datetime.withColumn("clean_request", udf_func1($"request")) df_clean_request.show
+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ | host| datetime| request|code|bytes| clean_datetime| clean_request| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ | in24.inetnebr.com|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200| 1839|01/Aug/1995:00:00:01| /shuttle/mission...| | uplherc.upl.com|[01/Aug/1995:00:0...| GET / HTTP/1.0| 304| 0|01/Aug/1995:00:00:07| / | | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/ksclo...| 304| 0|01/Aug/1995:00:00:08| /images/ksclogo−...| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/MOSAI...| 304| 0|01/Aug/1995:00:00:08| /images/MOSAIC−l...| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/USA−l...| 304| 0|01/Aug/1995:00:00:08| /images/USA−logo...| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /images/launc...| 200| 1713|01/Aug/1995:00:00:09| /images/launch−logo| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/WORLD...| 304| 0|01/Aug/1995:00:00:10| /images/WORLD−lo...| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 1687|01/Aug/1995:00:00:10| /history/skylab/...| |piweba4y.prodigy.com|[01/Aug/1995:00:0...|GET /images/launc...| 200|11853|01/Aug/1995:00:00:10| /images/launchme...| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/skyl...| 200| 9202|01/Aug/1995:00:00:11| /history/skylab/...| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 3635|01/Aug/1995:00:00:12| /images/ksclogos...| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /history/apol...| 200| 1173|01/Aug/1995:00:00:12| /history/apollo/...| |slppp6.intermind.net|[01/Aug/1995:00:0...|GET /history/apol...| 200| 3047|01/Aug/1995:00:00:13| /history/apollo/...| | uplherc.upl.com|[01/Aug/1995:00:0...|GET /images/NASA−...| 304| 0|01/Aug/1995:00:00:14| /images/NASA−log...| | 133.43.96.45|[01/Aug/1995:00:0...|GET /shuttle/miss...| 200|10566|01/Aug/1995:00:00:16| /shuttle/mission...| |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...| GET / HTTP/1.0| 200| 7280|01/Aug/1995:00:00:17| / | |kgtyk4.kj.yamagat...|[01/Aug/1995:00:0...|GET /images/ksclo...| 200| 5866|01/Aug/1995:00:00:18| /images/ksclogo−...| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200| 2743|01/Aug/1995:00:00:19| /history/apollo/...| |ix−esc−ca2−07.ix....|[01/Aug/1995:00:0...|GET /shuttle/reso...| 200| 6849|01/Aug/1995:00:00:19| /shuttle/resourc...| | d0ucr6.fnal.gov|[01/Aug/1995:00:0...|GET /history/apol...| 200|14897|01/Aug/1995:00:00:20| /history/apollo/...| +−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−+ only showing top 20 rows regexp: scala.util.matching.Regex = \s*\/+([a−zA−Z0−9\/\−\_]*)\s* func1: String => String = <function1> udf_func1: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType))) df_clean_request: org.apache.spark.sql.DataFrame = [host: string, datetime: string ... 5 more fields]
Rimuovo le colonne inutili
val new_df = df_clean_request.drop("datetime","request").cache new_df.show(truncate=false)
+−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |host |code|bytes|clean_datetime |clean_request | +−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ |in24.inetnebr.com |200 |1839 |01/Aug/1995:00:00:01| /shuttle/missions/sts−68/news/sts−68−mcc−05 | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:07| / | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:08| /images/ksclogo−medium | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:08| /images/MOSAIC−logosmall | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:08| /images/USA−logosmall | |ix−esc−ca2−07.ix.netcom.com|200 |1713 |01/Aug/1995:00:00:09| /images/launch−logo | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:10| /images/WORLD−logosmall | |slppp6.intermind.net |200 |1687 |01/Aug/1995:00:00:10| /history/skylab/skylab | |piweba4y.prodigy.com |200 |11853|01/Aug/1995:00:00:10| /images/launchmedium | |slppp6.intermind.net |200 |9202 |01/Aug/1995:00:00:11| /history/skylab/skylab−small | |slppp6.intermind.net |200 |3635 |01/Aug/1995:00:00:12| /images/ksclogosmall | |ix−esc−ca2−07.ix.netcom.com|200 |1173 |01/Aug/1995:00:00:12| /history/apollo/images/apollo−logo1 | |slppp6.intermind.net |200 |3047 |01/Aug/1995:00:00:13| /history/apollo/images/apollo−logo | |uplherc.upl.com |304 |0 |01/Aug/1995:00:00:14| /images/NASA−logosmall | |133.43.96.45 |200 |10566|01/Aug/1995:00:00:16| /shuttle/missions/sts−69/mission−sts−69 | |kgtyk4.kj.yamagata−u.ac.jp |200 |7280 |01/Aug/1995:00:00:17| / | |kgtyk4.kj.yamagata−u.ac.jp |200 |5866 |01/Aug/1995:00:00:18| /images/ksclogo−medium | |d0ucr6.fnal.gov |200 |2743 |01/Aug/1995:00:00:19| /history/apollo/apollo−16/apollo−16 | |ix−esc−ca2−07.ix.netcom.com|200 |6849 |01/Aug/1995:00:00:19| /shuttle/resources/orbiters/discovery | |d0ucr6.fnal.gov |200 |14897|01/Aug/1995:00:00:20| /history/apollo/apollo−16/apollo−16−patch−small| +−−−−−−−−−−−−−−−−−−−−−−−−−−−+−−−−+−−−−−+−−−−−−−−−−−−−−−−−−−−+−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−−+ only showing top 20 rows new_df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [host: string, code: int ... 3 more fields]