《電子技術(shù)應(yīng)用》
您所在的位置:首頁 > 通信與網(wǎng)絡(luò) > 設(shè)計(jì)應(yīng)用 > 基于Spark的數(shù)據(jù)庫增量準(zhǔn)實(shí)時(shí)同步
基于Spark的數(shù)據(jù)庫增量準(zhǔn)實(shí)時(shí)同步
2016年微型機(jī)與應(yīng)用第19期
王浩,葛昂,趙晴
華北計(jì)算機(jī)系統(tǒng)工程研究所,北京 100083
摘要: 為了實(shí)現(xiàn)將傳統(tǒng)關(guān)系型數(shù)據(jù)庫中的增量數(shù)據(jù)快速導(dǎo)入同構(gòu)或者異構(gòu)目的庫,在使用已有的增量提取方法的基礎(chǔ)上,提出了通過增加并行度和流式計(jì)算的方法加快同步速度。此方法不僅支持插入、更新和刪除的增量數(shù)據(jù)同步,而且可以抽取出數(shù)據(jù)庫表結(jié)構(gòu)信息動(dòng)態(tài)支持表結(jié)構(gòu)變更。與傳統(tǒng)單點(diǎn)抽取方式相比,大大提高了目的庫數(shù)據(jù)的新鮮度。
關(guān)鍵詞: 增量同步 Spark 流式計(jì)算
Abstract:
Key words :

  王浩,葛昂,趙晴

  (華北計(jì)算機(jī)系統(tǒng)工程研究所,北京 100083)

       摘要:為了實(shí)現(xiàn)將傳統(tǒng)關(guān)系型數(shù)據(jù)庫中的增量數(shù)據(jù)快速導(dǎo)入同構(gòu)或者異構(gòu)目的庫,在使用已有的增量提取方法的基礎(chǔ)上,提出了通過增加并行度和流式計(jì)算的方法加快同步速度。此方法不僅支持插入、更新和刪除的增量數(shù)據(jù)同步,而且可以抽取出數(shù)據(jù)庫表結(jié)構(gòu)信息動(dòng)態(tài)支持表結(jié)構(gòu)變更。與傳統(tǒng)單點(diǎn)抽取方式相比,大大提高了目的庫數(shù)據(jù)的新鮮度。

  關(guān)鍵詞:增量同步; Spark; 流式計(jì)算

0引言

  隨著大數(shù)據(jù)技術(shù)的發(fā)展,越來越多的企業(yè)開始構(gòu)建大數(shù)據(jù)平臺(tái)進(jìn)行數(shù)據(jù)處理。然而如何將保存在關(guān)系型數(shù)據(jù)庫中的數(shù)據(jù)快速同步到大數(shù)據(jù)平臺(tái)組件(例如HBase、HDFS)中,正成為很多企業(yè)面臨的問題。Sqoop是常用的數(shù)據(jù)同步工具,其實(shí)質(zhì)是MapReduce任務(wù),延時(shí)較高,而且需要通過定時(shí)任務(wù)來達(dá)到自動(dòng)化流程效果。本文在觸發(fā)器記錄數(shù)據(jù)變化的基礎(chǔ)上,提出了一種使用Spark Streaming將增量數(shù)據(jù)抽取出來,然后根據(jù)需要寫入到不同的目的庫的方法。由于只提取增量數(shù)據(jù),所以較Sqoop減少了數(shù)據(jù)量。另外由于是流式處理方式,降低了延時(shí)。

