diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java index c89fdcd25..fab4431a2 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java @@ -21,12 +21,14 @@ package org.apache.guacamole.rest.tunnel; import com.google.inject.Inject; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Map; import java.util.Set; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -143,4 +145,52 @@ public class TunnelRESTService { } + /** + * Intercepts a specific stream, sending the contents of the given + * InputStream over that stream as "blob" instructions. + * + * @param authToken + * The authentication token that is used to authenticate the user + * performing the operation. + * + * @param tunnelUUID + * The UUID of the tunnel containing the stream being intercepted. + * + * @param streamIndex + * The index of the stream to intercept. + * + * @param filename + * The filename to use for the sake of identifying the data being sent. + * + * @param data + * An InputStream containing the data to be sent over the intercepted + * stream. + * + * @throws GuacamoleException + * If the session associated with the given auth token cannot be + * retrieved, or if no such tunnel exists. + */ + @POST + @Consumes(MediaType.WILDCARD) + @Path("/{tunnel}/streams/{index}/{filename}") + public void setStreamContents(@QueryParam("token") String authToken, + @PathParam("tunnel") String tunnelUUID, + @PathParam("index") final int streamIndex, + @PathParam("filename") String filename, + InputStream data) + throws GuacamoleException { + + GuacamoleSession session = authenticationService.getGuacamoleSession(authToken); + Map tunnels = session.getTunnels(); + + // Pull tunnel with given UUID + final StreamInterceptingTunnel tunnel = tunnels.get(tunnelUUID); + if (tunnel == null) + throw new GuacamoleResourceNotFoundException("No such tunnel."); + + // Send input over stream + tunnel.interceptStream(streamIndex, data); + + } + } diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java new file mode 100644 index 000000000..6d494bd29 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import javax.xml.bind.DatatypeConverter; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts "ack" instructions, automatically reading + * from or closing the stream given with interceptStream(). The required "blob" + * and "end" instructions denoting the content and boundary of the stream are + * sent automatically. + */ +public class InputStreamInterceptingFilter + extends StreamInterceptingFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(InputStreamInterceptingFilter.class); + + /** + * Creates a new InputStreamInterceptingFilter which selectively intercepts + * "ack" instructions. The required "blob" and "end" instructions will + * automatically be sent over the given tunnel based on the content of + * provided InputStreams. + * + * @param tunnel + * The GuacamoleTunnel over which any required "blob" and "end" + * instructions should be sent. + */ + public InputStreamInterceptingFilter(GuacamoleTunnel tunnel) { + super(tunnel); + } + + /** + * Injects a "blob" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "blob" instructions are used + * to send chunks of data along a stream. + * + * @param index + * The index of the stream that this "blob" instruction relates to. + * + * @param blob + * The chunk of data to send within the "blob" instruction. + */ + private void sendBlob(String index, byte[] blob) { + + // Send "blob" containing provided data + sendInstruction(new GuacamoleInstruction("blob", index, + DatatypeConverter.printBase64Binary(blob))); + + } + + /** + * Injects an "end" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "end" instructions are used + * to signal the end of a stream. + * + * @param index + * The index of the stream that this "end" instruction relates to. + */ + private void sendEnd(String index) { + sendInstruction(new GuacamoleInstruction("end", index)); + } + + /** + * Reads the next chunk of data from the InputStream associated with an + * intercepted stream, sending that data as a "blob" instruction over the + * GuacamoleTunnel associated with this filter. If the end of the + * InputStream is reached, an "end" instruction will automatically be sent. + * + * @param stream + * The stream from which the next chunk of data should be read. + */ + private void readNextBlob(InterceptedStream stream) { + + // Read blob from stream if it exists + try { + + // Read raw data from input stream + byte[] blob = new byte[6048]; + int length = stream.getStream().read(blob); + + // End stream if no more data + if (length == -1) { + + // Close stream, send end if the stream is still valid + if (closeInterceptedStream(stream)) + sendEnd(stream.getIndex()); + + return; + + } + + // Inject corresponding "blob" instruction + sendBlob(stream.getIndex(), Arrays.copyOf(blob, length)); + + } + + // Terminate stream if it cannot be read + catch (IOException e) { + + logger.debug("Unable to read data of intercepted input stream.", e); + + // Close stream, send end if the stream is still valid + if (closeInterceptedStream(stream)) + sendEnd(stream.getIndex()); + + } + + } + + /** + * Handles a single "ack" instruction, sending yet more blobs or closing the + * stream depending on whether the "ack" indicates success or failure. If no + * InputStream is associated with the stream index within the "ack" + * instruction, the instruction is ignored. + * + * @param instruction + * The "ack" instruction being handled. + */ + private void handleAck(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 3) + return; + + // Pull associated stream + String index = args.get(0); + InterceptedStream stream = getInterceptedStream(index); + if (stream == null) + return; + + // Pull status code + String status = args.get(2); + + // Terminate stream if an error is encountered + if (!status.equals("0")) { + closeInterceptedStream(stream); + return; + } + + // Send next blob + readNextBlob(stream); + + } + + @Override + public GuacamoleInstruction filter(GuacamoleInstruction instruction) + throws GuacamoleException { + + // Intercept "ack" instructions for in-progress streams + if (instruction.getOpcode().equals("ack")) + handleAck(instruction); + + // Pass instruction through untouched + return instruction; + + } + + @Override + protected void handleInterceptedStream(InterceptedStream stream) { + + // Read the first blob. Note that future blobs will be read in response + // to received "ack" instructions. + readNextBlob(stream); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java index 333cdb1c5..91984cca8 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java @@ -19,7 +19,9 @@ package org.apache.guacamole.tunnel; +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.InputStream; import java.io.OutputStream; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.io.GuacamoleReader; @@ -30,10 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * GuacamoleTunnel implementation which provides for intercepting the contents - * of in-progress streams, rerouting received blobs to a provided OutputStream. - * Interception of streams is requested on a per stream basis and lasts only - * for the duration of that stream. + * GuacamoleTunnel implementation which provides for producing or consuming the + * contents of in-progress streams, rerouting blobs to a provided OutputStream + * or from a provided InputStream. Interception of streams is requested on a per + * stream basis and lasts only for the duration of that stream. * * @author Michael Jumper */ @@ -58,6 +60,12 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { super(tunnel); } + /** + * The filter to use for providing stream data from InputStreams. + */ + private final InputStreamInterceptingFilter inputStreamFilter = + new InputStreamInterceptingFilter(this); + /** * The filter to use for rerouting received stream data to OutputStreams. */ @@ -92,9 +100,45 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { } + /** + * Intercept the given stream, continuously writing the contents of the + * given InputStream as blobs. The stream will automatically end when + * when the end of the InputStream is reached. If there is no such + * stream, then the InputStream will be closed immediately. This function + * will block until all data from the InputStream has been written to the + * given stream. + * + * @param index + * The index of the stream to intercept. + * + * @param stream + * The InputStream to read all blobs data from. + */ + public void interceptStream(int index, InputStream stream) { + + // Log beginning of intercepted stream + logger.debug("Intercepting input stream #{} of tunnel \"{}\".", + index, getUUID()); + + inputStreamFilter.interceptStream(index, new BufferedInputStream(stream)); + + // Log end of intercepted stream + logger.debug("Intercepted input stream #{} of tunnel \"{}\" ended.", + index, getUUID()); + + } + @Override public GuacamoleReader acquireReader() { - return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter); + + GuacamoleReader reader = super.acquireReader(); + + // Filter both input and output streams + reader = new FilteredGuacamoleReader(reader, inputStreamFilter); + reader = new FilteredGuacamoleReader(reader, outputStreamFilter); + + return reader; + } @Override @@ -108,6 +152,7 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { // Close all intercepted streams finally { + inputStreamFilter.closeAllInterceptedStreams(); outputStreamFilter.closeAllInterceptedStreams(); }