RDD, izmantojot Spark: Apache Spark celtniecības bloks



Šis emuārs par RDD, izmantojot Spark, sniegs jums detalizētas un visaptverošas zināšanas par RDD, kas ir Spark & ​​How noderīga pamatvienība.

, Pats vārds ir pietiekams, lai radītu dzirksti katra Hadoop inženiera prātā. TO n atmiņā apstrādes rīks kas ir zibens ātrs kopu skaitļošanā. Salīdzinot ar MapReduce, atmiņā esošo datu koplietošana padara RDD 10-100x ātrāk nekā tīkla un diska koplietošana, un tas viss ir iespējams RDD (elastīgo izplatīto datu kopu) dēļ. Galvenie punkti, uz kuriem mēs šodien koncentrējamies šajā RDD, izmantojot Spark rakstu, ir:

Nepieciešamība pēc RDD?

Kāpēc mums ir nepieciešama RDD? -RDD, izmantojot Spark





Pasaule attīstās līdz ar un Datu zinātne gada progresa dēļ . Algoritmi balstoties uz Regresija , , un kas darbojas tālāk Izplatīts Iteratīvs aprēķins cijas mode, kas ietver datu atkārtotu izmantošanu un koplietošanu starp vairākām skaitļošanas vienībām.

Tradicionālais nepieciešamas stabilas starpposma un izplatītas krātuves, piemēram, HDFS kas sastāv no atkārtotiem aprēķiniem ar datu replikācijām un datu serializāciju, kas procesu padarīja daudz lēnāku. Atrast risinājumu nekad nebija viegli.



Tas ir kur RDD (Elastīgās izplatītās datu kopas) nonāk kopainā.

RDD S ir viegli lietojami un tos ir viegli izveidot, jo dati tiek importēti no datu avotiem un tiek ievietoti RDD. Tālāk operācijas tiek izmantotas to apstrādei. Tie ir a izplatīta atmiņas kolekcija ar atļaujām kā Tikai lasīt un pats galvenais, ka viņi ir Izturīgs pret kļūdām .



Ja kāds datu nodalījums gada RDD ir zaudēja , to var atjaunot, piemērojot to pašu transformācija operācija ar šo zaudēto nodalījumu cilts , nevis apstrādāt visus datus no nulles. Šāda pieeja reālā laika scenārijos var radīt brīnumus datu zaudēšanas situācijās vai tad, kad sistēma nedarbojas.

Kas ir RDD?

RDD vai ( Elastīga izplatīto datu kopa ) ir fundamentāls datu struktūra dzirkstelē. Termiņš Elastīgs definē spēju, kas automātiski ģenerē datus vai datus ritošā atpakaļ uz sākotnējais stāvoklis kad notiek neparedzēta nelaime ar datu zaudēšanas varbūtību.

RDD ierakstītie dati ir sadalīts un glabājas vairāki izpildāmi mezgli . Ja izpildes mezgls neizdodas izpildes laikā, tad tas uzreiz iegūst dublējumu no nākamais izpildāmais mezgls . Tāpēc RDD tiek uzskatīti par uzlabotu datu struktūru tipu, salīdzinot ar citām tradicionālajām datu struktūrām. RDD var uzglabāt strukturētus, nestrukturētus un daļēji strukturētus datus.

Pārejam uz priekšu ar mūsu RDD, izmantojot Spark emuāru, un uzzināsim par RDD unikālajām īpašībām, kas tam dod priekšrocības salīdzinājumā ar cita veida datu struktūrām.

RDD iezīmes

  • Atmiņā (RAM) Aprēķini : Atmiņā esošās skaitļošanas jēdziens datu apstrādi noved pie ātrāka un efektīvāka posma, kur kopumā sniegumu sistēmas ir modernizēts.
  • L viņa novērtējums : Termins Slink vērtējums saka pārvērtības tiek lietoti datiem RDD, bet izeja netiek ģenerēta. Tā vietā piemērotās transformācijas ir pieteicies.
  • Neatlaidība : Rezultāti RDD vienmēr atkārtoti lietojams.
  • Rupjgraudainas operācijas : Lietotājs var veikt pārveidojumus visiem datu kopu elementiem, izmantojot karte, filtru vai grupēt pēc operācijas.
  • Vainas tolerants : Ja tiek zaudēti dati, sistēma var ritiniet atpakaļ pie tā sākotnējais stāvoklis izmantojot reģistrēto pārvērtības .
  • Nemainīgums : Nevar definēt, iegūt vai izveidot datus mainīts kad tas ir pieteicies sistēmā. Gadījumā, ja jums ir nepieciešams piekļūt esošajam RDD un modificēt to, jums ir jāizveido jauns RDD, izmantojot kopu Pārvērtības darbojas uz pašreizējo vai iepriekšējo RDD.
  • Sadalīšana : Tas ir izšķiroša vienība paralēlisms Sparkā RDD. Pēc noklusējuma izveidoto nodalījumu skaits ir balstīts uz jūsu datu avotu. Jūs pat varat izlemt, cik nodalījumu vēlaties izmantot pasūtījuma nodalījums funkcijas.

