Skip to content

Commit

Permalink
[apache#1011] feat(tez): Avoid recompute succeeded task.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengchenyu committed Jul 24, 2023
1 parent ecfed5e commit d817ee8
Show file tree
Hide file tree
Showing 5 changed files with 810 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public class RssTezConfig {
public static final String RSS_SHUFFLE_DESTINATION_VERTEX_ID =
TEZ_RSS_CONFIG_PREFIX + "rss.shuffle.destination.vertex.id";

public static final String RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK =
TEZ_RSS_CONFIG_PREFIX + "rss.avoid.recompute.succeeded.task";
public static final boolean RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT = false;

public static RssConf toRssConf(Configuration jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.LogManager;
import org.apache.log4j.helpers.Loader;
import org.apache.log4j.helpers.OptionConverter;
import org.apache.tez.common.AsyncDispatcher;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezClassLoader;
Expand All @@ -56,10 +58,15 @@
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.AMPluginDescriptorProto;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
Expand All @@ -76,8 +83,12 @@
import static org.apache.log4j.LogManager.DEFAULT_CONFIGURATION_KEY;
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_ADDRESS;
import static org.apache.tez.common.RssTezConfig.RSS_AM_SHUFFLE_MANAGER_PORT;
import static org.apache.tez.common.RssTezConfig.RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK;
import static org.apache.tez.common.RssTezConfig.RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT;
import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_DESTINATION_VERTEX_ID;
import static org.apache.tez.common.RssTezConfig.RSS_SHUFFLE_SOURCE_VERTEX_ID;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS;
import static org.apache.tez.dag.api.TezConfiguration.TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT;

public class RssDAGAppMaster extends DAGAppMaster {
private static final Logger LOG = LoggerFactory.getLogger(RssDAGAppMaster.class);
Expand Down Expand Up @@ -125,6 +136,10 @@ public RssDAGAppMaster(
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
if (conf.getBoolean(
RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK, RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT)) {
overrideTaskAttemptEventDispatcher();
}
initAndStartRSSClient(this, conf);
}

Expand Down Expand Up @@ -336,6 +351,16 @@ public static void main(String[] args) {
}
}

if (conf.getBoolean(
RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK, RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK_DEFAULT)
&& conf.getBoolean(
TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS,
TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS_DEFAULT)) {
LOG.info(
"When rss.avoid.recompute.succeeded.task is enable, "
+ "we can not rescheduler succeeded task on unhealthy node");
conf.setBoolean(TEZ_AM_NODE_UNHEALTHY_RESCHEDULE_TASKS, false);
}
initAndStartAppMaster(appMaster, conf);
} catch (Throwable t) {
LOG.error("Error starting RssDAGAppMaster", t);
Expand Down Expand Up @@ -476,12 +501,50 @@ private static Object getPrivateField(Object object, String name) {
}
}

private static void reconfigureLog4j() {
static void reconfigureLog4j() {
String configuratorClassName = OptionConverter.getSystemProperty(CONFIGURATOR_CLASS_KEY, null);
String configurationOptionStr =
OptionConverter.getSystemProperty(DEFAULT_CONFIGURATION_KEY, null);
URL url = Loader.getResource(configurationOptionStr);
OptionConverter.selectAndConfigure(
url, configuratorClassName, LogManager.getLoggerRepository());
}

protected void overrideTaskAttemptEventDispatcher()
throws NoSuchFieldException, IllegalAccessException {
AsyncDispatcher dispatcher = (AsyncDispatcher) this.getDispatcher();
Field field = dispatcher.getClass().getDeclaredField("eventHandlers");
field.setAccessible(true);
Map<Class<? extends Enum>, EventHandler> eventHandlers =
(Map<Class<? extends Enum>, EventHandler>) field.get(dispatcher);
eventHandlers.put(TaskAttemptEventType.class, new RssTaskAttemptEventDispatcher());
}

private class RssTaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskAttemptEvent event) {
DAG dag = getContext().getCurrentDAG();
int eventDagIndex = event.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
if (dag == null || eventDagIndex != dag.getID().getId()) {
return; // event not relevant any more
}
Task task =
dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID())
.getTask(event.getTaskAttemptID().getTaskID());
TaskAttempt attempt = task.getAttempt(event.getTaskAttemptID());

if (attempt.getState() == TaskAttemptState.SUCCEEDED
&& event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
// Here we only handle TA_NODE_FAILED. TA_KILL_REQUEST and TA_KILLED also could trigger
// TerminatedAfterSuccessTransition, but the reason is not about bad node.
LOG.info(
"We should not recompute the succeeded task attempt, though task attempt {} recieved envent {}",
attempt,
event);
return;
}
((EventHandler<TaskAttemptEvent>) attempt).handle(event);
}
}
}
Loading

0 comments on commit d817ee8

Please sign in to comment.