DBInputFormat datu pārsūtīšanai no SQL uz NoSQL datu bāzi



Šī emuāra mērķis ir uzzināt, kā pārsūtīt datus no SQL datu bāzēm uz HDFS, kā pārsūtīt datus no SQL datu bāzēm uz NoSQL datu bāzēm.

Šajā emuārā mēs izpētīsim viena no vissvarīgākajām Hadoop tehnoloģijas sastāvdaļām, t.i., MapReduce, iespējas un iespējas.

Šodien uzņēmumi pieņem Hadoop sistēmu kā savu pirmo izvēli datu glabāšanai, jo tā spēj efektīvi apstrādāt lielus datus. Bet mēs arī zinām, ka dati ir daudzpusīgi un pastāv dažādās struktūrās un formātos. Lai kontrolētu tik milzīgu datu daudzveidību un to dažādos formātus, vajadzētu būt mehānismam, kas piemērotu visas šķirnes un tomēr nodrošinātu efektīvu un konsekventu rezultātu.





Spēcīgākais Hadoop ietvara komponents ir MapReduce, kas var nodrošināt datu un to struktūras kontroli labāk nekā citi kolēģi. Lai gan tam ir vajadzīgas mācīšanās līknes un programmēšanas sarežģītība, ja jūs varat tikt galā ar šīm sarežģītībām, jūs noteikti varat apstrādāt jebkāda veida datus, izmantojot Hadoop.

MapReduce framework sadala visus apstrādes uzdevumus būtībā divos posmos: Map un Reduce.



Lai sagatavotu neapstrādātus datus šīm fāzēm, ir nepieciešama dažu pamatklasi un saskarņu izpratne. Šīs pārstrādes superklase ir InputFormat.

The InputFormat klase ir viena no Hadoop MapReduce API pamatklasēm. Šī klase ir atbildīga par divu galveno lietu noteikšanu:

  • Dati sadalās
  • Ierakstu lasītājs

Datu sadalīšana ir Hadoop MapReduce ietvarstruktūra, kas nosaka gan atsevišķu karšu uzdevumu lielumu, gan tā iespējamo izpildes serveri. The Ierakstu lasītājs ir atbildīgs par faktisko ierakstu nolasīšanu no ievades faila un to (kā atslēgu / vērtību pāru) iesniegšanu kartētājā.



Kartētāju skaits tiek noteikts, pamatojoties uz sadalījumu skaitu. InputFormat uzdevums ir izveidot sadalījumus. Lielāko daļu laika sadalījuma lielums ir vienāds ar bloka lielumu, taču ne vienmēr sadalījumi tiks izveidoti, pamatojoties uz HDFS bloku lielumu. Tas pilnībā ir atkarīgs no tā, kā jūsu InputFormat metode getSplits () ir ignorēta.

Ir būtiska atšķirība starp MR sadalījumu un HDFS bloku. Bloks ir fiziska datu daļa, bet sadalīšana ir tikai loģiska daļa, kuru kartētājs lasa. Sadalījumā nav ievaddatu, tajā ir tikai datu atsauce vai adrese. Sadalījumam pamatā ir divas lietas: garums baitos un glabāšanas vietu kopums, kas ir tikai virknes.

Lai to labāk saprastu, ņemsim vienu piemēru: MySQL saglabāto datu apstrāde, izmantojot MR. Tā kā šajā gadījumā nav bloku jēdziena, teorija: “sadalījumi vienmēr tiek veidoti, pamatojoties uz HDFS bloku”,neizdodas. Viena no iespējām ir izveidot sadalījumus, pamatojoties uz MySQL tabulas rindu diapazoniem (un to dara DBInputFormat, ievades formāts datu nolasīšanai no relāciju datu bāzēm). Mums var būt k sadalījumu skaits, kas sastāv no n rindām.

Tikai InputFormats, kuru pamatā ir FileInputFormat (InputFormat failos saglabātu datu apstrādei), tiek izveidoti sadalījumi, pamatojoties uz kopējo ievades failu lielumu baitos. Tomēr ievades failu bloka izmērs FileSystem tiek uzskatīts par ievades sadalīšanas augšējo robežu. Ja jums ir fails, kas ir mazāks par HDFS bloka lielumu, par šo failu saņemsiet tikai 1 kartētāju. Ja vēlaties rīkoties citādi, varat izmantot mapred.min.split.size. Bet tas atkal ir atkarīgs tikai no jūsu InputFormat getSplits ().

