I dataframe di Apache Spark dispongono del metodo filter che consente di filtrare le righe che soddisfano una determinata condizione.
Il risultato di filter è un nuovo dataframe in cui sono presenti solo le righe che soddisfano la condizione passata a filter.
Si tratta di una trasformazione per cui è eseguita in modo lazy.
In realtà si tratta di più metodi filter in quanto ci sono diverse signature.
In questo post voglio esplorare i diversi modi di usare il filter di un dataframe.
FILTER signature #1
La funzione filter ha diverse signature, questo significa che la funzione puà essere chiamata con diversi parametri.
La prima funzione è:def filter(func: (T) ⇒ Boolean): Dataset[T]
In questo caso devo passare una funzione che ritorna un valore di tipo boolean.
La funzione può essere definita esplicitamente (def funzione(A): Boolean) oppure può essere una funzione anonima.
Esempio #1:
usiamo una funzione anonima. La funzione che passo è: x => x>20
in cui ogni elemento viene confrontato con il numero 20. Il risultato è ovviamente true o false.
spark.range(1, 25).filter( x => x>20 ).show
+−−−+ | id| +−−−+ | 21| | 22| | 23| | 24| +−−−+
Esempio #2
Posso applicare la funzione di filter ad ogni elemento della riga usando la notazione x._k per indicare il k-esimo elemento di x
val df = spark.range(1,25).map(x => (x, x*x)) df.filter(a => a._1>10).show(5)
+−−−+−−−+ | _1| _2| +−−−+−−−+ | 11|121| | 12|144| | 13|169| | 14|196| | 15|225| +−−−+−−−+ only showing top 5 rows df: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: bigint, _2: bigint]
Esempio #3
Definiamo la funzione esplicitamente.
def funz1(a: (Long, Long)): Boolean = a._2>10 val df = spark.range(1,25).map(x => (x.toLong, (x*x).toLong)) df.filter(funz1 _).show(5)
+−−−+−−−+ | _1| _2| +−−−+−−−+ | 4| 16| | 5| 25| | 6| 36| | 7| 49| | 8| 64| +−−−+−−−+ only showing top 5 rows funz1: (a: (Long, Long))Boolean df: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: bigint, _2: bigint]
FILTER signature #2
Vediamo la seconda signature
def filter(conditionExpr: String): Dataset[T]
Non è molto chiaro cosa sia conditionExpr, però si capisce che è una stringa, quindi la condizione deve essere espressa come stringa.
val df = spark.range(1,25).map(x => (x.toLong, (x*x).toLong)).toDF("numero", "quadrato") df.filter("quadrato > 100").show(5)
+−−−−−−+−−−−−−−−+ |numero|quadrato| +−−−−−−+−−−−−−−−+ | 11| 121| | 12| 144| | 13| 169| | 14| 196| | 15| 225| +−−−−−−+−−−−−−−−+ only showing top 5 rows df: org.apache.spark.sql.DataFrame = [numero: bigint, quadrato: bigint]
FILTER signature #3
Infine l’ultima signature:
def filter(condition: Column): Dataset[T]
Questo metodo si aspetta una colonna come condizione.
df.filter($"numero" > 20).show
+−−−−−−+−−−−−−−−+ |numero|quadrato| +−−−−−−+−−−−−−−−+ | 21| 441| | 22| 484| | 23| 529| | 24| 576| +−−−−−−+−−−−−−−−+
oppure nel formato che utilizza i metodi della classe org.apache.spark.sql.Column:
– gt: greater than
– lt: less than
– equalsTo: equal
– notequal: not equal
– leq: less than or equal
– geq: greater than or equal
e altre che si possono trovare nella API di Column
df.filter($"quadrato".equalTo(121)).show
+−−−−−−+−−−−−−−−+ |numero|quadrato| +−−−−−−+−−−−−−−−+ | 11| 121| +−−−−−−+−−−−−−−−+