Apache Spark è un framework per il calcolo distribuito su un cluster di computer. Di fatto è il sistema più diffuso per l’esecuzione di calcoli su una grosse mole di dati, leggasi Big Data.
In questo articolo vediamo di muovere i primi passi con i DataFrame di Apache Spark. A partire da Spark 2.0 i DataFrame sono considerati la struttura di default per la memorizzazione dei dati.
L’utilizzo ricorda molto da vicino quello dei DataFrame di Numpy per Python. Ma con alcune differenze.
Iniziamo con (vedere) la configurazione del cluster, dopodiché importiamo dei dati da un file csv. E poi eseguiamo un po’ di operazioni sul DataFrame, vedremo come aggregare i dati, come aggiungere o togliere righe/colonne dal DataFrame.
Configurazione del cluster
Importiamo la SparkSesssion, il punto di accesso al cluster per tutte le attività di Spark.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
Definisco e configuro una SparkSession, ho bisogno del metodo builder, assegno un nome alla applicazione Spark usando appName, definisco che il cluster funziona in modo locale. Infine uso getOrCreate, cioè crea una nuova SparkSession o usa una già esistente.
val spark = SparkSession.builder.appName("MyApp1").master("local").getOrCreate
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22161fb9
Se voglio vedere tutte le proprietà di configurazione posso usare spark.conf.getAll che restituisce un map di coppie key,value, ciscuna coppia contenente una proprietà della configurazione del cluster.
spark.conf.getAll
res1: Map[String,String] = Map(zeppelin.pyspark.python -> python, spark.driver.host -> 10.0.75.1, zeppelin.dep.localrepo -> local-repo, zeppelin.spark.sql.stacktrace -> false, spark.driver.port -> 53386, master -> local[*], spark.repl.class.uri -> spark://10.0.75.1:53386/classes, zeppelin.spark.useHiveContext -> true, spark.repl.class.outputDir -> C:\Users\home\AppData\Local\Temp\spark8113731899773902274, zeppelin.spark.sql.interpolation -> false, zeppelin.spark.importImplicit -> true, zeppelin.interpreter.output.limit -> 102400, spark.app.name -> MyApp1, zeppelin.R.cmd -> R, zeppelin.spark.maxResult -> 1000, zeppelin.pyspark.useIPython -> true, zeppelin.spark.concurrentSQL -> false, zeppelin.spark.enableSupportedVersionCheck -> true, zeppelin.spark.printREPLOutput -> true, zeppelin.dep…
Per visualizzare meglio i field del map, posso usare la funzione foreach con argomento la funzione println.
Da ricordare che in Scala esistono le funzioni di ordine superiore, cioè funzioni che accettano come argomento un altra funzione.
spark.conf.getAll.foreach(println)
(zeppelin.pyspark.python,python)
(spark.driver.host,10.0.75.1)
(zeppelin.dep.localrepo,local-repo)
(zeppelin.spark.sql.stacktrace,false)
(spark.driver.port,53386)
(master,local[*])
(spark.repl.class.uri,spark://10.0.75.1:53386/classes)
(zeppelin.spark.useHiveContext,true)
(spark.repl.class.outputDir,C:\Users\home\AppData\Local\Temp\spark8113731899773902274)
(zeppelin.spark.sql.interpolation,false)
(zeppelin.spark.importImplicit,true)
(zeppelin.interpreter.output.limit,102400)
(spark.app.name,MyApp1)
(zeppelin.R.cmd,R)
(zeppelin.spark.maxResult,1000)
(zeppelin.pyspark.useIPython,true)
(zeppelin.spark.concurrentSQL,false)
(zeppelin.spark.enableSupportedVersionCheck,true)
(zeppelin.spark.printREPLOutput,true)
(zeppelin.dep.additionalRemoteRepository,spark-packages,http://dl.bintray.com/spark-packages/maven,false;)
(spark.executor.id,driver)
(zeppelin.spark.useNew,true)
(spark.useHiveContext,true)
(spark.master,local)
(zeppelin.R.image.width,100%)
(zeppelin.spark.ui.hidden,false)
(zeppelin.interpreter.localRepo,C:\zeppelin-0.8.2-bin-all/local-repo/spark)
(zeppelin.R.render.options,out.format = ‘html’, comment = NA, echo = FALSE, results = ‘asis’, message = F, warning = F, fig.retina = 2)
(zeppelin.interpreter.max.poolsize,10)
(spark.app.id,local-1574860392257)
(zeppelin.R.knitr,true)
Importazione dei dati
I dati sono in un file csv. Questi possono essere importati comodamente in un DataFrame o in un RDD, dipendentemente a quale struttura voglio usare.
Poichè sono dati numerici in formato tabellare è sicuramente più comodo usare i DataFrame
Prima importo il package Dataframe, e poi importiamo il file csv in un Dataframe nuovo di zecca.
Il DataFrame di Spark è molto simile al DataFrame di Numpy per Python. Si tratta di un insieme di dati in formato tabella con le colonne aventi dei nomi per identificarle .
Con l’opzione (“Header”,“true”) comunico che la prima riga ha la funzione di Header per cui le colonne del DataFrame verranno nominate usando l’header.
In alternativa spark avrebbe assegnato i nomi generici _c0, _c1, _c2… e la prima riga sarebbe finita tra i dati.
import org.apache.spark.sql.DataFrame
val df = spark.read.option("Header","true").csv("exampl1.csv")
import org.apache.spark.sql.DataFrame df: org.apache.spark.sql.DataFrame = [eta: string, amici: string]
Come possiamo vedere sopra Spark mi dice che l’oggetto DataFrame ha 2 colonne di nome rispettivamente eta e amici, e che tutti e due i campi contengono stringhe.
Poiché in realtà i campi del file sono valori interi, posso fare in modo che nell’importazione i valori siano interpretati correttamente come valori numerici interi.
Per fare ciò uso l’opzione (“inferSchema”,“true”) in cui comunico al sistema di inferire il tipo dei valori nelle colonne.
val df = spark.read.option("Header","true").option("inferSchema","true").csv("exampl1.csv")
df: org.apache.spark.sql.DataFrame = [eta: int, amici: int]
I nomi delle colonne li conosco, ma se voglio verificare posso usare il metodo columns
df.columns
res3: Array[String] = Array(eta, amici)
Adesso che i dati sono correttamente interpretati come numerici vediamo la prime 5 righe del DataFrame, con il metodo head
df.head(5)
res4: Array[org.apache.spark.sql.Row] = Array([44,11], [5,190], [44,123], [9,111], [18,238])
Visualizziamo lo schema del DataFrame con printSchema, le informazioni che ottengo sono il nome e il tipo di ciascun campo:
df.printSchema
root
|– eta: integer (nullable = true)
|– amici: integer (nullable = true)
Il Dataframe ha un numero di elementi pari a
df.count
res6: Long = 499
Statistica descrittiva
Con questo termine si indica una analisi statistica da fare sui dati a disposizione.
Per il momento usiamo semplicemente la funzione describe()
df.describe().show
+–––––––+––––––––––––––––––+––––––––––––––––––+ |summary| eta| amici| +–––––––+––––––––––––––––––+––––––––––––––––––+ | count | 499| 499| | mean | 34.78156312625251| 258.3627254509018| | stddev|20.532332734289348|136.55329326307273| | min | 0| 1| | max | 69| 498| +–––––––+––––––––––––––––––+––––––––––––––––––+
df.groupBy("eta")
res8: org.apache.spark.sql.RelationalGroupedDataset = org.apache.spark.sql.RelationalGroupedDataset@727e4c7d
Il metodo groupBy fornisce come risultato un RelationalGroupedDataset, dopodiché per utilizzarlo devo specificare come aggregare i valori corrispondenti ad ogni key. Per scoprire quali sono i metodi che possiamo applicare a questa entità vado sulla pagina della documentazione di Spark relativa al RelationalGroupedDataset.
Scopro che posso usare diverse funzioni, tra cui agg, max, min, pivot. Proviamone alcune.
La funzione min è così definita:def min(colNames: String*): DataFrame
df.groupBy("eta").min("amici").show(5)
+–––+––––––––––+ |eta|min(amici)| +–––+––––––––––+ | 31| 73| | 65| 141| | 53| 42| | 34| 53| | 28| 36| +–––+––––––––––+ only showing top 5 rows
Adesso voglio contare quanti elementi ci sono nel DataFrame con la stessa key, così scopro per esempio che ci sono esattamente 5 31enni.
df.groupBy("eta").count().show(5)
+–––+–––––+ |eta|count| +–––+–––––+ | 31| 5| | 65| 9| | 53| 8| | 34| 10| | 28| 8| +–––+–––––+ only showing top 5 rows
Per la funzione agg devo specificare come argomento/i la/e funzione/i e su quale colonna ciascuna funzione deve essere applicata
df.groupBy("eta").agg(count("amici"),min("amici"), max("amici"), mean("amici")).show(5)
+–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |eta|count(amici)|min(amici)|max(amici)| avg(amici)| +–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 31| 5| 73| 419| 279.6| | 65| 9| 141| 469|328.22222222222223| | 53| 8| 42| 489| 322.25| | 34| 10| 53| 478| 278.7| | 28| 8| 36| 437| 249.0| +–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ only showing top 5 rows
Se volessimo sapere i dati relativi ai 30-enni potrei filtrare preventivamente usando la funzione filter
df.filter("eta == 30").show
+–––+–––––+ |eta|amici| +–––+–––––+ | 30| 148| | 30| 374| | 30| 153| | 30| 247| | 30| 56| | 30| 46| | 30| 352| +–––+–––––+
La funzione filter() è del tutto equivalente alla funzione where()
df.where("eta == 30").show
+–––+–––––+ |eta|amici| +–––+–––––+ | 30| 148| | 30| 374| | 30| 153| | 30| 247| | 30| 56| | 30| 46| | 30| 352| +–––+–––––+
Se voglio aggregare i dati relativi ai 30enni posso usare ancora la funzione agg
df.filter("eta == 30").agg(count("amici"),min("amici"), max("amici"), mean("amici")).shows
+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |count(amici)|min(amici)|max(amici)| avg(amici)| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 7| 46| 374|196.57142857142858| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+
Come argomento di filter posso usare gli operatori logici di SQL
df.filter("eta>=30 AND eta<=32").agg(count("amici"),min("amici"), max("amici"), mean("amici")).show
+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |count(amici)|min(amici)|max(amici)| avg(amici)| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 13| 46| 419|232.84615384615384| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+
Vediamo di calcolare per ogni età il delta tra il numero massimo di amici e il numero minimo.
Nel codice sotto, ho fatto un groupBy(“eta”), con funzioni aggreganti min e max.
Dopodiché con il metodo withColumnRenamed ho rinominato le due colonne poiché non riesco ad usare i nomi di default (min(amici) e max(amici)).
Infine con la funzione select ho selezionato le colonne che mi interessano e ho definito una nuova colonna come (’maxAmici – ’minAmici).
Si noti che ho usato due diverse notazioni possibili per indicare il nome della colonna: ’nome-colonna oppure “nome-colonna”.
val df2 = df.groupBy("eta").agg(min('amici), max('amici))
val df3 = df2.withColumnRenamed("min(amici)", "minAmici").withColumnRenamed("max(amici)", "maxAmici")
df3.select('eta, 'minAmici, 'maxAmici, ('maxAmici - 'minAmici) as ("delta")).show(5)
+–––+––––––––+––––––––+–––––+
|eta|minAmici|maxAmici|delta|
+–––+––––––––+––––––––+–––––+
| 31| 73| 419| 346|
| 65| 141| 469| 328|
| 53| 42| 489| 447|
| 34| 53| 478| 425|
| 28| 36| 437| 401|
+–––+––––––––+––––––––+–––––+
only showing top 5 rows
df2: org.apache.spark.sql.DataFrame = [eta: int, min(amici): int … 1 more field]
df3: org.apache.spark.sql.DataFrame = [eta: int, minAmici: int … 1 more field]
Un altro modo per esprimere l’ultima riga del codice sopra è usando il metodo selectExpr.
Con l’argomento “*” indico che voglio tutte le righe del DataFrame originario.
E poi definisco tra virgolette la nuova colonna come differenze tra le atre due colonne maxAmici e minAmici.
La grossa differenza con select è che selectExpr consente di usare espressioni SQL.
df3.selectExpr( "*", "(maxAmici - minAmici) as delta" ).show(5)
+–––+––––––––+––––––––+–––––+ |eta|minAmici|maxAmici|delta| +–––+––––––––+––––––––+–––––+ | 31| 73| 419| 346| | 65| 141| 469| 328| | 53| 42| 489| 447| | 34| 53| 478| 425| | 28| 36| 437| 401| +–––+––––––––+––––––––+–––––+ only showing top 5 rows
A quale età si hanno in media più amici? Utilizziamo la funzione sort() con la specifica .desc
df.groupBy("eta").mean("amici").withColumnRenamed("avg(amici)", "avgAmici").sort('avgAmici.desc).show(5)
+–––+–––––––––––––––––+ |eta| avgAmici| +–––+–––––––––––––––––+ | 6|394.3333333333333| | 56|366.3333333333333| | 40| 344.25| | 12|339.6666666666667| | 21| 336.9| +–––+–––––––––––––––––+ only showing top 5 rows
Se voglio creare un nuovo DataFrame con le prime N righe dopo avere ordinato la colonna in ordine decrescente uso la funzione limit(N)
df.groupBy("eta").mean("amici").withColumnRenamed("avg(amici)", "avgAmici").sort('avgAmici.desc).limit(10).show()
+–––+––––––––––––––––––+ |eta| avgAmici| +–––+––––––––––––––––––+ | 6| 394.3333333333333| | 56| 366.3333333333333| | 40| 344.25| | 12| 339.6666666666667| | 21| 336.9| | 50| 332.4| | 65|328.22222222222223| | 53| 322.25| | 1| 321.6| | 37| 320.6| +–––+––––––––––––––––––+
Aggiungere una riga al Dataframe
Aggiungere una riga non è immediato, in quanto occorre creare un nuovo DataFrame con una riga e successivamente fare l’unione dei due DataFrame.
Per creare il nuovo DataFrame uso il metodo della SparkSession createDataFrame, che prende come argomenti un RDD composto da Row, e lo schema da seguire, per questo uso lo schema di df.
//filtriamo gli elementi con età 31 anni
val df2 = df.filter("eta == 31")
df2.show
//importo il tipo Row
import org.apache.spark.sql.Row
//creo un RDD di Row
val dataRDD = spark.sparkContext.parallelize(Seq(Row(31,100)))
//creo il nuovo DataFrame
val newdf = spark.createDataFrame(dataRDD, df.schema)
val df_31 = df2.union(newdf)
df_31.show
+–––+–––––+
|eta|amici|
+–––+–––––+
| 31| 172|
| 31| 408|
| 31| 73|
| 31| 326|
| 31| 419|
+–––+–––––+
+–––+–––––+
|eta|amici|
+–––+–––––+
| 31| 172|
| 31| 408|
| 31| 73|
| 31| 326|
| 31| 419|
| 31| 100|
+–––+–––––+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [eta: int, amici: int]
import org.apache.spark.sql.Row
dataRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[96] at parallelize at <console>:37
newdf: org.apache.spark.sql.DataFrame = [eta: int, amici: int]
df_31: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [eta: int, amici: int]
Conclusione
Abbiamo visto come importare dei dati in un DataFrame e alcune delle operazioni che è possibile eseguire su questi. Abbiamo visto come aggregare i dati per ottenere delle informazioni aggiuntive.
Riferimenti
- How to use SparkSession in Apache Spark 2.0, da Databricks
- Dal sito di Apache Spark la guida alla programmazione