首頁技術(shù)文章正文

Java培訓(xùn):基于XXL-JOB實(shí)現(xiàn)分布式任務(wù)調(diào)度的實(shí)現(xiàn)

更新時(shí)間:2022-08-26 來源:黑馬程序員 瀏覽量:

  背景

  筆者以前在電商公司,我們需要在8月18號(hào)做大促活動(dòng),我們會(huì)提前一天給所有的用戶推送活動(dòng)信息,且需要根據(jù)用戶畫像生成不同的推送內(nèi)容。

  當(dāng)時(shí)我們總共有80萬用戶左右。

  經(jīng)測(cè)試,通過Spring Task和分布式鎖,單臺(tái)機(jī)器同時(shí)開啟5個(gè)線程,執(zhí)行時(shí)間需要27個(gè)小時(shí)左右,即便開10個(gè)線程,需要14個(gè)小時(shí)左右,顯然執(zhí)行時(shí)間過長(zhǎng)。

  解決方案

  當(dāng)時(shí)個(gè)推服務(wù)部署節(jié)點(diǎn)有3臺(tái),在每年大促期間可動(dòng)態(tài)擴(kuò)容,其余的機(jī)器資源沒有充分利用起來。

  要想短時(shí)間內(nèi)完成推送,那么就得想辦法讓每臺(tái)機(jī)器各自分一部分用戶數(shù)據(jù)去執(zhí)行,這樣效率可提高原來的N倍。

  那么就需要分布式任務(wù)去執(zhí)行,核心思想如下圖:

1661479930520_1.jpg

  經(jīng)過調(diào)研現(xiàn)有的開源的分布式任務(wù)調(diào)度框架,決定在elastic-job和xxl-job中選一個(gè)

  [Elastic Job](https://github.com/elasticjob)是當(dāng)當(dāng)網(wǎng)開源一個(gè)分布式調(diào)度解決方案,由兩個(gè)相互獨(dú)立的子項(xiàng)目Elastic-Job-Lite和Elastic-Job-Cloud組成;定位為輕量級(jí)無中心化解決方案,使用 jar 包的形式提供分布式任務(wù)的協(xié)調(diào)服務(wù)。支持分布式調(diào)度協(xié)調(diào)、彈性擴(kuò)容縮容、失效轉(zhuǎn)移、錯(cuò)過執(zhí)行作業(yè)重觸發(fā)、并行調(diào)度、自診斷和修復(fù)等等功能特性。

  [XXL-Job官網(wǎng)](https://github.com/xuxueli/xxl-job)是大眾點(diǎn)評(píng)發(fā)布的分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。

  更傾向于選擇XXL-JOB:

  1. 輕量級(jí),支持通過Web頁面對(duì)任務(wù)進(jìn)行動(dòng)態(tài)CRUD操作,操作簡(jiǎn)單

  2. 只依賴數(shù)據(jù)庫(kù)作為集群注冊(cè)中心,接入開發(fā)簡(jiǎn)單,不需要ZK

  3. 高可用、解耦、高性能、監(jiān)控報(bào)警、分片、重試、故障轉(zhuǎn)移

  4. 團(tuán)隊(duì)持續(xù)開發(fā),社區(qū)活躍

  5. 支持后臺(tái)直接查看每個(gè)任務(wù)執(zhí)行實(shí)時(shí)日志

  具體實(shí)現(xiàn)

  在項(xiàng)目中集成xxl-job客戶端

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.2.0</version>
</dependency>

  在配置文件中配置xxl-job信息

xxl:
  job:
    accessToken:
    admin:
      addresses: http://xxl部署IP地址:8080/xxl-job-admin
    executor:
      appname: vm-service
      address:
      ip:
      port: 9989
      logretentiondays: 30

  新增XxlJobConfig.java

package com.itheima.config;

import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * xxl-job config
 *
 * @author xuxueli 2017-04-28
 */
@Configuration
@Slf4j
public class XxlJobConfig {


    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

//    @Value("${xxl.job.executor.logpath}")
//    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        //xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}

  在xxl-job中新增執(zhí)行器

  注冊(cè)方式選自動(dòng)注冊(cè),這樣方便動(dòng)態(tài)擴(kuò)容

1661480124531_2.jpg

  創(chuàng)建任務(wù)

  路由策略選擇分片廣播

1661480137880_3.jpg

  代碼部分

  在任務(wù)代碼獲取推送用戶時(shí),根據(jù)當(dāng)前的分片及分片總數(shù)對(duì)用戶ID取余,這樣我們就可以在每個(gè)分片節(jié)點(diǎn),獲取不一樣的數(shù)據(jù)。id值越連續(xù),分片則越均勻。

 ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
 int numbers = shardingVo.getTotal();  //分片總數(shù)
 int index = shardingVo.getIndex(); //當(dāng)前分片索引

  假設(shè)分片總數(shù)為3,當(dāng)前節(jié)點(diǎn)獲取到的分片索引為0,那么查詢推送用戶SQL如下:

SELECT user_id FROM `user_info` WHERE MOD(user_id,3)=0

  注意:我們?cè)趯?shí)際代碼中,分片總數(shù)和當(dāng)前分片索引是以參數(shù)的形式傳給查詢的SQL語句的。

  如上,即可完成分布式任務(wù)。

  總結(jié)

  在某些定時(shí)任務(wù)需要處理大量數(shù)據(jù)的情況下,我們可以通過引入分布式任務(wù)框架xxl-job,充分利用機(jī)器資源,將需要處理的數(shù)據(jù)均勻的分配到不同的機(jī)器上去執(zhí)行,提高任務(wù)執(zhí)行效率。

分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!