[SNMP4J] Problems in DefaultTcpTransportMapping when receiving fragmented header

Bruno Filipe Basilio Bruno.Basilio at brisa.pt
Tue Jan 19 01:18:13 CET 2016


Hi,

Thank you for all your work in snmp4j and related.
We are using snmp4j to handle NTCIP-PMPP protocol which is based in SNMP over TCP.

More details about NTCIP-PMPP specification, see:
http://www.ntcip.org/library/documents/pdf/pmpp01.pdf

The problem is that using upstream DefaultTcpTransportMapping some data is discarded when the message received has a fragmented header, resulting in incoherent message.
Some changes were made to work around the problem, but as you can see bellow, it isn't the prettiest since we hadn't many experience coding in snmp4j.
You can see an example of the data exchange and the test code we have used to replicate the problem using only snmp4j example code.

All feedback is appreciated, but maybe you can suggest a better solution and add it in upstream snmp4j.

Here follows an example of the data exchanged and the fragmented response.
*Sent data*
7E 05 13 C1 30 31 02 01 00 04 06 50 75 62 6C 69 63 A0 24 02 04 28 BA 92 8B 02 01 00 02 01 00 30 16 30 14 06 10 2B 06 01 04 01 89 36 04 02 03 05 08 01 03 05 01 05 00 AE 11 7E

*Received data fragments*
1)
7E 05 13

2)
C1 30 70 02 01 00 04 06 50 75 62 6C 69 63 A2 63 02 04 28 BA 92 8B 02 01 00 02 01 00 30 55 30 53 06 10 2B 06 01 04 01 89 36 04 02 03 05 08 01 03 05 01 04 3F 44 52 49 56 45 5B 6E 6C 5D 57 49 54 48 5B 6E 6C 5D 43 41 52 45 5B 6E 70 5D 54 48 41 4E 4B 20 59 4F 55 5B

3)
6E 6C 5D 46 4F 52 20 44 52 49 56 49 4E 47 5B 6E 6C 5D 4E 57 20 50 41 52 4B 57 41 59 47 34 7E

The test code is as follows:
        String host="127.0.0.1";
        int port=7000;
        boolean isPmpp= true;
        String oid="1.3.6.1.4.1.1206.4.2.3.6.5.0";
        String communityName="Public";

        TcpTransportMapping transportMapping= new DefaultTcpTransportMapping();
        transportMapping.setMessageLengthDecoder(new PmppMessageLengthDecoder());
        MessageDispatcher messageDispatcher = new PmppMessageDispatcher();
        messageDispatcher.addMessageProcessingModel(new MPv1());
        Snmp snmp= new Snmp(messageDispatcher, transportMapping);

        Address targetAddress = GenericAddress.parse("tcp:" + host + "/" + port);

        // setting up target
        CommunityTarget target = new CommunityTarget();
        target.setCommunity(new OctetString(communityName));
        target.setAddress(targetAddress);
        target.setRetries(2);
        target.setTimeout(1500);
        target.setVersion(SnmpConstants.version1);
        // creating PDU
        PDU pdu = new PDU();
        pdu.add(new VariableBinding(new OID(oid)));
        pdu.setType(PDU.GET);

        snmp.listen();

        // send the PDU
        ResponseEvent response = snmp.send(pdu, target);
        // extract the response PDU (could be null if timed out)
        PDU responsePDU = response.getResponse();

        if(responsePDU==null) {
            System.out.println("response null");
            System.exit(-1);
        }

        Vector vbs = responsePDU.getVariableBindings();
        Variable ret=null;
        if (vbs.size() > 0) {
            VariableBinding vb = (VariableBinding) vbs.get(0);
            ret = vb.getVariable();
        }
        snmp.close();

        System.out.println("s=" + ret);



*Changes suggested to work around the problem*
pt.brisa.pmv.ntcip_adapter.manager.PmppTcpTransportMapping.ServerThread.readMessage(SelectionKey, SocketChannel, TcpAddress)

--- a/org/snmp4j/transport/DefaultTcpTransportMapping.java      2015-12-30 02:54:34.000000000 +0000
+++ b/org/snmp4j/transport/DefaultTcpTransportMapping.java      2016-01-18 23:30:53.604959600 +0000
@@ -953,12 +992,25 @@ public class DefaultTcpTransportMapping
         // slow but in some cases needed:
         entry = sockets.get(incomingAddress);
       }
