admin 管理员组

文章数量: 1087652


2024年2月7日发(作者:7种治疗类风湿最好的药)

private AbstractScheduler initStandaloneScheduler(Configuration configuration) {AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);tainerCommunicator(containerCommunicator);return new StandAloneScheduler(containerCommunicator);}}public abstract class AbstractScheduler{private static final Logger LOG = ger();public void schedule(List configurations) {/*** 给 taskGroupContainer 的 Communication 注册*/erCommunication(configurations);int totalTasks = calculateTaskCount(configurations);startAllTaskGroup(configurations);try {while (true) {Communication nowJobContainerCommunication = t();//汇报周期long now = tTimeMillis();if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {Communication reportCommunication = ortCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);(reportCommunication);if (te() == DED) {("Scheduler accomplished all tasks.");break;}if (te() == ) {dealFailedStat(nerCommunicator, owable());}(jobSleepIntervalInMillSec);}

} catch (InterruptedException e) {// 以 failed 状态退出("捕获到InterruptedException异常!", e);throw XException(E_ERROR, e);}}@Overridepublic void startAllTaskGroup(List configurations) {oupContainerExecutorService = edThreadPool(());for (Configuration taskGroupConfiguration : configurations) {TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);e(taskGroupContainerRunner);}wn();}@Overridepublic void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {wnNow();}}public class TaskGroupContainer extends AbstractContainer{private static final Logger LOG = ger();@Overridepublic void start() {try {while (true) {//1.判断task状态boolean failedOrKilled = false;Map communicationMap = municationMap();for( entry : et()){Integer taskId = ();

try {AbstractJobPlugin jobPlugin = (AbstractJobPlugin) tance();ginConf(getPluginConf(pluginType, pluginName));return jobPlugin;} catch (Exception e) {throw XException(E_ERROR,("DataX找到plugin[%s]的Job配置.",pluginName), e);}}//反射出具体plugin实例private static synchronized Class extends AbstractPlugin> loadPluginClass(PluginType pluginType, String pluginName,ContainerType pluginRunType) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = Loader(pluginType, pluginName);try {return (Class extends AbstractPlugin>) ass(ing("class") + "$"+ ());} catch (Exception e) {throw XException(E_ERROR, e);}}public static synchronized JarLoader getJarLoader(PluginType pluginType,String pluginName) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = (generatePluginKey(pluginType,pluginName));if (null == jarLoader) {String pluginPath = ing("path");if (k(pluginPath)) {

}private static void collectDirs(String path, List collector) {if (null == path || k(path)) {return;}File current = new File(path);if (!() || !ctory()) {return;}for (File child : les()) {if (!ctory()) {continue;}(olutePath());collectDirs(olutePath(), collector);}}}Task 初始化过程class TaskExecutor{private AbstractRunner generateRunner(PluginType pluginType) {return generateRunner(pluginType, null);}private AbstractRunner generateRunner(PluginType pluginType, List transformerInfoExecs) {AbstractRunner newRunner = null;TaskPluginCollector pluginCollector;switch (pluginType) {case READER:newRunner = uginRunner(pluginType,ing(_READER_NAME));Conf(figuration(_READER_PARAMETER));pluginCollector = tiate(taskCollectorClass, ,


本文标签: 状态 治疗 实例 反射 退出