1增量提取

  1.1增量提取的概念

  增量提取是針對(duì)上一次提取而言,將上一次提取時(shí)間點(diǎn)到現(xiàn)在數(shù)據(jù)庫中插入、更新、刪除的數(shù)據(jù)提取出來[1]。

  1.2常用的增量提取方法

  1.2.1基于業(yè)務(wù)系統(tǒng)日志

  在業(yè)務(wù)中將數(shù)據(jù)庫DML(Data Manipulation Language)語句輸出以日志的方式存儲(chǔ),然后通過解析日志將DML語句在目的庫中重放以達(dá)到目的。此方法需要侵入業(yè)務(wù)系統(tǒng),對(duì)于已經(jīng)成型的業(yè)務(wù)系統(tǒng)不適用。

  1.2.2基于數(shù)據(jù)庫日志

  解析數(shù)據(jù)庫日志也能達(dá)到增量提取的目的,但是各大數(shù)據(jù)庫廠商不對(duì)外開放數(shù)據(jù)庫系統(tǒng)的日志格式,這就使得解析日志變成了問題。而且各數(shù)據(jù)庫的日志格式還不盡相同,難以達(dá)到通用性。

  1.2.3基于觸發(fā)器

  基于觸發(fā)器的方式,目前被廣泛運(yùn)用于數(shù)據(jù)庫增量提取。它通過在源表上建立插入、更新、刪除觸發(fā)器來記錄對(duì)數(shù)據(jù)的操作。每當(dāng)有數(shù)據(jù)變化時(shí),就會(huì)觸發(fā)相應(yīng)的觸發(fā)器,然后運(yùn)行觸發(fā)器定義的邏輯,將變化記錄到增量表。

  1.3基于觸發(fā)器方法的具體實(shí)現(xiàn)

  由于觸發(fā)器方法具有實(shí)現(xiàn)邏輯簡(jiǎn)單,對(duì)業(yè)務(wù)無入侵,數(shù)據(jù)庫通用等優(yōu)點(diǎn),所以本文采用了基于觸發(fā)器方式的增量提取方法。具體實(shí)現(xiàn)方法如下:

 ?。?)創(chuàng)建名為dml_log的數(shù)據(jù)庫表,字段為id、table_name、record_id、execute_date、dml_type。其中id為自增id,table_name存儲(chǔ)要同步的源表表名稱,record_id是源表中發(fā)生變化的記錄的唯一標(biāo)識(shí),execute_date為觸發(fā)器執(zhí)行時(shí)的時(shí)間戳,dml_type為I、U、D分別代表insert、update、delete操作。

 ?。?)在源表上創(chuàng)建插入、更新、刪除類型的觸發(fā)器。創(chuàng)建語句在此省略。

