... primi Passi con Apache Spark DataFrame

Apache Spark è un framework per il calcolo distribuito su un cluster di computer. Di fatto è il sistema più diffuso per l’esecuzione di calcoli su una grosse mole di dati, leggasi Big Data.

In questo articolo vediamo di muovere i primi passi con i DataFrame di Apache Spark. A partire da Spark 2.0 i DataFrame sono considerati la struttura di default per la memorizzazione dei dati.

L’utilizzo ricorda molto da vicino quello dei DataFrame di Numpy per Python. Ma con alcune differenze.

Iniziamo con (vedere) la configurazione del cluster, dopodiché importiamo dei dati da un file csv. E poi eseguiamo un po’ di operazioni sul DataFrame, vedremo come aggregare i dati, come aggiungere o togliere righe/colonne dal DataFrame.

Configurazione del cluster

Importiamo la SparkSesssion, il punto di accesso al cluster per tutte le attività di Spark.

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.SparkSession

Definisco e configuro una SparkSession, ho bisogno del metodo builder, assegno un nome alla applicazione Spark usando appName, definisco che il cluster funziona in modo locale. Infine uso getOrCreate, cioè crea una nuova SparkSession o usa una già esistente.

val spark = SparkSession.builder.appName("MyApp1").master("local").getOrCreate

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22161fb9

Se voglio vedere tutte le proprietà di configurazione posso usare spark.conf.getAll che restituisce un map di coppie key,value, ciscuna coppia contenente una proprietà della configurazione del cluster.

spark.conf.getAll

