Kumulatīvā stāvokļa pārveidošana Apache Spark straumēšanā



Šajā emuāra ziņā tiek apspriestas nozīmīgas izmaiņas Spark Streaming. Uzziniet visu par kumulatīvo izsekošanu un prasmju uzlabošanu Hadoop Spark karjerā.

Piedalās Prithviraj Bose

Savā iepriekšējā emuārā esmu apspriedis nozīmīgas transformācijas, izmantojot apache Spark Streaming dedzīgo koncepciju. Jūs to varat izlasīt šeit .





Šajā amatā es apspriedīšu Apache Spark Streaming kumulatīvās stāvokļa darbības. Ja esat jauns Spark Streaming lietotājs, es iesaku izlasīt manu iepriekšējo emuāru, lai saprastu, kā darbojas windowing.

Statiskas pārveidošanas veidi dzirksteļu straumēšanā (turpinājums ...)

> Kumulatīvā izsekošana

Mēs bijām izmantojuši reducByKeyAndWindow (…) API atslēgu stāvokļu izsekošanai, tomēr atsevišķos lietošanas gadījumos ierobežošana rada ierobežojumus. Ko darīt, ja mēs vēlamies visā laikā uzkrāt atslēgu stāvokļus, nevis ierobežot to tikai ar laika logu? Tādā gadījumā mums tas būtu jāizmanto updateStateByKey (…) UGUNS.



Šī API tika ieviesta Spark 1.3.0 versijā un ir bijusi ļoti populāra. Tomēr šai API ir zināma veiktspēja, tā veiktspēja pasliktinās, jo laika gaitā palielinās stāvokļu lielums. Esmu uzrakstījis paraugu, lai parādītu šīs API lietojumu. Jūs varat atrast kodu šeit .

Spark 1.6.0 ieviesa jaunu API mapWithState (…) kas atrisina izpildījuma pieskaitāmās izmaksas, ko rada updateStateByKey (…) . Šajā emuārā es apspriedīšu šo konkrēto API, izmantojot savas uzrakstītās programmas paraugu. Jūs varat atrast kodu šeit .

Pirms ienirstu koda ievadā, saudzēsim dažus vārdus par kontrolpunktu noteikšanu. Jebkurai valstiskai transformācijai kontrolpunkti ir obligāti. Kontrolpunktu noteikšana ir atslēgu stāvokļa atjaunošanas mehānisms, ja draivera programma neizdodas. Kad draiveris restartējas, atslēgu stāvoklis tiek atjaunots no kontrolpunktu failiem. Kontrolpunkta atrašanās vietas parasti ir HDFS vai Amazon S3 vai jebkura uzticama krātuve. Pārbaudot kodu, to var arī saglabāt vietējā failu sistēmā.



Paraugprogrammā mēs klausāmies ligzdas teksta straumi vietnē host = localhost un port = 9999. Tas simbolizē ienākošo straumi (vārdi, gadījumu skaits) un izseko vārdu skaitu, izmantojot 1.6.0 API mapWithState (…) . Turklāt atslēgas bez atjauninājumiem tiek noņemtas, izmantojot StateSpec. Timeout API. Mēs kontrolējam HDFS, un kontrolpunktu biežums notiek ik pēc 20 sekundēm.

kas ir charat java

Vispirms izveidosim Spark Streaming sesiju,

Spark-streaming-session

Mēs izveidojam a kontrolpunktsDir HDFS un pēc tam izsauciet objekta metodi getOrCreate (…) . The getOrCreate API pārbauda kontrolpunktsDir lai redzētu, vai ir kādi iepriekšējie stāvokļi, kurus atjaunot, ja tādi pastāv, tas atjauno Spark Streaming sesiju un atjaunina atslēgu stāvokļus no failos saglabātajiem datiem, pirms pāriet ar jauniem datiem. Pretējā gadījumā tas izveido jaunu Spark Streaming sesiju.

The getOrCreate ņem kontrolpunkta direktorijas nosaukumu un funkciju (kuru esam nosaukuši createFunc ) kura parakstam jābūt () => StreamingContext .

Pārbaudīsim kodu iekšpusē createFunc .

2. rindiņa: mēs izveidojam straumēšanas kontekstu ar darba nosaukumu uz “TestMapWithStateJob” un partijas intervālu = 5 sekundes.

5. rindiņa: iestatiet kontrolpunkta direktoriju.

8. rinda: iestatiet stāvokļa specifikāciju, izmantojot klasi org.apache.streaming.StateSpec objekts. Vispirms mēs iestatām funkciju, kas izsekos stāvokli, pēc tam mēs iestatām iegūto DStream nodalījumu skaitu, kas tiks ģenerēti turpmāko pārveidojumu laikā. Visbeidzot, mēs iestatījām taimautu (līdz 30 sekundēm). Ja kāds atjauninājums atslēgai netiek saņemts 30 sekunžu laikā, atslēgas stāvoklis tiks noņemts.

12. rindiņa #: iestatiet kontaktligzdas straumi, izlīdziniet ienākošos pakešdatus, izveidojiet atslēgu un vērtību pāri, zvaniet mapWithState , iestatiet kontrolpunktu intervālu uz 20s un beidzot izdrukājiet rezultātus.

Spark ietvars aicina th e createFunc katram taustiņam ar iepriekšējo vērtību un pašreizējo stāvokli. Mēs aprēķinām summu un atjauninām stāvokli ar kumulatīvo summu, un visbeidzot mēs atdodam atslēgu.

ko java nozīmē pielikums

Gituba avoti -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Vai mums ir jautājums? Lūdzu, pieminējiet to komentāru sadaļā, un mēs ar jums sazināsimies.

Saistītās ziņas:

Sāciet darbu ar Apache Spark & ​​Scala

Statiskas pārvērtības ar dzīšanu dzirksteļu straumēšanā