2構(gòu)建Spark Streaming程序

  2.1Spark Streaming

  Spark是目前大數(shù)據(jù)處理領(lǐng)域比較常用的計(jì)算框架。它將中間計(jì)算結(jié)果維護(hù)在內(nèi)存中,這樣不僅可以做到中間結(jié)果的重用,而且減少了磁盤IO,大大加快了計(jì)算速度。Spark Streaming是構(gòu)建于Spark core之上的流式處理模塊。其原理是將流式數(shù)據(jù)切分成一個(gè)個(gè)小的片段,以mini batch的形式來處理這一小部分?jǐn)?shù)據(jù),從而模擬流式計(jì)算達(dá)到準(zhǔn)實(shí)時(shí)的效果。

  2.2JdbcRDD

  彈性分布式數(shù)據(jù)集(Resilient Distributed Datasets,RDD),它是Spark數(shù)據(jù)抽象的基石。RDD是一個(gè)只讀的分區(qū)記錄集合,分區(qū)分散在各個(gè)計(jì)算節(jié)點(diǎn)[2]。RDD提供了transformation和action兩類操作,其中transformation是lazy級(jí)別的,主要對(duì)數(shù)據(jù)處理流程進(jìn)行標(biāo)記,而不立即進(jìn)行運(yùn)算。action操作會(huì)觸發(fā)作業(yè)的提交,然后進(jìn)行回溯導(dǎo)致transformation操作進(jìn)行運(yùn)算。

  JdbcRDD擴(kuò)展自RDD,是RDD的子類。內(nèi)部通過JDBC(Java Data Base Connectivity)操作以數(shù)據(jù)庫為源頭構(gòu)建RDD。其構(gòu)造函數(shù)簽名為:

  class JdbcRDD[T: ClassTag](

  sc: SparkContext,

  getConnection:()=> Connection,

  sql: String,

  lowerBound: Long,

  upperBound: Long,

  numPartitions: Int,

  mapRow:(ResultSet) => T =

  JdbcRDD.resultSetToObjectArray _)

  extends RDD[T](sc, Nil) with Logging {…}

  2.3具體實(shí)現(xiàn)

  Spark官方提供用于構(gòu)建Spark Streaming的數(shù)據(jù)源沒有對(duì)數(shù)據(jù)庫進(jìn)行支持,所以本文自己實(shí)現(xiàn)對(duì)數(shù)據(jù)庫的支持。編寫繼承自InputDStream類的DirectJdbcInputDStream類,其簽名為:

  class DirectJdbcInputDStream[T: ClassTag](

  @transient ssc_ : StreamingContext,

  param: JdbcParam) extends

  InputDStream[Row] (ssc_) with Logging {…}

  對(duì)start()、compute()和stop()方法進(jìn)行重寫。

  (1)在start函數(shù)中注冊(cè)JDBC驅(qū)動(dòng),用于JDBC獲取初始化信息(構(gòu)造JdbcRDD時(shí)的參數(shù));

  (2)compute函數(shù)會(huì)被框架間隔指定的時(shí)間反復(fù)調(diào)用,其實(shí)質(zhì)是如何返回一個(gè)JdbcRDD。首先通過JDBC獲取本次需要拉取的trigger記錄的id的上下界以及表的Schema信息;然后以這些信息為參數(shù)生成提取真實(shí)數(shù)據(jù)的SQL,其邏輯為用選中的trigger表中的記錄和原表在record_id上進(jìn)行左連接;最后使用該SQL當(dāng)做參數(shù)構(gòu)建JdbcRDD。值得說明的是,構(gòu)建JdbcRDD時(shí)是可以指定并行度的,每個(gè)worker節(jié)點(diǎn)都會(huì)建立到數(shù)據(jù)庫的JDBC連接,由多個(gè)節(jié)點(diǎn)并行去數(shù)據(jù)庫拉取屬于自己的那一部分?jǐn)?shù)據(jù),這就大大增加了提取和處理速度。

  (3)在stop函數(shù)中關(guān)閉JDBC連接。總體來看,就是在driver程序中執(zhí)行的JDBC程序獲取初始化參數(shù),在executor中執(zhí)行的JDBC程序拉取真實(shí)的數(shù)據(jù)。

  (4)編寫driver程序:

  val sc = new SparkContext(new SparkConf)

  val ssc = new StreamingContext(sc, Seconds(30))

  val directStream = new DirectJdbcInputDStream[Row](ssc, jdbcParam)

  directStream.foreachRDD(rdd => {

  …//對(duì)數(shù)據(jù)進(jìn)行處理

  })

  2.4限流

  假設(shè)當(dāng)前時(shí)間點(diǎn)到上次提取的時(shí)間點(diǎn)之間新增數(shù)據(jù)量太大,就會(huì)導(dǎo)致在新一次作業(yè)提交時(shí),上一次的作業(yè)仍然沒有完成,可能會(huì)因此造成作業(yè)積壓使得系統(tǒng)不穩(wěn)定。本文使用了基于規(guī)則的限流方法,綜合考慮集群處理能力以及間隔時(shí)間,可以配置化設(shè)置每次最大提取條數(shù)。如果當(dāng)前需要提取的數(shù)據(jù)條數(shù)大于最大提取條數(shù),則本次就只提取最大條數(shù),剩下的延時(shí)到下次再進(jìn)行提取。這樣做的好處是削減了峰流對(duì)系統(tǒng)造成的影響。

3測(cè)試分析

  測(cè)試環(huán)境:VMware虛擬機(jī),處理器設(shè)置為4核心,2 GB內(nèi)存, 64位CentOS 6.5操作系統(tǒng),Spark 1.5.1,Oracle 11g。使用4臺(tái)虛擬機(jī)搭建成Spark集群,1臺(tái)為Master,3臺(tái)為Worker。數(shù)據(jù)庫表分別設(shè)置為20、40個(gè)字段,每次最大抽取記錄數(shù)分別設(shè)置為10 000、50 000、500 000。將抽取出來的數(shù)據(jù)寫成parquet格式的文件存儲(chǔ)到hdfs上。測(cè)試結(jié)果如表1所示。

圖像 004.png

