... utilizzare il metodo filter sui dataframe di Apache Spark

I dataframe di Apache Spark dispongono del metodo filter che consente di filtrare le righe che soddisfano una determinata condizione.
Il risultato di filter è un nuovo dataframe in cui sono presenti solo le righe che soddisfano la condizione passata a filter.
Si tratta di una trasformazione per cui è eseguita in modo lazy.
In realtà si tratta di più metodi filter in quanto ci sono diverse signature.
In questo post voglio esplorare i diversi modi di usare il filter di un dataframe.

FILTER signature #1

La funzione filter ha diverse signature, questo significa che la funzione puà essere chiamata con diversi parametri.

La prima funzione è:
def filter(func: (T) ⇒ Boolean): Dataset[T]

In questo caso devo passare una funzione che ritorna un valore di tipo boolean.

La funzione può essere definita esplicitamente (def funzione(A): Boolean) oppure può essere una funzione anonima.

Esempio #1:

usiamo una funzione anonima. La funzione che passo è: x => x>20 in cui ogni elemento viene confrontato con il numero 20. Il risultato è ovviamente true o false.

spark.range(1, 25).filter( x => x>20 ).show

+−−−+
| id|
+−−−+
| 21|
| 22|
| 23|
| 24|
+−−−+


Esempio #2

Posso applicare la funzione di filter ad ogni elemento della riga usando la notazione x._k per indicare il k-esimo elemento di x

val df = spark.range(1,25).map(x => (x, x*x))
df.filter(a => a._1>10).show(5)

+−−−+−−−+
| _1| _2|
+−−−+−−−+
| 11|121|
| 12|144|
| 13|169|
| 14|196|
| 15|225|
+−−−+−−−+
only showing top 5 rows

df: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: bigint, _2: bigint]

Esempio #3

Definiamo la funzione esplicitamente.

def funz1(a: (Long, Long)): Boolean = a._2>10

val df = spark.range(1,25).map(x => (x.toLong, (x*x).toLong)) 

df.filter(funz1 _).show(5)

+−−−+−−−+
| _1| _2|
+−−−+−−−+
|  4| 16|
|  5| 25|
|  6| 36|
|  7| 49|
|  8| 64|
+−−−+−−−+
only showing top 5 rows

funz1: (a: (Long, Long))Boolean
df: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: bigint, _2: bigint]

FILTER signature #2

Vediamo la seconda signature

def filter(conditionExpr: String): Dataset[T]

Non è molto chiaro cosa sia conditionExpr, però si capisce che è una stringa, quindi la condizione deve essere espressa come stringa.

val df = spark.range(1,25).map(x => (x.toLong, (x*x).toLong)).toDF("numero", "quadrato")

df.filter("quadrato > 100").show(5)

+−−−−−−+−−−−−−−−+
|numero|quadrato|
+−−−−−−+−−−−−−−−+
|    11|     121|
|    12|     144|
|    13|     169|
|    14|     196|
|    15|     225|
+−−−−−−+−−−−−−−−+
only showing top 5 rows

df: org.apache.spark.sql.DataFrame = [numero: bigint, quadrato: bigint]

FILTER signature #3

Infine l’ultima signature:

def filter(condition: Column): Dataset[T]

Questo metodo si aspetta una colonna come condizione.

df.filter($"numero" > 20).show

+−−−−−−+−−−−−−−−+
|numero|quadrato|
+−−−−−−+−−−−−−−−+
|    21|     441|
|    22|     484|
|    23|     529|
|    24|     576|
+−−−−−−+−−−−−−−−+


oppure nel formato che utilizza i metodi della classe org.apache.spark.sql.Column:
gt: greater than
lt: less than
equalsTo: equal
notequal: not equal
leq: less than or equal
geq: greater than or equal
e altre che si possono trovare nella API di Column

df.filter($"quadrato".equalTo(121)).show

+−−−−−−+−−−−−−−−+
|numero|quadrato|
+−−−−−−+−−−−−−−−+
|    11|     121|
+−−−−−−+−−−−−−−−+


Leave a Reply

Il tuo indirizzo email non sarà pubblicato. I campi obbligatori sono contrassegnati *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>