source: trunk/LogicMail/src/org/logicprobe/LogicMail/util/Connection.java @ 979

Revision 979, 18.6 KB checked in by octorian, 2 months ago (diff)

Attempt to improve watchdog-triggered socket force-close behavior (refs #373)

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 2010, Derek Konigsberg
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the project nor the names of its
15 *    contributors may be used to endorse or promote products derived
16 *    from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
29 * OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32package org.logicprobe.LogicMail.util;
33
34import net.rim.device.api.compress.ZLibInputStream;
35import net.rim.device.api.compress.ZLibOutputStream;
36import net.rim.device.api.io.NoCopyByteArrayOutputStream;
37import net.rim.device.api.system.EventLogger;
38import net.rim.device.api.util.Arrays;
39
40import org.logicprobe.LogicMail.AppInfo;
41import org.logicprobe.LogicMail.conf.GlobalConfig;
42import org.logicprobe.LogicMail.conf.MailSettings;
43
44import java.io.ByteArrayOutputStream;
45import java.io.IOException;
46import java.io.InputStream;
47import java.io.OutputStream;
48
49import javax.microedition.io.SocketConnection;
50
51
52/**
53 * This class serves as a facade for socket connections used by the various
54 * protocols supported by this application.  It handles the details of sending
55 * and receiving data in whole lines, with customizable logic to meet protocol
56 * specific needs.
57 */
58public class Connection {
59    private static final byte CR = (byte)0x0D;
60    private static final byte LF = (byte)0x0A;
61    private static final byte[] CRLF = new byte[] { CR, LF };
62    private static String strCRLF = "\r\n";
63
64    private SocketConnection socket;
65    private final int connectionType;
66    private final String localAddress;
67    private final GlobalConfig globalConfig;
68    private InputStream input;
69    private OutputStream output;
70    private int fakeAvailable = -1;
71    private int bytesSent = 0;
72    private int bytesReceived = 0;
73    private final Object socketLock = new Object();
74    private final Object socketReadLock = new Object();
75    private final Object socketWriteLock = new Object();
76    private Thread socketReadThread;
77    private Thread socketWriteThread;
78    private volatile boolean connectionClosed = true;
79   
80    /**
81     * Byte stream used to hold received data before it is passed back to
82     * the rest of the application.
83     */
84    private final ByteArrayOutputStream byteStream = new NoCopyByteArrayOutputStream(1024);
85   
86    /**
87     * Temporary read buffer used as an intermediary between the socket and
88     * the byteStream.
89     */
90    private final byte[] readBuffer = new byte[1024];
91   
92    /**
93     * Initializes a new connection object.
94     *
95     * @param socket Socket representing the connection.
96     * @param connectionType The type of connection that was opened, based on
97     *     the <code>ConnectionConfig.TRANSPORT_XXXX</code> constants.
98     * @throws IOException Thrown if an I/O error occurs.
99     */
100    public Connection(SocketConnection socket, int connectionType) throws IOException {
101        this.connectionType = connectionType;
102        this.globalConfig = MailSettings.getInstance().getGlobalConfig();
103       
104        this.socket = socket;
105        this.input = socket.openDataInputStream();
106        this.output = socket.openDataOutputStream();
107        this.localAddress = socket.getLocalAddress();
108        this.bytesSent = 0;
109        this.bytesReceived = 0;
110        this.connectionClosed = false;
111    }
112   
113    /**
114     * Enables compression on the I/O sockets using ZLib.
115     */
116    public void enableCompression() {
117        if(this.input instanceof ZLibInputStream) { return; }
118        this.input = new ZLibInputStream(this.input, true);
119        this.output = new ZLibOutputStream(this.output, true);
120    }
121   
122    /**
123     * Closes a connection.
124     */
125    public void close() {
126        forceClose();
127
128        EventLogger.logEvent(AppInfo.GUID, "Connection closed".getBytes(),
129                EventLogger.INFORMATION);
130    }
131
132    /**
133     * Closes a connection abruptly, regardless of its current state.
134     */
135    public void forceClose() {
136        connectionClosed = true;
137       
138        synchronized(socketLock) {
139            if(socketWriteThread != null) {
140                socketWriteThread.interrupt();
141                socketWriteThread = null;
142            }
143            if(socketReadThread != null) {
144                socketReadThread.interrupt();
145                socketReadThread = null;
146            }
147        }
148       
149        Exception[] exp = new Exception[3];
150
151        if(input != null) {
152            try {
153                input.close();
154            } catch (Exception e) {
155                exp[0] = e;
156            } finally {
157                input = null;
158            }
159        }
160
161        if(output != null) {
162            try {
163                output.close();
164            } catch (Exception e) {
165                exp[1] = e;
166            } finally {
167                output = null;
168            }
169        }
170
171        if(socket != null) {
172            try {
173                socket.close();
174            } catch (IOException e) {
175                exp[2] = e;
176            } finally {
177                socket = null;
178            }
179        }
180       
181        for(int i=0; i<exp.length; i++) {
182            if(exp[i] == null) { continue; }
183            EventLogger.logEvent(AppInfo.GUID,
184                    ("Error closing connection: " + exp[i].getMessage()).getBytes(),
185                    EventLogger.WARNING);
186        }
187    }
188
189    /**
190     * Determine whether we are currently connected
191     * @return True if connected
192     */
193    public boolean isConnected() {
194        if (socket != null) {
195            return true;
196        } else {
197            return false;
198        }
199    }
200
201    /**
202     * Gets the type of connection that was opened.
203     *
204     * @return the connection type, based on the
205     *     <code>ConnectionConfig.TRANSPORT_XXXX</code> constants.
206     */
207    public int getConnectionType() {
208        return connectionType;
209    }
210   
211    /**
212     * Get the local address to which this connection is bound
213     * @return Local address
214     */
215    public String getLocalAddress() {
216        return localAddress;
217    }
218
219    /**
220     * Gets the number of bytes that have been sent since the
221     * connection was opened.
222     * <p>
223     * The counter is not synchronized, so it should only be
224     * called from the same thread as the send and receive
225     * methods.
226     * </p>
227     * @return bytes sent
228     */
229    public int getBytesSent() {
230        return bytesSent;
231    }
232
233    /**
234     * Gets the number of bytes that have been received since the
235     * connection was opened.
236     * <p>
237     * The counter is not synchronized, so it should only be
238     * called from the same thread as the send and receive
239     * methods.
240     * </p>
241     * @return bytes received
242     */
243    public int getBytesReceived() {
244        return bytesReceived;
245    }
246
247    /**
248     * Gets the socket used by this connection instance.
249     * This method should only be called by <code>NetworkConnector</code>
250     * for the purpose of creating new wrapped sockets.
251     *
252     * @return the connection socket
253     */
254    SocketConnection getSocket() {
255        return socket;
256    }
257   
258    /**
259     * Gets the input stream used by this connection instance.
260     * This method should only be called by <code>NetworkConnector</code>
261     * for the purpose of creating new wrapped sockets.
262     *
263     * @return the connection socket's input stream
264     */
265    InputStream getInput() {
266        return input;
267    }
268   
269    /**
270     * Gets the output stream used by this connection instance.
271     * This method should only be called by <code>NetworkConnector</code>
272     * for the purpose of creating new wrapped sockets.
273     *
274     * @return the connection socket's output stream
275     */
276    OutputStream getOutput() {
277        return output;
278    }
279   
280    /**
281     * Sends a string to the server, terminating it with a CRLF.
282     * No cleanup is performed, as it is expected that the string
283     * is a prepared protocol command.
284     */
285    public void sendCommand(String s) throws IOException {
286        if (globalConfig.getConnDebug()) {
287            EventLogger.logEvent(AppInfo.GUID, ("[SEND CMD] " + s).getBytes(),
288                    EventLogger.DEBUG_INFO);
289        }
290
291        synchronized (socketLock) { socketWriteThread = Thread.currentThread(); }
292        synchronized(socketWriteLock) {
293            if (s == null) {
294                output.write(CRLF, 0, 2);
295                bytesSent += 2;
296            } else {
297                byte[] buf = (s + strCRLF).getBytes();
298                output.write(buf);
299                bytesSent += buf.length;
300            }
301   
302            output.flush();
303        }
304        synchronized (socketLock) { socketWriteThread = null; }
305        if(connectionClosed) { throw new IOException(); }
306    }
307
308    /**
309     * Sends a string to the server. This method is used to bypass all
310     * the processing done by the normal send method, and is most useful
311     * for bulk transmissions.  It writes the provided string to the socket
312     * in a single command, followed by a flush.
313     *
314     * @param data   the data.
315     * @param offset the start offset in the data.
316     * @param length the number of bytes to write.
317     *
318     * @see #send
319     */
320    public void sendRaw(byte[] data, int offset, int length) throws IOException {
321        if (globalConfig.getConnDebug()) {
322            ByteArrayOutputStream stream = new ByteArrayOutputStream();
323            stream.write("[SEND RAW]\r\n".getBytes());
324            stream.write(data, offset, length);
325            EventLogger.logEvent(AppInfo.GUID,
326                    stream.toByteArray(), EventLogger.DEBUG_INFO);
327        }
328
329        synchronized (socketLock) { socketWriteThread = Thread.currentThread(); }
330        synchronized(socketWriteLock) {
331            output.write(data, offset, length);
332            bytesSent += length;
333   
334            output.flush();
335        }
336        synchronized (socketLock) { socketWriteThread = null; }
337       
338        if(connectionClosed) { throw new IOException(); }
339    }
340
341    /**
342     * Returns the number of bytes available for reading.
343     * Used to poll the connection without blocking.
344     *
345     * @see InputStream#available()
346     */
347    public int available() throws IOException {
348        //FIXME: This may cause issues if checked during a blocking read
349        synchronized(socketReadLock) {
350            if (fakeAvailable == -1) {
351                return input.available();
352            } else {
353                return fakeAvailable;
354            }
355        }
356    }
357   
358    /**
359     * Receives a string from the server. This method is used internally for
360     * incoming communication from the server. The main thing it does is
361     * ensure that only complete lines are returned to the application, that is,
362     * lines that were terminated at least by a LF.  Neither CRs nor LFs are
363     * returned as part of the result.
364     *
365     * @return the complete line, minus the CRLF, as a byte array
366     */
367    public byte[] receive() throws IOException {
368        byte[] result = receive(lineResponseTester);
369        if(connectionClosed) { throw new IOException(); }
370        return result;
371    }
372   
373    /**
374     * Receives a string from the server. This method is used internally for
375     * incoming communication from the server.
376     *
377     * @param responseTester class to determine when a complete response has
378     *   been read from the network, and whether to trim it prior to returning
379     * @return the complete response, as a byte array
380     */
381    public byte[] receive(ConnectionResponseTester responseTester) throws IOException {
382        synchronized (socketLock) { socketReadThread = Thread.currentThread(); }
383        byte[] result = receiveImpl(responseTester);
384        synchronized (socketLock) { socketReadThread = null; }
385       
386        if(result != null && globalConfig.getConnDebug()) {
387                EventLogger.logEvent(AppInfo.GUID,
388                        ("[RECV] " + responseTester.logString(result)).getBytes(),
389                        EventLogger.DEBUG_INFO);
390        }
391       
392        if(connectionClosed) { throw new IOException(); }
393        return result;
394    }
395   
396    private byte[] receiveImpl(ConnectionResponseTester responseTester) throws IOException {
397        synchronized(socketReadLock) {
398            // Check existing data for a usable line
399            byte[] line = checkForLine(responseTester);
400            if(line != null) {
401                return line;
402            }
403   
404            // Read from the socket
405            int firstByte = input.read();
406            if(firstByte != -1) {
407                byteStream.write((byte)firstByte);
408                bytesReceived++;
409                int bytesAvailable = input.available();
410                while(bytesAvailable > 0) {
411                    int len = input.read(readBuffer, 0, Math.min(bytesAvailable, readBuffer.length));
412                    byteStream.write(readBuffer, 0, len);
413                    bytesReceived += len;
414                   
415                    // Check read data for a usable line
416                    line = checkForLine(responseTester);
417                    if(line != null) {
418                        return line;
419                    }
420                   
421                    bytesAvailable = input.available();
422   
423                    // If no bytes are reported as being available, but we have
424                    // not yet received a full line, then we need to attempt
425                    // another single-byte blocking read.
426                    if(bytesAvailable == 0) {
427                        firstByte = input.read();
428                        if(firstByte != -1) {
429                            byteStream.write((byte)firstByte);
430                            bytesReceived++;
431                            bytesAvailable = input.available();
432                        }
433                        else {
434                            handleSocketReadError();
435                        }
436                    }
437                }
438               
439                // Check for any final data
440                if(byteStream.size() > 0) {
441                    line = checkForLine(responseTester);
442                    if(line != null) {
443                        return line;
444                    }
445                }
446            }
447            else {
448                handleSocketReadError();
449            }
450        }
451        // We should never get here, unless there is a connection error or the
452        // line response tester has a bug.
453        handleSocketReadError();
454        return null;
455    }
456   
457    private void handleSocketReadError() throws IOException {
458        // If we got here, that means that the InputStream is either closed
459        // or we are in some otherwise unrecoverable state.  This means we
460        // will try to close the connection, ignore any errors from the
461        // close operation, and throw an IOException.
462       
463        EventLogger.logEvent(AppInfo.GUID,
464                "Unable to read from socket, closing connection".getBytes(),
465                EventLogger.INFORMATION);
466
467        close();
468
469        throw new IOException("Connection closed");
470    }
471
472    /**
473     * Checks the byte stream buffer for a usable line of returnable data.
474     * If a line is returned, the buffer will be updated to only contain data
475     * following that line.
476     *
477     * @return the trimmed string which ended in a CRLF in the source data
478     */
479    private byte[] checkForLine(ConnectionResponseTester responseTester) throws IOException {
480        byte[] result;
481       
482        byte[] buf = byteStream.toByteArray();
483        int size = byteStream.size();
484       
485        int p = responseTester.checkForCompleteResponse(buf, size);
486       
487        if(p != -1) {
488            int trimCount = responseTester.trimCount();
489           
490            result = Arrays.copy(buf, 0, p - trimCount);
491           
492            if(p < size) {
493                buf = Arrays.copy(buf, p, size - p);
494                byteStream.reset();
495                byteStream.write(buf);
496                fakeAvailable = buf.length;
497            }
498            else {
499                byteStream.reset();
500                fakeAvailable = -1;
501            }
502        }
503        else {
504            fakeAvailable = size;
505            result = null;
506        }
507        return result;
508    }
509
510    private static ConnectionResponseTester lineResponseTester = new ConnectionResponseTester() {
511        private int trimCount;
512        private int lastLength = 0;
513       
514        public int checkForCompleteResponse(byte[] buf, int len) {
515            trimCount = 0;
516            int p = StringArrays.indexOf(buf, LF, lastLength);
517           
518            if(p != -1 && p < len) {
519                // Specific test for responses that use a double LF in
520                // the middle, to separate things that look like separate
521                // responses but really are not.
522                while(p != -1 && p + 1 < len && buf[p + 1] == LF) {
523                    if(p + 2 == len) {
524                        lastLength = len;
525                        return -1;
526                    }
527                    else {
528                        p = StringArrays.indexOf(buf, LF, p + 2);
529                    }
530                }
531               
532                if(p > 0 && buf[p - 1] == CR) {
533                    trimCount = 2;
534                }
535                else {
536                    trimCount = 1;
537                }
538                lastLength = 0;
539                return ++p;
540            }
541            else {
542                lastLength = len;
543                return -1;
544            }
545        }
546
547        public int trimCount() {
548            return trimCount;
549        }
550       
551        public String logString(byte[] result) {
552            return new String(result);
553        }
554    };
555}
Note: See TracBrowser for help on using the repository browser.