4結(jié)束語

  本文在基于數(shù)據(jù)庫觸發(fā)器記錄數(shù)據(jù)變化的基礎(chǔ)上,通過自己構(gòu)造DirectJdbcStream類提供Spark Streaming對(duì)數(shù)據(jù)庫的支持,達(dá)到準(zhǔn)實(shí)時(shí)從數(shù)據(jù)庫中抽取出增量數(shù)據(jù)的目的。并且可以對(duì)抽取出來的數(shù)據(jù)進(jìn)行過濾、清洗等操作,根據(jù)需求靈活地寫入到不同的目的庫。

  參考文獻(xiàn)

 ?。?] 郭亮. 基于MD5與HASH的數(shù)據(jù)庫增量提取算法及其應(yīng)用[D]. 長(zhǎng)沙:湖南大學(xué),2013.

 ?。?] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault tolerant abstraction for in memory cluster computing[C]. Usenix Conference on Networked Systems Design & Implementation, 2012, 70(2):141146.

 ?。?] DEAN J, GHEMAWAT S. MapReduce: simplified dataprocessing on large clusters[C]. USENIX Association OSDI′04: 6th Symposium on Operating Systems Design and Implementation, 2004:137149.

 ?。?] MARTIN O. Programming in scala[M]. California: Artima Press,2010.

 ?。?] YADAV R. Spark cookbook[M]. UK: Packt Publishing Ltd, 2015.

 ?。?] KARAU H. Learning spark[M]. America: O’Reilly Media, Inc. 2015.

 ?。?] 梁剛. 企業(yè)大數(shù)據(jù)管理解決方案[J]. 微型機(jī)與應(yīng)用,2013,32(24):7 10,13.


