mirror of
https://github.com/gyurix1968/guacamole-client.git
synced 2025-09-06 13:17:41 +00:00
GUACAMOLE-44: Extract logic of StreamInterceptingTunnel.
This commit is contained in:
@@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.guacamole.tunnel;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
/**
|
||||
* A simple pairing of the index of an intercepted Guacamole stream with the
|
||||
* stream-type object which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream.
|
||||
*
|
||||
* @author Michael Jumper
|
||||
* @param <T>
|
||||
* The type of object which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream. Usually, this will be either InputStream
|
||||
* or OutputStream.
|
||||
*/
|
||||
public class InterceptedStream<T extends Closeable> {
|
||||
|
||||
/**
|
||||
* The index of the Guacamole stream being intercepted.
|
||||
*/
|
||||
private final String index;
|
||||
|
||||
/**
|
||||
* The stream which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream.
|
||||
*/
|
||||
private final T stream;
|
||||
|
||||
/**
|
||||
* Creates a new InterceptedStream which associated the given Guacamole
|
||||
* stream index with the given stream object.
|
||||
*
|
||||
* @param index
|
||||
* The index of the Guacamole stream being intercepted.
|
||||
*
|
||||
* @param stream
|
||||
* The stream which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream.
|
||||
*/
|
||||
public InterceptedStream(String index, T stream) {
|
||||
this.index = index;
|
||||
this.stream = stream;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the index of the Guacamole stream being intercepted.
|
||||
*
|
||||
* @return
|
||||
* The index of the Guacamole stream being intercepted.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the stream which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream.
|
||||
*
|
||||
* @return
|
||||
* The stream which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream.
|
||||
*/
|
||||
public T getStream() {
|
||||
return stream;
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,211 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.guacamole.tunnel;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Map-like storage for intercepted Guacamole streams.
|
||||
*
|
||||
* @author Michael Jumper
|
||||
* @param <T>
|
||||
* The type of object which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream. Usually, this will be either InputStream
|
||||
* or OutputStream.
|
||||
*/
|
||||
public class InterceptedStreamMap<T extends Closeable> {
|
||||
|
||||
/**
|
||||
* Logger for this class.
|
||||
*/
|
||||
private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class);
|
||||
|
||||
/**
|
||||
* The maximum number of milliseconds to wait for notification that a
|
||||
* stream has closed before explicitly checking for closure ourselves.
|
||||
*/
|
||||
private static final long STREAM_WAIT_TIMEOUT = 1000;
|
||||
|
||||
/**
|
||||
* Mapping of the indexes of all streams whose associated "blob" and "end"
|
||||
* instructions should be intercepted.
|
||||
*/
|
||||
private final ConcurrentMap<String, InterceptedStream<T>> streams =
|
||||
new ConcurrentHashMap<String, InterceptedStream<T>>();
|
||||
|
||||
/**
|
||||
* Closes the given stream, logging any errors that occur during closure.
|
||||
* The monitor of the stream is notified via a single call to notify() once
|
||||
* the attempt to close has been made.
|
||||
*
|
||||
* @param stream
|
||||
* The stream to close and notify.
|
||||
*/
|
||||
private void close(T stream) {
|
||||
|
||||
// Attempt to close stream
|
||||
try {
|
||||
stream.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.warn("Unable to close intercepted stream: {}", e.getMessage());
|
||||
logger.debug("I/O error prevented closure of intercepted stream.", e);
|
||||
}
|
||||
|
||||
// Notify waiting threads that the stream has ended
|
||||
synchronized (stream) {
|
||||
stream.notify();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the stream object associated with the stream having the given
|
||||
* index, if any, removing it from the map, logging any errors that occur
|
||||
* during closure, and unblocking any in-progress calls to waitFor() for
|
||||
* that stream. If no such stream exists within this map, then this
|
||||
* function has no effect.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream whose associated stream object should be
|
||||
* closed.
|
||||
*
|
||||
* @return
|
||||
* The stream associated with the given index, if the stream was stored
|
||||
* within this map, or null if no such stream exists.
|
||||
*/
|
||||
public InterceptedStream<T> close(String index) {
|
||||
|
||||
// Remove associated stream
|
||||
InterceptedStream<T> stream = streams.remove(index);
|
||||
if (stream == null)
|
||||
return null;
|
||||
|
||||
// Close stream if it exists
|
||||
close(stream.getStream());
|
||||
return stream;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the given stream, logging any errors that occur during closure,
|
||||
* and unblocking any in-progress calls to waitFor() for the given stream.
|
||||
* If the given stream is stored within this map, it will also be removed.
|
||||
*
|
||||
* @param stream
|
||||
* The stream to close.
|
||||
*
|
||||
* @return
|
||||
* true if the given stream was stored within this map, false
|
||||
* otherwise.
|
||||
*/
|
||||
public boolean close(InterceptedStream<T> stream) {
|
||||
|
||||
// Remove stream if present
|
||||
boolean wasRemoved = streams.remove(stream.getIndex(), stream);
|
||||
|
||||
// Close provided stream
|
||||
close(stream.getStream());
|
||||
|
||||
return wasRemoved;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes and closes all streams stored within this map, logging any errors
|
||||
* that occur during closure, and unblocking any in-progress calls to
|
||||
* waitFor().
|
||||
*/
|
||||
public void closeAll() {
|
||||
|
||||
// Close any active streams
|
||||
for (InterceptedStream<T> stream : streams.values())
|
||||
close(stream.getStream());
|
||||
|
||||
// Remove now-useless references
|
||||
streams.clear();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until the given stream is closed, or until another stream with
|
||||
* the same index replaces it.
|
||||
*
|
||||
* @param stream
|
||||
* The stream to wait for.
|
||||
*/
|
||||
public void waitFor(InterceptedStream<T> stream) {
|
||||
|
||||
T underlyingStream = stream.getStream();
|
||||
|
||||
// Wait for stream to close
|
||||
synchronized (underlyingStream) {
|
||||
while (streams.get(stream.getIndex()) == stream) {
|
||||
try {
|
||||
underlyingStream.wait(STREAM_WAIT_TIMEOUT);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the stream stored in this map under the given index.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream to return.
|
||||
*
|
||||
* @return
|
||||
* The stream having the given index, or null if no such stream is
|
||||
* stored within this map.
|
||||
*/
|
||||
public InterceptedStream<T> get(String index) {
|
||||
return streams.get(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the given stream to this map, storing it under its associated
|
||||
* index. If another stream already exists within this map having the same
|
||||
* index, that stream will be closed and replaced.
|
||||
*
|
||||
* @param stream
|
||||
* The stream to store within this map.
|
||||
*/
|
||||
public void put(InterceptedStream<T> stream) {
|
||||
|
||||
// Add given stream to map
|
||||
InterceptedStream<T> oldStream =
|
||||
streams.put(stream.getIndex(), stream);
|
||||
|
||||
// If a previous stream DID exist, close it
|
||||
if (oldStream != null)
|
||||
close(oldStream.getStream());
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,194 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.guacamole.tunnel;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import javax.xml.bind.DatatypeConverter;
|
||||
import org.apache.guacamole.GuacamoleException;
|
||||
import org.apache.guacamole.net.GuacamoleTunnel;
|
||||
import org.apache.guacamole.protocol.GuacamoleInstruction;
|
||||
import org.apache.guacamole.protocol.GuacamoleStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Filter which selectively intercepts "blob" and "end" instructions,
|
||||
* automatically writing to or closing the stream given with
|
||||
* interceptStream(). The required "ack" responses to received blobs are
|
||||
* sent automatically.
|
||||
*
|
||||
* @author Michael Jumper
|
||||
*/
|
||||
public class OutputStreamInterceptingFilter
|
||||
extends StreamInterceptingFilter<OutputStream> {
|
||||
|
||||
/**
|
||||
* Logger for this class.
|
||||
*/
|
||||
private static final Logger logger =
|
||||
LoggerFactory.getLogger(OutputStreamInterceptingFilter.class);
|
||||
|
||||
/**
|
||||
* Creates a new OutputStreamInterceptingFilter which selectively intercepts
|
||||
* "blob" and "end" instructions. The required "ack" responses will
|
||||
* automatically be sent over the given tunnel.
|
||||
*
|
||||
* @param tunnel
|
||||
* The GuacamoleTunnel over which any required "ack" instructions
|
||||
* should be sent.
|
||||
*/
|
||||
public OutputStreamInterceptingFilter(GuacamoleTunnel tunnel) {
|
||||
super(tunnel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects an "ack" instruction into the outbound Guacamole protocol
|
||||
* stream, as if sent by the connected client. "ack" instructions are used
|
||||
* to acknowledge the receipt of a stream and its subsequent blobs, and are
|
||||
* the only means of communicating success/failure status.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream that this "ack" instruction relates to.
|
||||
*
|
||||
* @param message
|
||||
* An arbitrary human-readable message to include within the "ack"
|
||||
* instruction.
|
||||
*
|
||||
* @param status
|
||||
* The status of the stream operation being acknowledged via the "ack"
|
||||
* instruction. Error statuses will implicitly close the stream via
|
||||
* closeStream().
|
||||
*/
|
||||
private void sendAck(String index, String message, GuacamoleStatus status) {
|
||||
|
||||
// Error "ack" instructions implicitly close the stream
|
||||
if (status != GuacamoleStatus.SUCCESS)
|
||||
closeInterceptedStream(index);
|
||||
|
||||
sendInstruction(new GuacamoleInstruction("ack", index, message,
|
||||
Integer.toString(status.getGuacamoleStatusCode())));
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a single "blob" instruction, decoding its base64 data,
|
||||
* sending that data to the associated OutputStream, and ultimately
|
||||
* dropping the "blob" instruction such that the client never receives
|
||||
* it. If no OutputStream is associated with the stream index within
|
||||
* the "blob" instruction, the instruction is passed through untouched.
|
||||
*
|
||||
* @param instruction
|
||||
* The "blob" instruction being handled.
|
||||
*
|
||||
* @return
|
||||
* The originally-provided "blob" instruction, if that instruction
|
||||
* should be passed through to the client, or null if the "blob"
|
||||
* instruction should be dropped.
|
||||
*/
|
||||
private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
|
||||
|
||||
// Verify all required arguments are present
|
||||
List<String> args = instruction.getArgs();
|
||||
if (args.size() < 2)
|
||||
return instruction;
|
||||
|
||||
// Pull associated stream
|
||||
String index = args.get(0);
|
||||
InterceptedStream<OutputStream> stream = getInterceptedStream(index);
|
||||
if (stream == null)
|
||||
return instruction;
|
||||
|
||||
// Decode blob
|
||||
byte[] blob;
|
||||
try {
|
||||
String data = args.get(1);
|
||||
blob = DatatypeConverter.parseBase64Binary(data);
|
||||
}
|
||||
catch (IllegalArgumentException e) {
|
||||
logger.warn("Received base64 data for intercepted stream was invalid.");
|
||||
logger.debug("Decoding base64 data for intercepted stream failed.", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Attempt to write data to stream
|
||||
try {
|
||||
stream.getStream().write(blob);
|
||||
sendAck(index, "OK", GuacamoleStatus.SUCCESS);
|
||||
}
|
||||
catch (IOException e) {
|
||||
sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
|
||||
logger.debug("Write failed for intercepted stream.", e);
|
||||
}
|
||||
|
||||
// Instruction was handled purely internally
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a single "end" instruction, closing the associated
|
||||
* OutputStream. If no OutputStream is associated with the stream index
|
||||
* within the "end" instruction, this function has no effect.
|
||||
*
|
||||
* @param instruction
|
||||
* The "end" instruction being handled.
|
||||
*/
|
||||
private void handleEnd(GuacamoleInstruction instruction) {
|
||||
|
||||
// Verify all required arguments are present
|
||||
List<String> args = instruction.getArgs();
|
||||
if (args.size() < 1)
|
||||
return;
|
||||
|
||||
// Terminate stream
|
||||
closeInterceptedStream(args.get(0));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GuacamoleInstruction filter(GuacamoleInstruction instruction)
|
||||
throws GuacamoleException {
|
||||
|
||||
// Intercept "blob" instructions for in-progress streams
|
||||
if (instruction.getOpcode().equals("blob"))
|
||||
return handleBlob(instruction);
|
||||
|
||||
// Intercept "end" instructions for in-progress streams
|
||||
if (instruction.getOpcode().equals("end")) {
|
||||
handleEnd(instruction);
|
||||
return instruction;
|
||||
}
|
||||
|
||||
// Pass instruction through untouched
|
||||
return instruction;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void handleInterceptedStream(InterceptedStream<OutputStream> stream) {
|
||||
|
||||
// Acknowledge that the stream is ready to receive data
|
||||
sendAck(stream.getIndex(), "OK", GuacamoleStatus.SUCCESS);
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -0,0 +1,209 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.guacamole.tunnel;
|
||||
|
||||
import java.io.Closeable;
|
||||
import org.apache.guacamole.GuacamoleException;
|
||||
import org.apache.guacamole.io.GuacamoleWriter;
|
||||
import org.apache.guacamole.net.GuacamoleTunnel;
|
||||
import org.apache.guacamole.protocol.GuacamoleFilter;
|
||||
import org.apache.guacamole.protocol.GuacamoleInstruction;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Filter which selectively intercepts stream-related instructions,
|
||||
* automatically writing to, reading from, or closing the stream given with
|
||||
* interceptStream(). Any instructions required by the Guacamole protocol to be
|
||||
* sent in response to intercepted instructions will be sent automatically.
|
||||
*
|
||||
* @param <T>
|
||||
* The type of object which will produce or consume the data sent over the
|
||||
* intercepted Guacamole stream. Usually, this will be either InputStream
|
||||
* or OutputStream.
|
||||
*/
|
||||
public abstract class StreamInterceptingFilter<T extends Closeable>
|
||||
implements GuacamoleFilter {
|
||||
|
||||
/**
|
||||
* Logger for this class.
|
||||
*/
|
||||
private static final Logger logger =
|
||||
LoggerFactory.getLogger(StreamInterceptingFilter.class);
|
||||
|
||||
/**
|
||||
* Mapping of the all streams whose related instructions should be
|
||||
* intercepted.
|
||||
*/
|
||||
private final InterceptedStreamMap<T> streams = new InterceptedStreamMap<T>();
|
||||
|
||||
/**
|
||||
* The tunnel over which any required instructions should be sent.
|
||||
*/
|
||||
private final GuacamoleTunnel tunnel;
|
||||
|
||||
/**
|
||||
* Creates a new StreamInterceptingFilter which selectively intercepts
|
||||
* stream-related instructions. Any instructions required by the Guacamole
|
||||
* protocol to be sent in response to intercepted instructions will be sent
|
||||
* automatically over the given tunnel.
|
||||
*
|
||||
* @param tunnel
|
||||
* The GuacamoleTunnel over which any required instructions should be
|
||||
* sent.
|
||||
*/
|
||||
public StreamInterceptingFilter(GuacamoleTunnel tunnel) {
|
||||
this.tunnel = tunnel;
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects an arbitrary Guacamole instruction into the outbound Guacamole
|
||||
* protocol stream (GuacamoleWriter) of the tunnel associated with this
|
||||
* StreamInterceptingFilter, as if the instruction was sent by the connected
|
||||
* client.
|
||||
*
|
||||
* @param instruction
|
||||
* The Guacamole instruction to inject.
|
||||
*/
|
||||
protected void sendInstruction(GuacamoleInstruction instruction) {
|
||||
|
||||
// Temporarily acquire writer to send "ack" instruction
|
||||
GuacamoleWriter writer = tunnel.acquireWriter();
|
||||
|
||||
// Send successful "ack"
|
||||
try {
|
||||
writer.writeInstruction(instruction);
|
||||
}
|
||||
catch (GuacamoleException e) {
|
||||
logger.debug("Unable to send \"{}\" for intercepted stream.",
|
||||
instruction.getOpcode(), e);
|
||||
}
|
||||
|
||||
// Done writing
|
||||
tunnel.releaseWriter();
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the stream having the given index and currently being intercepted
|
||||
* by this filter.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream to return.
|
||||
*
|
||||
* @return
|
||||
* The stream having the given index, or null if no such stream is
|
||||
* being intercepted.
|
||||
*/
|
||||
protected InterceptedStream<T> getInterceptedStream(String index) {
|
||||
return streams.get(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the stream having the given index and currently being intercepted
|
||||
* by this filter, if any. If no such stream is being intercepted, then this
|
||||
* function has no effect.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream to close.
|
||||
*
|
||||
* @return
|
||||
* The stream associated with the given index, if the stream is being
|
||||
* intercepted, or null if no such stream exists.
|
||||
*/
|
||||
protected InterceptedStream<T> closeInterceptedStream(String index) {
|
||||
return streams.close(index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the given stream.
|
||||
*
|
||||
* @param stream
|
||||
* The stream to close.
|
||||
*
|
||||
* @return
|
||||
* true if the given stream was being intercepted, false otherwise.
|
||||
*/
|
||||
protected boolean closeInterceptedStream(InterceptedStream<T> stream) {
|
||||
return streams.close(stream);
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes all streams being intercepted by this filter.
|
||||
*/
|
||||
public void closeAllInterceptedStreams() {
|
||||
streams.closeAll();
|
||||
}
|
||||
|
||||
/**
|
||||
* Begins handling the data of the given intercepted stream. This function
|
||||
* will automatically be invoked by interceptStream() for any valid stream.
|
||||
* It is not required that this function block until all data is handled;
|
||||
* interceptStream() will do this automatically. Implementations are free
|
||||
* to use asynchronous approaches to data handling.
|
||||
*
|
||||
* @param stream
|
||||
* The stream being intercepted.
|
||||
*/
|
||||
protected abstract void handleInterceptedStream(InterceptedStream<T> stream);
|
||||
|
||||
/**
|
||||
* Intercept the stream having the given index, producing or consuming its
|
||||
* data as appropriate. The given stream object will automatically be closed
|
||||
* when the stream ends. If there is no stream having the given index, then
|
||||
* the stream object will be closed immediately. This function will block
|
||||
* until all data has been handled and the stream is ended.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream to intercept.
|
||||
*
|
||||
* @param stream
|
||||
* The stream object which will produce or consume all data for the
|
||||
* stream having the given index.
|
||||
*/
|
||||
public void interceptStream(int index, T stream) {
|
||||
|
||||
InterceptedStream<T> interceptedStream;
|
||||
String indexString = Integer.toString(index);
|
||||
|
||||
// Atomically verify tunnel is open and add the given stream
|
||||
synchronized (tunnel) {
|
||||
|
||||
// Do nothing if tunnel is not open
|
||||
if (!tunnel.isOpen())
|
||||
return;
|
||||
|
||||
// Wrap stream
|
||||
interceptedStream = new InterceptedStream<T>(indexString, stream);
|
||||
|
||||
// Replace any existing stream
|
||||
streams.put(interceptedStream);
|
||||
|
||||
}
|
||||
|
||||
// Produce/consume all stream data
|
||||
handleInterceptedStream(interceptedStream);
|
||||
|
||||
// Wait for stream to close
|
||||
streams.waitFor(interceptedStream);
|
||||
|
||||
}
|
||||
|
||||
}
|
@@ -20,21 +20,12 @@
|
||||
package org.apache.guacamole.tunnel;
|
||||
|
||||
import java.io.BufferedOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import javax.xml.bind.DatatypeConverter;
|
||||
import org.apache.guacamole.GuacamoleException;
|
||||
import org.apache.guacamole.io.GuacamoleReader;
|
||||
import org.apache.guacamole.io.GuacamoleWriter;
|
||||
import org.apache.guacamole.net.DelegatingGuacamoleTunnel;
|
||||
import org.apache.guacamole.net.GuacamoleTunnel;
|
||||
import org.apache.guacamole.protocol.FilteredGuacamoleReader;
|
||||
import org.apache.guacamole.protocol.GuacamoleFilter;
|
||||
import org.apache.guacamole.protocol.GuacamoleInstruction;
|
||||
import org.apache.guacamole.protocol.GuacamoleStatus;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -51,13 +42,8 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel {
|
||||
/**
|
||||
* Logger for this class.
|
||||
*/
|
||||
private static final Logger logger = LoggerFactory.getLogger(StreamInterceptingTunnel.class);
|
||||
|
||||
/**
|
||||
* The maximum number of milliseconds to wait for notification that a
|
||||
* stream has closed before explicitly checking for closure ourselves.
|
||||
*/
|
||||
private static final long STREAM_WAIT_TIMEOUT = 1000;
|
||||
private static final Logger logger =
|
||||
LoggerFactory.getLogger(StreamInterceptingTunnel.class);
|
||||
|
||||
/**
|
||||
* Creates a new StreamInterceptingTunnel which wraps the given tunnel,
|
||||
@@ -73,206 +59,10 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mapping of the indexes of all streams whose associated "blob" and "end"
|
||||
* instructions should be intercepted.
|
||||
* The filter to use for rerouting received stream data to OutputStreams.
|
||||
*/
|
||||
private final Map<String, OutputStream> streams =
|
||||
new ConcurrentHashMap<String, OutputStream>();
|
||||
|
||||
/**
|
||||
* Filter which selectively intercepts "blob" and "end" instructions,
|
||||
* automatically writing to or closing the stream given with
|
||||
* interceptStream(). The required "ack" responses to received blobs are
|
||||
* sent automatically.
|
||||
*/
|
||||
private final GuacamoleFilter STREAM_FILTER = new GuacamoleFilter() {
|
||||
|
||||
/**
|
||||
* Handles a single "blob" instruction, decoding its base64 data,
|
||||
* sending that data to the associated OutputStream, and ultimately
|
||||
* dropping the "blob" instruction such that the client never receives
|
||||
* it. If no OutputStream is associated with the stream index within
|
||||
* the "blob" instruction, the instruction is passed through untouched.
|
||||
*
|
||||
* @param instruction
|
||||
* The "blob" instruction being handled.
|
||||
*
|
||||
* @return
|
||||
* The originally-provided "blob" instruction, if that instruction
|
||||
* should be passed through to the client, or null if the "blob"
|
||||
* instruction should be dropped.
|
||||
*/
|
||||
private GuacamoleInstruction handleBlob(GuacamoleInstruction instruction) {
|
||||
|
||||
// Verify all required arguments are present
|
||||
List<String> args = instruction.getArgs();
|
||||
if (args.size() < 2)
|
||||
return instruction;
|
||||
|
||||
// Pull associated stream
|
||||
String index = args.get(0);
|
||||
OutputStream stream = streams.get(index);
|
||||
if (stream == null)
|
||||
return instruction;
|
||||
|
||||
// Decode blob
|
||||
byte[] blob;
|
||||
try {
|
||||
String data = args.get(1);
|
||||
blob = DatatypeConverter.parseBase64Binary(data);
|
||||
}
|
||||
catch (IllegalArgumentException e) {
|
||||
logger.warn("Received base64 data for intercepted stream was invalid.");
|
||||
logger.debug("Decoding base64 data for intercepted stream failed.", e);
|
||||
return null;
|
||||
}
|
||||
|
||||
// Attempt to write data to stream
|
||||
try {
|
||||
stream.write(blob);
|
||||
sendAck(index, "OK", GuacamoleStatus.SUCCESS);
|
||||
}
|
||||
catch (IOException e) {
|
||||
sendAck(index, "FAIL", GuacamoleStatus.SERVER_ERROR);
|
||||
logger.debug("Write failed for intercepted stream.", e);
|
||||
}
|
||||
|
||||
// Instruction was handled purely internally
|
||||
return null;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles a single "end" instruction, closing the associated
|
||||
* OutputStream. If no OutputStream is associated with the stream index
|
||||
* within the "end" instruction, this function has no effect.
|
||||
*
|
||||
* @param instruction
|
||||
* The "end" instruction being handled.
|
||||
*/
|
||||
private void handleEnd(GuacamoleInstruction instruction) {
|
||||
|
||||
// Verify all required arguments are present
|
||||
List<String> args = instruction.getArgs();
|
||||
if (args.size() < 1)
|
||||
return;
|
||||
|
||||
// Terminate stream
|
||||
closeStream(args.get(0));
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GuacamoleInstruction filter(GuacamoleInstruction instruction)
|
||||
throws GuacamoleException {
|
||||
|
||||
// Intercept "blob" instructions for in-progress streams
|
||||
if (instruction.getOpcode().equals("blob"))
|
||||
return handleBlob(instruction);
|
||||
|
||||
// Intercept "end" instructions for in-progress streams
|
||||
if (instruction.getOpcode().equals("end")) {
|
||||
handleEnd(instruction);
|
||||
return instruction;
|
||||
}
|
||||
|
||||
// Pass instruction through untouched
|
||||
return instruction;
|
||||
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/**
|
||||
* Closes the given OutputStream, logging any errors that occur during
|
||||
* closure. The monitor of the OutputStream is notified via a single call
|
||||
* to notify() once the attempt to close has been made.
|
||||
*
|
||||
* @param stream
|
||||
* The OutputStream to close and notify.
|
||||
*/
|
||||
private void closeStream(OutputStream stream) {
|
||||
|
||||
// Attempt to close stream
|
||||
try {
|
||||
stream.close();
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.warn("Unable to close intercepted stream: {}", e.getMessage());
|
||||
logger.debug("I/O error prevented closure of intercepted stream.", e);
|
||||
}
|
||||
|
||||
// Notify waiting threads that the stream has ended
|
||||
synchronized (stream) {
|
||||
stream.notify();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the OutputStream associated with the stream having the given
|
||||
* index, if any, logging any errors that occur during closure. If no such
|
||||
* stream exists, this function has no effect. The monitor of the
|
||||
* OutputStream is notified via a single call to notify() once the attempt
|
||||
* to close has been made.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream whose associated OutputStream should be
|
||||
* closed and notified.
|
||||
*/
|
||||
private OutputStream closeStream(String index) {
|
||||
|
||||
// Remove associated stream
|
||||
OutputStream stream = streams.remove(index);
|
||||
if (stream == null)
|
||||
return null;
|
||||
|
||||
// Close stream if it exists
|
||||
closeStream(stream);
|
||||
return stream;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Injects an "ack" instruction into the outbound Guacamole protocol
|
||||
* stream, as if sent by the connected client. "ack" instructions are used
|
||||
* to acknowledge the receipt of a stream and its subsequent blobs, and are
|
||||
* the only means of communicating success/failure status.
|
||||
*
|
||||
* @param index
|
||||
* The index of the stream that this "ack" instruction relates to.
|
||||
*
|
||||
* @param message
|
||||
* An arbitrary human-readable message to include within the "ack"
|
||||
* instruction.
|
||||
*
|
||||
* @param status
|
||||
* The status of the stream operation being acknowledged via the "ack"
|
||||
* instruction. Error statuses will implicitly close the stream via
|
||||
* closeStream().
|
||||
*/
|
||||
private void sendAck(String index, String message, GuacamoleStatus status) {
|
||||
|
||||
// Temporarily acquire writer to send "ack" instruction
|
||||
GuacamoleWriter writer = acquireWriter();
|
||||
|
||||
// Send successful "ack"
|
||||
try {
|
||||
writer.writeInstruction(new GuacamoleInstruction("ack", index, message,
|
||||
Integer.toString(status.getGuacamoleStatusCode())));
|
||||
}
|
||||
catch (GuacamoleException e) {
|
||||
logger.debug("Unable to send \"ack\" for intercepted stream.", e);
|
||||
}
|
||||
|
||||
// Error "ack" instructions implicitly close the stream
|
||||
if (status != GuacamoleStatus.SUCCESS)
|
||||
closeStream(index);
|
||||
|
||||
// Done writing
|
||||
releaseWriter();
|
||||
|
||||
}
|
||||
private final OutputStreamInterceptingFilter outputStreamFilter =
|
||||
new OutputStreamInterceptingFilter(this);
|
||||
|
||||
/**
|
||||
* Intercept all data received along the stream having the given index,
|
||||
@@ -290,57 +80,21 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel {
|
||||
*/
|
||||
public void interceptStream(int index, OutputStream stream) {
|
||||
|
||||
String indexString = Integer.toString(index);
|
||||
|
||||
// Atomically verify tunnel is open and add the given stream
|
||||
OutputStream oldStream;
|
||||
synchronized (this) {
|
||||
|
||||
// Do nothing if tunnel is not open
|
||||
if (!isOpen()) {
|
||||
closeStream(stream);
|
||||
return;
|
||||
}
|
||||
|
||||
// Wrap stream
|
||||
stream = new BufferedOutputStream(stream);
|
||||
|
||||
// Replace any existing stream
|
||||
oldStream = streams.put(indexString, stream);
|
||||
|
||||
}
|
||||
|
||||
// If a previous stream DID exist, close it
|
||||
if (oldStream != null)
|
||||
closeStream(oldStream);
|
||||
|
||||
// Log beginning of intercepted stream
|
||||
logger.debug("Intercepting stream #{} of tunnel \"{}\".",
|
||||
logger.debug("Intercepting output stream #{} of tunnel \"{}\".",
|
||||
index, getUUID());
|
||||
|
||||
// Acknowledge stream receipt
|
||||
sendAck(indexString, "OK", GuacamoleStatus.SUCCESS);
|
||||
|
||||
// Wait for stream to close
|
||||
synchronized (stream) {
|
||||
while (streams.get(indexString) == stream) {
|
||||
try {
|
||||
stream.wait(STREAM_WAIT_TIMEOUT);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
outputStreamFilter.interceptStream(index, new BufferedOutputStream(stream));
|
||||
|
||||
// Log end of intercepted stream
|
||||
logger.debug("Intercepted stream #{} of tunnel \"{}\" ended.", index, getUUID());
|
||||
logger.debug("Intercepted output stream #{} of tunnel \"{}\" ended.",
|
||||
index, getUUID());
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public GuacamoleReader acquireReader() {
|
||||
return new FilteredGuacamoleReader(super.acquireReader(), STREAM_FILTER);
|
||||
return new FilteredGuacamoleReader(super.acquireReader(), outputStreamFilter);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -352,16 +106,9 @@ public class StreamInterceptingTunnel extends DelegatingGuacamoleTunnel {
|
||||
super.close();
|
||||
}
|
||||
|
||||
// Ensure all waiting threads are notified that all streams have ended
|
||||
// Close all intercepted streams
|
||||
finally {
|
||||
|
||||
// Close any active streams
|
||||
for (OutputStream stream : streams.values())
|
||||
closeStream(stream);
|
||||
|
||||
// Remove now-useless references
|
||||
streams.clear();
|
||||
|
||||
outputStreamFilter.closeAllInterceptedStreams();
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user