-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[INTERNAL][CORE] Prepares Stream access #653
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,26 @@ | ||
package saros.net; | ||
|
||
import java.io.IOException; | ||
import saros.net.internal.IByteStreamConnection; | ||
import saros.net.stream.StreamMode; | ||
import saros.net.xmpp.JID; | ||
|
||
// TODO Javadoc | ||
|
||
/** | ||
* The connection manager is responsible for establishing connections to remote addresses. | ||
* | ||
* <p>It offers support for two connection types. | ||
* | ||
* <ol> | ||
* <li>Establishing a connection using {@link #connect(String, Object)} to establish a connection | ||
* which is needed in conjunction with the {@link ITransmitter transmitter} and {@link | ||
* IReceiver receiver} in order to send and receive related packages. | ||
* <li>Establishing a connection using {@link #connectStream(String, Object)} to establish a | ||
* {@link IStreamConnection connection} that can be used for custom purposes. In order to get | ||
* notified about such a connection on the remote side you have to install a {@link | ||
* IStreamConnectionListener}. | ||
* </ol> | ||
* | ||
* <p><b>Note</b>: Stream connections must be closed by calling {@link IStreamConnection#close() | ||
* close} on the given connection. | ||
*/ | ||
public interface IConnectionManager { | ||
|
||
public static final int IBB_SERVICE = 1; | ||
|
@@ -21,24 +35,40 @@ public interface IConnectionManager { | |
*/ | ||
public void setServices(int serviceMask); | ||
|
||
/** @deprecated */ | ||
@Deprecated | ||
public IByteStreamConnection connect(JID peer) throws IOException; | ||
public void addStreamConnectionListener(final IStreamConnectionListener listener); | ||
|
||
public IByteStreamConnection connect(String connectionID, JID peer) throws IOException; | ||
public void removeStreamConnectionListener(final IStreamConnectionListener listener); | ||
|
||
/** | ||
* @deprecated Disconnects {@link IByteStreamConnection} with the specified peer | ||
* @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection} | ||
* Connects to the given address using the given stream ID. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if the information about the connection types that is given in the interface javadoc were also part of the corresponding interface method javadoc. This is not as important for this method as you can guess how to use it from the return type, but it is kind of confusing for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this comment applies to the connect method and not connectStream ? Because you do get a handle when calling this method. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. I just put it here as it was the first of the two methods and I thought that some more information wouldn't hurt here either. |
||
* | ||
* @param id the ID of the stream | ||
* @param address the remote address to connect to | ||
* @return the stream connection | ||
* @throws IOException if an I/O error occurs or such a stream already exists | ||
*/ | ||
@Deprecated | ||
public boolean closeConnection(JID peer); | ||
public IStreamConnection connectStream(String id, Object address) throws IOException; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is the address of type There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am currently also have no clue what Object is (sounds dumb I know, I tried to genaralize things). I have started reworking the streams (another not yet published patch) comparing the object against a JID instance or just calling toString() and try to convert the outcome to a JID. So I generally do not know if we should stick to object or change it to String. |
||
|
||
public boolean closeConnection(String connectionIdentifier, JID peer); | ||
/** | ||
* Connects to the given address using the given ID. | ||
* | ||
* @param id | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add param descriptions? |
||
* @param address | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
public void connect(String id, Object address) throws IOException; | ||
|
||
/** @deprecated */ | ||
@Deprecated | ||
public StreamMode getTransferMode(JID jid); | ||
/** | ||
* Closes the given connection. | ||
* | ||
* @param id the ID of the connection | ||
* @param address the remote address | ||
* @return <code>true</code> if the connection was closed, <code>false</code> if no such | ||
* connection exists | ||
*/ | ||
// TODO rename to close | ||
public boolean closeConnection(String id, Object address); | ||
|
||
public StreamMode getTransferMode(String connectionID, JID jid); | ||
// TODO RENAME | ||
public StreamMode getTransferMode(String connectionID, Object address); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
package saros.net; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.OutputStream; | ||
import saros.net.internal.IConnection; | ||
|
||
/** | ||
* A stream connection consists of an input and out stream. It up to the caller to gracefully | ||
* shutdown the connections. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Provide methods to close the streams for ease of use? Also, this might help reminding the caller to close the connections. Update: I have now seen that the interface There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is more related on how to do a gracefully TCP shutdown of a connection (none that we are currently doing in the network layer, just turn SO_LINGER on and the season leave packet will never arrive). This is really complicated stuff where close can result in RST packets (depending on the OS TCP stack). This information is really technically. https://docs.oracle.com/javase/8/docs/technotes/guides/net/articles/connection_release.html To shorten up things. You should not call close on a connection when there is pending data in the receiving buffer. From my testings over the year this is not an issue (on Windows), however it depends on the buffer size that you currently occupying. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Is there any way of providing a method to gracefully shut it down? Or is the dependent on the use case? In other words: Do we have to shift this responsibility completely to the caller or can we resolve (at least some of) it as part of the implementation and offer it as an interface method (or include it in |
||
*/ | ||
public interface IStreamConnection extends IConnection { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this interface provide access to the streams but also direct access to some of their functionality (the timeout stuff)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because timeout stuff is session management, The connection only provides the "stream" but not the session you would put above it. To clearify things, we are talking about network sessions (see OSI layer), not about a Saros session. |
||
|
||
/** | ||
* Gets the input stream to receive data. | ||
* | ||
* @return the input stream to receive data | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
public InputStream getInputStream() throws IOException; | ||
|
||
/** | ||
* Gets the output stream to send data. | ||
* | ||
* @return the output stream to send data | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
public OutputStream getOutputStream() throws IOException; | ||
|
||
/** | ||
* Returns the read timeout in milliseconds. | ||
* | ||
* @return the read timeout | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
public int getReadTimeout() throws IOException; | ||
|
||
/** | ||
* Sets the read timeout in milliseconds. | ||
* | ||
* @throws IOException if an I/O error occurs | ||
*/ | ||
public void setReadTimeout(int timeout) throws IOException; | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,23 @@ | ||||||
package saros.net; | ||||||
|
||||||
/** | ||||||
* Listener interface for accepting remote stream connections. | ||||||
* | ||||||
* <p>The listener must either accept or reject the connection. If there are multiple listeners | ||||||
* installed via {@link IConnectionManager#addStreamConnectionListener(IStreamConnectionListener)} | ||||||
* the connection will only be rejected if all listeners reject. | ||||||
* | ||||||
* <p>Furthermore if a listener accepts the request every outstanding listener will <b>not</b> get | ||||||
* notified about this connection establishment. | ||||||
*/ | ||||||
public interface IStreamConnectionListener { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand what this class is needed for. Could you explain the use case to me? Was it added so that we don't have to blindly accept any incoming stream? Furthermore, using the name "listener" is a bit confusing in combination with the implemented behavior. This "listener" is only used to accept or reject connections and doesn't really do anything with the passed connection. Furthermore, it only runs until a registered listener accepts the connection, meaning not all listeners have to be notified (as stated in the javadoc). I am struggling with finding a good name, but I somewhat like the metaphor of a bouncer for incoming connections (which doesn't completely match as all "bouncers" have to "reject" the connection for it to actually be rejected).
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is quite simple and has to do with (D)DOS attacks. So imagine you open a socket (you can only accept 65535 connections). The first check after calling accept (see ServerSocket) is to do some mighty "magic" and close the connection just afterwards if the "magic" decided you shall not pass. Now it gets passes to the listener(s). If no listener is interested, should I keep the connection alive ? I guess not. TBH the name is bad. It should be changed. |
||||||
|
||||||
/** | ||||||
* Gets called when a connection was established. | ||||||
* | ||||||
* @param connection the established connection | ||||||
* @return <code>true</code> to accept the connection or <code>false</code> to reject the | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the wording "reject" in this context as it suggests to me that every listener has the power to close an incoming connection, while, in reality, the connection is only really rejected iff all registered listeners don't accept the connection. Even though all of this is explained in the interface javadoc, I don't think it hurts to be as clear as possible with the wording. How about changing it to simply state that the method returns false if the connection is not accepted by this listener? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this needs to be updated to: should reject, |
||||||
* connection. | ||||||
*/ | ||||||
public boolean connectionEstablished(IStreamConnection connection); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is from a TCP (depends on the stream service) point of the the connection IS established, it occupies a port. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. Then how about something along the lines of |
||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,37 +1,35 @@ | ||||||
/* | ||||||
* DPP - Serious Distributed Pair Programming (c) Freie Universität Berlin - | ||||||
* Fachbereich Mathematik und Informatik - 2006 (c) Riad Djemili - 2006 | ||||||
* | ||||||
* This program is free software; you can redistribute it and/or modify it under | ||||||
* the terms of the GNU General Public License as published by the Free Software | ||||||
* Foundation; either version 1, or (at your option) any later version. | ||||||
* | ||||||
* This program is distributed in the hope that it will be useful, but WITHOUT | ||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | ||||||
* FOR A PARTICULAR PURPOSE. See the GNU General Public License for more | ||||||
* details. | ||||||
* | ||||||
* You should have received a copy of the GNU General Public License along with | ||||||
* this program; if not, write to the Free Software Foundation, Inc., 675 Mass | ||||||
* Ave, Cambridge, MA 02139, USA. | ||||||
*/ | ||||||
|
||||||
package saros.net; | ||||||
|
||||||
import java.io.IOException; | ||||||
import org.jivesoftware.smack.packet.Packet; | ||||||
import org.jivesoftware.smack.packet.PacketExtension; | ||||||
import saros.annotations.Component; | ||||||
import saros.net.xmpp.JID; | ||||||
|
||||||
/** | ||||||
* A humble interface that is responsible for network functionality. The idea behind this interface | ||||||
* is to only encapsulates the least possible amount of functionality - the one that can't be easily | ||||||
* tested. | ||||||
* Interface for sending packets to remote addresses. In general this interface offers two | ||||||
* possibilities for sending packets. | ||||||
* | ||||||
* @author rdjemili | ||||||
* <ol> | ||||||
* <li>Either sending packets using the default network environment by calling {@link | ||||||
* #sendPacket(Packet)} or {@link #sendPacketExtension(JID, PacketExtension)}. | ||||||
* <li>Using {@link #send(String, JID, PacketExtension)} using a specific connection that must | ||||||
* first be established by calling {@link IConnectionManager#connect(String, Object)}. | ||||||
* </ol> | ||||||
* | ||||||
* The second option should always be used as default option when sending packets frequently and | ||||||
* over a longer time span to an already known address. | ||||||
* | ||||||
* <p><b>Implementation notes</b>: Implementation should consider to support connection ID's through | ||||||
* the {@link IConnectionManager}. If this is not possible the implementation <b>must</b> ensure | ||||||
* that connection lost is properly detected, i.e sending packets to a server which may route the | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
* packets at a later time without getting an acknowledgement if the packet has been received is a | ||||||
* <b>violation</b> of the contract. | ||||||
* | ||||||
* @see IConnectionManager | ||||||
*/ | ||||||
/* | ||||||
* TODO ensure we use IQ packets so the server must return an error. Afterwards we can change the contract of this interface. | ||||||
*/ | ||||||
@Component(module = "net") | ||||||
public interface ITransmitter { | ||||||
|
||||||
/** | ||||||
|
@@ -63,13 +61,14 @@ public interface ITransmitter { | |||||
public void send(JID recipient, PacketExtension extension) throws IOException; | ||||||
|
||||||
/** | ||||||
* Sends the given {@link PacketExtension} to the given {@link JID} using a direct stream | ||||||
* connection. The connection must be already established to the recipient with the given id. | ||||||
* Sends the given {@link PacketExtension} to the given {@link JID} using the given connection ID. | ||||||
* A connection with the given connection id must already been established. | ||||||
* | ||||||
* @param connectionID the id of the connection | ||||||
* @param connectionID the ID of the connection | ||||||
* @param recipient the recipient of the extension | ||||||
* @param extension the extension to send | ||||||
* @throws IOException if an I/O error occurs | ||||||
* @see IConnectionManager#connect(String, Object) | ||||||
*/ | ||||||
public void send(String connectionID, JID recipient, PacketExtension extension) | ||||||
throws IOException; | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
* @author coezbek | ||
* @author srossbach | ||
*/ | ||
public class BinaryChannelConnection implements IByteStreamConnection { | ||
public class BinaryChannelConnection implements IPacketConnection { | ||
|
||
private static final Logger LOG = Logger.getLogger(BinaryChannelConnection.class); | ||
|
||
|
@@ -49,21 +49,23 @@ private static class Opcode { | |
/** Max size of data chunks */ | ||
private static final int CHUNKSIZE = 32 * 1024 - 1; | ||
|
||
private IByteStreamConnectionListener listener; | ||
private ReceiverThread receiveThread; | ||
|
||
private final JID remoteAddress; | ||
private final JID localAddress; | ||
|
||
private final String connectionID; | ||
|
||
private final IConnectionClosedCallback callback; | ||
|
||
private IDPool idPool = new IDPool(); | ||
|
||
private boolean connected; | ||
private boolean initialized; | ||
|
||
private Map<Integer, ByteArrayOutputStream> pendingFragmentedPackets = | ||
new HashMap<Integer, ByteArrayOutputStream>(); | ||
|
||
private Map<Integer, BinaryXMPPExtension> pendingXMPPExtensions = | ||
new HashMap<Integer, BinaryXMPPExtension>(); | ||
|
||
|
@@ -111,24 +113,16 @@ public void run() { | |
|
||
private IBinaryXMPPExtensionReceiver receiver; | ||
|
||
public BinaryChannelConnection( | ||
JID localAddress, | ||
JID remoteAddress, | ||
String connectionID, | ||
ByteStream stream, | ||
StreamMode mode, | ||
IByteStreamConnectionListener listener) | ||
throws IOException { | ||
this.listener = listener; | ||
this.localAddress = localAddress; | ||
this.remoteAddress = remoteAddress; | ||
this.connectionID = connectionID; | ||
this.stream = stream; | ||
this.stream.setReadTimeout(0); // keep connection alive | ||
this.mode = mode; | ||
public BinaryChannelConnection(ByteStream stream, IConnectionClosedCallback callback) { | ||
this.callback = callback; | ||
// FIXME | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Specify what has to be fixed? Not obvious to me. Do you mean the usage of JID? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You was already confused about this "Object Adresses", this is how it is internally managed. Works fine until the addresses used by the bytestream are not JIDs. |
||
this.localAddress = (JID) stream.getLocalAddress(); | ||
// FIXME | ||
this.remoteAddress = (JID) stream.getRemoteAddress(); | ||
|
||
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream())); | ||
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream())); | ||
this.connectionID = stream.getId(); | ||
this.mode = stream.getMode(); | ||
this.stream = stream; | ||
} | ||
|
||
@Override | ||
|
@@ -138,14 +132,17 @@ public void setBinaryXMPPExtensionReceiver(IBinaryXMPPExtensionReceiver receiver | |
this.receiver = receiver; | ||
} | ||
|
||
@Override | ||
public synchronized void initialize() { | ||
public synchronized void initialize() throws IOException { | ||
if (initialized) return; | ||
|
||
/* | ||
* it is ok to start the receiver a bit later because the data will be | ||
* already buffered by SMACK or the OS | ||
*/ | ||
stream.setReadTimeout(0); // keep connection alive | ||
outputStream = new DataOutputStream(new BufferedOutputStream(stream.getOutputStream())); | ||
inputStream = new DataInputStream(new BufferedInputStream(stream.getInputStream())); | ||
|
||
receiveThread = new ReceiverThread(); | ||
receiveThread.setName("BinaryChannel-" + remoteAddress.getName()); | ||
receiveThread.start(); | ||
|
@@ -154,13 +151,23 @@ public synchronized void initialize() { | |
} | ||
|
||
@Override | ||
public String getConnectionID() { | ||
return connectionID; | ||
public Object getLocalAddress() { | ||
return localAddress; | ||
} | ||
|
||
@Override | ||
public synchronized boolean isConnected() { | ||
return connected; | ||
public JID getRemoteAddress() { | ||
return remoteAddress; | ||
} | ||
|
||
@Override | ||
public StreamMode getMode() { | ||
return mode; | ||
} | ||
|
||
@Override | ||
public String getId() { | ||
return connectionID; | ||
} | ||
|
||
@Override | ||
|
@@ -192,17 +199,7 @@ public void close() { | |
} | ||
} | ||
|
||
listener.connectionClosed(connectionID, this); | ||
} | ||
|
||
@Override | ||
public StreamMode getMode() { | ||
return mode; | ||
} | ||
|
||
@Override | ||
public JID getRemoteAddress() { | ||
return remoteAddress; | ||
if (callback != null) callback.connectionClosed(this); | ||
} | ||
|
||
@Override | ||
|
@@ -436,6 +433,10 @@ private BinaryXMPPExtension readNextXMPPExtension() throws IOException { | |
throw new InterruptedIOException("interrupted while reading stream data"); | ||
} | ||
|
||
private synchronized boolean isConnected() { | ||
return connected; | ||
} | ||
|
||
private synchronized void sendData(int fragmentId, byte[] data, int offset, int length) | ||
throws IOException { | ||
|
||
|
@@ -485,7 +486,15 @@ private void splitAndSend(byte[] data, int chunks, int fragmentId) throws IOExce | |
|
||
@Override | ||
public String toString() { | ||
return "[mode=" + getMode() + ", id=" + connectionID + "]" + " " + remoteAddress; | ||
return "PacketConnection [id=" | ||
+ getId() | ||
+ ", mode=" | ||
+ getMode() | ||
+ ", localAddress=" | ||
+ getLocalAddress() | ||
+ ", remoteAddress=" | ||
+ getRemoteAddress() | ||
+ "]"; | ||
} | ||
|
||
static class IDPool { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the phrase "establish(ing) a connection" twice sounds a bit weird. But that is more of a nit-pick than a real complaint.