admin 管理员组文章数量: 1087652
2024年2月7日发(作者:7种治疗类风湿最好的药)
private AbstractScheduler initStandaloneScheduler(Configuration configuration) {AbstractContainerCommunicator containerCommunicator = new StandAloneJobContainerCommunicator(configuration);tainerCommunicator(containerCommunicator);return new StandAloneScheduler(containerCommunicator);}}public abstract class AbstractScheduler{private static final Logger LOG = ger();public void schedule(List configurations) {/*** 给 taskGroupContainer 的 Communication 注册*/erCommunication(configurations);int totalTasks = calculateTaskCount(configurations);startAllTaskGroup(configurations);try {while (true) {Communication nowJobContainerCommunication = t();//汇报周期long now = tTimeMillis();if (now - lastReportTimeStamp > jobReportIntervalInMillSec) {Communication reportCommunication = ortCommunication(nowJobContainerCommunication, lastJobContainerCommunication, totalTasks);(reportCommunication);if (te() == DED) {("Scheduler accomplished all tasks.");break;}if (te() == ) {dealFailedStat(nerCommunicator, owable());}(jobSleepIntervalInMillSec);}
} catch (InterruptedException e) {// 以 failed 状态退出("捕获到InterruptedException异常!", e);throw XException(E_ERROR, e);}}@Overridepublic void startAllTaskGroup(List configurations) {oupContainerExecutorService = edThreadPool(());for (Configuration taskGroupConfiguration : configurations) {TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);e(taskGroupContainerRunner);}wn();}@Overridepublic void dealFailedStat(AbstractContainerCommunicator frameworkCollector, Throwable throwable) {wnNow();}}public class TaskGroupContainer extends AbstractContainer{private static final Logger LOG = ger();@Overridepublic void start() {try {while (true) {//1.判断task状态boolean failedOrKilled = false;Map communicationMap = municationMap();for( entry : et()){Integer taskId = ();
try {AbstractJobPlugin jobPlugin = (AbstractJobPlugin) tance();ginConf(getPluginConf(pluginType, pluginName));return jobPlugin;} catch (Exception e) {throw XException(E_ERROR,("DataX找到plugin[%s]的Job配置.",pluginName), e);}}//反射出具体plugin实例private static synchronized Class extends AbstractPlugin> loadPluginClass(PluginType pluginType, String pluginName,ContainerType pluginRunType) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = Loader(pluginType, pluginName);try {return (Class extends AbstractPlugin>) ass(ing("class") + "$"+ ());} catch (Exception e) {throw XException(E_ERROR, e);}}public static synchronized JarLoader getJarLoader(PluginType pluginType,String pluginName) {Configuration pluginConf = getPluginConf(pluginType, pluginName);JarLoader jarLoader = (generatePluginKey(pluginType,pluginName));if (null == jarLoader) {String pluginPath = ing("path");if (k(pluginPath)) {
}private static void collectDirs(String path, List collector) {if (null == path || k(path)) {return;}File current = new File(path);if (!() || !ctory()) {return;}for (File child : les()) {if (!ctory()) {continue;}(olutePath());collectDirs(olutePath(), collector);}}}Task 初始化过程class TaskExecutor{private AbstractRunner generateRunner(PluginType pluginType) {return generateRunner(pluginType, null);}private AbstractRunner generateRunner(PluginType pluginType, List transformerInfoExecs) {AbstractRunner newRunner = null;TaskPluginCollector pluginCollector;switch (pluginType) {case READER:newRunner = uginRunner(pluginType,ing(_READER_NAME));Conf(figuration(_READER_PARAMETER));pluginCollector = tiate(taskCollectorClass, ,
版权声明:本文标题:datax底层原理_datax原理解析和性能优化 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1707249898a512981.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论