Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send input to the remote command asynchronously #66

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,10 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.xml.ws.Action;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.WebServiceException;
import javax.xml.ws.soap.SOAPFaultException;

import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.AttributedURIType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,9 +27,6 @@ public RetryingProxyHandler(WinRm winrm, int retriesForConnectionFailures) {

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//TODO use different instances of service http://cxf.apache.org/docs/developing-a-consumer.html#DevelopingaConsumer-SettingConnectionPropertieswithContexts
setActionToContext(method);

// Don't retry the "command" - could lead to unexpected side effects of having the script run multiple times.
if (method.getName().equals("command")) {
return method.invoke(winrm, args);
Expand Down Expand Up @@ -75,31 +67,6 @@ public Object invokeWithRetry(Method method, Object[] args)
throw new RuntimeException("failed task " + method.getName(), exceptions.get(0));
}

// TODO fix CXF to not set a wrong action https://issues.apache.org/jira/browse/CXF-4647
private void setActionToContext(Method method) {
Action action = method.getAnnotation(Action.class);
if (action != null && action.input() != null) {
AttributedURIType attrUri = new AttributedURIType();
attrUri.setValue(action.input());
AddressingProperties addrProps = getAddressingProperties((BindingProvider)winrm);
addrProps.setAction(attrUri);
}
}

private AddressingProperties getAddressingProperties(BindingProvider bp) {
String ADDR_CONTEXT = "javax.xml.ws.addressing.context";
Map<String, Object> reqContext = bp.getRequestContext();
if (reqContext==null) {
throw new NullPointerException("Unable to load request context; delegate load failed");
}

AddressingProperties addrProps = ((AddressingProperties)reqContext.get(ADDR_CONTEXT));
if (addrProps==null) {
throw new NullPointerException("Unable to load request context "+ADDR_CONTEXT+"; are the addressing classes installed (you may need <feature>cxf-ws-addr</feature> if running in osgi)");
}
return addrProps;
}

@Deprecated
public void setRetriesForConnectionFailures(int retries) {
this.retriesForConnectionFailures = retries;
Expand Down
111 changes: 97 additions & 14 deletions client/src/main/java/io/cloudsoft/winrm4j/client/ShellCommand.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.cloudsoft.winrm4j.client;

import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.Charset;
import java.util.List;

import javax.xml.ws.soap.SOAPFaultException;

import org.apache.commons.codec.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.NodeList;
Expand All @@ -15,6 +18,7 @@
import io.cloudsoft.winrm4j.client.shell.DesiredStreamType;
import io.cloudsoft.winrm4j.client.shell.Receive;
import io.cloudsoft.winrm4j.client.shell.ReceiveResponse;
import io.cloudsoft.winrm4j.client.shell.Send;
import io.cloudsoft.winrm4j.client.shell.StreamType;
import io.cloudsoft.winrm4j.client.wsman.CommandResponse;
import io.cloudsoft.winrm4j.client.wsman.Locale;
Expand All @@ -25,6 +29,47 @@
import io.cloudsoft.winrm4j.client.wsman.Signal;

public class ShellCommand implements AutoCloseable {
private static class StreamSender implements Runnable {
// max amount to send
private static final int BUFFER_LENGTH = 4096;

ShellCommand shell;
String commandId;
Reader in;

char[] buffer = new char[BUFFER_LENGTH];



public StreamSender(ShellCommand shellCommand, String commandId, Reader in) {
this.shell = shellCommand;
this.commandId = commandId;
this.in = in;
}

@Override
public void run() {
while (true) {
int read;
try {
read = in.read(buffer);
} catch (IOException e) {
LOG.warn("Aborting stdin send for command ID " + commandId + " due to a read exception.", e);
// Leaving the process running
throw new IllegalStateException(e);
}
if (read > 0) {
shell.send(new String(buffer, 0, read), commandId);
} else if (read == -1) {
shell.send(null, commandId);
return;
}
}
}

}


private static final Logger LOG = LoggerFactory.getLogger(ShellCommand.class.getName());

private static final String COMMAND_STATE_DONE = "http://schemas.microsoft.com/wbem/wsman/1/windows/shell/CommandState/Done";
Expand All @@ -41,6 +86,8 @@ public class ShellCommand implements AutoCloseable {
* Possible causes are: the specified ShellId is incorrect or the shell no longer exi
*/
private static final String WSMAN_FAULT_CODE_SHELL_WAS_NOT_FOUND = "2150858843";

private static final Charset DEFAULT_SHELL_ENCODING = Charsets.UTF_8;

private WinRm winrm;
private SelectorSetType shellSelector;
Expand All @@ -67,8 +114,41 @@ private SelectorSetType createShellSelector(String shellId) {
}

public int execute(String cmd, Writer out, Writer err) {
return execute(cmd, null, out, err);
}

/**
* WARN doesn't work against win12
*
* @since 0.6.0
*/
public int execute(String cmd, Reader in, Writer out, Writer err) {
WinRmClient.checkNotNull(cmd, "command");

numberOfReceiveCalls = 0;
String commandId = command(cmd);

Thread stdinSender = null;
if (in != null) {
stdinSender = new Thread(new StreamSender(this, commandId, in));
stdinSender.start();
}

try {
return receive(commandId, out, err);
} finally {
try {
if (stdinSender != null) {
stdinSender.interrupt();
}
releaseCommand(commandId);
} catch (SOAPFaultException soapFault) {
assertFaultCode(soapFault, WSMAN_FAULT_CODE_SHELL_WAS_NOT_FOUND);
}
}
}

private String command(String cmd) {
final CommandLine cmdLine = new CommandLine();
cmdLine.setCommand(cmd);
final OptionSetType optSetCmd = new OptionSetType();
Expand All @@ -81,24 +161,27 @@ public int execute(String cmd, Writer out, Writer err) {
optSkipCmdShell.setValue("FALSE");
optSetCmd.getOption().add(optSkipCmdShell);

numberOfReceiveCalls = 0;

CommandResponse cmdResponse = winrm.command(cmdLine, WinRmClient.RESOURCE_URI, WinRmClient.MAX_ENVELOPER_SIZE, operationTimeout, locale, shellSelector, optSetCmd);
return cmdResponse.getCommandId();
}

String commandId = cmdResponse.getCommandId();

try {
return receiveCommand(commandId, out, err);
} finally {
try {
releaseCommand(commandId);
} catch (SOAPFaultException soapFault) {
assertFaultCode(soapFault, WSMAN_FAULT_CODE_SHELL_WAS_NOT_FOUND);
}
private void send(String cmd, String commandId) {
final Send send = new Send();
StreamType stdin = new StreamType();
stdin.setCommandId(commandId);
stdin.setName("stdin");
if (cmd != null) {
stdin.setValue(cmd.getBytes(DEFAULT_SHELL_ENCODING));
} else {
stdin.setEnd(true);
}
send.getStream().add(stdin);

winrm.send(send, WinRmClient.RESOURCE_URI, WinRmClient.MAX_ENVELOPER_SIZE, operationTimeout, locale, shellSelector);
}

private int receiveCommand(String commandId, Writer out, Writer err) {
private int receive(String commandId, Writer out, Writer err) {
while(true) {
final Receive receive = new Receive();
DesiredStreamType stream = new DesiredStreamType();
Expand Down Expand Up @@ -163,7 +246,7 @@ private void getStreams(ReceiveResponse receiveResponse, Writer out, Writer err)
try {
//TODO use passed locale?
if (value.length > 0) {
out.write(new String(value));
out.write(new String(value, DEFAULT_SHELL_ENCODING));
}
if (Boolean.TRUE.equals(s.isEnd())) {
out.close();
Expand All @@ -176,7 +259,7 @@ private void getStreams(ReceiveResponse receiveResponse, Writer out, Writer err)
try {
//TODO use passed locale?
if (value.length > 0) {
err.write(new String(value));
err.write(new String(value, DEFAULT_SHELL_ENCODING));
}
if (Boolean.TRUE.equals(s.isEnd())) {
err.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@
import org.apache.cxf.service.model.ServiceInfo;
import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.JAXWSAConstants;
import org.apache.cxf.ws.addressing.VersionTransformer;
import org.apache.cxf.ws.addressing.policy.MetadataConstants;
import org.apache.cxf.ws.policy.PolicyConstants;
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.NTCredentials;
Expand All @@ -41,6 +40,8 @@
import org.apache.http.impl.auth.BasicSchemeFactory;
import org.apache.http.impl.auth.KerberosSchemeFactory;
import org.apache.http.impl.auth.SPNegoSchemeFactory;
import org.apache.neethi.Policy;
import org.apache.neethi.builders.PrimitiveAssertion;
import org.w3c.dom.Element;

import io.cloudsoft.winrm4j.client.ntlm.SpNegoNTLMSchemeFactory;
Expand Down Expand Up @@ -235,9 +236,9 @@ private static void initializeClientAndService(WinRm winrm, WinRmClientBuilder b
List<Handler> handlerChain = Arrays.<Handler>asList(new StripShellResponseHandler());
bp.getBinding().setHandlerChain(handlerChain);

Map<String, Object> requestContext = bp.getRequestContext();
AddressingProperties maps = new AddressingProperties(VersionTransformer.Names200408.WSA_NAMESPACE_NAME);
requestContext.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES, maps);
Policy policy = new Policy();
policy.addAssertion(new PrimitiveAssertion(MetadataConstants.USING_ADDRESSING_2004_QNAME));
bp.getRequestContext().put(PolicyConstants.POLICY_OVERRIDE, policy);

bp.getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, endpoint);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package io.cloudsoft.winrm4j.client;

import java.lang.reflect.Field;
import java.util.Arrays;

import javax.xml.ws.WebServiceFeature;
import javax.xml.ws.spi.Provider;
import javax.xml.ws.spi.ServiceDelegate;

import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.feature.Feature;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.ws.addressing.WSAddressingFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,7 +93,6 @@ private static WinRm doCreateServiceWithBean(Bus bus) {
factory.getClientFactoryBean().getServiceFactory().setWsdlURL(WinRmService.WSDL_LOCATION);
factory.setServiceName(WinRmService.SERVICE);
factory.setEndpointName(WinRmService.WinRmPort);
factory.setFeatures(Arrays.asList((Feature)newMemberSubmissionAddressingFeature()));
factory.setBus(bus);
return factory.create(WinRm.class);
}
Expand All @@ -107,33 +102,7 @@ private static WinRmService doCreateService_1_CreateMinimalServiceInstance() {
}

private static WinRm doCreateService_2_GetClient(WinRmService service) {
return service.getWinRmPort(
// * Adds WS-Addressing headers and uses the submission spec namespace
// http://schemas.xmlsoap.org/ws/2004/08/addressing
newMemberSubmissionAddressingFeature());
}

private static WebServiceFeature newMemberSubmissionAddressingFeature() {
/*
* Requires the following dependency so the feature is visible to maven.
* But is it included in the IBM dist?
<dependency>
<groupId>com.sun.xml.ws</groupId>
<artifactId>jaxws-rt</artifactId>
<version>2.2.10</version>
</dependency>
*/
try {
// com.ibm.websphere.wsaddressing.jaxws21.SubmissionAddressingFeature for IBM java (available only in WebSphere?)

WSAddressingFeature webServiceFeature = new WSAddressingFeature();
// webServiceFeature.setResponses(WSAddressingFeature.AddressingResponses.ANONYMOUS);
webServiceFeature.setAddressingRequired(true);

return webServiceFeature;
} catch (Exception e) {
throw new RuntimeException(e);
}
return service.getWinRmPort();
}

}
Loading