此內(nèi)容為AET網(wǎng)站原創(chuàng),未經(jīng)授權(quán)禁止轉(zhuǎn)載。
亚洲一区二区欧美_亚洲丝袜一区_99re亚洲国产精品_日韩亚洲一区二区
欧美精品一区二区三区蜜臀| 国内精品伊人久久久久av一坑| 欧美一区二区三区久久精品茉莉花| 亚洲日本无吗高清不卡| 欧美在线free| 亚洲一品av免费观看| 午夜国产欧美理论在线播放| 免费久久精品视频| 日韩午夜三级在线| 亚洲高清影视| 好看的日韩视频| 国产一区二区三区久久| 国产欧美一区二区三区视频| 国产精品国产a级| 久久国产直播| 久久久99爱| 久久久久久国产精品mv| 久久久久国产精品厨房| 久久久另类综合| 久久久人成影片一区二区三区| 久久国产精品久久w女人spa| 欧美在线免费播放| 久久久精品999| 男人插女人欧美| 欧美激情中文字幕一区二区| 欧美久久影院| 欧美天堂在线观看| 国产精品一区二区视频| 国产日韩欧美成人| 国精产品99永久一区一区| 在线观看国产日韩| 亚洲人成艺术| 亚洲一区在线播放| 久久激情视频久久| 亚洲精品国产无天堂网2021| 亚洲第一精品夜夜躁人人爽 | 亚洲天堂偷拍| 亚洲一区在线免费| 欧美在线视频一区二区| 久久午夜电影网| 欧美欧美天天天天操| 国产精品美女久久久久aⅴ国产馆 国产精品美女久久久 | 国产精品一二三| 国产一区二区三区网站| 在线观看亚洲精品视频| 亚洲人体一区| 亚洲伊人久久综合| 亚洲第一免费播放区| 99re在线精品| 欧美一区=区| 噜噜噜久久亚洲精品国产品小说| 可以免费看不卡的av网站| 欧美精品xxxxbbbb| 国产精品久久久免费| 国产在线高清精品| 91久久久久久国产精品| 一区二区三区精品视频在线观看| 亚洲欧美第一页| 亚洲激情欧美| 亚洲综合精品自拍| 蜜臀久久99精品久久久画质超高清 | 欧美大片第1页| 国产精品久久国产愉拍| 狠狠爱成人网| 亚洲午夜久久久久久尤物 | 欧美日韩国产小视频在线观看| 国产精品久久久久毛片大屁完整版 | 欧美一区二区三区久久精品茉莉花 | 国产欧美欧美| 亚洲国产日韩美| 亚洲欧美日韩在线一区| 亚洲精品一区二区三区四区高清| 亚洲一区二区三区久久 | 欧美精品久久一区二区| 国产亚洲aⅴaaaaaa毛片| 亚洲精品久久久久久下一站 | 久久精品国产第一区二区三区最新章节 | 日韩视频在线观看免费| 久久精品电影| 亚洲欧美一区二区三区久久| 欧美sm极限捆绑bd| 国产视频久久久久久久| aa级大片欧美三级| 最新高清无码专区| 久久精品国产久精国产爱| 亚洲精品久久久久久久久久久久| 午夜日韩福利| 久久综合九色九九| 国产精品国产福利国产秒拍| 1024欧美极品| 欧美在线视频免费| 午夜在线电影亚洲一区| 欧美日韩精品免费观看视频完整| 黄色成人av网| 午夜日韩福利| 亚洲影院免费| 欧美人妖另类| 亚洲国产成人久久综合| 久久精品国产久精国产爱| 性欧美超级视频| 欧美性片在线观看| 亚洲激情小视频| 亚洲国产日韩欧美| 久久久久久久一区| 国产亚洲成av人在线观看导航| 亚洲一区二区三区精品视频| 中文国产成人精品| 欧美精品一区二区蜜臀亚洲| 亚洲东热激情| 亚洲国产精品999| 久久久久一区二区三区| 国产日韩精品入口| 亚洲欧美日韩在线一区| 性欧美在线看片a免费观看| 国产精品高精视频免费| 宅男精品视频| 亚洲影院在线观看| 国产精品久久久久999| 一区二区三区四区五区在线| 一区二区三区视频免费在线观看| 欧美精品一区二区三区蜜桃| 亚洲精品国产精品国自产观看浪潮 | 欧美一区二区国产| 欧美一级电影久久| 国产精品自在欧美一区| 亚洲一区二区欧美| 性欧美长视频| 国产日韩欧美a| 欧美制服丝袜第一页| 一本一本久久a久久精品综合妖精| 欧美深夜影院| 欧美三区美女| 99re6热在线精品视频播放速度 | 国产精品亚发布| 亚洲自拍另类| 久久精品二区三区| 红桃视频国产一区| 亚洲黄网站在线观看| 欧美激情二区三区| 日韩午夜高潮| 亚洲一区精品在线| 国产欧美日韩三区| 久久精品国内一区二区三区| 美女黄色成人网| 亚洲黄色在线| 中文有码久久| 国产精品入口夜色视频大尺度| 亚洲欧美另类在线| 久久久在线视频| 亚洲国产福利在线| 在线亚洲观看| 国产九色精品成人porny| 亚洲大胆视频| 欧美精品一区在线发布| 中国亚洲黄色| 久久久免费精品| 亚洲精品美女在线| 午夜免费久久久久| 国产一区二区三区丝袜| 亚洲黄色影片| 国产精品国产三级国产| 欧美一区二区视频网站| 欧美粗暴jizz性欧美20| 亚洲午夜精品一区二区| 久久中文欧美| 亚洲美女av网站| 欧美专区18| 91久久国产综合久久蜜月精品| 亚洲综合色婷婷| 国内外成人在线| 一区二区三区欧美成人| 国产亚洲欧洲一区高清在线观看| 亚洲欧洲日本国产| 国产精品久久久久久久9999 | 欧美成黄导航| 亚洲字幕在线观看| 欧美va天堂| 亚洲欧美另类在线| 欧美精品一区二区三区久久久竹菊| 亚洲综合不卡| 欧美精品一区在线| 性欧美暴力猛交另类hd| 欧美日韩网址| 久久国产精品黑丝| 国产精品videosex极品| 亚洲国产成人精品女人久久久 | 午夜精品福利视频| 欧美激情精品| 欧美一级大片在线观看| 欧美日韩亚洲高清一区二区| 久久精品国产亚洲精品| 国产精品成人播放| 亚洲日本电影| 国产婷婷97碰碰久久人人蜜臀| 99视频精品免费观看| 韩国福利一区| 欧美一区二区成人6969| 亚洲免费高清视频| 美女黄毛**国产精品啪啪| 午夜精品久久久久久久99樱桃|