1.Spring扫描初始化配置所在包
<context:annotation-config />
<context:component-scan base-package="com.ithzk.lts" />
2.配置文件 lts-ittracker.properties
lts.tasktracker.cluster-name=it_lts_cluster
lts.tasktracker.registry-address=zookeeper://127.0.0.1:2181
lts.tasktracker.node-group=it_trade_tasktracker
lts.tasktracker.configs.job.fail.store=mapdb
3.编写自定义JobRunner
package com.ithzk.lts.runner;
import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.logger.BizLogger;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.LtsLoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
public class JobRunnerDispatcherimplements JobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(JobRunnerDispatcher.class);
@Override
public Result run(JobContext jobContext) throws Throwable {
try {
//业务逻辑
LOGGER.info("执行:" + jobContext);
BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
// 会发送到 LTS JobTracker上
bizLogger.info("测试");
} catch (Exception e) {
LOGGER.info("Run job failed!", e);
return new Result(Action.EXECUTE_LATER, e.getMessage());
}
return new Result(Action.EXECUTE_SUCCESS, "Run job success!");
}
}
4.初始化taskTracker
package com.ithzk.lts.config;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.github.ltsopensource.spring.TaskTrackerAnnotationFactoryBean;
import com.github.ltsopensource.tasktracker.TaskTracker;
import com.ithzk.lts.runner.JobRunnerDispatcher;
@Configuration
public class TaskTrackerConfig implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Bean(name = "taskTracker")
public TaskTracker getTaskTracker() throws Exception {
TaskTrackerAnnotationFactoryBean factoryBean = new TaskTrackerAnnotationFactoryBean();
factoryBean.setApplicationContext(applicationContext);
factoryBean.setJobRunnerClass(JobRunnerDispatcher.class);
factoryBean.setLocations("lts-ittracker.properties");
factoryBean.setWorkThreads(64);
factoryBean.setShardField("taskId");
factoryBean.setBindIp("INFO");
factoryBean.afterPropertiesSet();
factoryBean.start();
return factoryBean.getObject();
}
}
启动即可开始调度任务
附:很多小伙伴使用LTS的时候,会存在需要调用各种类型任务的情况,是否需要多个TaskTracker去执行不同的任务。官方给出的建议是当某种任务量非常大或者非常耗时的时候,可以将任务独立出一个或者多个TaskTracker去执行。任务量不大的时候可以使用一个TaskTracker来解决这种问题
主要通过Job中自定义参数实现,可以在Job中指定参数去处理调度各种任务处理不同业务逻辑,下面给出一个参考例子
首先需要不同业务逻辑的JobRunner
package com.ithzk.lts.runner;
import java.util.HashMap;
import java.util.Map;
import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.logger.BizLogger;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
import com.github.ltsopensource.tasktracker.runner.LtsLoggerFactory;
public class AJobRunner implements JobRunner{
private static final Logger logger = LoggerFactory.getLogger(AJobRunner.class);
/** * 启动新的线程去执行具体业务 */
class newJobThread implements Runnable{
Map<String, String> params = new HashMap<String,String>();
newJobThread(Map<String, String> params){
this.params = params;
}
@Override
public void run() {
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//调用具体业务
}
}
@Override
public Result run(JobContext jobContext) throws Throwable {
try {
Job job = jobContext.getJob();
String params = job.getExtParams().get("params");
Map<String, String> param = new HashMap<String,String>();
new Thread(new newJobThread(param)).start();
BizLogger bizLogger = LtsLoggerFactory.getBizLogger();
bizLogger.info("AJobRunner.success.......");
} catch (Exception e) {
logger.info("Run job failed!", e);
return new Result(Action.EXECUTE_FAILED, e.getMessage());
}
return new Result(Action.EXECUTE_SUCCESS, "Run job success!");
}
}
package com.ithzk.lts.runner;
import java.util.concurrent.ConcurrentHashMap;
import com.github.ltsopensource.core.domain.Action;
import com.github.ltsopensource.core.domain.Job;
import com.github.ltsopensource.core.logger.Logger;
import com.github.ltsopensource.core.logger.LoggerFactory;
import com.github.ltsopensource.tasktracker.Result;
import com.github.ltsopensource.tasktracker.runner.JobContext;
import com.github.ltsopensource.tasktracker.runner.JobRunner;
/** * 任务执行分发 */
public class JobRunnerDispatcher implements JobRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(JobRunnerDispatcher.class);
private static final ConcurrentHashMap<String, JobRunner> JOB_RUNNER_MAP = new ConcurrentHashMap<String, JobRunner>();
static {
JOB_RUNNER_MAP.put("aJobRunner", new AJobRunner());
}
@Override
public Result run(JobContext jobContext) throws Throwable {
try {
//根据type选择对应的JobRunner运行
Job job = jobContext.getJob();
String taskType= job.getParam("taskType");
LOGGER.warn("IT_Trade_Tasktracker 正在匹配对应任务.........");
JobRunner jobRunner = JOB_RUNNER_MAP.get(taskIdentifier);
LOGGER.warn("IT_Trade_Tasktracker 进入任务中......... TaskType:"+taskType);
return jobRunner.run(jobContext);
} catch (Exception e) {
LOGGER.info("Run job failed!", e);
return new Result(Action.EXECUTE_LATER, e.getMessage());
}
//return new JobDispatcher().run(jobContext);
}
}
这样就可以解决一个taskTracker处理不同任务的问题了,希望能帮到大家