Mums ir tik daudz jau esošu ievades formātu, kas pieejami pakotnē org.apache.hadoop.mapreduce.lib.input.

CombineFileInputFormat.html

noklusējuma vērtība java

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Noklusējums ir TextInputFormat.

Līdzīgi mums ir tik daudz izvades formātu, kas nolasa reduktoru datus un saglabā tos HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Noklusējums ir TextOutputFormat.

Kad esat beidzis lasīt šo emuāru, jūs būtu uzzinājis:

  • Kā uzrakstīt kartes samazināšanas programmu
  • Par dažādiem Mapreduce pieejamajiem InputFormats veidiem
  • Kāda ir InputFormats vajadzība
  • Kā rakstīt pielāgotus InputFormats
  • Kā pārsūtīt datus no SQL datu bāzēm uz HDFS
  • Kā pārsūtīt datus no SQL (šeit MySQL) datu bāzēm uz NoSQL datu bāzēm (šeit Hbase)
  • Kā pārsūtīt datus no vienas SQL datu bāzes uz citu tabulu SQL datu bāzēs (Varbūt tas var nebūt tik svarīgi, ja mēs to darām vienā un tajā pašā SQL datu bāzē. Tomēr nav nekas nepareizs, ja jums ir zināšanas par to pašu. Jūs nekad nevarat zināt kā to var sākt lietot)

Priekšnosacījums:

  • Hadoop ir iepriekš instalēts
  • SQL ir iepriekš instalēts
  • Hbase ir iepriekš instalēta
  • Java pamata izpratne
  • MapReduce zināšanas
  • Hadoop ietvara pamatzināšanas

Sapratīsim problēmas izklāstu, kuru mēs šeit atrisināsim:

Mums ir darbinieku tabula MySQL DB mūsu relāciju datu bāzē Edureka. Tagad atbilstoši biznesa prasībām mums visi relatīvajā DB pieejamie dati ir jāpārvieto uz Hadoop failu sistēmu, ti, HDFS, NoSQL DB, kas pazīstams kā Hbase.

Mums ir daudz iespēju veikt šo uzdevumu:

  • Sqoop
  • Flume
  • MapReduce

Tagad jūs nevēlaties instalēt un konfigurēt citus rīkus šai darbībai. Jums paliek tikai viena opcija, kas ir Hadoop apstrādes sistēma MapReduce. MapReduce ietvars ļaus jums pilnībā kontrolēt datus pārsūtīšanas laikā. Jūs varat manipulēt ar kolonnām un ievietot tieši jebkurā no divām mērķa vietām.

Piezīme:

  • Lai ielādētu tabulas no MySQL tabulas, mums ir jāielādē un jāievieto MySQL savienotājs Hadoop klases ceļā. Lai to izdarītu, lejupielādējiet savienotāju com.mysql.jdbc_5.1.5.jar un saglabājiet to direktorijā Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Lejupielādes / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Tāpat, lai jūsu MR programma piekļūtu Hbase, ielieciet visas Hbase burkas zem Hadoop klases ceļa. Lai to izdarītu, izpildiet šādu komandu :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Programmatūras versijas, kuras esmu izmantojis šī uzdevuma izpildē, ir:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Aptumsuma Mēness

Lai izvairītos no programmas jebkādās saderības problēmās, es lasītājiem lieku palaist komandu ar līdzīgu vidi.

Pielāgota DBInputWritable:

pakete com.inputFormat.copy importēt java.io.DataInput importēt java.io.DataOutput importēt java.io.IOException importēt java.sql.ResultSet importēt java.sql.PreparedStatement importēt java.sql.SQLException importēt org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implement Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // Objekts Rezultātu kopa atspoguļo datus, kas atgriezti no SQL priekšraksta {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throws IOException { } public void write (PreparedStatement ps) izmet SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} publiskā virkne getDept () {return dept}}

Pielāgots DBOutputWritable:

pakete com.inputFormat.copy importēt java.io.DataInput importēt java.io.DataOutput importēt java.io.IOException importēt java.sql.ResultSet importēt java.sql.PreparedStatement importēt java.sql.SQLException importēt org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implement Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = nosaukums this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement ps) izmet SQLException {ps.setString (1, nosaukums) ps.setInt (2, id) ps.setString (3, dept)}}

Ievades tabula:

izveidot datubāzi edureka
izveidot tabulas emp (empid int not null, nosaukums varchar (30), dept varchar (20), primārā atslēga (empid))
ievietot emp vērtībās (1, 'abhay', 'attīstība'), (2, 'brundesh', 'test')
atlasiet * no emp

