From f391f00c7ba9d146c5bb1717f80a0bb291fd5bac Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sat, 4 Jun 2016 01:58:01 -0700 Subject: [PATCH] GUACAMOLE-44: Extract logic of StreamInterceptingTunnel. --- .../guacamole/tunnel/InterceptedStream.java | 86 ++++++ .../tunnel/InterceptedStreamMap.java | 211 +++++++++++++ .../OutputStreamInterceptingFilter.java | 194 ++++++++++++ .../tunnel/StreamInterceptingFilter.java | 209 +++++++++++++ .../tunnel/StreamInterceptingTunnel.java | 277 +----------------- 5 files changed, 712 insertions(+), 265 deletions(-) create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java create mode 100644 guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java new file mode 100644 index 000000000..b4f4d0679 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStream.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; + +/** + * A simple pairing of the index of an intercepted Guacamole stream with the + * stream-type object which will produce or consume the data sent over the + * intercepted Guacamole stream. + * + * @author Michael Jumper + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public class InterceptedStream { + + /** + * The index of the Guacamole stream being intercepted. + */ + private final String index; + + /** + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + private final T stream; + + /** + * Creates a new InterceptedStream which associated the given Guacamole + * stream index with the given stream object. + * + * @param index + * The index of the Guacamole stream being intercepted. + * + * @param stream + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + public InterceptedStream(String index, T stream) { + this.index = index; + this.stream = stream; + } + + /** + * Returns the index of the Guacamole stream being intercepted. + * + * @return + * The index of the Guacamole stream being intercepted. + */ + public String getIndex() { + return index; + } + + /** + * Returns the stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + * + * @return + * The stream which will produce or consume the data sent over the + * intercepted Guacamole stream. + */ + public T getStream() { + return stream; + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java new file mode 100644 index 000000000..344bc36a1 --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/InterceptedStreamMap.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Map-like storage for intercepted Guacamole streams. + * + * @author Michael Jumper + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public class InterceptedStreamMap { + + /** + * Logger for this class. + */ + private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class); + + /** + * The maximum number of milliseconds to wait for notification that a + * stream has closed before explicitly checking for closure ourselves. + */ + private static final long STREAM_WAIT_TIMEOUT = 1000; + + /** + * Mapping of the indexes of all streams whose associated "blob" and "end" + * instructions should be intercepted. + */ + private final ConcurrentMap> streams = + new ConcurrentHashMap>(); + + /** + * Closes the given stream, logging any errors that occur during closure. + * The monitor of the stream is notified via a single call to notify() once + * the attempt to close has been made. + * + * @param stream + * The stream to close and notify. + */ + private void close(T stream) { + + // Attempt to close stream + try { + stream.close(); + } + catch (IOException e) { + logger.warn("Unable to close intercepted stream: {}", e.getMessage()); + logger.debug("I/O error prevented closure of intercepted stream.", e); + } + + // Notify waiting threads that the stream has ended + synchronized (stream) { + stream.notify(); + } + + } + + /** + * Closes the stream object associated with the stream having the given + * index, if any, removing it from the map, logging any errors that occur + * during closure, and unblocking any in-progress calls to waitFor() for + * that stream. If no such stream exists within this map, then this + * function has no effect. + * + * @param index + * The index of the stream whose associated stream object should be + * closed. + * + * @return + * The stream associated with the given index, if the stream was stored + * within this map, or null if no such stream exists. + */ + public InterceptedStream close(String index) { + + // Remove associated stream + InterceptedStream stream = streams.remove(index); + if (stream == null) + return null; + + // Close stream if it exists + close(stream.getStream()); + return stream; + + } + + /** + * Closes the given stream, logging any errors that occur during closure, + * and unblocking any in-progress calls to waitFor() for the given stream. + * If the given stream is stored within this map, it will also be removed. + * + * @param stream + * The stream to close. + * + * @return + * true if the given stream was stored within this map, false + * otherwise. + */ + public boolean close(InterceptedStream stream) { + + // Remove stream if present + boolean wasRemoved = streams.remove(stream.getIndex(), stream); + + // Close provided stream + close(stream.getStream()); + + return wasRemoved; + + } + + /** + * Removes and closes all streams stored within this map, logging any errors + * that occur during closure, and unblocking any in-progress calls to + * waitFor(). + */ + public void closeAll() { + + // Close any active streams + for (InterceptedStream stream : streams.values()) + close(stream.getStream()); + + // Remove now-useless references + streams.clear(); + + } + + /** + * Blocks until the given stream is closed, or until another stream with + * the same index replaces it. + * + * @param stream + * The stream to wait for. + */ + public void waitFor(InterceptedStream stream) { + + T underlyingStream = stream.getStream(); + + // Wait for stream to close + synchronized (underlyingStream) { + while (streams.get(stream.getIndex()) == stream) { + try { + underlyingStream.wait(STREAM_WAIT_TIMEOUT); + } + catch (InterruptedException e) { + // Ignore + } + } + } + + } + + /** + * Returns the stream stored in this map under the given index. + * + * @param index + * The index of the stream to return. + * + * @return + * The stream having the given index, or null if no such stream is + * stored within this map. + */ + public InterceptedStream get(String index) { + return streams.get(index); + } + + /** + * Adds the given stream to this map, storing it under its associated + * index. If another stream already exists within this map having the same + * index, that stream will be closed and replaced. + * + * @param stream + * The stream to store within this map. + */ + public void put(InterceptedStream stream) { + + // Add given stream to map + InterceptedStream oldStream = + streams.put(stream.getIndex(), stream); + + // If a previous stream DID exist, close it + if (oldStream != null) + close(oldStream.getStream()); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java new file mode 100644 index 000000000..7cdadba9b --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/OutputStreamInterceptingFilter.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import javax.xml.bind.DatatypeConverter; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.apache.guacamole.protocol.GuacamoleStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts "blob" and "end" instructions, + * automatically writing to or closing the stream given with + * interceptStream(). The required "ack" responses to received blobs are + * sent automatically. + * + * @author Michael Jumper + */ +public class OutputStreamInterceptingFilter + extends StreamInterceptingFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(OutputStreamInterceptingFilter.class); + + /** + * Creates a new OutputStreamInterceptingFilter which selectively intercepts + * "blob" and "end" instructions. The required "ack" responses will + * automatically be sent over the given tunnel. + * + * @param tunnel + * The GuacamoleTunnel over which any required "ack" instructions + * should be sent. + */ + public OutputStreamInterceptingFilter(GuacamoleTunnel tunnel) { + super(tunnel); + } + + /** + * Injects an "ack" instruction into the outbound Guacamole protocol + * stream, as if sent by the connected client. "ack" instructions are used + * to acknowledge the receipt of a stream and its subsequent blobs, and are + * the only means of communicating success/failure status. + * + * @param index + * The index of the stream that this "ack" instruction relates to. + * + * @param message + * An arbitrary human-readable message to include within the "ack" + * instruction. + * + * @param status + * The status of the stream operation being acknowledged via the "ack" + * instruction. Error statuses will implicitly close the stream via + * closeStream(). + */ + private void sendAck(String index, String message, GuacamoleStatus status) { + + // Error "ack" instructions implicitly close the stream + if (status != GuacamoleStatus.SUCCESS) + closeInterceptedStream(index); + + sendInstruction(new GuacamoleInstruction("ack", index, message, + Integer.toString(status.getGuacamoleStatusCode()))); + + } + + /** + * Handles a single "blob" instruction, decoding its base64 data, + * sending that data to the associated OutputStream, and ultimately + * dropping the "blob" instruction such that the client never receives + * it. If no OutputStream is associated with the stream index within + * the "blob" instruction, the instruction is passed through untouched. + * + * @param instruction + * The "blob" instruction being handled. + * + * @return + * The originally-provided "blob" instruction, if that instruction + * should be passed through to the client, or null if the "blob" + * instruction should be dropped. + */ + private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 2) + return instruction; + + // Pull associated stream + String index = args.get(0); + InterceptedStream stream = getInterceptedStream(index); + if (stream == null) + return instruction; + + // Decode blob + byte[] blob; + try { + String data = args.get(1); + blob = DatatypeConverter.parseBase64Binary(data); + } + catch (IllegalArgumentException e) { + logger.warn("Received base64 data for intercepted stream was invalid."); + logger.debug("Decoding base64 data for intercepted stream failed.", e); + return null; + } + + // Attempt to write data to stream + try { + stream.getStream().write(blob); + sendAck(index, "OK", GuacamoleStatus.SUCCESS); + } + catch (IOException e) { + sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR); + logger.debug("Write failed for intercepted stream.", e); + } + + // Instruction was handled purely internally + return null; + + } + + /** + * Handles a single "end" instruction, closing the associated + * OutputStream. If no OutputStream is associated with the stream index + * within the "end" instruction, this function has no effect. + * + * @param instruction + * The "end" instruction being handled. + */ + private void handleEnd(GuacamoleInstruction instruction) { + + // Verify all required arguments are present + List args = instruction.getArgs(); + if (args.size() < 1) + return; + + // Terminate stream + closeInterceptedStream(args.get(0)); + + } + + @Override + public GuacamoleInstruction filter(GuacamoleInstruction instruction) + throws GuacamoleException { + + // Intercept "blob" instructions for in-progress streams + if (instruction.getOpcode().equals("blob")) + return handleBlob(instruction); + + // Intercept "end" instructions for in-progress streams + if (instruction.getOpcode().equals("end")) { + handleEnd(instruction); + return instruction; + } + + // Pass instruction through untouched + return instruction; + + } + + @Override + protected void handleInterceptedStream(InterceptedStream stream) { + + // Acknowledge that the stream is ready to receive data + sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java new file mode 100644 index 000000000..e04de7d7e --- /dev/null +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingFilter.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.guacamole.tunnel; + +import java.io.Closeable; +import org.apache.guacamole.GuacamoleException; +import org.apache.guacamole.io.GuacamoleWriter; +import org.apache.guacamole.net.GuacamoleTunnel; +import org.apache.guacamole.protocol.GuacamoleFilter; +import org.apache.guacamole.protocol.GuacamoleInstruction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Filter which selectively intercepts stream-related instructions, + * automatically writing to, reading from, or closing the stream given with + * interceptStream(). Any instructions required by the Guacamole protocol to be + * sent in response to intercepted instructions will be sent automatically. + * + * @param + * The type of object which will produce or consume the data sent over the + * intercepted Guacamole stream. Usually, this will be either InputStream + * or OutputStream. + */ +public abstract class StreamInterceptingFilter + implements GuacamoleFilter { + + /** + * Logger for this class. + */ + private static final Logger logger = + LoggerFactory.getLogger(StreamInterceptingFilter.class); + + /** + * Mapping of the all streams whose related instructions should be + * intercepted. + */ + private final InterceptedStreamMap streams = new InterceptedStreamMap(); + + /** + * The tunnel over which any required instructions should be sent. + */ + private final GuacamoleTunnel tunnel; + + /** + * Creates a new StreamInterceptingFilter which selectively intercepts + * stream-related instructions. Any instructions required by the Guacamole + * protocol to be sent in response to intercepted instructions will be sent + * automatically over the given tunnel. + * + * @param tunnel + * The GuacamoleTunnel over which any required instructions should be + * sent. + */ + public StreamInterceptingFilter(GuacamoleTunnel tunnel) { + this.tunnel = tunnel; + } + + /** + * Injects an arbitrary Guacamole instruction into the outbound Guacamole + * protocol stream (GuacamoleWriter) of the tunnel associated with this + * StreamInterceptingFilter, as if the instruction was sent by the connected + * client. + * + * @param instruction + * The Guacamole instruction to inject. + */ + protected void sendInstruction(GuacamoleInstruction instruction) { + + // Temporarily acquire writer to send "ack" instruction + GuacamoleWriter writer = tunnel.acquireWriter(); + + // Send successful "ack" + try { + writer.writeInstruction(instruction); + } + catch (GuacamoleException e) { + logger.debug("Unable to send \"{}\" for intercepted stream.", + instruction.getOpcode(), e); + } + + // Done writing + tunnel.releaseWriter(); + + } + + /** + * Returns the stream having the given index and currently being intercepted + * by this filter. + * + * @param index + * The index of the stream to return. + * + * @return + * The stream having the given index, or null if no such stream is + * being intercepted. + */ + protected InterceptedStream getInterceptedStream(String index) { + return streams.get(index); + } + + /** + * Closes the stream having the given index and currently being intercepted + * by this filter, if any. If no such stream is being intercepted, then this + * function has no effect. + * + * @param index + * The index of the stream to close. + * + * @return + * The stream associated with the given index, if the stream is being + * intercepted, or null if no such stream exists. + */ + protected InterceptedStream closeInterceptedStream(String index) { + return streams.close(index); + } + + /** + * Closes the given stream. + * + * @param stream + * The stream to close. + * + * @return + * true if the given stream was being intercepted, false otherwise. + */ + protected boolean closeInterceptedStream(InterceptedStream stream) { + return streams.close(stream); + } + + /** + * Closes all streams being intercepted by this filter. + */ + public void closeAllInterceptedStreams() { + streams.closeAll(); + } + + /** + * Begins handling the data of the given intercepted stream. This function + * will automatically be invoked by interceptStream() for any valid stream. + * It is not required that this function block until all data is handled; + * interceptStream() will do this automatically. Implementations are free + * to use asynchronous approaches to data handling. + * + * @param stream + * The stream being intercepted. + */ + protected abstract void handleInterceptedStream(InterceptedStream stream); + + /** + * Intercept the stream having the given index, producing or consuming its + * data as appropriate. The given stream object will automatically be closed + * when the stream ends. If there is no stream having the given index, then + * the stream object will be closed immediately. This function will block + * until all data has been handled and the stream is ended. + * + * @param index + * The index of the stream to intercept. + * + * @param stream + * The stream object which will produce or consume all data for the + * stream having the given index. + */ + public void interceptStream(int index, T stream) { + + InterceptedStream interceptedStream; + String indexString = Integer.toString(index); + + // Atomically verify tunnel is open and add the given stream + synchronized (tunnel) { + + // Do nothing if tunnel is not open + if (!tunnel.isOpen()) + return; + + // Wrap stream + interceptedStream = new InterceptedStream(indexString, stream); + + // Replace any existing stream + streams.put(interceptedStream); + + } + + // Produce/consume all stream data + handleInterceptedStream(interceptedStream); + + // Wait for stream to close + streams.waitFor(interceptedStream); + + } + +} diff --git a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java index a9a135512..333cdb1c5 100644 --- a/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java +++ b/guacamole/src/main/java/org/apache/guacamole/tunnel/StreamInterceptingTunnel.java @@ -20,21 +20,12 @@ package org.apache.guacamole.tunnel; import java.io.BufferedOutputStream; -import java.io.IOException; import java.io.OutputStream; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import javax.xml.bind.DatatypeConverter; import org.apache.guacamole.GuacamoleException; import org.apache.guacamole.io.GuacamoleReader; -import org.apache.guacamole.io.GuacamoleWriter; import org.apache.guacamole.net.DelegatingGuacamoleTunnel; import org.apache.guacamole.net.GuacamoleTunnel; import org.apache.guacamole.protocol.FilteredGuacamoleReader; -import org.apache.guacamole.protocol.GuacamoleFilter; -import org.apache.guacamole.protocol.GuacamoleInstruction; -import org.apache.guacamole.protocol.GuacamoleStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +42,8 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { /** * Logger for this class. */ - private static final Logger logger = LoggerFactory.getLogger(StreamInterceptingTunnel.class); - - /** - * The maximum number of milliseconds to wait for notification that a - * stream has closed before explicitly checking for closure ourselves. - */ - private static final long STREAM_WAIT_TIMEOUT = 1000; + private static final Logger logger = + LoggerFactory.getLogger(StreamInterceptingTunnel.class); /** * Creates a new StreamInterceptingTunnel which wraps the given tunnel, @@ -73,206 +59,10 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { } /** - * Mapping of the indexes of all streams whose associated "blob" and "end" - * instructions should be intercepted. + * The filter to use for rerouting received stream data to OutputStreams. */ - private final Map streams = - new ConcurrentHashMap(); - - /** - * Filter which selectively intercepts "blob" and "end" instructions, - * automatically writing to or closing the stream given with - * interceptStream(). The required "ack" responses to received blobs are - * sent automatically. - */ - private final GuacamoleFilter STREAM_FILTER = new GuacamoleFilter() { - - /** - * Handles a single "blob" instruction, decoding its base64 data, - * sending that data to the associated OutputStream, and ultimately - * dropping the "blob" instruction such that the client never receives - * it. If no OutputStream is associated with the stream index within - * the "blob" instruction, the instruction is passed through untouched. - * - * @param instruction - * The "blob" instruction being handled. - * - * @return - * The originally-provided "blob" instruction, if that instruction - * should be passed through to the client, or null if the "blob" - * instruction should be dropped. - */ - private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) { - - // Verify all required arguments are present - List args = instruction.getArgs(); - if (args.size() < 2) - return instruction; - - // Pull associated stream - String index = args.get(0); - OutputStream stream = streams.get(index); - if (stream == null) - return instruction; - - // Decode blob - byte[] blob; - try { - String data = args.get(1); - blob = DatatypeConverter.parseBase64Binary(data); - } - catch (IllegalArgumentException e) { - logger.warn("Received base64 data for intercepted stream was invalid."); - logger.debug("Decoding base64 data for intercepted stream failed.", e); - return null; - } - - // Attempt to write data to stream - try { - stream.write(blob); - sendAck(index, "OK", GuacamoleStatus.SUCCESS); - } - catch (IOException e) { - sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR); - logger.debug("Write failed for intercepted stream.", e); - } - - // Instruction was handled purely internally - return null; - - } - - /** - * Handles a single "end" instruction, closing the associated - * OutputStream. If no OutputStream is associated with the stream index - * within the "end" instruction, this function has no effect. - * - * @param instruction - * The "end" instruction being handled. - */ - private void handleEnd(GuacamoleInstruction instruction) { - - // Verify all required arguments are present - List args = instruction.getArgs(); - if (args.size() < 1) - return; - - // Terminate stream - closeStream(args.get(0)); - - } - - @Override - public GuacamoleInstruction filter(GuacamoleInstruction instruction) - throws GuacamoleException { - - // Intercept "blob" instructions for in-progress streams - if (instruction.getOpcode().equals("blob")) - return handleBlob(instruction); - - // Intercept "end" instructions for in-progress streams - if (instruction.getOpcode().equals("end")) { - handleEnd(instruction); - return instruction; - } - - // Pass instruction through untouched - return instruction; - - } - - }; - - /** - * Closes the given OutputStream, logging any errors that occur during - * closure. The monitor of the OutputStream is notified via a single call - * to notify() once the attempt to close has been made. - * - * @param stream - * The OutputStream to close and notify. - */ - private void closeStream(OutputStream stream) { - - // Attempt to close stream - try { - stream.close(); - } - catch (IOException e) { - logger.warn("Unable to close intercepted stream: {}", e.getMessage()); - logger.debug("I/O error prevented closure of intercepted stream.", e); - } - - // Notify waiting threads that the stream has ended - synchronized (stream) { - stream.notify(); - } - - } - - /** - * Closes the OutputStream associated with the stream having the given - * index, if any, logging any errors that occur during closure. If no such - * stream exists, this function has no effect. The monitor of the - * OutputStream is notified via a single call to notify() once the attempt - * to close has been made. - * - * @param index - * The index of the stream whose associated OutputStream should be - * closed and notified. - */ - private OutputStream closeStream(String index) { - - // Remove associated stream - OutputStream stream = streams.remove(index); - if (stream == null) - return null; - - // Close stream if it exists - closeStream(stream); - return stream; - - } - - /** - * Injects an "ack" instruction into the outbound Guacamole protocol - * stream, as if sent by the connected client. "ack" instructions are used - * to acknowledge the receipt of a stream and its subsequent blobs, and are - * the only means of communicating success/failure status. - * - * @param index - * The index of the stream that this "ack" instruction relates to. - * - * @param message - * An arbitrary human-readable message to include within the "ack" - * instruction. - * - * @param status - * The status of the stream operation being acknowledged via the "ack" - * instruction. Error statuses will implicitly close the stream via - * closeStream(). - */ - private void sendAck(String index, String message, GuacamoleStatus status) { - - // Temporarily acquire writer to send "ack" instruction - GuacamoleWriter writer = acquireWriter(); - - // Send successful "ack" - try { - writer.writeInstruction(new GuacamoleInstruction("ack", index, message, - Integer.toString(status.getGuacamoleStatusCode()))); - } - catch (GuacamoleException e) { - logger.debug("Unable to send \"ack\" for intercepted stream.", e); - } - - // Error "ack" instructions implicitly close the stream - if (status != GuacamoleStatus.SUCCESS) - closeStream(index); - - // Done writing - releaseWriter(); - - } + private final OutputStreamInterceptingFilter outputStreamFilter = + new OutputStreamInterceptingFilter(this); /** * Intercept all data received along the stream having the given index, @@ -290,57 +80,21 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { */ public void interceptStream(int index, OutputStream stream) { - String indexString = Integer.toString(index); - - // Atomically verify tunnel is open and add the given stream - OutputStream oldStream; - synchronized (this) { - - // Do nothing if tunnel is not open - if (!isOpen()) { - closeStream(stream); - return; - } - - // Wrap stream - stream = new BufferedOutputStream(stream); - - // Replace any existing stream - oldStream = streams.put(indexString, stream); - - } - - // If a previous stream DID exist, close it - if (oldStream != null) - closeStream(oldStream); - // Log beginning of intercepted stream - logger.debug("Intercepting stream #{} of tunnel \"{}\".", + logger.debug("Intercepting output stream #{} of tunnel \"{}\".", index, getUUID()); - // Acknowledge stream receipt - sendAck(indexString, "OK", GuacamoleStatus.SUCCESS); - - // Wait for stream to close - synchronized (stream) { - while (streams.get(indexString) == stream) { - try { - stream.wait(STREAM_WAIT_TIMEOUT); - } - catch (InterruptedException e) { - // Ignore - } - } - } + outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream)); // Log end of intercepted stream - logger.debug("Intercepted stream #{} of tunnel \"{}\" ended.", index, getUUID()); + logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.", + index, getUUID()); } @Override public GuacamoleReader acquireReader() { - return new FilteredGuacamoleReader(super.acquireReader(), STREAM_FILTER); + return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter); } @Override @@ -352,16 +106,9 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel { super.close(); } - // Ensure all waiting threads are notified that all streams have ended + // Close all intercepted streams finally { - - // Close any active streams - for (OutputStream stream : streams.values()) - closeStream(stream); - - // Remove now-useless references - streams.clear(); - + outputStreamFilter.closeAllInterceptedStreams(); } }