Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Several improvement #3

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,9 @@ private Map<String, LocalResource> getContainerResources() throws IOException {
Utils.localizeHDFSResource(fs, startScript, Constants.DYARN_START_SCRIPT, LocalResourceType.FILE, containerResources);
Utils.localizeHDFSResource(fs, containerExecutorCfg, Constants.CONTAINER_EXECUTOR_CFG, LocalResourceType.FILE, containerResources);

//Try to get Dyno_Generate jar
Utils.localizeHDFSResource(fs, System.getenv(Constants.SIMULATED_FATJAR_NAME), Constants.SIMULATED_FATJAR, LocalResourceType.FILE, containerResources);

String hdfsClasspath = System.getenv("HDFS_CLASSPATH");
for (FileStatus status : fs.listStatus(new Path(hdfsClasspath))) {
Utils.localizeHDFSResource(fs, status.getPath().toString(), status.getPath().getName(), LocalResourceType.FILE, containerResources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.linkedin.dynoyarn.common.DynoYARNConfigurationKeys;
import com.linkedin.dynoyarn.common.Utils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -212,6 +214,23 @@ private ContainerLaunchContext createAMContainerSpec(ApplicationId appId, ByteBu
Path containerExecutorCfg = Utils.localizeLocalResource(dyarnConf, fs, CONTAINER_EXECUTOR_CFG, LocalResourceType.FILE, appResourcesPath, localResources);
Utils.localizeLocalResource(dyarnConf, fs, DYNOYARN_SITE_XML, LocalResourceType.FILE, appResourcesPath, localResources);
containerEnv.put(Constants.DYARN_CONF_NAME, conf.toString());

//Try to add dynoyarn-generator resource when launch NMs
File cwd = new File(".");
File[] files = cwd.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.startsWith("dynoyarn-generator");
}
});
if (files.length == 0) {
throw new FileNotFoundException("Not found dynooyarn-generator-jar");
}
Path simulateFatJarLocation = Utils.localizeLocalResource(dyarnConf, fs,
files[0].getPath(), LocalResourceType.FILE, appResourcesPath, localResources);

containerEnv.put(Constants.SIMULATED_FATJAR_NAME, simulateFatJarLocation.toString());

containerEnv.put(Constants.DYARN_JAR_NAME, dyarnJar.toString());
containerEnv.put(Constants.DYARN_START_SCRIPT_NAME, startScript.toString());
containerEnv.put(Constants.CAPACITY_SCHEDULER_NAME, capacitySchedulerConfPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ public void run() {
containerShellEnv.put(Constants.SIMULATED_FATJAR_NAME, System.getenv(Constants.SIMULATED_FATJAR_NAME));
containerShellEnv.put("HDFS_CLASSPATH", System.getenv("HDFS_CLASSPATH"));
containerShellEnv.put(Constants.DYARN_CONF_NAME, System.getenv(Constants.DYARN_CONF_NAME));
containerShellEnv.put("driverAppId", System.getenv("driverAppId"));
try (PrintWriter out = new PrintWriter(Constants.SPEC_FILE, "UTF-8")) {
String spec = new ObjectMapper().writeValueAsString(containerAppSpecs)
.replaceAll("\"", "'");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public class WorkloadClient implements AutoCloseable {
private String confPath;
private String workloadJarPath;

private String driverAppId;

private Path appResourcesPath;

public static final String DRIVER_APP_ID_ARG = "driver_app_id";
Expand Down Expand Up @@ -159,7 +161,8 @@ public boolean init(String[] args) throws IOException, ParseException {
if (args.length == 0) {
throw new IllegalArgumentException("No args specified for client to initialize");
}
String driverAppId = cliParser.getOptionValue(DRIVER_APP_ID_ARG);
//String driverAppId = cliParser.getOptionValue(DRIVER_APP_ID_ARG);
driverAppId = cliParser.getOptionValue(DRIVER_APP_ID_ARG);
workloadSpecLocation = cliParser.getOptionValue(WORKLOAD_SPEC_LOCATION_ARG);
simulatedFatJarLocation = cliParser.getOptionValue(SIMULATED_FATJAR_ARG);
confPath = cliParser.getOptionValue(CONF_ARG);
Expand Down Expand Up @@ -243,6 +246,7 @@ public boolean accept(File dir, String name) {
classPathEnv.append(c.trim());
}
containerEnv.put("CLASSPATH", classPathEnv.toString());
containerEnv.put("driverAppId", driverAppId);

// Set logs to be readable by everyone. Set app to be modifiable only by app owner.
Map<ApplicationAccessType, String> acls = new HashMap<>(2);
Expand Down
12 changes: 11 additions & 1 deletion dynoyarn-driver/src/main/resources/start-component.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ cp *.jar $extraClasspathDir

# This is where libleveldbjni is written (per-NM). Write it to a larger partition
# instead of /tmp
tmpdir="/grid/a/tmp/hadoop-`whoami`"
#tmpdir="/grid/a/tmp/hadoop-`whoami`"
tmpdir="$baseDir/tmp/hadoop-`whoami`"
mkdir $tmpdir

# Change environment variables for the Hadoop process
Expand Down Expand Up @@ -122,6 +123,7 @@ if [ "$component" = "RESOURCE_MANAGER" ]; then
cp `readlink -f dynoyarn-capacity-scheduler.xml` ${baseDir}/dcs/capacity-scheduler.xml
rm ${confDir}/capacity-scheduler.xml
ln -s ${baseDir}/dcs/capacity-scheduler.xml ${confDir}/capacity-scheduler.xml
unset APPLICATION_WEB_PROXY_BASE
else
rm ${hadoopHome}/etc/hadoop/container-executor.cfg
ln -s `readlink -f dynoyarn-container-executor.cfg` ${hadoopHome}/etc/hadoop/container-executor.cfg
Expand All @@ -143,6 +145,14 @@ read -r -d '' driverConfigs <<EOF
$@
EOF

# When mesher will start node manager, it will download the simulatedfatjar and then put it on the /tmp folder
# When dynoyarn generator run its container, it doesn't need to download simulatedfatjar itself.
# Reason we did this is that, dynoyarn generator will run a container without security check, therefore,
# dynoyarn generator container can't download a file from a security cluster.
if [ "$component" = "NODE_MANAGER" ]; then
rm -f /tmp/simulatedfatjar.jar
cp `pwd`/simulatedfatjar.jar /tmp/simulatedfatjar.jar

if [ "$component" = "NODE_MANAGER" ]; then
HADOOP_LOG_DIR=$logDir HADOOP_CLIENT_OPTS=$HADOOP_CLIENT_OPTS $hadoopHome/bin/hadoop com.linkedin.dynoyarn.SimulatedNodeManagers $driverConfigs $nmCount
echo nodemanager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class SimulatedAppSubmitter {
private Configuration conf = new Configuration();
private FileSystem fs;
private String rmEndpoint;
private String schedulerEndPoint;
private String clusterSpecLocation;
private float multiplier;
private AppSpec[] appSpecs;
Expand Down Expand Up @@ -111,6 +112,7 @@ private boolean init(String[] args) throws IOException {
String out = IOUtils.toString(inputStream);
ClusterInfo cluster = new ObjectMapper().readValue(out, ClusterInfo.class);
rmEndpoint = cluster.getRmHost() + ":" + cluster.getRmPort();
schedulerEndPoint = cluster.getRmHost() + ":" + cluster.getRmSchedulerPort();
conf.set(YarnConfiguration.RM_ADDRESS, rmEndpoint);
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS);
Expand Down Expand Up @@ -267,9 +269,11 @@ private ContainerLaunchContext createAMContainerSpec(AppSpec appSpec, ByteBuffer
Map<String, String> containerEnv = new HashMap<>();

Map<String, LocalResource> localResources = new HashMap<>();
Utils.localizeHDFSResource(fs, System.getenv(Constants.DYARN_JAR_NAME), Constants.DYARN_JAR, LocalResourceType.FILE, localResources);
Utils.localizeHDFSResource(fs, System.getenv(Constants.SIMULATED_FATJAR_NAME), Constants.SIMULATED_FATJAR, LocalResourceType.FILE, localResources);
Utils.localizeHDFSResource(fs, System.getenv(Constants.DYARN_CONF_NAME), Constants.DYARN_CONF_NAME, LocalResourceType.FILE, localResources);
//Utils.localizeHDFSResource(fs, System.getenv(Constants.DYARN_JAR_NAME), Constants.DYARN_JAR, LocalResourceType.FILE, localResources);
//Utils.localizeHDFSResource(fs, System.getenv(Constants.SIMULATED_FATJAR_NAME), Constants.SIMULATED_FATJAR, LocalResourceType.FILE, localResources);
//Utils.localizeHDFSResource(fs, System.getenv(Constants.DYARN_CONF_NAME), Constants.DYARN_CONF_NAME, LocalResourceType.FILE, localResources);

//In my case, I never use HDFS_CLASSPATH, therefore, I don't deal with this localizeHDFSResource
String hdfsClasspath = System.getenv("HDFS_CLASSPATH");
if (hdfsClasspath != null) {
for (FileStatus status : fs.listStatus(new Path(hdfsClasspath))) {
Expand All @@ -278,8 +282,13 @@ private ContainerLaunchContext createAMContainerSpec(AppSpec appSpec, ByteBuffer
}
}

//We need to pass those parameters, since we don't upload the Dyarn_Conf
containerEnv.put("schedulerEndPoint", schedulerEndPoint);
containerEnv.put("rmEndPoint", rmEndpoint);
containerEnv.put(Constants.DYARN_CONF_NAME, new Path(System.getenv(Constants.DYARN_CONF_NAME)).getName());
containerEnv.put("CLASSPATH", "./*:$HADOOP_CONF_DIR:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/lib/*");

//Add simulatedfatjar.jar in the classpath.
containerEnv.put("CLASSPATH", "./*:$HADOOP_CONF_DIR:$HADOOP_YARN_HOME/share/hadoop/yarn/*:$HADOOP_YARN_HOME/lib/*:/tmp/simulatedfatjar.jar");
containerEnv.put(Constants.IS_AM, "true");
containerEnv.put("HDFS_CLASSPATH", System.getenv("HDFS_CLASSPATH"));
containerEnv.put(Constants.APP_SPEC_NAME, new ObjectMapper().writeValueAsString(appSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ public boolean init(String[] args) throws IOException {
InputStream inputStream = fs.open(new Path(clusterSpecLocation));
String out = IOUtils.toString(inputStream);
ClusterInfo cluster = new ObjectMapper().readValue(out, ClusterInfo.class);
rmEndpoint = cluster.getRmHost() + ":" + cluster.getRmPort();
amEndpoint = cluster.getRmHost() + ":" + cluster.getRmSchedulerPort();
//rmEndpoint = cluster.getRmHost() + ":" + cluster.getRmPort();
//amEndpoint = cluster.getRmHost() + ":" + cluster.getRmSchedulerPort();
rmEndpoint = System.getenv("rmEndPoint");
amEndpoint = System.getenv("schedulerEndPoint"); // should be schedulerEndPoint, amEndpoint.
conf.set(YarnConfiguration.RM_ADDRESS, rmEndpoint);
conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, amEndpoint);
// Set NM/RM max-wait to respective intervals, so that the underlying ipc.Client only retries once.
Expand Down