res1: Map[String,String] = Map(zeppelin.pyspark.python -> python, spark.driver.host -> 10.0.75.1, zeppelin.dep.localrepo -> local-repo, zeppelin.spark.sql.stacktrace -> false, spark.driver.port -> 53386, master -> local[*], spark.repl.class.uri -> spark://10.0.75.1:53386/classes, zeppelin.spark.useHiveContext -> true, spark.repl.class.outputDir -> C:\Users\home\AppData\Local\Temp\spark8113731899773902274, zeppelin.spark.sql.interpolation -> false, zeppelin.spark.importImplicit -> true, zeppelin.interpreter.output.limit -> 102400, spark.app.name -> MyApp1, zeppelin.R.cmd -> R, zeppelin.spark.maxResult -> 1000, zeppelin.pyspark.useIPython -> true, zeppelin.spark.concurrentSQL -> false, zeppelin.spark.enableSupportedVersionCheck -> true, zeppelin.spark.printREPLOutput -> true, zeppelin.dep…

Per visualizzare meglio i field del map, posso usare la funzione foreach con argomento la funzione println.
Da ricordare che in Scala esistono le funzioni di ordine superiore, cioè funzioni che accettano come argomento un altra funzione.

spark.conf.getAll.foreach(println)

(zeppelin.pyspark.python,python)
(spark.driver.host,10.0.75.1)
(zeppelin.dep.localrepo,local-repo)
(zeppelin.spark.sql.stacktrace,false)
(spark.driver.port,53386)
(master,local[*])
(spark.repl.class.uri,spark://10.0.75.1:53386/classes)
(zeppelin.spark.useHiveContext,true)
(spark.repl.class.outputDir,C:\Users\home\AppData\Local\Temp\spark8113731899773902274)
(zeppelin.spark.sql.interpolation,false)
(zeppelin.spark.importImplicit,true)
(zeppelin.interpreter.output.limit,102400)
(spark.app.name,MyApp1)
(zeppelin.R.cmd,R)
(zeppelin.spark.maxResult,1000)
(zeppelin.pyspark.useIPython,true)
(zeppelin.spark.concurrentSQL,false)
(zeppelin.spark.enableSupportedVersionCheck,true)
(zeppelin.spark.printREPLOutput,true)
(zeppelin.dep.additionalRemoteRepository,spark-packages,http://dl.bintray.com/spark-packages/maven,false;)
(spark.executor.id,driver)
(zeppelin.spark.useNew,true)
(spark.useHiveContext,true)
(spark.master,local)
(zeppelin.R.image.width,100%)
(zeppelin.spark.ui.hidden,false)
(zeppelin.interpreter.localRepo,C:\zeppelin-0.8.2-bin-all/local-repo/spark)
(zeppelin.R.render.options,out.format = ‘html’, comment = NA, echo = FALSE, results = ‘asis’, message = F, warning = F, fig.retina = 2)
(zeppelin.interpreter.max.poolsize,10)
(spark.app.id,local-1574860392257)
(zeppelin.R.knitr,true)

Importazione dei dati

I dati sono in un file csv. Questi possono essere importati comodamente in un DataFrame o in un RDD, dipendentemente a quale struttura voglio usare.

Poichè sono dati numerici in formato tabellare è sicuramente più comodo usare i DataFrame
Prima importo il package Dataframe, e poi importiamo il file csv in un Dataframe nuovo di zecca.

Il DataFrame di Spark è molto simile al DataFrame di Numpy per Python. Si tratta di un insieme di dati in formato tabella con le colonne aventi dei nomi per identificarle .
Con l’opzione (“Header”,“true”) comunico che la prima riga ha la funzione di Header per cui le colonne del DataFrame verranno nominate usando l’header.
In alternativa spark avrebbe assegnato i nomi generici _c0, _c1, _c2… e la prima riga sarebbe finita tra i dati.

import org.apache.spark.sql.DataFrame

val df = spark.read.option("Header","true").csv("exampl1.csv")

import org.apache.spark.sql.DataFrame df: org.apache.spark.sql.DataFrame = [eta: string, amici: string]

Come possiamo vedere sopra Spark mi dice che l’oggetto DataFrame ha 2 colonne di nome rispettivamente eta e amici, e che tutti e due i campi contengono stringhe.
Poiché in realtà i campi del file sono valori interi, posso fare in modo che nell’importazione i valori siano interpretati correttamente come valori numerici interi.
Per fare ciò uso l’opzione (“inferSchema”,“true”) in cui comunico al sistema di inferire il tipo dei valori nelle colonne.

val df = spark.read.option("Header","true").option("inferSchema","true").csv("exampl1.csv")

df: org.apache.spark.sql.DataFrame = [eta: int, amici: int]

I nomi delle colonne li conosco, ma se voglio verificare posso usare il metodo columns

df.columns

res3: Array[String] = Array(eta, amici)

Adesso che i dati sono correttamente interpretati come numerici vediamo la prime 5 righe del DataFrame, con il metodo head

df.head(5)

res4: Array[org.apache.spark.sql.Row] = Array([44,11], [5,190], [44,123], [9,111], [18,238])

Visualizziamo lo schema del DataFrame con printSchema, le informazioni che ottengo sono il nome e il tipo di ciascun campo:

df.printSchema

root
|– eta: integer (nullable = true)
|– amici: integer (nullable = true)

Il Dataframe ha un numero di elementi pari a

df.count

res6: Long = 499

Statistica descrittiva

Con questo termine si indica una analisi statistica da fare sui dati a disposizione.
Per il momento usiamo semplicemente la funzione describe()

df.describe().show

+–––––––+––––––––––––––––––+––––––––––––––––––+ |summary| eta| amici| +–––––––+––––––––––––––––––+––––––––––––––––––+ | count | 499| 499| | mean | 34.78156312625251| 258.3627254509018| | stddev|20.532332734289348|136.55329326307273| | min | 0| 1| | max | 69| 498| +–––––––+––––––––––––––––––+––––––––––––––––––+

df.groupBy("eta")

res8: org.apache.spark.sql.RelationalGroupedDataset = org.apache.spark.sql.RelationalGroupedDataset@727e4c7d

Il metodo groupBy fornisce come risultato un RelationalGroupedDataset, dopodiché per utilizzarlo devo specificare come aggregare i valori corrispondenti ad ogni key. Per scoprire quali sono i metodi che possiamo applicare a questa entità vado sulla pagina della documentazione di Spark relativa al RelationalGroupedDataset.
Scopro che posso usare diverse funzioni, tra cui aggmaxminpivot. Proviamone alcune.

La funzione min è così definita:
def min(colNames: String*): DataFrame

df.groupBy("eta").min("amici").show(5)

+–––+––––––––––+ |eta|min(amici)| +–––+––––––––––+ | 31| 73| | 65| 141| | 53| 42| | 34| 53| | 28| 36| +–––+––––––––––+ only showing top 5 rows

Adesso voglio contare quanti elementi ci sono nel DataFrame con la stessa key, così scopro per esempio che ci sono esattamente 5 31enni.

df.groupBy("eta").count().show(5)

+–––+–––––+ |eta|count| +–––+–––––+ | 31| 5| | 65| 9| | 53| 8| | 34| 10| | 28| 8| +–––+–––––+ only showing top 5 rows

Per la funzione agg devo specificare come argomento/i la/e funzione/i e su quale colonna ciascuna funzione deve essere applicata

df.groupBy("eta").agg(count("amici"),min("amici"), max("amici"), mean("amici")).show(5)

+–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |eta|count(amici)|min(amici)|max(amici)| avg(amici)| +–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 31| 5| 73| 419| 279.6| | 65| 9| 141| 469|328.22222222222223| | 53| 8| 42| 489| 322.25| | 34| 10| 53| 478| 278.7| | 28| 8| 36| 437| 249.0| +–––+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ only showing top 5 rows

Se volessimo sapere i dati relativi ai 30-enni potrei filtrare preventivamente usando la funzione filter

 

df.filter("eta == 30").show

+–––+–––––+ |eta|amici| +–––+–––––+ | 30| 148| | 30| 374| | 30| 153| | 30| 247| | 30| 56| | 30| 46| | 30| 352| +–––+–––––+

La funzione filter() è del tutto equivalente alla funzione where()

df.where("eta == 30").show

+–––+–––––+ |eta|amici| +–––+–––––+ | 30| 148| | 30| 374| | 30| 153| | 30| 247| | 30| 56| | 30| 46| | 30| 352| +–––+–––––+

Se voglio aggregare i dati relativi ai 30enni posso usare ancora la funzione agg

df.filter("eta == 30").agg(count("amici"),min("amici"), max("amici"), mean("amici")).shows

+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |count(amici)|min(amici)|max(amici)| avg(amici)| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 7| 46| 374|196.57142857142858| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+

Come argomento di filter posso usare gli operatori logici di SQL

df.filter("eta>=30 AND eta<=32").agg(count("amici"),min("amici"), max("amici"), mean("amici")).show

+––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ |count(amici)|min(amici)|max(amici)| avg(amici)| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+ | 13| 46| 419|232.84615384615384| +––––––––––––+––––––––––+––––––––––+––––––––––––––––––+

Vediamo di calcolare per ogni età il delta tra il numero massimo di amici e il numero minimo.
Nel codice sotto, ho fatto un groupBy(“eta”), con funzioni aggreganti min e max.

Dopodiché con il metodo withColumnRenamed ho rinominato le due colonne poiché non riesco ad usare i nomi di default (min(amici) e max(amici)).

Infine con la funzione select ho selezionato le colonne che mi interessano e ho definito una nuova colonna come (’maxAmici – ’minAmici).

Si noti che ho usato due diverse notazioni possibili per indicare il nome della colonna: ’nome-colonna oppure “nome-colonna”.

val df2 = df.groupBy("eta").agg(min('amici), max('amici))

val df3 = df2.withColumnRenamed("min(amici)", "minAmici").withColumnRenamed("max(amici)", "maxAmici")

df3.select('eta, 'minAmici, 'maxAmici, ('maxAmici - 'minAmici) as ("delta")).show(5)

+–––+––––––––+––––––––+–––––+ |eta|minAmici|maxAmici|delta| +–––+––––––––+––––––––+–––––+ | 31| 73| 419| 346| | 65| 141| 469| 328| | 53| 42| 489| 447| | 34| 53| 478| 425| | 28| 36| 437| 401| +–––+––––––––+––––––––+–––––+ only showing top 5 rows

df2: org.apache.spark.sql.DataFrame = [eta: int, min(amici): int … 1 more field]
df3: org.apache.spark.sql.DataFrame = [eta: int, minAmici: int … 1 more field]

 

Un altro modo per esprimere l’ultima riga del codice sopra è usando il metodo selectExpr.
Con l’argomento “*” indico che voglio tutte le righe del DataFrame originario.
E poi definisco tra virgolette la nuova colonna come differenze tra le atre due colonne maxAmici e minAmici.

La grossa differenza con select è che selectExpr consente di usare espressioni SQL.

df3.selectExpr( "*", "(maxAmici - minAmici) as delta" ).show(5)

+–––+––––––––+––––––––+–––––+ |eta|minAmici|maxAmici|delta| +–––+––––––––+––––––––+–––––+ | 31| 73| 419| 346| | 65| 141| 469| 328| | 53| 42| 489| 447| | 34| 53| 478| 425| | 28| 36| 437| 401| +–––+––––––––+––––––––+–––––+ only showing top 5 rows

A quale età si hanno in media più amici? Utilizziamo la funzione sort() con la specifica .desc

df.groupBy("eta").mean("amici").withColumnRenamed("avg(amici)", "avgAmici").sort('avgAmici.desc).show(5)

+–––+–––––––––––––––––+ |eta| avgAmici| +–––+–––––––––––––––––+ | 6|394.3333333333333| | 56|366.3333333333333| | 40| 344.25| | 12|339.6666666666667| | 21| 336.9| +–––+–––––––––––––––––+ only showing top 5 rows

Se voglio creare un nuovo DataFrame con le prime N righe dopo avere ordinato la colonna in ordine decrescente uso la funzione limit(N)

df.groupBy("eta").mean("amici").withColumnRenamed("avg(amici)", "avgAmici").sort('avgAmici.desc).limit(10).show()

+–––+––––––––––––––––––+ |eta| avgAmici| +–––+––––––––––––––––––+ | 6| 394.3333333333333| | 56| 366.3333333333333| | 40| 344.25| | 12| 339.6666666666667| | 21| 336.9| | 50| 332.4| | 65|328.22222222222223| | 53| 322.25| | 1| 321.6| | 37| 320.6| +–––+––––––––––––––––––+

 

Aggiungere una riga al Dataframe

Aggiungere una riga non è immediato, in quanto occorre creare un nuovo DataFrame con una riga e successivamente fare l’unione dei due DataFrame.

Per creare il nuovo DataFrame uso il metodo della SparkSession createDataFrame, che prende come argomenti un RDD composto da Row, e lo schema da seguire, per questo uso lo schema di df.

//filtriamo gli elementi con età 31 anni
val df2 = df.filter("eta == 31")
df2.show
//importo il tipo Row
import org.apache.spark.sql.Row
//creo un RDD di Row
val dataRDD = spark.sparkContext.parallelize(Seq(Row(31,100)))
//creo il nuovo DataFrame
val newdf = spark.createDataFrame(dataRDD, df.schema)
val df_31 = df2.union(newdf)
df_31.show

+–––+–––––+ |eta|amici| +–––+–––––+ | 31| 172| | 31| 408| | 31| 73| | 31| 326| | 31| 419| +–––+–––––+ +–––+–––––+ |eta|amici| +–––+–––––+ | 31| 172| | 31| 408| | 31| 73| | 31| 326| | 31| 419| | 31| 100| +–––+–––––+ df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [eta: int, amici: int]
import org.apache.spark.sql.Row
dataRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[96] at parallelize at <console>:37
newdf: org.apache.spark.sql.DataFrame = [eta: int, amici: int]
df_31: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [eta: int, amici: int]

Conclusione

Abbiamo visto come importare dei dati in un DataFrame e alcune delle operazioni che è possibile eseguire su questi. Abbiamo visto come aggregare i dati per ottenere delle informazioni aggiuntive.

Riferimenti

  1. How to use SparkSession in Apache Spark 2.0, da Databricks
  2. Dal sito di Apache Spark la guida alla programmazione

Leave a Reply

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>