1. gadījums: pārsūtīšana no MySQL uz HDFS

pakete com.inputFormat.copy importēt java.net.URI importēt org.apache.hadoop.conf.Configuration importēt org.apache.hadoop.fs.FileSystem importēt org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Darba importa org.apache.hadoop.mapreduce.lib.db.DBConfiguration importēšanas org.apache.hadoop.mapreduce.lib.db.DBInputFormat importēšanas org.apache.hadoop.mapreduce.lib.output.FileOutputFormat importēšanas org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // draivera klase' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // lietotāja vārds' root ') // parole Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormset.utormFutPathPathPathPathPath.utFormat.clorm) FileOut jauns ceļš (args [0])) DBInputFormat.setInput (darbs, DBInputWritable.class, 'emp', // ievades tabulas nosaukums null, null, jauna virkne [] {'empid', 'name', 'dept'} / / tabulas kolonnas) Ceļš p = jauns ceļš (args [0]) FileSystem fs = FileSystem.get (jauns URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Šis koda fragments ļauj mums sagatavot vai konfigurēt ievades formātu, lai piekļūtu mūsu avota SQL DB. Parametrs ietver draivera klasi, URL ir SQL datu bāzes adrese, tā lietotājvārds un parole.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draivera klase 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // lietotāja vārds 'root') // parole

Šis koda gabals ļauj mums nodot sīkāku informāciju par datu bāzē esošajām tabulām un iestatīt to darba objektā. Parametri, protams, ietver darba instanci, pielāgoto rakstāmo klasi, kurai jāievieš DBWritable saskarne, avota tabulas nosaukumu, nosacījumu, ja vēl kāds ir null, jebkurus šķirošanas parametrus else null, attiecīgi tabulas kolonnu sarakstu.

