From f391f00c7ba9d146c5bb1717f80a0bb291fd5bac Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sat, 4 Jun 2016 01:58:01 -0700 Subject: [PATCH 1/8] GUACAMOLE-44: Extract logic of StreamInterceptingTunnel. --- .../guacamole/tunnel/InterceptedStream.java | 86 ++++++ .../tunnel/InterceptedStreamMap.java | 211 +++++++++++++ .../OutputStreamInterceptingFilter.java | 194 ++++++++++++ .../tunnel/StreamInterceptingFilter.java | 209 +++++++++++++ .../tunnel/StreamInterceptingTunnel.java | 277 +----------------- 5 files changed, 712 insertions(+), 265 deletions(-) create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java new file mode 100644 index 000000000..b4f4d0679 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; + +/** + * A simple pairing of the index of an intercepted Guacamole stream with the + * stream-type object which will produce or consume the data sent over the + * intercepted Guacamole stream. + * + * @author Michael Jumper + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public class InterceptedStream { + + /** + * The index of the Guacamole stream being intercepted. + */ + private final String index; + + /** + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + private final T stream; + + /** + * Creates a new InterceptedStream which associated the given Guacamole + * stream index with the given stream object. + * + * @param index + * The index of the Guacamole stream being intercepted. + * + * @param stream + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + public InterceptedStream(String index, T stream) { + this.index = index; + this.stream = stream; + } + + /** + * Returns the index of the Guacamole stream being intercepted. + * + * @return + * The index of the Guacamole stream being intercepted. + */ + public String getIndex() { + return index; + } + + /** + * Returns the stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + * + * @return + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + public T getStream() { + return stream; + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java new file mode 100644 index 000000000..344bc36a1 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Map-like storage for intercepted Guacamole streams. + * + * @author Michael Jumper + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public class InterceptedStreamMap { + + /** + * Logger for this class. + */ + private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class); + + /** + * The maximum number of milliseconds to wait for notification that a + * stream has closed before explicitly checking for closure ourselves. + */ + private static final long STREAM_WAIT_TIMEOUT = 1000; + + /** + * Mapping of the indexes of all streams whose associated "blob" and "end" + * instructions should be intercepted. + */ + private final ConcurrentMap> streams = + new ConcurrentHashMap>(); + + /** + * Closes the given stream, logging any errors that occur during closure. + * The monitor of the stream is notified via a single call to notify() once + * the attempt to close has been made. + * + * @param stream + * The stream to close and notify. + */ + private void close(T stream) { + + // Attempt to close stream + try { + stream.close(); + } + catch (IOException e) { + logger.warn("Unable to close intercepted stream: {}", e.getMessage()); + logger.debug("I/O error prevented closure of intercepted stream.", e); + } + + // Notify waiting threads that the stream has ended + synchronized (stream) { + stream.notify(); + } + + } + + /** + * Closes the stream object associated with the stream having the given + * index, if any, removing it from the map, logging any errors that occur + * during closure, and unblocking any in-progress calls to waitFor() for + * that stream. If no such stream exists within this map, then this + * function has no effect. + * + * @param index + * The index of the stream whose associated stream object should be + * closed. + * + * @return + * The stream associated with the given index, if the stream was stored + * within this map, or null if no such stream exists. + */ + public InterceptedStream close(String index) { + + // Remove associated stream + InterceptedStream stream = streams.remove(index); + if (stream == null) + return null; + + // Close stream if it exists + close(stream.getStream()); + return stream; + + } + + /** + * Closes the given stream, logging any errors that occur during closure, + * and unblocking any in-progress calls to waitFor() for the given stream. + * If the given stream is stored within this map, it will also be removed. + * + * @param stream + * The stream to close. + * + * @return + * true if the given stream was stored within this map, false + * otherwise. + */ + public boolean close(InterceptedStream stream) { + + // Remove stream if present + boolean wasRemoved = streams.remove(stream.getIndex(), stream); + + // Close provided stream + close(stream.getStream()); + + return wasRemoved; + + } + + /** + * Removes and closes all streams stored within this map, logging any errors + * that occur during closure, and unblocking any in-progress calls to + * waitFor(). + */ + public void closeAll() { + + // Close any active streams + for (InterceptedStream stream : streams.values()) + close(stream.getStream()); + + // Remove now-useless references + streams.clear(); + + } + + /** + * Blocks until the given stream is closed, or until another stream with + * the same index replaces it. + * + * @param stream + * The stream to wait for. + */ + public void waitFor(InterceptedStream stream) { + + T underlyingStream = stream.getStream(); + + // Wait for stream to close + synchronized (underlyingStream) { + while (streams.get(stream.getIndex()) == stream) { + try { + underlyingStream.wait(STREAM_WAIT_TIMEOUT); + } + catch (InterruptedException e) { + // Ignore + } + } + } + + } + + /** + * Returns the stream stored in this map under the given index. + * + * @param index + * The index of the stream to return. + * + * @return + * The stream having the given index, or null if no such stream is + * stored within this map. + */ + public InterceptedStream get(String index) { + return streams.get(index); + } + + /** + * Adds the given stream to this map, storing it under its associated + * index. If another stream already exists within this map having the same + * index, that stream will be closed and replaced. + * + * @param stream + * The stream to store within this map. + */ + public void put(InterceptedStream stream) { + + // Add given stream to map + InterceptedStream oldStream = + streams.put(stream.getIndex(), stream); + + // If a previous stream DID exist, close it + if (oldStream != null) + close(oldStream.getStream()); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java new file mode 100644 index 000000000..7cdadba9b --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import javax.xml.bind.DatatypeConverter; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.apache.guacamole.protocol.GuacamoleStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts "blob" and "end" instructions, + * automatically writing to or closing the stream given with + * interceptStream(). The required "ack" responses to received blobs are + * sent automatically. + * + * @author Michael Jumper + */ +public class OutputStreamInterceptingFilter + extends StreamInterceptingFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(OutputStreamInterceptingFilter.class); + + /** + * Creates a new OutputStreamInterceptingFilter which selectively intercepts + * "blob" and "end" instructions. The required "ack" responses will + * automatically be sent over the given tunnel. + * + * @param tunnel + * The GuacamoleTunnel over which any required "ack" instructions + * should be sent. + */ + public OutputStreamInterceptingFilter(GuacamoleTunnel tunnel) { + super(tunnel); + } + + /** + * Injects an "ack" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "ack" instructions are used + * to acknowledge the receipt of a stream and its subsequent blobs, and are + * the only means of communicating success/failure status. + * + * @param index + * The index of the stream that this "ack" instruction relates to. + * + * @param message + * An arbitrary human-readable message to include within the "ack" + * instruction. + * + * @param status + * The status of the stream operation being acknowledged via the "ack" + * instruction. Error statuses will implicitly close the stream via + * closeStream(). + */ + private void sendAck(String index, String message, GuacamoleStatus status) { + + // Error "ack" instructions implicitly close the stream + if (status != GuacamoleStatus.SUCCESS) + closeInterceptedStream(index); + + sendInstruction(new GuacamoleInstruction("ack", index, message, + Integer.toString(status.getGuacamoleStatusCode()))); + + } + + /** + * Handles a single "blob" instruction, decoding its base64 data, + * sending that data to the associated OutputStream, and ultimately + * dropping the "blob" instruction such that the client never receives + * it. If no OutputStream is associated with the stream index within + * the "blob" instruction, the instruction is passed through untouched. + * + * @param instruction + * The "blob" instruction being handled. + * + * @return + * The originally-provided "blob" instruction, if that instruction + * should be passed through to the client, or null if the "blob" + * instruction should be dropped. + */ + private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 2) + return instruction; + + // Pull associated stream + String index = args.get(0); + InterceptedStream stream = getInterceptedStream(index); + if (stream == null) + return instruction; + + // Decode blob + byte[] blob; + try { + String data = args.get(1); + blob = DatatypeConverter.parseBase64Binary(data); + } + catch (IllegalArgumentException e) { + logger.warn("Received base64 data for intercepted stream was invalid."); + logger.debug("Decoding base64 data for intercepted stream failed.", e); + return null; + } + + // Attempt to write data to stream + try { + stream.getStream().write(blob); + sendAck(index, "OK", GuacamoleStatus.SUCCESS); + } + catch (IOException e) { + sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR); + logger.debug("Write failed for intercepted stream.", e); + } + + // Instruction was handled purely internally + return null; + + } + + /** + * Handles a single "end" instruction, closing the associated + * OutputStream. If no OutputStream is associated with the stream index + * within the "end" instruction, this function has no effect. + * + * @param instruction + * The "end" instruction being handled. + */ + private void handleEnd(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 1) + return; + + // Terminate stream + closeInterceptedStream(args.get(0)); + + } + + @Override + public GuacamoleInstruction filter(GuacamoleInstruction instruction) + throws GuacamoleException { + + // Intercept "blob" instructions for in-progress streams + if (instruction.getOpcode().equals("blob")) + return handleBlob(instruction); + + // Intercept "end" instructions for in-progress streams + if (instruction.getOpcode().equals("end")) { + handleEnd(instruction); + return instruction; + } + + // Pass instruction through untouched + return instruction; + + } + + @Override + protected void handleInterceptedStream(InterceptedStream stream) { + + // Acknowledge that the stream is ready to receive data + sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java new file mode 100644 index 000000000..e04de7d7e --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.io.GuacamoleWriter; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleFilter; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts stream-related instructions, + * automatically writing to, reading from, or closing the stream given with + * interceptStream(). Any instructions required by the Guacamole protocol to be + * sent in response to intercepted instructions will be sent automatically. + * + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public abstract class StreamInterceptingFilter + implements GuacamoleFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(StreamInterceptingFilter.class); + + /** + * Mapping of the all streams whose related instructions should be + * intercepted. + */ + private final InterceptedStreamMap streams = new InterceptedStreamMap(); + + /** + * The tunnel over which any required instructions should be sent. + */ + private final GuacamoleTunnel tunnel; + + /** + * Creates a new StreamInterceptingFilter which selectively intercepts + * stream-related instructions. Any instructions required by the Guacamole + * protocol to be sent in response to intercepted instructions will be sent + * automatically over the given tunnel. + * + * @param tunnel + * The GuacamoleTunnel over which any required instructions should be + * sent. + */ + public StreamInterceptingFilter(GuacamoleTunnel tunnel) { + this.tunnel = tunnel; + } + + /** + * Injects an arbitrary Guacamole instruction into the outbound Guacamole + * protocol stream (GuacamoleWriter) of the tunnel associated with this + * StreamInterceptingFilter, as if the instruction was sent by the connected + * client. + * + * @param instruction + * The Guacamole instruction to inject. + */ + protected void sendInstruction(GuacamoleInstruction instruction) { + + // Temporarily acquire writer to send "ack" instruction + GuacamoleWriter writer = tunnel.acquireWriter(); + + // Send successful "ack" + try { + writer.writeInstruction(instruction); + } + catch (GuacamoleException e) { + logger.debug("Unable to send \"{}\" for intercepted stream.", + instruction.getOpcode(), e); + } + + // Done writing + tunnel.releaseWriter(); + + } + + /** + * Returns the stream having the given index and currently being intercepted + * by this filter. + * + * @param index + * The index of the stream to return. + * + * @return + * The stream having the given index, or null if no such stream is + * being intercepted. + */ + protected InterceptedStream getInterceptedStream(String index) { + return streams.get(index); + } + + /** + * Closes the stream having the given index and currently being intercepted + * by this filter, if any. If no such stream is being intercepted, then this + * function has no effect. + * + * @param index + * The index of the stream to close. + * + * @return + * The stream associated with the given index, if the stream is being + * intercepted, or null if no such stream exists. + */ + protected InterceptedStream closeInterceptedStream(String index) { + return streams.close(index); + } + + /** + * Closes the given stream. + * + * @param stream + * The stream to close. + * + * @return + * true if the given stream was being intercepted, false otherwise. + */ + protected boolean closeInterceptedStream(InterceptedStream stream) { + return streams.close(stream); + } + + /** + * Closes all streams being intercepted by this filter. + */ + public void closeAllInterceptedStreams() { + streams.closeAll(); + } + + /** + * Begins handling the data of the given intercepted stream. This function + * will automatically be invoked by interceptStream() for any valid stream. + * It is not required that this function block until all data is handled; + * interceptStream() will do this automatically. Implementations are free + * to use asynchronous approaches to data handling. + * + * @param stream + * The stream being intercepted. + */ + protected abstract void handleInterceptedStream(InterceptedStream stream); + + /** + * Intercept the stream having the given index, producing or consuming its + * data as appropriate. The given stream object will automatically be closed + * when the stream ends. If there is no stream having the given index, then + * the stream object will be closed immediately. This function will block + * until all data has been handled and the stream is ended. + * + * @param index + * The index of the stream to intercept. + * + * @param stream + * The stream object which will produce or consume all data for the + * stream having the given index. + */ + public void interceptStream(int index, T stream) { + + InterceptedStream interceptedStream; + String indexString = Integer.toString(index); + + // Atomically verify tunnel is open and add the given stream + synchronized (tunnel) { + + // Do nothing if tunnel is not open + if (!tunnel.isOpen()) + return; + + // Wrap stream + interceptedStream = new InterceptedStream(indexString, stream); + + // Replace any existing stream + streams.put(interceptedStream); + + } + + // Produce/consume all stream data + handleInterceptedStream(interceptedStream); + + // Wait for stream to close + streams.waitFor(interceptedStream); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java index a9a135512..333cdb1c5 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java @@ -20,21 +20,12 @@ package org.apache.guacamole.tunnel; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.OutputStream; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import javax.xml.bind.DatatypeConverter; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.io.GuacamoleReader; -import org.apache.guacamole.io.GuacamoleWriter; import org.apache.guacamole.net.DelegatingGuacamoleTunnel; import org.apache.guacamole.net.GuacamoleTunnel; import org.apache.guacamole.protocol.FilteredGuacamoleReader; -import org.apache.guacamole.protocol.GuacamoleFilter; -import org.apache.guacamole.protocol.GuacamoleInstruction; -import org.apache.guacamole.protocol.GuacamoleStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +42,8 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { /** * Logger for this class. */ - private static final Logger logger = LoggerFactory.getLogger(StreamInterceptingTunnel.class); - - /** - * The maximum number of milliseconds to wait for notification that a - * stream has closed before explicitly checking for closure ourselves. - */ - private static final long STREAM_WAIT_TIMEOUT = 1000; + private static final Logger logger = + LoggerFactory.getLogger(StreamInterceptingTunnel.class); /** * Creates a new StreamInterceptingTunnel which wraps the given tunnel, @@ -73,206 +59,10 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { } /** - * Mapping of the indexes of all streams whose associated "blob" and "end" - * instructions should be intercepted. + * The filter to use for rerouting received stream data to OutputStreams. */ - private final Map streams = - new ConcurrentHashMap(); - - /** - * Filter which selectively intercepts "blob" and "end" instructions, - * automatically writing to or closing the stream given with - * interceptStream(). The required "ack" responses to received blobs are - * sent automatically. - */ - private final GuacamoleFilter STREAM_FILTER = new GuacamoleFilter() { - - /** - * Handles a single "blob" instruction, decoding its base64 data, - * sending that data to the associated OutputStream, and ultimately - * dropping the "blob" instruction such that the client never receives - * it. If no OutputStream is associated with the stream index within - * the "blob" instruction, the instruction is passed through untouched. - * - * @param instruction - * The "blob" instruction being handled. - * - * @return - * The originally-provided "blob" instruction, if that instruction - * should be passed through to the client, or null if the "blob" - * instruction should be dropped. - */ - private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { - - // Verify all required arguments are present - List args = instruction.getArgs(); - if (args.size() < 2) - return instruction; - - // Pull associated stream - String index = args.get(0); - OutputStream stream = streams.get(index); - if (stream == null) - return instruction; - - // Decode blob - byte[] blob; - try { - String data = args.get(1); - blob = DatatypeConverter.parseBase64Binary(data); - } - catch (IllegalArgumentException e) { - logger.warn("Received base64 data for intercepted stream was invalid."); - logger.debug("Decoding base64 data for intercepted stream failed.", e); - return null; - } - - // Attempt to write data to stream - try { - stream.write(blob); - sendAck(index, "OK", GuacamoleStatus.SUCCESS); - } - catch (IOException e) { - sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR); - logger.debug("Write failed for intercepted stream.", e); - } - - // Instruction was handled purely internally - return null; - - } - - /** - * Handles a single "end" instruction, closing the associated - * OutputStream. If no OutputStream is associated with the stream index - * within the "end" instruction, this function has no effect. - * - * @param instruction - * The "end" instruction being handled. - */ - private void handleEnd(GuacamoleInstruction instruction) { - - // Verify all required arguments are present - List args = instruction.getArgs(); - if (args.size() < 1) - return; - - // Terminate stream - closeStream(args.get(0)); - - } - - @Override - public GuacamoleInstruction filter(GuacamoleInstruction instruction) - throws GuacamoleException { - - // Intercept "blob" instructions for in-progress streams - if (instruction.getOpcode().equals("blob")) - return handleBlob(instruction); - - // Intercept "end" instructions for in-progress streams - if (instruction.getOpcode().equals("end")) { - handleEnd(instruction); - return instruction; - } - - // Pass instruction through untouched - return instruction; - - } - - }; - - /** - * Closes the given OutputStream, logging any errors that occur during - * closure. The monitor of the OutputStream is notified via a single call - * to notify() once the attempt to close has been made. - * - * @param stream - * The OutputStream to close and notify. - */ - private void closeStream(OutputStream stream) { - - // Attempt to close stream - try { - stream.close(); - } - catch (IOException e) { - logger.warn("Unable to close intercepted stream: {}", e.getMessage()); - logger.debug("I/O error prevented closure of intercepted stream.", e); - } - - // Notify waiting threads that the stream has ended - synchronized (stream) { - stream.notify(); - } - - } - - /** - * Closes the OutputStream associated with the stream having the given - * index, if any, logging any errors that occur during closure. If no such - * stream exists, this function has no effect. The monitor of the - * OutputStream is notified via a single call to notify() once the attempt - * to close has been made. - * - * @param index - * The index of the stream whose associated OutputStream should be - * closed and notified. - */ - private OutputStream closeStream(String index) { - - // Remove associated stream - OutputStream stream = streams.remove(index); - if (stream == null) - return null; - - // Close stream if it exists - closeStream(stream); - return stream; - - } - - /** - * Injects an "ack" instruction into the outbound Guacamole protocol - * stream, as if sent by the connected client. "ack" instructions are used - * to acknowledge the receipt of a stream and its subsequent blobs, and are - * the only means of communicating success/failure status. - * - * @param index - * The index of the stream that this "ack" instruction relates to. - * - * @param message - * An arbitrary human-readable message to include within the "ack" - * instruction. - * - * @param status - * The status of the stream operation being acknowledged via the "ack" - * instruction. Error statuses will implicitly close the stream via - * closeStream(). - */ - private void sendAck(String index, String message, GuacamoleStatus status) { - - // Temporarily acquire writer to send "ack" instruction - GuacamoleWriter writer = acquireWriter(); - - // Send successful "ack" - try { - writer.writeInstruction(new GuacamoleInstruction("ack", index, message, - Integer.toString(status.getGuacamoleStatusCode()))); - } - catch (GuacamoleException e) { - logger.debug("Unable to send \"ack\" for intercepted stream.", e); - } - - // Error "ack" instructions implicitly close the stream - if (status != GuacamoleStatus.SUCCESS) - closeStream(index); - - // Done writing - releaseWriter(); - - } + private final OutputStreamInterceptingFilter outputStreamFilter = + new OutputStreamInterceptingFilter(this); /** * Intercept all data received along the stream having the given index, @@ -290,57 +80,21 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { */ public void interceptStream(int index, OutputStream stream) { - String indexString = Integer.toString(index); - - // Atomically verify tunnel is open and add the given stream - OutputStream oldStream; - synchronized (this) { - - // Do nothing if tunnel is not open - if (!isOpen()) { - closeStream(stream); - return; - } - - // Wrap stream - stream = new BufferedOutputStream(stream); - - // Replace any existing stream - oldStream = streams.put(indexString, stream); - - } - - // If a previous stream DID exist, close it - if (oldStream != null) - closeStream(oldStream); - // Log beginning of intercepted stream - logger.debug("Intercepting stream #{} of tunnel \"{}\".", + logger.debug("Intercepting output stream #{} of tunnel \"{}\".", index, getUUID()); - // Acknowledge stream receipt - sendAck(indexString, "OK", GuacamoleStatus.SUCCESS); - - // Wait for stream to close - synchronized (stream) { - while (streams.get(indexString) == stream) { - try { - stream.wait(STREAM_WAIT_TIMEOUT); - } - catch (InterruptedException e) { - // Ignore - } - } - } + outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream)); // Log end of intercepted stream - logger.debug("Intercepted stream #{} of tunnel \"{}\" ended.", index, getUUID()); + logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.", + index, getUUID()); } @Override public GuacamoleReader acquireReader() { - return new FilteredGuacamoleReader(super.acquireReader(), STREAM_FILTER); + return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter); } @Override @@ -352,16 +106,9 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { super.close(); } - // Ensure all waiting threads are notified that all streams have ended + // Close all intercepted streams finally { - - // Close any active streams - for (OutputStream stream : streams.values()) - closeStream(stream); - - // Remove now-useless references - streams.clear(); - + outputStreamFilter.closeAllInterceptedStreams(); } } From 131785a442084a2631421daa313541e0a9760b2d Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sat, 4 Jun 2016 02:18:34 -0700 Subject: [PATCH 2/8] GUACAMOLE-44: Implement intercepting of input streams. --- .../rest/tunnel/TunnelRESTService.java | 50 +++++ .../tunnel/InputStreamInterceptingFilter.java | 198 ++++++++++++++++++ .../tunnel/StreamInterceptingTunnel.java | 55 ++++- 3 files changed, 298 insertions(+), 5 deletions(-) create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java index c89fdcd25..fab4431a2 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java @@ -21,12 +21,14 @@ package org.apache.guacamole.rest.tunnel; import com.google.inject.Inject; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.util.Map; import java.util.Set; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.Produces; @@ -143,4 +145,52 @@ public class TunnelRESTService { } + /** + * Intercepts a specific stream, sending the contents of the given + * InputStream over that stream as "blob" instructions. + * + * @param authToken + * The authentication token that is used to authenticate the user + * performing the operation. + * + * @param tunnelUUID + * The UUID of the tunnel containing the stream being intercepted. + * + * @param streamIndex + * The index of the stream to intercept. + * + * @param filename + * The filename to use for the sake of identifying the data being sent. + * + * @param data + * An InputStream containing the data to be sent over the intercepted + * stream. + * + * @throws GuacamoleException + * If the session associated with the given auth token cannot be + * retrieved, or if no such tunnel exists. + */ + @POST + @Consumes(MediaType.WILDCARD) + @Path("/{tunnel}/streams/{index}/{filename}") + public void setStreamContents(@QueryParam("token") String authToken, + @PathParam("tunnel") String tunnelUUID, + @PathParam("index") final int streamIndex, + @PathParam("filename") String filename, + InputStream data) + throws GuacamoleException { + + GuacamoleSession session = authenticationService.getGuacamoleSession(authToken); + Map tunnels = session.getTunnels(); + + // Pull tunnel with given UUID + final StreamInterceptingTunnel tunnel = tunnels.get(tunnelUUID); + if (tunnel == null) + throw new GuacamoleResourceNotFoundException("No such tunnel."); + + // Send input over stream + tunnel.interceptStream(streamIndex, data); + + } + } diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java new file mode 100644 index 000000000..6d494bd29 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import javax.xml.bind.DatatypeConverter; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts "ack" instructions, automatically reading + * from or closing the stream given with interceptStream(). The required "blob" + * and "end" instructions denoting the content and boundary of the stream are + * sent automatically. + */ +public class InputStreamInterceptingFilter + extends StreamInterceptingFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(InputStreamInterceptingFilter.class); + + /** + * Creates a new InputStreamInterceptingFilter which selectively intercepts + * "ack" instructions. The required "blob" and "end" instructions will + * automatically be sent over the given tunnel based on the content of + * provided InputStreams. + * + * @param tunnel + * The GuacamoleTunnel over which any required "blob" and "end" + * instructions should be sent. + */ + public InputStreamInterceptingFilter(GuacamoleTunnel tunnel) { + super(tunnel); + } + + /** + * Injects a "blob" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "blob" instructions are used + * to send chunks of data along a stream. + * + * @param index + * The index of the stream that this "blob" instruction relates to. + * + * @param blob + * The chunk of data to send within the "blob" instruction. + */ + private void sendBlob(String index, byte[] blob) { + + // Send "blob" containing provided data + sendInstruction(new GuacamoleInstruction("blob", index, + DatatypeConverter.printBase64Binary(blob))); + + } + + /** + * Injects an "end" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "end" instructions are used + * to signal the end of a stream. + * + * @param index + * The index of the stream that this "end" instruction relates to. + */ + private void sendEnd(String index) { + sendInstruction(new GuacamoleInstruction("end", index)); + } + + /** + * Reads the next chunk of data from the InputStream associated with an + * intercepted stream, sending that data as a "blob" instruction over the + * GuacamoleTunnel associated with this filter. If the end of the + * InputStream is reached, an "end" instruction will automatically be sent. + * + * @param stream + * The stream from which the next chunk of data should be read. + */ + private void readNextBlob(InterceptedStream stream) { + + // Read blob from stream if it exists + try { + + // Read raw data from input stream + byte[] blob = new byte[6048]; + int length = stream.getStream().read(blob); + + // End stream if no more data + if (length == -1) { + + // Close stream, send end if the stream is still valid + if (closeInterceptedStream(stream)) + sendEnd(stream.getIndex()); + + return; + + } + + // Inject corresponding "blob" instruction + sendBlob(stream.getIndex(), Arrays.copyOf(blob, length)); + + } + + // Terminate stream if it cannot be read + catch (IOException e) { + + logger.debug("Unable to read data of intercepted input stream.", e); + + // Close stream, send end if the stream is still valid + if (closeInterceptedStream(stream)) + sendEnd(stream.getIndex()); + + } + + } + + /** + * Handles a single "ack" instruction, sending yet more blobs or closing the + * stream depending on whether the "ack" indicates success or failure. If no + * InputStream is associated with the stream index within the "ack" + * instruction, the instruction is ignored. + * + * @param instruction + * The "ack" instruction being handled. + */ + private void handleAck(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 3) + return; + + // Pull associated stream + String index = args.get(0); + InterceptedStream stream = getInterceptedStream(index); + if (stream == null) + return; + + // Pull status code + String status = args.get(2); + + // Terminate stream if an error is encountered + if (!status.equals("0")) { + closeInterceptedStream(stream); + return; + } + + // Send next blob + readNextBlob(stream); + + } + + @Override + public GuacamoleInstruction filter(GuacamoleInstruction instruction) + throws GuacamoleException { + + // Intercept "ack" instructions for in-progress streams + if (instruction.getOpcode().equals("ack")) + handleAck(instruction); + + // Pass instruction through untouched + return instruction; + + } + + @Override + protected void handleInterceptedStream(InterceptedStream stream) { + + // Read the first blob. Note that future blobs will be read in response + // to received "ack" instructions. + readNextBlob(stream); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java index 333cdb1c5..91984cca8 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java @@ -19,7 +19,9 @@ package org.apache.guacamole.tunnel; +import java.io.BufferedInputStream; import java.io.BufferedOutputStream; +import java.io.InputStream; import java.io.OutputStream; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.io.GuacamoleReader; @@ -30,10 +32,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * GuacamoleTunnel implementation which provides for intercepting the contents - * of in-progress streams, rerouting received blobs to a provided OutputStream. - * Interception of streams is requested on a per stream basis and lasts only - * for the duration of that stream. + * GuacamoleTunnel implementation which provides for producing or consuming the + * contents of in-progress streams, rerouting blobs to a provided OutputStream + * or from a provided InputStream. Interception of streams is requested on a per + * stream basis and lasts only for the duration of that stream. * * @author Michael Jumper */ @@ -58,6 +60,12 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { super(tunnel); } + /** + * The filter to use for providing stream data from InputStreams. + */ + private final InputStreamInterceptingFilter inputStreamFilter = + new InputStreamInterceptingFilter(this); + /** * The filter to use for rerouting received stream data to OutputStreams. */ @@ -92,9 +100,45 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { } + /** + * Intercept the given stream, continuously writing the contents of the + * given InputStream as blobs. The stream will automatically end when + * when the end of the InputStream is reached. If there is no such + * stream, then the InputStream will be closed immediately. This function + * will block until all data from the InputStream has been written to the + * given stream. + * + * @param index + * The index of the stream to intercept. + * + * @param stream + * The InputStream to read all blobs data from. + */ + public void interceptStream(int index, InputStream stream) { + + // Log beginning of intercepted stream + logger.debug("Intercepting input stream #{} of tunnel \"{}\".", + index, getUUID()); + + inputStreamFilter.interceptStream(index, new BufferedInputStream(stream)); + + // Log end of intercepted stream + logger.debug("Intercepted input stream #{} of tunnel \"{}\" ended.", + index, getUUID()); + + } + @Override public GuacamoleReader acquireReader() { - return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter); + + GuacamoleReader reader = super.acquireReader(); + + // Filter both input and output streams + reader = new FilteredGuacamoleReader(reader, inputStreamFilter); + reader = new FilteredGuacamoleReader(reader, outputStreamFilter); + + return reader; + } @Override @@ -108,6 +152,7 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { // Close all intercepted streams finally { + inputStreamFilter.closeAllInterceptedStreams(); outputStreamFilter.closeAllInterceptedStreams(); } From 75baa69ceadd42b767fe9b70d6fd21f64a647d88 Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sun, 5 Jun 2016 15:41:52 -0700 Subject: [PATCH 3/8] GUACAMOLE-44: Add GuacamoleStreamException for reporting errors from intercepted streams. --- .../org/apache/guacamole/rest/APIError.java | 62 ++++++++++++++++--- .../apache/guacamole/rest/APIException.java | 38 ++++++++++++ .../guacamole/rest/RESTExceptionWrapper.java | 16 +++++ .../tunnel/GuacamoleStreamException.java | 61 ++++++++++++++++++ .../src/main/webapp/app/rest/types/Error.js | 19 +++++- 5 files changed, 188 insertions(+), 8 deletions(-) create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/APIError.java b/guacamole/src/main/java/org/apache/guacamole/rest/APIError.java index 9a390a872..c6a2c635c 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/APIError.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/APIError.java @@ -36,6 +36,11 @@ public class APIError { */ private final String message; + /** + * The associated Guacamole protocol status code. + */ + private final Integer statusCode; + /** * All expected request parameters, if any, as a collection of fields. */ @@ -81,7 +86,14 @@ public class APIError { /** * Permission was denied to perform the requested operation. */ - PERMISSION_DENIED(Response.Status.FORBIDDEN); + PERMISSION_DENIED(Response.Status.FORBIDDEN), + + /** + * An error occurred within an intercepted stream, terminating that + * stream. The Guacamole protocol status code of that error can be + * retrieved with getStatusCode(). + */ + STREAM_ERROR(Response.Status.BAD_REQUEST); /** * The HTTP status associated with this error type. @@ -110,6 +122,27 @@ public class APIError { } + /** + * Creates a new APIError of type STREAM_ERROR and having the given + * Guacamole protocol status code and human-readable message. The status + * code and message should be taken directly from the "ack" instruction + * causing the error. + * + * @param statusCode + * The Guacamole protocol status code describing the error that + * occurred within the intercepted stream. + * + * @param message + * An arbitrary human-readable message describing the error that + * occurred. + */ + public APIError(int statusCode, String message) { + this.type = Type.STREAM_ERROR; + this.message = message; + this.statusCode = statusCode; + this.expected = null; + } + /** * Create a new APIError with the specified error message. * @@ -120,9 +153,10 @@ public class APIError { * The error message. */ public APIError(Type type, String message) { - this.type = type; - this.message = message; - this.expected = null; + this.type = type; + this.message = message; + this.statusCode = null; + this.expected = null; } /** @@ -140,9 +174,10 @@ public class APIError { * a result of the original request, as a collection of fields. */ public APIError(Type type, String message, Collection expected) { - this.type = type; - this.message = message; - this.expected = expected; + this.type = type; + this.message = message; + this.statusCode = null; + this.expected = expected; } /** @@ -155,6 +190,19 @@ public class APIError { return type; } + /** + * Returns the Guacamole protocol status code associated with the error + * that occurred. This is only valid for errors of type STREAM_ERROR. + * + * @return + * The Guacamole protocol status code associated with the error that + * occurred. If the error is not of type STREAM_ERROR, this will be + * null. + */ + public Integer getStatusCode() { + return statusCode; + } + /** * Returns a collection of all required parameters, where each parameter is * represented by a field. diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/APIException.java b/guacamole/src/main/java/org/apache/guacamole/rest/APIException.java index 752dd0f84..0a004bef6 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/APIException.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/APIException.java @@ -23,6 +23,7 @@ import java.util.Collection; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response; import org.apache.guacamole.form.Field; +import org.apache.guacamole.protocol.GuacamoleStatus; /** * An exception that will result in the given error error information being @@ -60,6 +61,43 @@ public class APIException extends WebApplicationException { this(new APIError(type, message)); } + /** + * Creates a new APIException which represents an error that occurred within + * an intercepted Guacamole stream. The nature of that error will be + * described by a given status code, which should be the status code + * provided by the "ack" instruction that reported the error. + * + * @param status + * The Guacamole protocol status code describing the error that + * occurred within the intercepted stream. + * + * @param message + * An arbitrary human-readable message describing the error that + * occurred. + */ + public APIException(int status, String message) { + this(new APIError(status, message)); + } + + /** + * Creates a new APIException which represents an error that occurred within + * an intercepted Guacamole stream. The nature of that error will be + * described by a given Guacamole protocol status, which should be the + * status associated with the code provided by the "ack" instruction that + * reported the error. + * + * @param status + * The Guacamole protocol status describing the error that occurred + * within the intercepted stream. + * + * @param message + * An arbitrary human-readable message describing the error that + * occurred. + */ + public APIException(GuacamoleStatus status, String message) { + this(status.getGuacamoleStatusCode(), message); + } + /** * Creates a new APIException with the given type, message, and parameter * information. The corresponding APIError will be created from the diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/RESTExceptionWrapper.java b/guacamole/src/main/java/org/apache/guacamole/rest/RESTExceptionWrapper.java index 5adb13184..736907d4c 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/RESTExceptionWrapper.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/RESTExceptionWrapper.java @@ -34,6 +34,7 @@ import org.apache.guacamole.GuacamoleUnauthorizedException; import org.apache.guacamole.net.auth.credentials.GuacamoleInsufficientCredentialsException; import org.apache.guacamole.net.auth.credentials.GuacamoleInvalidCredentialsException; import org.apache.guacamole.rest.auth.AuthenticationService; +import org.apache.guacamole.tunnel.GuacamoleStreamException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -248,6 +249,21 @@ public class RESTExceptionWrapper implements MethodInterceptor { } + // Errors from intercepted streams + catch (GuacamoleStreamException e) { + + // Generate default message + String message = e.getMessage(); + if (message == null) + message = "Error reported by stream."; + + throw new APIException( + e.getStatus(), + message + ); + + } + // All other errors catch (GuacamoleException e) { diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java new file mode 100644 index 000000000..7bb4ef23e --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import org.apache.guacamole.GuacamoleServerException; +import org.apache.guacamole.protocol.GuacamoleStatus; + +/** + * A generic exception thrown when an intercepted Guacamole stream has closed + * with an error condition. Guacamole streams report errors using the "ack" + * instruction, which provides a status code and human-readable message. + * + * @author Michael Jumper + */ +public class GuacamoleStreamException extends GuacamoleServerException { + + /** + * The error condition reported by the intercepted Guacamole stream. + */ + private final GuacamoleStatus status; + + /** + * Creates a new GuacamoleStreamException representing an error returned by + * an intercepted stream. + * + * @param status + * The status code of the error condition reported by the intercepted + * Guacamole stream. + * + * @param message + * The human readable description of the error that occurred, as + * as provided by the stream. + */ + public GuacamoleStreamException(GuacamoleStatus status, String message) { + super(message); + this.status = status; + } + + @Override + public GuacamoleStatus getStatus() { + return status; + } + +} diff --git a/guacamole/src/main/webapp/app/rest/types/Error.js b/guacamole/src/main/webapp/app/rest/types/Error.js index 85f2cf59e..43fdc9602 100644 --- a/guacamole/src/main/webapp/app/rest/types/Error.js +++ b/guacamole/src/main/webapp/app/rest/types/Error.js @@ -42,6 +42,14 @@ angular.module('rest').factory('Error', [function defineError() { */ this.message = template.message; + /** + * The Guacamole protocol status code associated with the error that + * occurred. This is only valid for errors of type STREAM_ERROR. + * + * @type Number + */ + this.statusCode = template.statusCode; + /** * The type string defining which values this parameter may contain, * as well as what properties are applicable. Valid types are listed @@ -110,7 +118,16 @@ angular.module('rest').factory('Error', [function defineError() { * * @type String */ - PERMISSION_DENIED : 'PERMISSION_DENIED' + PERMISSION_DENIED : 'PERMISSION_DENIED', + + /** + * An error occurred within an intercepted stream, terminating that + * stream. The Guacamole protocol status code of that error will be + * stored within statusCode. + * + * @type String + */ + STREAM_ERROR : 'STREAM_ERROR' }; From 2bb5260144697316e5c93cfca9812511ee0b3354 Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sun, 5 Jun 2016 16:01:08 -0700 Subject: [PATCH 4/8] GUACAMOLE-44: Provide for direct translation of status codes into GuacamoleStatus values. --- .../guacamole/protocol/GuacamoleStatus.java | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/guacamole-common/src/main/java/org/apache/guacamole/protocol/GuacamoleStatus.java b/guacamole-common/src/main/java/org/apache/guacamole/protocol/GuacamoleStatus.java index 4dbae64bc..a5e84c689 100644 --- a/guacamole-common/src/main/java/org/apache/guacamole/protocol/GuacamoleStatus.java +++ b/guacamole-common/src/main/java/org/apache/guacamole/protocol/GuacamoleStatus.java @@ -166,5 +166,31 @@ public enum GuacamoleStatus { public int getGuacamoleStatusCode() { return guac_code; } - + + /** + * Returns the GuacamoleStatus corresponding to the given Guacamole + * protocol status code. If no such GuacamoleStatus is defined, null is + * returned. + * + * @param code + * The Guacamole protocol status code to translate into a + * GuacamoleStatus. + * + * @return + * The GuacamoleStatus corresponding to the given Guacamole protocol + * status code, or null if no such GuacamoleStatus is defined. + */ + public static GuacamoleStatus fromGuacamoleStatusCode(int code) { + + // Search for a GuacamoleStatus having the given status code + for (GuacamoleStatus status : values()) { + if (status.getGuacamoleStatusCode() == code) + return status; + } + + // No such status found + return null; + + } + } From e79d019fe6253f9bacc16285dc728c6d2c44df40 Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sun, 5 Jun 2016 16:00:03 -0700 Subject: [PATCH 5/8] GUACAMOLE-44: Allow intercepted streams to report errors. --- .../rest/tunnel/TunnelRESTService.java | 12 ++- .../tunnel/InputStreamInterceptingFilter.java | 17 +++++ .../guacamole/tunnel/InterceptedStream.java | 75 +++++++++++++++++++ .../tunnel/StreamInterceptingFilter.java | 10 ++- .../tunnel/StreamInterceptingTunnel.java | 34 +++++++-- 5 files changed, 136 insertions(+), 12 deletions(-) diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java index fab4431a2..5fe209ee3 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java @@ -136,7 +136,12 @@ public class TunnelRESTService { @Override public void write(OutputStream output) throws IOException { - tunnel.interceptStream(streamIndex, output); + try { + tunnel.interceptStream(streamIndex, output); + } + catch (GuacamoleException e) { + throw new IOException(e); + } } }; @@ -167,8 +172,9 @@ public class TunnelRESTService { * stream. * * @throws GuacamoleException - * If the session associated with the given auth token cannot be - * retrieved, or if no such tunnel exists. + * If the session associated with the given auth + * token cannot be retrieved, if no such tunnel exists, or if the + * intercepted stream itself closes with an error. */ @POST @Consumes(MediaType.WILDCARD) diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java index 6d494bd29..98c15c98a 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InputStreamInterceptingFilter.java @@ -27,6 +27,7 @@ import javax.xml.bind.DatatypeConverter; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.net.GuacamoleTunnel; import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.apache.guacamole.protocol.GuacamoleStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -164,8 +165,24 @@ public class InputStreamInterceptingFilter // Terminate stream if an error is encountered if (!status.equals("0")) { + + // Parse status code as integer + int code; + try { + code = Integer.parseInt(status); + } + + // Assume internal error if parsing fails + catch (NumberFormatException e) { + logger.debug("Translating invalid status code \"{}\" to SERVER_ERROR.", status); + code = GuacamoleStatus.SERVER_ERROR.getGuacamoleStatusCode(); + } + + // Flag error and close stream + stream.setStreamError(code, args.get(1)); closeInterceptedStream(stream); return; + } // Send next blob diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java index b4f4d0679..021411f0e 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java @@ -20,6 +20,8 @@ package org.apache.guacamole.tunnel; import java.io.Closeable; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.protocol.GuacamoleStatus; /** * A simple pairing of the index of an intercepted Guacamole stream with the @@ -45,6 +47,13 @@ public class InterceptedStream { */ private final T stream; + /** + * The exception which prevented the stream from completing successfully, + * if any. If the stream completed successfully, or has not encountered any + * exception yet, this will be null. + */ + private GuacamoleException streamError = null; + /** * Creates a new InterceptedStream which associated the given Guacamole * stream index with the given stream object. @@ -83,4 +92,70 @@ public class InterceptedStream { return stream; } + /** + * Reports that this InterceptedStream did not complete successfully due to + * the given GuacamoleException, which could not be thrown at the time due + * to asynchronous handling of the stream contents. + * + * @param streamError + * The exception which prevented the stream from completing + * successfully. + */ + public void setStreamError(GuacamoleException streamError) { + this.streamError = streamError; + } + + /** + * Reports that this InterceptedStream did not complete successfully due to + * an error described by the given status code and human-readable message. + * The error reported by this call can later be retrieved as a + * GuacamoleStreamException by calling getStreamError(). + * + * @param code + * The Guacamole protocol status code which described the error that + * occurred. This should be taken directly from the "ack" instruction + * that reported the error witin the intercepted stream. + * + * @param message + * A human-readable message describing the error that occurred. This + * should be taken directly from the "ack" instruction that reported + * the error witin the intercepted stream. + */ + public void setStreamError(int code, String message) { + + // Map status code to GuacamoleStatus, assuming SERVER_ERROR by default + GuacamoleStatus status = GuacamoleStatus.fromGuacamoleStatusCode(code); + if (status == null) + status = GuacamoleStatus.SERVER_ERROR; + + // Associate stream with corresponding GuacamoleStreamException + setStreamError(new GuacamoleStreamException(status, message)); + + } + + /** + * Returns whether an error has prevented this InterceptedStream from + * completing successfully. This will return false if the stream has + * completed successfully OR if the stream simply has not yet completed. + * + * @return + * true if an error has prevented this InterceptedStream from + * completing successfully, false otherwise. + */ + public boolean hasStreamError() { + return streamError != null; + } + + /** + * Returns a GuacamoleException which describes why this InterceptedStream + * did not complete successfully. + * + * @return + * An exception describing the error that prevented the stream from + * completing successfully, or null if no such error has occurred. + */ + public GuacamoleException getStreamError() { + return streamError; + } + } diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java index e04de7d7e..30c24c15a 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java @@ -177,8 +177,12 @@ public abstract class StreamInterceptingFilter * @param stream * The stream object which will produce or consume all data for the * stream having the given index. + * + * @throws GuacamoleException + * If an error occurs while intercepting the stream, or if the stream + * itself reports an error. */ - public void interceptStream(int index, T stream) { + public void interceptStream(int index, T stream) throws GuacamoleException { InterceptedStream interceptedStream; String indexString = Integer.toString(index); @@ -204,6 +208,10 @@ public abstract class StreamInterceptingFilter // Wait for stream to close streams.waitFor(interceptedStream); + // Throw any asynchronously-provided exception + if (interceptedStream.hasStreamError()) + throw interceptedStream.getStreamError(); + } } diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java index 91984cca8..dc14d4833 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java @@ -85,18 +85,27 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { * * @param stream * The OutputStream to write all intercepted data to. + * + * @throws GuacamoleException + * If an error occurs while intercepting the stream, or if the stream + * itself reports an error. */ - public void interceptStream(int index, OutputStream stream) { + public void interceptStream(int index, OutputStream stream) + throws GuacamoleException { // Log beginning of intercepted stream logger.debug("Intercepting output stream #{} of tunnel \"{}\".", index, getUUID()); - outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream)); + try { + outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream)); + } // Log end of intercepted stream - logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.", - index, getUUID()); + finally { + logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.", + index, getUUID()); + } } @@ -113,18 +122,27 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { * * @param stream * The InputStream to read all blobs data from. + * + * @throws GuacamoleException + * If an error occurs while intercepting the stream, or if the stream + * itself reports an error. */ - public void interceptStream(int index, InputStream stream) { + public void interceptStream(int index, InputStream stream) + throws GuacamoleException { // Log beginning of intercepted stream logger.debug("Intercepting input stream #{} of tunnel \"{}\".", index, getUUID()); - inputStreamFilter.interceptStream(index, new BufferedInputStream(stream)); + try { + inputStreamFilter.interceptStream(index, new BufferedInputStream(stream)); + } // Log end of intercepted stream - logger.debug("Intercepted input stream #{} of tunnel \"{}\" ended.", - index, getUUID()); + finally { + logger.debug("Intercepted input stream #{} of tunnel \"{}\" ended.", + index, getUUID()); + } } From ef5329dbe1ebdeecfbb81b820d4fef79b201401c Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sun, 5 Jun 2016 16:12:37 -0700 Subject: [PATCH 6/8] GUACAMOLE-44: Implement JavaScript service for uploading files to a stream via the REST tunnel endpoint. --- .../webapp/app/rest/services/tunnelService.js | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/guacamole/src/main/webapp/app/rest/services/tunnelService.js b/guacamole/src/main/webapp/app/rest/services/tunnelService.js index c97d36154..35c42a896 100644 --- a/guacamole/src/main/webapp/app/rest/services/tunnelService.js +++ b/guacamole/src/main/webapp/app/rest/services/tunnelService.js @@ -24,8 +24,12 @@ angular.module('rest').factory('tunnelService', ['$injector', function tunnelService($injector) { + // Required types + var Error = $injector.get('Error'); + // Required services var $http = $injector.get('$http'); + var $q = $injector.get('$q'); var $window = $injector.get('$window'); var authenticationService = $injector.get('authenticationService'); @@ -122,6 +126,98 @@ angular.module('rest').factory('tunnelService', ['$injector', }; + /** + * Makes a request to the REST API to send the contents of the given file + * along a stream which has been created within the active Guacamole + * connection associated with the given tunnel. The contents of the file + * will automatically be split into individual "blob" instructions, as if + * sent by the connected Guacamole client. + * + * @param {String} tunnel + * The UUID of the tunnel associated with the Guacamole connection + * whose stream should receive the given file. + * + * @param {Guacamole.OutputStream} stream + * The stream that should receive the given file. + * + * @param {File} file + * The file that should be sent along the given stream. + * + * @param {Function} [progressCallback] + * An optional callback which, if provided, will be invoked as the + * file upload progresses. The current position within the file, in + * bytes, will be provided to the callback as the sole argument. + * + * @return {Promise} + * A promise which resolves when the upload has completed, and is + * rejected with an Error if the upload fails. The Guacamole protocol + * status code describing the failure will be included in the Error if + * available. If the status code is available, the type of the Error + * will be STREAM_ERROR. + */ + service.uploadToStream = function uploadToStream(tunnel, stream, file, + progressCallback) { + + var deferred = $q.defer(); + + // Build upload URL + var url = $window.location.origin + + $window.location.pathname + + 'api/tunnels/' + encodeURIComponent(tunnel) + + '/streams/' + encodeURIComponent(stream.index) + + '/' + encodeURIComponent(file.name) + + '?token=' + encodeURIComponent(authenticationService.getCurrentToken()); + + var xhr = new XMLHttpRequest(); + + // Invoke provided callback if upload tracking is supported + if (progressCallback && xhr.upload) { + xhr.upload.addEventListener('progress', function updateProgress(e) { + progressCallback(e.loaded); + }); + } + + // Resolve/reject promise once upload has stopped + xhr.onreadystatechange = function uploadStatusChanged() { + + // Ignore state changes prior to completion + if (xhr.readyState !== 4) + return; + + // Resolve if HTTP status code indicates success + if (xhr.status >= 200 && xhr.status < 300) + deferred.resolve(); + + // Parse and reject with resulting JSON error + else if (xhr.getResponseHeader('Content-Type') === 'application/json') + deferred.reject(angular.fromJson(xhr.responseText)); + + // Warn of lack of permission of a proxy rejects the upload + else if (xhr.status >= 400 && xhr.status < 500) + deferred.reject(new Error({ + 'type' : Error.Type.STREAM_ERROR, + 'statusCode' : Guacamole.Status.Code.CLIENT_FORBIDDEN, + 'message' : 'HTTP ' + xhr.status + })); + + // Assume internal error for all other cases + else + deferred.reject(new Error({ + 'type' : Error.Type.STREAM_ERROR, + 'statusCode' : Guacamole.Status.Code.INTERNAL_ERROR, + 'message' : 'HTTP ' + xhr.status + })); + + }; + + // Perform upload + xhr.open('POST', url, true); + xhr.send(file); + + return deferred.promise; + + }; + return service; }]); From b6983586ed798bb09d8d21156b0c5dccc23156cb Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Mon, 6 Jun 2016 10:14:04 -0700 Subject: [PATCH 7/8] GUACAMOLE-44: Fix wrapping in setStreamContents() documentation. --- .../org/apache/guacamole/rest/tunnel/TunnelRESTService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java index 5fe209ee3..49740d1b2 100644 --- a/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java +++ b/guacamole/src/main/java/org/apache/guacamole/rest/tunnel/TunnelRESTService.java @@ -172,9 +172,9 @@ public class TunnelRESTService { * stream. * * @throws GuacamoleException - * If the session associated with the given auth - * token cannot be retrieved, if no such tunnel exists, or if the - * intercepted stream itself closes with an error. + * If the session associated with the given auth token cannot be + * retrieved, if no such tunnel exists, or if the intercepted stream + * itself closes with an error. */ @POST @Consumes(MediaType.WILDCARD) From 2e8263a589746860a8b4967fdc70c5e5e9694e33 Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Mon, 6 Jun 2016 10:17:19 -0700 Subject: [PATCH 8/8] GUACAMOLE-44: Remove remove repeated repeated word word from from comment comment. --- .../org/apache/guacamole/tunnel/GuacamoleStreamException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java index 7bb4ef23e..c2b19c8b8 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/GuacamoleStreamException.java @@ -46,7 +46,7 @@ public class GuacamoleStreamException extends GuacamoleServerException { * * @param message * The human readable description of the error that occurred, as - * as provided by the stream. + * provided by the stream. */ public GuacamoleStreamException(GuacamoleStatus status, String message) { super(message);