本文主要介紹rabbitmqjava教程(rabbitmq的高級(jí)特性),下面一起看看rabbitmqjava教程(rabbitmq的高級(jí)特性)相關(guān)資訊。
rabbitmq 1的高級(jí)應(yīng)用。消息可靠性傳遞使用rabbitmq時(shí),生產(chǎn)者要想知道消息是否成功傳遞到對(duì)應(yīng)的交換機(jī)和隊(duì)列,有兩種方法可以控制消息傳遞的可靠性模式。
從上圖中的整個(gè)消息傳遞過程來看,生產(chǎn)者 s消息在進(jìn)入中間件時(shí)會(huì)先到達(dá)交換機(jī),然后再從交換機(jī)轉(zhuǎn)移到隊(duì)列,分兩步策略。那么消息的丟失就會(huì)發(fā)生在這兩個(gè)階段,rabbitmq為我們提供了這兩部分可靠的新交付模式:
確認(rèn)模式。返回模式。使用這兩種回調(diào)模式來確??煽康南鬟f。
1.1.確認(rèn)模式消息從生產(chǎn)者到交換機(jī)的傳輸將返回confirmcallback。您可以直接在rabbittemplate實(shí)例中設(shè)置確認(rèn)邏輯。如果您使用xml配置,您需要打開publisher-confirms = 真實(shí) 在工廠里的配置,與publisher-confirm-typ:和yaml的配置直接相關(guān)。默認(rèn)情況下,它是none,所以您需要手動(dòng)打開它。
@ run with(spring junit 4 class runner . class)@ context configuration(locations = class path : spring-rabbit mq . xml )public class producer { @ autowired private rabbit template rabbit template;@test public void producer拋出interrupted exception { rabbit template . setconfirm callback(new rabbit template。confirm callback{ @ override public void confirm(correlation data correlation data,boolean b,string s){ system . out . println;如果(!b) {//消息重傳system.out.println(s)等處理;}else { system . out . println( 交換機(jī)成功接收消息);} } });rabbit template . convertandsend( 默認(rèn)_交換 , 默認(rèn)隊(duì)列 , hello world beordie );時(shí)間單位。秒.睡眠(5);}}上面的確認(rèn)是由一個(gè)confirm函數(shù)執(zhí)行的,該函數(shù)攜帶三個(gè)參數(shù),第一個(gè)是配置相關(guān)信息,第二個(gè)表示交換機(jī)是否成功接收到報(bào)文,第三個(gè)參數(shù)是指報(bào)文沒有成功接收的原因。
1.2.返回模式如果從交換機(jī)到消息隊(duì)列的傳遞失敗,將返回returncallback。打開回退模式publisher-returns = 真實(shí) 在出廠配置中設(shè)置交換機(jī)處理報(bào)文失敗的模式(默認(rèn)為false直接丟棄報(bào)文),并增加回退處理的邏輯。
@ run with(spring junit 4 class runner . class)@ context configuration(locations = class path : spring-rabbit mq . xml )公共類生產(chǎn)者{ @ autowired private rabbit template rabbit template;@test public void producer拋出interrupted exception { rabbit template . set mandatory(true);rabbittemplate . setreturncallback(新rabbit template。return callback{ @ override public void returned message(message message,int replycode,string reply text,string exchange,string routing key){//重傳邏輯處理system . out . println(message . getbody 消息隊(duì)列傳遞失敗 );} });rabbit template . convertandsend( 默認(rèn)_交換 , 默認(rèn)_隊(duì)列e , hello world beordie );時(shí)間單位。秒.睡眠(5);}}返回的消息攜帶五個(gè)參數(shù),分別指消息對(duì)象、錯(cuò)誤代碼、錯(cuò)誤消息、交換機(jī)和路由關(guān)鍵字。
1.3.確認(rèn)機(jī)制在消費(fèi)者抓取消息隊(duì)列中的數(shù)據(jù)進(jìn)行消費(fèi)后會(huì)有一個(gè)確認(rèn)機(jī)制對(duì)消息進(jìn)行確認(rèn),防止抓取消息后由于消費(fèi)不成功導(dǎo)致消息丟失。有三種方法可以確認(rèn):
自動(dòng)確認(rèn):確認(rèn)= 無
人工確認(rèn):確認(rèn)= 手冊(cè)
根據(jù)異常情況確認(rèn):acknowledge = 汽車與娛樂
自動(dòng)確認(rèn)是指消息一旦被消費(fèi)者抓取,默認(rèn)自動(dòng)成功,消息從消息隊(duì)列中刪除。如果此時(shí)消費(fèi)者的消費(fèi)出現(xiàn)問題,會(huì)默認(rèn)消息消費(fèi)成功,但實(shí)際上并不成功,即當(dāng)前消息丟失。默認(rèn)情況是自動(dòng)確認(rèn)機(jī)制。
如果設(shè)置了手動(dòng)確認(rèn)模式,則需要回?fù)艽_認(rèn)頻道。普通消費(fèi)信息后的基本并手動(dòng)簽收。如果在業(yè)務(wù)處理過程中出現(xiàn)異常,調(diào)用channel.basicnack重新發(fā)送消息。
首先需要配置隊(duì)列綁定時(shí)的確認(rèn)機(jī)制,設(shè)置為手動(dòng)簽到。
!-綁定隊(duì)列-兔子:器-容器連接-工廠= 兔子工廠 自動(dòng)聲明= 真實(shí) 確認(rèn)= 手冊(cè) 兔子:聽眾。ref = 兔子消費(fèi)者和消費(fèi)者協(xié)會(huì)隊(duì)列名稱= 默認(rèn)隊(duì)列 //rabbit : listener-container生產(chǎn)者不需要改變,只需要改變消費(fèi)者 的實(shí)現(xiàn)來自動(dòng)為消息簽名。如果業(yè)務(wù)正常執(zhí)行,就會(huì)簽收消息。如果業(yè)務(wù)出現(xiàn)錯(cuò)誤,消息將被拒絕,消息將被重新傳輸或丟棄。
公共類使用者ack實(shí)現(xiàn)channelawaremessagelist { @ override public void on message(message message,channel)throws exception {//message unique id long tag = mes。sage.getmessageproperties。getdeliverytag;請(qǐng)嘗試{ string msg = new string(message . getbody, utf-8和);channel.basicack(標(biāo)簽,真);system . out . println( 接收消息: 味精);} catch(異常e) {system。out . println( 收到的消息異常 );channel.basicnack(tag,true,true);e . printstacktrace;}}}涉及到三個(gè)簡(jiǎn)單的簽到函數(shù),一個(gè)是正確簽到的basicack,一個(gè)是單次拒簽的basicreject,第三個(gè)是批量拒簽的basic ack。
basicack的第一個(gè)參數(shù)表示消息在通道中的唯一id,只針對(duì)當(dāng)前通道;第二個(gè)參數(shù)表示是否批量同意。如果為false,它將只同意用當(dāng)前id對(duì)消息進(jìn)行簽名,并將其從消息隊(duì)列中刪除。如果為真,此id之前的所有消息都將被同意簽名。basicreject的第一個(gè)參數(shù)仍然指示消息的唯一id,第二個(gè)參數(shù)指示是否將消息發(fā)送回隊(duì)列。false表示消息被直接丟棄,或者有死信隊(duì)列要接收。true表示消息被發(fā)送回隊(duì)列,所有操作只針對(duì)當(dāng)前消息。basicnack比第二個(gè)多了一個(gè)參數(shù),就是中間一個(gè)布爾值,表示是否批量進(jìn)行。2.消費(fèi)端限流增加了消息中間件在用戶請(qǐng)求和db服務(wù)處理之間的隔離,使得所有突發(fā)流量都能被消息隊(duì)列抵御,降低了服務(wù)器被沖走的可能性。讓所有的請(qǐng)求都存儲(chǔ)在隊(duì)列中,消費(fèi)者只需要?jiǎng)蛩偃〕鱿⑦M(jìn)行消費(fèi),這樣可以保證運(yùn)行效率,客戶端不會(huì)因?yàn)楹笈_(tái)的阻擋而得到正常的響應(yīng)(當(dāng)然是指一些不需要同步echo的任務(wù))。
你只需要在消費(fèi)者綁定消息隊(duì)列的時(shí)候指定取消息的速率,需要使用手動(dòng)登錄,每次登錄都會(huì)從隊(duì)列中取下一條數(shù)據(jù)。
!-綁定隊(duì)列-兔子:器-容器連接-工廠= 兔子工廠 自動(dòng)聲明= 真實(shí) 確認(rèn)= 手冊(cè) 預(yù)取= 1 兔子:聽眾ref = 兔子消費(fèi)者和消費(fèi)者協(xié)會(huì)隊(duì)列名稱= 默認(rèn)隊(duì)列 //rabbit : listener-container 3、消息過期時(shí)間消息隊(duì)列提供隊(duì)列中存儲(chǔ)的消息的過期時(shí)間。分兩個(gè)方向?qū)崿F(xiàn),一個(gè)是針對(duì)整個(gè)隊(duì)列中的所有消息,即隊(duì)列的到期時(shí)間,另一個(gè)是針對(duì)當(dāng)前消息的到期時(shí)間,即針對(duì)單個(gè)消息單獨(dú)設(shè)置。
設(shè)置隊(duì)列的到期時(shí)間非常簡(jiǎn)單。您只需要在創(chuàng)建隊(duì)列時(shí)指定到期時(shí)間,也可以通過控制臺(tái)直接創(chuàng)建指定的到期時(shí)間。一旦隊(duì)列過期,隊(duì)列中所有未被使用的消息都將過期,隊(duì)列也將過期。
rabbit : queue id = 默認(rèn)隊(duì)列 name = 默認(rèn)隊(duì)列 自動(dòng)聲明= 真實(shí) rabbit : queue-arguments entry key = x-消息-ttl 價(jià)值= 10000英鎊值類型= 整數(shù) //rabbit : queue-arguments/rabbit : queue單個(gè)消息的失效時(shí)間需要在發(fā)送時(shí)單獨(dú)指定,配置的附加信息在發(fā)送時(shí)指定,配置由配置類編寫。
如果一個(gè)消息過期了,但此時(shí)它在隊(duì)列中間,就不會(huì)被處理,只有在后面處理的時(shí)候才會(huì)判斷它是否過期。
messagepostprocessor messagepostprocessor = new messagepostprocessor{ @ override public message postprocessmessage(message message)拋出amqpexception {//設(shè)置消息的過期時(shí)間。getmessageproperties。set expiration( 5000英鎊);//返回消息;}};rabbit template . convertandsend( 交換與交易, 路線和路線, 味精 ,消息發(fā)布處理器);如果同時(shí)設(shè)置了消息的過期時(shí)間和隊(duì)列的過期時(shí)間,那么最終的過期時(shí)間是由最短的時(shí)間決定的,也就是說,如果當(dāng)前消息的過期時(shí)間沒有到,但是整個(gè)隊(duì)列的過期時(shí)間到了,那么隊(duì)列中的所有消息都會(huì)自然過期,執(zhí)行過期處理策略。
4.死信隊(duì)列4.1。死信概念死信隊(duì)列指的是死信交換機(jī)。當(dāng)一個(gè)報(bào)文成為死信時(shí),可以重新發(fā)送到另一個(gè)交換機(jī)進(jìn)行處理,這個(gè)進(jìn)行處理的交換機(jī)稱為死信交換機(jī)。
在幾種情況下,消息會(huì)變成死信。當(dāng)隊(duì)列的消息長(zhǎng)度達(dá)到限制時(shí),當(dāng)消費(fèi)者拒絕接收消息時(shí),消息將不會(huì)被放回隊(duì)列。隊(duì)列有消息過期設(shè)置,未使用的消息有過期時(shí)間。送到消費(fèi)者手中時(shí),發(fā)現(xiàn)已經(jīng)過期。創(chuàng)建隊(duì)列時(shí),可以指定相關(guān)信息,如死信開關(guān)和隊(duì)列長(zhǎng)度等。,然后一系列工作都不會(huì)由程序員操作,mq會(huì)自己完成配置好的事件響應(yīng)。
rabbit : queue id = 默認(rèn)隊(duì)列 name = 默認(rèn)隊(duì)列 自動(dòng)聲明= 真實(shí) rabbit : queue-arguments!-死信開關(guān)-輸入鍵= x-死信交換 值類型= dlx _ exchane /!-路由輸入鍵= x-死信路由鍵 值類型= dlx _路由 /!-隊(duì)列到期時(shí)間-輸入鍵= x-消息-ttl 價(jià)值= 10000英鎊值類型= 整數(shù) /!-隊(duì)列長(zhǎng)度-條目關(guān)鍵字= x-最大長(zhǎng)度 值類型= 整數(shù) 價(jià)值= 10 //兔子: queue-arguments/兔子: queue 4.2、延遲隊(duì)列延遲隊(duì)列是指消息進(jìn)入隊(duì)列后不會(huì)立即消耗,而是在到達(dá)指定時(shí)間后才會(huì)消耗,也就是需要有一個(gè)時(shí)間判斷條件。
事實(shí)上,消息隊(duì)列并不提供延遲隊(duì)列的實(shí)現(xiàn),但是可以通過ttl死信隊(duì)列來完成。建立了一個(gè)隊(duì)列,它不會(huì)被任何使用者使用,所有傳入的消息都將存儲(chǔ)在其中。的過期時(shí)間,一旦隊(duì)列過期,所有消息將被轉(zhuǎn)移到綁定的死信隊(duì)列。
然后特定的消費(fèi)者消費(fèi)死信隊(duì)列中的消息,從而實(shí)現(xiàn)延遲排隊(duì)的功能。
例如,可以通過下單的加班費(fèi)實(shí)現(xiàn)取消訂單的功能:
標(biāo)簽:
隊(duì)列消息
了解更多rabbitmqjava教程(rabbitmq的高級(jí)特性)相關(guān)內(nèi)容請(qǐng)關(guān)注本站點(diǎn)。