source: trunk/LogicMail/src/org/logicprobe/LogicMail/util/Connection.java @ 919

Revision 919, 17.7 KB checked in by octorian, 7 months ago (diff)

Fix for escaped dot issue with POP and SMTP (refs #348) and initial unit tests for POP and SMTP.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
Line 
1/*-
2 * Copyright (c) 2010, Derek Konigsberg
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 *
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 * 3. Neither the name of the project nor the names of its
15 *    contributors may be used to endorse or promote products derived
16 *    from this software without specific prior written permission.
17 *
18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
21 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
22 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
23 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
24 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
26 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
27 * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
29 * OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31
32package org.logicprobe.LogicMail.util;
33
34import net.rim.device.api.compress.ZLibInputStream;
35import net.rim.device.api.compress.ZLibOutputStream;
36import net.rim.device.api.io.NoCopyByteArrayOutputStream;
37import net.rim.device.api.system.EventLogger;
38import net.rim.device.api.util.Arrays;
39
40import org.logicprobe.LogicMail.AppInfo;
41import org.logicprobe.LogicMail.conf.GlobalConfig;
42import org.logicprobe.LogicMail.conf.MailSettings;
43
44import java.io.ByteArrayOutputStream;
45import java.io.IOException;
46import java.io.InputStream;
47import java.io.OutputStream;
48
49import javax.microedition.io.SocketConnection;
50
51
52/**
53 * This class serves as a facade for socket connections used by the various
54 * protocols supported by this application.  It handles the details of sending
55 * and receiving data in whole lines, with customizable logic to meet protocol
56 * specific needs.
57 */
58public class Connection {
59    private static final byte CR = (byte)0x0D;
60    private static final byte LF = (byte)0x0A;
61    private static final byte[] CRLF = new byte[] { CR, LF };
62    private static String strCRLF = "\r\n";
63
64    private SocketConnection socket;
65    private final int connectionType;
66    private final String localAddress;
67    private final GlobalConfig globalConfig;
68    private InputStream input;
69    private OutputStream output;
70    private int fakeAvailable = -1;
71    private int bytesSent = 0;
72    private int bytesReceived = 0;
73    private final Object socketLock = new Object();
74    private final Object socketReadLock = new Object();
75    private volatile boolean connectionClosed = true;
76   
77    /**
78     * Byte stream used to hold received data before it is passed back to
79     * the rest of the application.
80     */
81    private final ByteArrayOutputStream byteStream = new NoCopyByteArrayOutputStream(1024);
82   
83    /**
84     * Temporary read buffer used as an intermediary between the socket and
85     * the byteStream.
86     */
87    private final byte[] readBuffer = new byte[1024];
88   
89    /**
90     * Initializes a new connection object.
91     *
92     * @param socket Socket representing the connection.
93     * @param connectionType The type of connection that was opened, based on
94     *     the <code>ConnectionConfig.TRANSPORT_XXXX</code> constants.
95     * @throws IOException Thrown if an I/O error occurs.
96     */
97    public Connection(SocketConnection socket, int connectionType) throws IOException {
98        this.connectionType = connectionType;
99        this.globalConfig = MailSettings.getInstance().getGlobalConfig();
100       
101        this.socket = socket;
102        this.input = socket.openDataInputStream();
103        this.output = socket.openDataOutputStream();
104        this.localAddress = socket.getLocalAddress();
105        this.bytesSent = 0;
106        this.bytesReceived = 0;
107        this.connectionClosed = false;
108    }
109   
110    /**
111     * Enables compression on the I/O sockets using ZLib.
112     */
113    public void enableCompression() {
114        if(this.input instanceof ZLibInputStream) { return; }
115        this.input = new ZLibInputStream(this.input, true);
116        this.output = new ZLibOutputStream(this.output, true);
117    }
118   
119    /**
120     * Closes a connection.
121     */
122    public void close() {
123        synchronized(socketLock) {
124            forceClose();
125        }
126       
127        EventLogger.logEvent(AppInfo.GUID, "Connection closed".getBytes(),
128                EventLogger.INFORMATION);
129    }
130
131    /**
132     * Closes a connection abruptly, regardless of its current state.
133     */
134    public void forceClose() {
135        connectionClosed = true;
136       
137        Exception[] exp = new Exception[3];
138
139        if(input != null) {
140            try {
141                input.close();
142            } catch (Exception e) {
143                exp[0] = e;
144            } finally {
145                input = null;
146            }
147        }
148
149        if(output != null) {
150            try {
151                output.close();
152            } catch (Exception e) {
153                exp[1] = e;
154            } finally {
155                output = null;
156            }
157        }
158
159        if(socket != null) {
160            try {
161                socket.close();
162            } catch (IOException e) {
163                exp[2] = e;
164            } finally {
165                socket = null;
166            }
167        }
168       
169        for(int i=0; i<exp.length; i++) {
170            if(exp[i] == null) { continue; }
171            EventLogger.logEvent(AppInfo.GUID,
172                    ("Error closing connection: " + exp[i].getMessage()).getBytes(),
173                    EventLogger.WARNING);
174        }
175    }
176
177    /**
178     * Determine whether we are currently connected
179     * @return True if connected
180     */
181    public boolean isConnected() {
182        if (socket != null) {
183            return true;
184        } else {
185            return false;
186        }
187    }
188
189    /**
190     * Gets the type of connection that was opened.
191     *
192     * @return the connection type, based on the
193     *     <code>ConnectionConfig.TRANSPORT_XXXX</code> constants.
194     */
195    public int getConnectionType() {
196        return connectionType;
197    }
198   
199    /**
200     * Get the local address to which this connection is bound
201     * @return Local address
202     */
203    public String getLocalAddress() {
204        return localAddress;
205    }
206
207    /**
208     * Gets the number of bytes that have been sent since the
209     * connection was opened.
210     * <p>
211     * The counter is not synchronized, so it should only be
212     * called from the same thread as the send and receive
213     * methods.
214     * </p>
215     * @return bytes sent
216     */
217    public int getBytesSent() {
218        return bytesSent;
219    }
220
221    /**
222     * Gets the number of bytes that have been received since the
223     * connection was opened.
224     * <p>
225     * The counter is not synchronized, so it should only be
226     * called from the same thread as the send and receive
227     * methods.
228     * </p>
229     * @return bytes received
230     */
231    public int getBytesReceived() {
232        return bytesReceived;
233    }
234
235    /**
236     * Gets the socket used by this connection instance.
237     * This method should only be called by <code>NetworkConnector</code>
238     * for the purpose of creating new wrapped sockets.
239     *
240     * @return the connection socket
241     */
242    SocketConnection getSocket() {
243        return socket;
244    }
245   
246    /**
247     * Gets the input stream used by this connection instance.
248     * This method should only be called by <code>NetworkConnector</code>
249     * for the purpose of creating new wrapped sockets.
250     *
251     * @return the connection socket's input stream
252     */
253    InputStream getInput() {
254        return input;
255    }
256   
257    /**
258     * Gets the output stream used by this connection instance.
259     * This method should only be called by <code>NetworkConnector</code>
260     * for the purpose of creating new wrapped sockets.
261     *
262     * @return the connection socket's output stream
263     */
264    OutputStream getOutput() {
265        return output;
266    }
267   
268    /**
269     * Sends a string to the server, terminating it with a CRLF.
270     * No cleanup is performed, as it is expected that the string
271     * is a prepared protocol command.
272     */
273    public void sendCommand(String s) throws IOException {
274        if (globalConfig.getConnDebug()) {
275            EventLogger.logEvent(AppInfo.GUID, ("[SEND CMD] " + s).getBytes(),
276                    EventLogger.DEBUG_INFO);
277        }
278
279        synchronized(socketLock) {
280            if (s == null) {
281                output.write(CRLF, 0, 2);
282                bytesSent += 2;
283            } else {
284                byte[] buf = (s + strCRLF).getBytes();
285                output.write(buf);
286                bytesSent += buf.length;
287            }
288   
289            output.flush();
290        }
291        if(connectionClosed) { throw new IOException(); }
292    }
293
294    /**
295     * Sends a string to the server. This method is used to bypass all
296     * the processing done by the normal send method, and is most useful
297     * for bulk transmissions.  It writes the provided string to the socket
298     * in a single command, followed by a flush.
299     *
300     * @param data   the data.
301     * @param offset the start offset in the data.
302     * @param length the number of bytes to write.
303     *
304     * @see #send
305     */
306    public void sendRaw(byte[] data, int offset, int length) throws IOException {
307        if (globalConfig.getConnDebug()) {
308            ByteArrayOutputStream stream = new ByteArrayOutputStream();
309            stream.write("[SEND RAW]\r\n".getBytes());
310            stream.write(data, offset, length);
311            EventLogger.logEvent(AppInfo.GUID,
312                    stream.toByteArray(), EventLogger.DEBUG_INFO);
313        }
314
315        synchronized(socketLock) {
316            output.write(data, offset, length);
317            bytesSent += length;
318   
319            output.flush();
320        }
321        if(connectionClosed) { throw new IOException(); }
322    }
323
324    /**
325     * Returns the number of bytes available for reading.
326     * Used to poll the connection without blocking.
327     *
328     * @see InputStream#available()
329     */
330    public int available() throws IOException {
331        //FIXME: This may cause issues if checked during a blocking read
332        synchronized(socketReadLock) {
333            if (fakeAvailable == -1) {
334                return input.available();
335            } else {
336                return fakeAvailable;
337            }
338        }
339    }
340   
341    /**
342     * Receives a string from the server. This method is used internally for
343     * incoming communication from the server. The main thing it does is
344     * ensure that only complete lines are returned to the application, that is,
345     * lines that were terminated at least by a LF.  Neither CRs nor LFs are
346     * returned as part of the result.
347     *
348     * @return the complete line, minus the CRLF, as a byte array
349     */
350    public byte[] receive() throws IOException {
351        byte[] result = receive(lineResponseTester);
352        if(connectionClosed) { throw new IOException(); }
353        return result;
354    }
355   
356    /**
357     * Receives a string from the server. This method is used internally for
358     * incoming communication from the server.
359     *
360     * @param responseTester class to determine when a complete response has
361     *   been read from the network, and whether to trim it prior to returning
362     * @return the complete response, as a byte array
363     */
364    public byte[] receive(ConnectionResponseTester responseTester) throws IOException {
365        byte[] result = receiveImpl(responseTester);
366       
367        if(result != null && globalConfig.getConnDebug()) {
368                EventLogger.logEvent(AppInfo.GUID,
369                        ("[RECV] " + responseTester.logString(result)).getBytes(),
370                        EventLogger.DEBUG_INFO);
371        }
372       
373        if(connectionClosed) { throw new IOException(); }
374        return result;
375    }
376   
377    private byte[] receiveImpl(ConnectionResponseTester responseTester) throws IOException {
378        synchronized(socketReadLock) {
379            // Check existing data for a usable line
380            byte[] line = checkForLine(responseTester);
381            if(line != null) {
382                return line;
383            }
384   
385            // Read from the socket
386            int firstByte = input.read();
387            if(firstByte != -1) {
388                byteStream.write((byte)firstByte);
389                bytesReceived++;
390                int bytesAvailable = input.available();
391                while(bytesAvailable > 0) {
392                    int len = input.read(readBuffer, 0, Math.min(bytesAvailable, readBuffer.length));
393                    byteStream.write(readBuffer, 0, len);
394                    bytesReceived += len;
395                   
396                    // Check read data for a usable line
397                    line = checkForLine(responseTester);
398                    if(line != null) {
399                        return line;
400                    }
401                   
402                    bytesAvailable = input.available();
403   
404                    // If no bytes are reported as being available, but we have
405                    // not yet received a full line, then we need to attempt
406                    // another single-byte blocking read.
407                    if(bytesAvailable == 0) {
408                        firstByte = input.read();
409                        if(firstByte != -1) {
410                            byteStream.write((byte)firstByte);
411                            bytesReceived++;
412                            bytesAvailable = input.available();
413                        }
414                        else {
415                            handleSocketReadError();
416                        }
417                    }
418                }
419               
420                // Check for any final data
421                if(byteStream.size() > 0) {
422                    line = checkForLine(responseTester);
423                    if(line != null) {
424                        return line;
425                    }
426                }
427            }
428            else {
429                handleSocketReadError();
430            }
431        }
432        // We should never get here, unless there is a connection error or the
433        // line response tester has a bug.
434        handleSocketReadError();
435        return null;
436    }
437   
438    private void handleSocketReadError() throws IOException {
439        // If we got here, that means that the InputStream is either closed
440        // or we are in some otherwise unrecoverable state.  This means we
441        // will try to close the connection, ignore any errors from the
442        // close operation, and throw an IOException.
443       
444        EventLogger.logEvent(AppInfo.GUID,
445                "Unable to read from socket, closing connection".getBytes(),
446                EventLogger.INFORMATION);
447
448        close();
449
450        throw new IOException("Connection closed");
451    }
452
453    /**
454     * Checks the byte stream buffer for a usable line of returnable data.
455     * If a line is returned, the buffer will be updated to only contain data
456     * following that line.
457     *
458     * @return the trimmed string which ended in a CRLF in the source data
459     */
460    private byte[] checkForLine(ConnectionResponseTester responseTester) throws IOException {
461        byte[] result;
462       
463        byte[] buf = byteStream.toByteArray();
464        int size = byteStream.size();
465       
466        int p = responseTester.checkForCompleteResponse(buf, size);
467       
468        if(p != -1) {
469            int trimCount = responseTester.trimCount();
470           
471            result = Arrays.copy(buf, 0, p - trimCount);
472           
473            if(p < size) {
474                buf = Arrays.copy(buf, p, size - p);
475                byteStream.reset();
476                byteStream.write(buf);
477                fakeAvailable = buf.length;
478            }
479            else {
480                byteStream.reset();
481                fakeAvailable = -1;
482            }
483        }
484        else {
485            fakeAvailable = size;
486            result = null;
487        }
488        return result;
489    }
490
491    private static ConnectionResponseTester lineResponseTester = new ConnectionResponseTester() {
492        private int trimCount;
493        private int lastLength = 0;
494       
495        public int checkForCompleteResponse(byte[] buf, int len) {
496            trimCount = 0;
497            int p = StringArrays.indexOf(buf, LF, lastLength);
498           
499            if(p != -1 && p < len) {
500                // Specific test for responses that use a double LF in
501                // the middle, to separate things that look like separate
502                // responses but really are not.
503                while(p != -1 && p + 1 < len && buf[p + 1] == LF) {
504                    if(p + 2 == len) {
505                        lastLength = len;
506                        return -1;
507                    }
508                    else {
509                        p = StringArrays.indexOf(buf, LF, p + 2);
510                    }
511                }
512               
513                if(p > 0 && buf[p - 1] == CR) {
514                    trimCount = 2;
515                }
516                else {
517                    trimCount = 1;
518                }
519                lastLength = 0;
520                return ++p;
521            }
522            else {
523                lastLength = len;
524                return -1;
525            }
526        }
527
528        public int trimCount() {
529            return trimCount;
530        }
531       
532        public String logString(byte[] result) {
533            return new String(result);
534        }
535    };
536}
Note: See TracBrowser for help on using the repository browser.