RDD izveide, izmantojot Spark

RDD var izveidot trīs veidi:

metodes pārslodze un ignorēšana Java

  1. Datu lasīšana no paralēlās kolekcijas
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ). foreach (println)
  1. Pieteikšanās transformācija par iepriekšējiem RDD
val vārdi = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'ļoti', 'spēcīgs', 'valoda')) val wordpair = vārdi.map (w = (w.charAt 0), w)) wordpair.collect (). Foreach (println)
  1. Datu lasīšana no ārējā krātuve vai failu ceļi, piemēram, HDFS vai HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Darbības, kas veiktas ar RDD:

RDD galvenokārt tiek veiktas divu veidu darbības, proti:

  • Pārvērtības
  • Darbības

Pārvērtības : The operācijas mēs izmantojam RDD uz filtrs, piekļuve un modificēt vecāku RDD datus, lai ģenerētu a pēc kārtas RDD tiek saukts transformācija . Jaunais RDD atgriež rādītāju iepriekšējam RDD, nodrošinot starp tiem atkarību.

Pārvērtības ir Slinki vērtējumi, citiem vārdiem sakot, RDD izmantotās darbības, kurās strādājat, tiks reģistrētas, taču netiks reģistrētas izpildīts. Pēc sistēmas aktivizēšanas sistēma izsaka rezultātu vai izņēmumu Darbība .

Mēs varam sadalīt transformācijas divos veidos, kā norādīts zemāk:

  • Šauras pārvērtības
  • Plašas pārvērtības

Šauras pārvērtības Mēs pielietojam šauras transformācijas uz a viens nodalījums vecāka RDD, lai ģenerētu jaunu RDD, jo dati, kas nepieciešami RDD apstrādei, ir pieejami vienā vecāku ASD . Šauru pārveidojumu piemēri ir:

  • karte ()
  • filtrs ()
  • flatMap ()
  • nodalījums ()
  • mapPartitions ()

Plašas pārvērtības: Mēs izmantojam plašo pārveidojumu vairākas starpsienas lai ģenerētu jaunu RDD. RDD apstrādei nepieciešamie dati ir pieejami vairākos vecāku ASD . Plašu pārveidojumu piemēri ir:

  • samazinātBy ()
  • savienība ()

Darbības : Darbības liek Apache Spark piemērot aprēķins un nodot rezultātu vai izņēmumu atpakaļ vadītāja RDD. Dažas no darbībām ietver:

  • savākt ()
  • skaitīt ()
  • ņemt ()
  • pirmais ()

Lietosim praktiski darbības ar RDD:

IPL (Indijas premjerlīga) ir kriketa turnīrs ar visaugstāko līmeni. Tātad, ļaujiet mums šodien pievērsties IPL datu kopai un izpildīt mūsu RDD, izmantojot Spark.

  • Pirmkārt, lejupielādēsim IPL CSV atbilstības datus. Pēc lejupielādes tas sāk izskatīties kā EXCEL fails ar rindām un kolonnām.

Nākamajā solī mēs aktivizējam dzirksteli un ielādējam match.csv failu no tā atrašanās vietas, manā gadījumā manacsvfaila atrašanās vieta ir “/User/edureka_566977/test/matches.csv”

Tagad sāksim ar Pārvērtības pirmā daļa:

  • karte ():

Mēs izmantojam Kartes pārveidošana katram RDD elementam piemērot īpašu pārveidošanas darbību. Šeit mēs izveidojam RDD ar nosaukumu CKfile kur glabājam mūsucsvfailu. Mēs izveidosim vēl vienu RDD ar aicinājumu uz valstīm glabājiet pilsētas datus .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println) val stāvokļi = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtrs ():

Filtru pārveidošana, nosaukums pats par sevi raksturo tā izmantošanu. Mēs izmantojam šo pārveidošanas operāciju, lai filtrētu selektīvos datus no doto datu kopas. Mēs piesakāmies filtra darbība šeit, lai iegūtu gada IPL spēļu ierakstus 2017. gads un glabājiet to failā RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Mēs izmantojam flatMap ir transformācijas darbība katram RDD elementam, lai izveidotu jaunu RDD. Tas ir līdzīgs kartes transformācijai. šeit mēs piesakāmiesPlakanā karteuz izspļaut Hyderabad pilsētas mačus un saglabājiet datusfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). savākt ()

  • nodalījums ():

Visi dati, kurus mēs ierakstām RDD, tiek sadalīti noteiktā partīciju skaitā. Mēs izmantojam šo pārveidojumu, lai atrastu starpsienu skaits dati faktiski tiek sadalīti.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Mēs uzskatām MapPatitions par Map () unkatram() kopā. Mēs šeit izmantojam mapPartitions, lai atrastu rindu skaits mums ir mūsu failā RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Masīvs (idx.size) .iterator) .collect

  • samazinātBy ():