+
+      ByteBuffer readBuffer =null;
       if (entry != null) {
         // note that socket has been used
         entry.used();
-        ByteBuffer readBuffer = entry.getReadBuffer();
+        readBuffer = entry.getReadBuffer();
         if (readBuffer != null) {
           readChannel.read(readBuffer);
+
+          if(logger.isDebugEnabled()) {
+              logger.debug("readBuffer != null, readBuffer.position() != messageLengthDecoder.getMinHeaderLength():"
+                      + (readBuffer.position()!=messageLengthDecoder.getMinHeaderLength())
+                      + ", readBuffer.position(): "+ readBuffer.position()
+                  + ", readBuffer.limit(): "+ readBuffer.limit()
+                  +", readBuffer.array():" + new OctetString(readBuffer.array(), 0, readBuffer.limit()).toHexString());
+          }
+
+          // don't handle fragment less then header length
+          if (readBuffer.position() != messageLengthDecoder.getMinHeaderLength()) {
           if (readBuffer.hasRemaining()) {
             entry.addRegistration(selector, SelectionKey.OP_READ);
           }
@@ -968,8 +1020,17 @@ public class DefaultTcpTransportMapping
           }
           return;
         }
+          if(logger.isDebugEnabled()) {
+              logger.debug("readBuffer.position() != messageLengthDecoder.getMinHeaderLength(), readBuffer.position(): "+ readBuffer.position());
+          }
+        }
       }
-      ByteBuffer byteBuffer = ByteBuffer.wrap(buf);
+
+      ByteBuffer byteBuffer =null;
+      long bytesRead = 0;
+      if (readBuffer == null) {
+          // prepare a read fist fragment
+          byteBuffer = ByteBuffer.wrap(buf);
       byteBuffer.limit(messageLengthDecoder.getMinHeaderLength());
       if (!readChannel.isOpen()) {
         sk.cancel();
@@ -979,12 +1040,14 @@ public class DefaultTcpTransportMapping
         }
         return;
       }
-      long bytesRead = 0;
       try {
         bytesRead = readChannel.read(byteBuffer);
         if (logger.isDebugEnabled()) {
           logger.debug("Reading header " + bytesRead + " bytes from " +
                        incomingAddress);
+              logger.debug("entry != null, byteBuffer read, byteBuffer.position(): "+ byteBuffer.position()
+                  + ", readBuffer.limit(): "+ byteBuffer.limit()
+                  +", readBuffer.array():" + new OctetString(byteBuffer.array(), 0, byteBuffer.limit()).toHexString());
         }
       }
       catch (ClosedChannelException ccex) {
@@ -995,6 +1058,29 @@ public class DefaultTcpTransportMapping
         }
         return;
       }
+      }
+      else {
+          // get buffer info from previous read saved in entry
+          byteBuffer=readBuffer;
+          bytesRead=readBuffer.position();
+          ByteBuffer b = ByteBuffer.wrap(buf);
+          b.put(readBuffer.array(), 0, (int) bytesRead);
+      }
+
+      if (bytesRead> 0 && bytesRead<messageLengthDecoder.getMinHeaderLength()) {
+          // handle read fragment less than header length and save it in entry
+          byte[] message = new byte[byteBuffer.capacity()];
+          int bufpos = byteBuffer.position();
+          int buflim = byteBuffer.limit();
+          byteBuffer.flip();
+          byteBuffer.get(message, 0, bufpos);
+          ByteBuffer newBuffer = ByteBuffer.wrap(message);
+          newBuffer.position(bufpos);
+          newBuffer.limit(buflim);
+          if (entry != null) {
+            entry.setReadBuffer(newBuffer);
+          }
+      }
       if (bytesRead == messageLengthDecoder.getMinHeaderLength()) {
         MessageLength messageLength =
             messageLengthDecoder.getMessageLength(ByteBuffer.wrap(buf));
@@ -1019,6 +1105,12 @@ public class DefaultTcpTransportMapping
           byteBuffer.limit(messageLength.getMessageLength());
           bytesRead += readChannel.read(byteBuffer);
           if (bytesRead == messageLength.getMessageLength()) {
+              if(entry!=null){
+                  if(logger.isDebugEnabled()) {
+                      logger.debug("bytesRead == messageLength.getMessageLength(), bytesRead: "+ bytesRead);
+                  }
+                  entry.setReadBuffer(null); // <== set read buffer of entry to null
+              }
             dispatchMessage(incomingAddress, byteBuffer, bytesRead, entry);
           }
           else {


Best regards,
Bruno


Bruno Filipe Basílio
Departamento de Investigação, Desenvolvimento e Inovação

Tel: (+351) 214233436

http://www.brisainovacao.pt







--------------------------------------------------------------------------------

Declaração:
A informação contida nesta mensagem, e os ficheiros anexos, é privilegiada e confidencial, destinando-se exclusivamente ao(s) destinatário(s).Se não é o destinatário (ou o responsável pela sua entrega ao destinatário) e recebeu a mesma por engano, fica notificado que é estritamente proibido reproduzir, guardar ou distribuir toda ou qualquer parte desta mensagem e ficheiros anexos. Por favor reencaminhe a mensagem para o responsável pelo seu envio ou contacte-nos por telefone e elimine a mensagem e ficheiros anexos do seu computador, sem os reproduzir.

Disclaimer:
The information contained in this message, and any files attached, is privileged and confidential, and intended exclusively for the included addresses. If you are not the intended recipient (or the person responsible for delivering to the intended recipient) and received this message by mistake, be aware that copy, storage, distribution or any other use of all or part of this message and the files attached is strictly prohibited. Please notify the sender by reply e-mail or contact us by telephone and delete this message and the files attached, without retaining a copy.

--------------------------------------------------------------------------------



More information about the SNMP4J mailing list