如何解决来自容器程序的纱线应用程序失败
我写了一个纱线客户端,它从 YARN 请求一些容器
问题是即使我的应用程序在第二个容器上失败后,应用程序仍被标记为成功。
如果我的应用程序在容器上失败,任何人都可以帮我找出一种方法来将我的应用程序标记为失败
客户端.java
FirebaseFirestore.instance
.collection('messages')
.where('encSenderUId',whereIn: [widget.encUId,_loggedInUserId])
.where('encReceiverUId',isEqualTo: _loggedInUserId)
.where('encReceiverUId',isEqualTo: widget.encUId)
.orderBy('sentOn',descending: false)
.startAt([_startAtTimestamp]).snapshots();
ApplicationMaster.java
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
public class Client {
Configuration conf = new YarnConfiguration();
public void run(String[] args) throws Exception {
// final String command = args[0];
final Path jarPath = new Path(args[0]);
// Create yarnClient
YarnConfiguration conf = new YarnConfiguration();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
yarnClient.start();
// Create application via yarnClient
YarnClientApplication app = yarnClient.createApplication();
ApplicationSubmissionContext appContext =
app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// Set up the container launch context for the application master
ContainerLaunchContext amContainer =
Records.newRecord(ContainerLaunchContext.class);
amContainer.setCommands(
Collections.singletonList(
"$JAVA_HOME/bin/java" +
" -Xmx256M" +
" com.paytm.lending.datamart.yarn.v2.ApplicationMaster" +
" " + appId.toString() +
" 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
" 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
)
);
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
setupAppMasterJar(jarPath,appMasterJar);
amContainer.setLocalResources(
Collections.singletonMap("simpleapp.jar",appMasterJar));
// Setup CLAsspATH for ApplicationMaster
Map<String,String> appMasterEnv = new HashMap<String,String>();
setupAppMasterEnv(appMasterEnv,jarPath.toUri().getRawPath());
amContainer.setEnvironment(appMasterEnv);
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(256);
capability.setVirtualCores(1);
// Finally,set-up ApplicationSubmissionContext for the application
appContext.setApplicationName("simple-yarn-app"); // application name
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
appContext.setQueue("default"); // queue
// Submit application
System.out.println("Submitting application " + appId);
yarnClient.submitApplication(appContext);
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (appState != YarnApplicationState.FINISHED &&
appState != YarnApplicationState.KILLED &&
appState != YarnApplicationState.Failed) {
Thread.sleep(100);
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
System.out.println(
"Application " + appId + " finished with" +
" state " + appState +
" at " + appReport.getFinishTime());
}
private void setupAppMasterJar(Path jarPath,LocalResource appMasterJar) throws IOException {
FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
appMasterJar.setResource(URL.fromPath(jarPath));
appMasterJar.setSize(jarStat.getLen());
appMasterJar.setTimestamp(jarStat.getModificationTime());
appMasterJar.setType(LocalResourceType.FILE);
appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
}
private void setupAppMasterEnv(Map<String,String> appMasterEnv,String jarPath) {
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLAsspATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLAsspATH)) {
Apps.addToEnvironment(appMasterEnv,Environment.CLAsspATH.name(),c.trim(),File.pathSeparator);
}
Apps.addToEnvironment(appMasterEnv,Environment.PWD.$() + File.separator + "*",File.pathSeparator);
Apps.addToEnvironment(appMasterEnv,"JAR_PATH",jarPath,File.pathSeparator);
}
public static void main(String[] args) throws Exception {
Client c = new Client();
c.run(args);
}
}
DummyApplication.java
import java.io.File;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.Records;
public class ApplicationMaster {
public static void main(String[] args) throws Exception {
final String appId = args[0];
// Initialize clients to ResourceManager and NodeManagers
Configuration conf = new YarnConfiguration();
AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
rmClient.init(conf);
rmClient.start();
NMClient nmClient = NMClient.createNMClient();
nmClient.init(conf);
nmClient.start();
// Register with ResourceManager
System.out.println("registerapplicationMaster 0");
rmClient.registerapplicationMaster("","");
System.out.println("registerapplicationMaster 1");
// Priority for worker containers - priorities are intra-application
Priority priority = Records.newRecord(Priority.class);
priority.setPriority(0);
// Resource requirements for worker containers
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(128);
capability.setVirtualCores(1);
// Make container requests to ResourceManager
for (int i = 0; i < 2; ++i) {
ContainerRequest containerAsk = new ContainerRequest(capability,null,priority);
System.out.println("Making resource request " + i);
rmClient.addContainerRequest(containerAsk);
}
// Obtain allocated containers,launch and check for responses
int responseId = 0;
int completedContainers = 0;
while (completedContainers < 2) {
AllocateResponse response = rmClient.allocate(responseId++);
System.out.println("response.getAllocatedContainers().size() "+response.getAllocatedContainers().size());
for (Container container : response.getAllocatedContainers()) {
List<String> commands = new ArrayList<String>(Arrays.asList("$JAVA_HOME/bin/java " +
" -Xmx256m " +
" com.paytm.lending.datamart.yarn.DummyApplication " +1 +" "+
" 1> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDOUT +
" 2> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDERR,"exit 100"));
// List<String> commands = Collections.singletonList(
// "$JAVA_HOME/bin/java " +
// " -Xmx256m " +
// " com.paytm.lending.datamart.yarn.DummyApplication " +1 +" "+
// " 1> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDOUT +
// " 2> " + "/var/log/hadoop-yarn/containers/"+appId+"/"+container.getId()+"/"+ApplicationConstants.STDERR
// );
// Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx =
Records.newRecord(ContainerLaunchContext.class);
Map<String,LocalResource> test = new HashMap<>();
LocalResource test1 = Records.newRecord(LocalResource.class);
FileStatus status = FileSystem.get(conf).getFileStatus(new Path(System.getenv("JAR_PATH")));
test1.setResource(URL.fromPath(status.getPath()));
test1.setSize(status.getLen());
test1.setTimestamp(status.getModificationTime());
test1.setType(LocalResourceType.FILE);
test1.setVisibility(LocalResourceVisibility.PUBLIC);
Map<String,String> env = new HashMap<String,String>();
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLAsspATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLAsspATH)) {
Apps.addToEnvironment(env,ApplicationConstants.Environment.CLAsspATH.name(),File.pathSeparator);
}
Apps.addToEnvironment(env,ApplicationConstants.Environment.PWD.$() + File.separator + "*",File.pathSeparator);
test.put("simpleapp.jar",test1);
ctx.setLocalResources(test);
ctx.setEnvironment(env);
ctx.setCommands(commands);
System.out.println("New Launching container " + container + " with command "+commands);
nmClient.startContainer(container,ctx);
}
for (ContainerStatus status : response.getCompletedContaineRSStatuses()) {
++completedContainers;
System.out.println("Completed container " + status.getContainerId());
System.out.println("Container state "+ status.getState());
}
Thread.sleep(100);
}
// Un-register with ResourceManager
rmClient.unregisterapplicationMaster(
FinalApplicationStatus.SUCCEEDED,"","");
}
}
如果在 DummyApplication.java 类中发生异常,我需要一种可以使我的应用程序失败的方法
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。