Mēs izmantojamReduceBy() ieslēgts Atslēgu un vērtību pāri . Mēs izmantojām šo transformāciju savācsvfailu, lai atrastu atskaņotāju ar mača augstākais cilvēks .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (nepatiesa) ManOTH.take (10) .foreach (println)

  • savienība ():

Nosaukums visu izskaidro, Mēs izmantojam arodbiedrību pārveidošanu klubā divi RDD kopā . Šeit mēs izveidojam divus RDD, proti, fil un fil2. fil RDD satur 2017. gada IPL spēļu ierakstus, bet fil2 RDD - 2016. gada IPL spēļu ierakstus.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Sāksim ar Darbība daļa, kurā parādām faktisko produkciju:

  • savākt ():

Kolekcija ir darbība, kuru mēs izmantojam parādīt saturu RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.collect.foreach (println)

  • skaits ():

Skaitītir darbība, kuru mēs izmantojam, lai skaitītu ierakstu skaits klāt RDD.Šeitmēs izmantojam šo darbību, lai saskaitītu kopējo ierakstu skaitu mūsu match.csv failā.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') CKfile.count ()

  • ņemt ():

Veikt ir darbība, kas ir līdzīga savākšanai, bet vienīgā atšķirība ir tā, ka tā var izdrukāt jebkuru selektīvais rindu skaits atbilstoši lietotāja pieprasījumam. Šeit mēs izmantojam šādu kodu, lai izdrukātu desmit vadošo ziņojumu top.

val statecountm = Scount.reduceByKey ((x, y) => x + y). karte (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. ņemt (10). foreach (println)

  • pirmais ():

Pirmais () ir darbības darbība, kas līdzīga savākšanai () un ņemšanai ()toizmanto, lai izdrukātu augšējo atskaiti s izvadi Šeit mēs izmantojam pirmo () darbību, lai atrastu maksimālais spēlēto spēļu skaits noteiktā pilsētā un mēs iegūstam Mumbaju kā rezultātu.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / match.csv') val stāvokļi = CKfile.map (_. split (',') (2)) val Scount = stāvokļi.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach [println] val statecountm = Scount.reduceByKey ((x, y) => x + y). karte (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Lai padarītu mūsu procesu par RDD mācīšanos, izmantojot Spark, vēl interesantāku, es esmu izdomājis interesantu lietošanas gadījumu.

RDD, izmantojot Spark: Pokemon lietošanas gadījumu

  • Pirmkārt, Lejupielādēsim Pokemon.csv failu un ielādēsim to dzirksteles apvalkā tāpat kā failā Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemoni faktiski ir pieejami daudzos veidos. Atrodiet dažas šķirnes.

kāda ir atšķirība starp html un xml
  • Shēmas noņemšana no faila Pokemon.csv

Mums tas varētu nebūt vajadzīgs Shēma no Pokemon.csv faila. Tādējādi mēs to noņemam.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Atrast numuru starpsienas mūsu pokemon.csv ir sadalīts.
println ('Partition No. =' + NoHeader.partitions.size)

  • Ūdens Pokemons

Atrodot Ūdens pokemona skaits

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Uguns Pokemons

Atrodot Uguns pokemonu skaits

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Mēs varam arī atklāt populācija cita veida pokemon, izmantojot skaitīšanas funkciju
WaterRDD.count () FireRDD.count ()

  • Tā kā man patīk spēle aizsardzības stratēģija ļaujiet mums atrast pokemonu ar maksimāla aizsardzība.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Mēs zinām maksimumu aizsardzības spēka vērtība bet mēs nezinām, kurš pokemons tas ir. Tātad, ļaujiet mums atrast, kas ir tas pokemons.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Pasūtīšana [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Tagad sakārtosim pokemonu ar vismazāk Aizsardzība
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5). foreach (println)

  • Tagad ļaujiet mums redzēt Pokemon ar mazāk aizsardzības stratēģija.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName .karte {x => x.split (',')}. karte {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Pasūtīšana [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Tātad, līdz ar to mēs esam nonākuši šajā RDD, izmantojot Spark rakstu. Es ceru, ka mēs nedaudz izgaismojām jūsu zināšanas par RDD, to funkcijām un dažāda veida darbībām, kuras ar tām var veikt.

Šis raksts ir balstīts uz ir paredzēts, lai sagatavotu jūs Cloudera Hadoop un Spark Developer sertifikācijas eksāmenam (CCA175). Jūs iegūsiet padziļinātas zināšanas par Apache Spark un Spark ekosistēmu, kas ietver Spark RDD, Spark SQL, Spark MLlib un Spark Streaming. Jūs iegūsiet visaptverošas zināšanas par Scala programmēšanas valodu, HDFS, Sqoop, Flume, Spark GraphX ​​un ziņojumapmaiņas sistēmu, piemēram, Kafka.