DBInputFormat.setInput (darbs, DBInputWritable.class, 'emp', // ievades tabulas nosaukums null, null, jauna virkne [] {'empid', 'name', 'dept'} // tabulas kolonnas)

Kartētājs

pakete com.inputFormat.copy importēt java.io.IOException import org.apache.hadoop.mapreduce.Mapper importēt org.apache.hadoop.io.LongWritable importēt org.apache.hadoop.io.Text importēt org.apache.hadoop.io .IntWritable sabiedrības klases karte paplašina Mapper {
aizsargāta tukšuma karte (LongWritable atslēga, DBInputWritable vērtība, konteksta ctx) {mēģiniet {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (jauns teksts (nosaukums + '' + id + '' + dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduktors: izmantots identitātes reduktors

Komanda palaist:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Izeja: MySQL tabula pārsūtīta uz HDFS

hadoop dfs -ls / dbtohdfs / *

2. gadījums: pārsūtīšana no vienas MySQL tabulas uz citu MySQL

izejas tabulas izveidošana MySQL

izveidot tabulas darbinieku1 (nosaukums varchar (20), id int, dept varchar (20))

pakete com.inputFormat.copy import org.apache.hadoop.conf.Configuration importēt org.apache.hadoop.mapreduce.Job importēt org.apache.hadoop.mapreduce.lib.db.DBConfiguration importēt org.apache.hadoop.mapreduce.lib .db.DBInputFormat importēt org.apache.hadoop.mapreduce.lib.db.DBOutputFormat importēt org.apache.hadoop.io.Text importēt org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) izmet izņēmumu {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draivera klase 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // lietotāja vārds' root ') // parole Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) darbs .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // ievades tabulas nosaukums null, null, jauna virkne [] {'empid ',' nosaukums ',' dept '} // tabulas kolonnas) DBOutputFormat.setOutput (darbs,' darbinieks1 ', // izejas tabulas nosaukums jauna virkne [] {' nosaukums ',' id ',' dept '} // tabula kolonnas) System.exit (job.waitForCompletion (true)? 0: 1)}}

Šis koda gabals ļauj mums konfigurēt izvades tabulas nosaukumu SQL DB. Parametri ir attiecīgi darba instances, izvades tabulas nosaukums un izvades kolonnu nosaukumi.

DBOutputFormat.setOutput (darbs, 'darbinieks1', // izvades tabulas nosaukums jauna virkne [] {'nosaukums', 'id', 'dept'} // tabulas kolonnas)

Mapper: tāds pats kā 1. gadījums

Reduktors:

pakete com.inputFormat.copy importēt java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable publiskā klase Reduce paplašina Reducer {aizsargāta void reducēšana (teksta atslēga, atkārtojamās vērtības, konteksts ctx) {int summa = 0 virknes rinda [] = key.toString (). Split ('') mēģiniet {ctx.write (jauns DBOutputWritable (line [0] .toString (), Integer.parseInt (line [1] .toString ()), line [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Komanda palaist:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Rezultāts: datu pārsūtīšana no MySQL EMP tabulas uz citu MySQL tabulas darbinieku1

3. gadījums: pārsūtīšana no MySQL tabulas uz NoSQL (Hbase) tabulu

Hbase tabulas izveide izejas no SQL tabulas izveidei:

izveidot 'darbinieks', 'oficiālais_info'

Vadītāja klase:

pakotne Dbtohbase importēt org.apache.hadoop.conf.Configuration importēt org.apache.hadoop.mapreduce.Job importēt org.apache.hadoop.mapreduce.lib.db.DBConfiguration importēt org.apache.hadoop.mapreduce.lib.db.DBInputFormat importēt org.apache.hadoop.hbase.mapreduce.TableOutputFormat importēt org.apache.hadoop.hbase.HBaseConfiguration importēt org.apache.hadoop.hbase.client.HTable importēt org.apache.hadoop.hbase.client.HTableInterface importēt org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draivera klase 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // lietotāja vārds 'root') // parole Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('darbinieks', Reduce.class, darbs) job.setInputFormFormatFormFormFormFormFormFormATormFormATormFormTormTormTormTormatFormTormat (orm.FormatForm) CLASS (darba.setMets). klase) DBInputFormat.setInput (darbs, DBInputWritable.class, 'emp', // ievades tabulas nosaukums null, null, jauna virkne [] {'empid', 'name', 'dept'} // tabulas kolonnas) System.exit (job.waitForCompletion (patiess)? 0: 1)}}

Šis koda gabals ļauj konfigurēt izvades atslēgu klasi, kas hbase gadījumā ir ImmutableBytesWritable

system.exit (0) java
job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Šeit mēs nododam hbase tabulas nosaukumu un reduktoru, lai darbotos uz galda.

TableMapReduceUtil.initTableReducerJob ('darbinieks', Reduce.class, darbs)

Kartētājs:

pakotne Dbtohbase importēt java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable importa org.apache.hadoop.io.Text importēšana org.apache.hadoop.io.IntWritable publiskās klases karte paplašina Mapper {private IntWritable one = jauna IntWritable (1) aizsargāta tukšumkarte (LongWritable id, DBInputWritable value, Context context) {mēģiniet {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), jauns teksts (līnija + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Šajā koda daļā mēs ņemam vērtības no DBinputwritable klases getteriem un pēc tam tos nododam
ImmutableBytesWritable tā, lai tie sasniegtu reduktoru bytewriatble formā, kuru Hbase saprot.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), jauns teksts (līnija + '' + dept ))

Reduktors:

pakotne Dbtohbase importēt java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce paplašina TableReducer {public void reduc (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {String [] cēlonis = null // Loop vērtības par (Teksta val: vērtības) {cēlonis = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('nosaukums'), Bytes.toBytes (izraisīt [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('departaments'), Bytes.toBytes (cēlonis [1 ])) context.write (atslēga, ielikt)}}

Šis koda gabals ļauj mums izlemt precīzu rindu un kolonnu, kurā mēs glabātu vērtības no reduktora. Šeit mēs katru tukšumu glabājam atsevišķā rindā, jo mēs izveidojām empid kā rindas atslēgu, kas būtu unikāla. Katrā rindā mēs glabājam darbinieku oficiālo informāciju kolonnu saime “official_info” attiecīgi slejās “nosaukums” un “departaments”.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('nosaukums'), Bytes.toBytes (izraisīt [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('departaments'), Bytes.toBytes (izraisīt [1])) context.write (atslēga, ielikt)

Pārsūtītie dati Hbase:

skenēšanas darbinieks

Kā redzam, mums izdevās veiksmīgi veikt uzņēmējdarbības datu migrāciju no relāciju SQL DB uz NoSQL DB.

Nākamajā emuārā mēs uzzināsim, kā rakstīt un izpildīt kodus citiem ievades un izvades formātiem.

Turpiniet publicēt savus komentārus, jautājumus vai jebkādas atsauksmes. Es labprāt dzirdētu no jums.

Vai mums ir jautājums? Lūdzu, pieminējiet to komentāru sadaļā, un mēs ar jums sazināsimies.

Saistītās ziņas: