• python协议解析


    在这里插入图片描述在这里插入图片描述

    from scapy.layers.inet import *
    from scapy.compat import binascii
    from scapy.layers.inet import *
    from scapy.layers.l2 import *
    from scapy.packet import *
    from scapy.sendrecv import *
    
    # cap packets and parse protocol
    # common cap sniff data
    # packets check and summary
    # return bool filter functions
    
    packets = sniff(
        filter="udp",
        prn=lambda packets: packets.summary(),
        lfilter=lambda packets: UDP in packets,
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    常使用的packets 筛选器

    filter 支持 BPF 语法
    prn 支持每个数据包单独回调
    lfilter 返回匹配条件的数据包

    from ctypes import (Structure, addressof, c_char_p, c_uint8, c_uint16,
                        c_uint32, c_void_p, memmove, sizeof)
    
    import crcmod
    from scapy.compat import binascii
    from scapy.layers.inet import *
    from scapy.packet import *
    from scapy.sendrecv import *
    
    
    class IspCommonField(Structure):
        _pack_ = 1
        _fields_ = [
            ("start", c_uint8),
            ("ap_layer", c_uint8),
            ("vp_layer", c_uint8),
            ("station_numbers", c_uint32),
            ("device_numbers", c_uint8),
            ("packets_numbers", c_uint16),
            ("vp_layer_interact", c_uint8),
            ("mcp_layer", c_uint8),
            ("cmd", c_uint8),
            ("response", c_uint8),
        ]
    
    
    class IspMCPA(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint16), ("value", c_char_p)]
    
    
    class IspMCPC(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint32), ("value", c_char_p)]
    
    
    class IspEndField(Structure):
        _pack_ = 1
        _fields_ = [("crc", c_uint16), ("end", c_uint8)]
    
    
    # crc code calu
    # isp protocol framework
    class IspProtocol(Structure):
        __isMCPA__ = False
        __isMCPC__ = False
        __IspCommonField__ = IspCommonField()
        __IspCommonFieldLen__ = sizeof(__IspCommonField__)
        __IspEndField__ = IspEndField()
        __IspEndFieldLen__ = sizeof(__IspEndField__)
        __MCPA_LIST__ = []
        __MCPC_LIST__ = []
        __crc16__ = 0x0
        __crcFlag__ = False
    
        def deserdes(self, msg: bytes):
            esc_msg = self.__escape_recv_packets(msg)
            esc_msg = "7e" + esc_msg + "7e"
            esc_msg = esc_msg.encode()
            memmove(
                addressof(self.__IspCommonField__),
                binascii.unhexlify(esc_msg),
                self.__IspCommonFieldLen__,
            )
            # Payload pdu part
            # end of payload data caluication
            memmove(
                addressof(self.__IspEndField__),
                binascii.unhexlify(esc_msg[-6:]),
                self.__IspEndFieldLen__,
            )
            self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
            if self.__IspEndField__.crc != self.__crc16__:
                print("ISP crc code parse error.drop this packets ... ...")
                return False
            else:
                return True
    
        def __crc16(self, code: bytes):
            return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
    
        def __escape_recv_packets(self, msg: bytes):
            if msg[0] != "7e" or msg[-1] != "7e":
                print("error:escap packets")
            return msg[2:-2].decode().replace("5e5d", "5E").replace("5e7d", "7e")
    
        def payload(self):
            if self.__isMCPA__:
                return self.__MCPA_LIST__
            if self.__isMCPC__:
                return self.__MCPC_LIST__
    
        def crc16(self):
            return self.__crc16__
    
        def show(self):
            pass
    
    
    buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
    buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
    
    hdr = IspProtocol()
    hdr_len = sizeof(IspProtocol)
    hdr.deserdes(buf)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    debug okay

    import enum
    from ctypes import (Structure, addressof, c_char, c_char_p, c_uint8, c_uint16,
                        c_uint32, c_void_p, memmove, sizeof)
    
    import crcmod
    from scapy.compat import binascii
    from scapy.layers.inet import *
    from scapy.packet import *
    from scapy.sendrecv import *
    
    
    class pduType(enum.Enum):
        MCPA = 1
        MCPC = 3
    
    
    class IspCommonField(Structure):
        _pack_ = 1
        _fields_ = [
            ("start", c_uint8),
            ("ap_layer", c_uint8),
            ("vp_layer", c_uint8),
            ("station_numbers", c_uint32),
            ("device_numbers", c_uint8),
            ("packets_numbers", c_uint16),
            ("vp_layer_interact", c_uint8),
            ("mcp_layer", c_uint8),
            ("cmd", c_uint8),
            ("response", c_uint8),
        ]
    
    
    class mcpa_pdu(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint16)]
        value: bytes
    
    
    class mcpc_pdu(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint32)]
        value: bytes
    
    
    class IspEndField(Structure):
        _pack_ = 1
        _fields_ = [("crc", c_uint16), ("end", c_uint8)]
    
    
    # crc code calu
    # isp protocol framework
    class IspProtocol(Structure):
        __IspCommonField__ = IspCommonField()
        __IspCommonFieldLen__ = sizeof(__IspCommonField__)
        __IspEndField__ = IspEndField()
        __IspEndFieldLen__ = sizeof(__IspEndField__)
        __crc16__ = 0x0
        __crcFlag__ = False
        __pdu_type: pduType
        __pdu__ = {}
    
        def deserdes(self, msg: bytes):
            msg = msg.lower()
            esc_msg = self.__escape_recv_packets(msg)
            esc_msg = "7e" + esc_msg + "7e"
            esc_msg = esc_msg.encode()
    
            memmove(
                addressof(self.__IspCommonField__),
                binascii.unhexlify(esc_msg),
                self.__IspCommonFieldLen__,
            )
    
            ta = 8
            if self.__IspCommonField__.mcp_layer == 3:
                self.__pdu_type = pduType.MCPC
                ta = 8
            elif self.__IspCommonField__.mcp_layer == 1:
                self.__pdu_type = pduType.MCPA
                ta = 4
    
            start = self.__IspCommonFieldLen__ * 2
            end = -6
            self.__pduList__ = []
            pdu_part = esc_msg[start:end]
            __len = len(pdu_part)
            _start = 0
            _pdu: bytes
    
            while __len > 0:
                if self.__pdu_type == pduType.MCPA:
                    _tmp = mcpa_pdu()
                    _pdu = pdu_part[_start : _start + 2]
                    memmove(
                        addressof(_tmp),
                        binascii.unhexlify(pdu_part[_start : _start + ta]),
                        3,
                    )
                    _tmp.value = pdu_part[
                        _start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
                    ]
                    self.__pduList__.append(_tmp)
                    __len = __len - int(_pdu, 16) * 2
                    _start = _start + int(_pdu, 16) * 2
                elif self.__pdu_type == pduType.MCPC:
                    _tmp = mcpc_pdu()
                    _pdu = pdu_part[_start : _start + 2]
                    memmove(
                        addressof(_tmp),
                        binascii.unhexlify(pdu_part[_start : _start + ta]),
                        5,
                    )
                    _tmp.value = pdu_part[
                        _start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
                    ]
                    self.__pduList__.append(_tmp)
                    __len = __len - int(_pdu, 16) * 2
                    _start = _start + int(_pdu, 16) * 2
                else:
                    break
    
            # Payload pdu part
            # end of payload data caluication
            memmove(
                addressof(self.__IspEndField__),
                binascii.unhexlify(esc_msg[-6:]),
                self.__IspEndFieldLen__,
            )
            self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
            if self.__IspEndField__.crc != self.__crc16__:
                print("ISP crc code parse error.drop this packets ... ...")
                return False
            else:
                return True
    
        def __crc16(self, code: bytes):
            return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
    
        def __escape_recv_packets(self, msg: bytes):
            if msg[0:2] != b"7e" or msg[-2:] != b"7e":
                print("escape packets error")
            return msg[2:-2].decode().replace("5e5d", "5e").replace("5e7d", "7e")
    
        def payload(self):
            return self.__pduList__
    
        def crc16(self):
            return self.__crc16__
    
        def show(self):
            pass
    
    
    buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
    # buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
    hdr = IspProtocol()
    hdr_len = sizeof(IspProtocol)
    hdr.deserdes(buf)
    print("this packets:%04x" % hdr.crc16())
    
    for it in hdr.payload():
        print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162

    payload parse

    hdr = IspProtocol()
    hdr.deserdes(buf)
    print("this packets:%04x" % hdr.crc16())
    tp = 4
    start = 0
    _tag = mcpc_tag()
    for it in hdr.payload():
        print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
        if it.tag == 0x9:
            _len = len(it.value)
            if hdr.PduType() == pduType.MCPA:
                _tag = mcpa_tag()
            elif hdr.PduType() == pduType.MCPC:
                _tag = mcpc_tag()
            while _len > 0:
                memmove(
                    addressof(_tag),
                    binascii.unhexlify(it.value[start : start + hdr.pduTagLen()]),
                    int(hdr.pduTagLen() / 2),
                )
                print("tag:%08x" % _tag.tag)
                start = start + hdr.pduTagLen()
                _len = _len - hdr.pduTagLen()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    parse protocol upgrade for hex bytes

    from scapy.compat import binascii
    import logging
    import crcmod
    import enum
    
    from ctypes import (
        Structure,
        addressof,
        c_uint8,
        c_uint16,
        c_uint32,
        memmove,
        sizeof,
    )
    
    
    logger = logging.getLogger("scapy")
    logger.setLevel(logging.INFO)
    
    # cap packets and parse protocol
    # common cap sniff data
    # packets check and summary
    # return bool filter functions
    
    
    class pduType(enum.Enum):
        MCPA = 1
        MCPC = 3
    
    
    class IspCommonField(Structure):
        _pack_ = 1
        _fields_ = [
            ("start", c_uint8),
            ("ap_layer", c_uint8),
            ("vp_layer", c_uint8),
            ("station_numbers", c_uint32),
            ("device_numbers", c_uint8),
            ("packets_numbers", c_uint16),
            ("vp_layer_interact", c_uint8),
            ("mcp_layer", c_uint8),
            ("cmd", c_uint8),
            ("response", c_uint8),
        ]
    
    
    class mcpa_tag(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint16)]
        value: bytes
    
    
    class mcpc_tag(Structure):
        _pack_ = 1
        _fields_ = [("len", c_uint8), ("tag", c_uint32)]
        value: bytes
    
    
    class mcpa_idx(Structure):
        _pack_ = 1
        _fields_ = [("tag", c_uint16)]
    
    
    class mcpc_idx(Structure):
        _pack_ = 1
        _fields_ = [("tag", c_uint32)]
    
    
    class IspEndField(Structure):
        _pack_ = 1
        _fields_ = [("crc", c_uint16), ("end", c_uint8)]
    
    
    # crc code calu
    # isp protocol framework
    # only for text parse protocol
    
    
    class IspProtocolForLog(Structure):
        __IspCommonField__ = IspCommonField()
        __IspCommonFieldLen__ = sizeof(__IspCommonField__)
        __IspEndField__ = IspEndField()
        __IspEndFieldLen__ = sizeof(__IspEndField__)
        __crc16__ = 0x0
        __crcFlag__ = False
        __pdu_type: pduType
        __pdu__ = {}
    
        def deserdes(self, msg: bytes):
            msg = msg.lower()
            esc_msg = self.__escape_recv_packets(msg)
            esc_msg = "7e" + esc_msg + "7e"
            esc_msg = esc_msg.encode()
    
            memmove(
                addressof(self.__IspCommonField__),
                binascii.unhexlify(esc_msg),
                self.__IspCommonFieldLen__,
            )
    
            ta = 8
            if self.__IspCommonField__.mcp_layer == 3:
                self.__pdu_type = pduType.MCPC
                ta = 8
            elif self.__IspCommonField__.mcp_layer == 1:
                self.__pdu_type = pduType.MCPA
                ta = 4
    
            start = self.__IspCommonFieldLen__ * 2
            end = -6
            self.__pduList__ = []
            pdu_part = esc_msg[start:end]
            __len = len(pdu_part)
            _start = 0
            _pdu: bytes
    
            while __len > 0:
                if self.__pdu_type == pduType.MCPA:
                    _tmp = mcpa_pdu()
                    _pdu = pdu_part[_start : _start + 2]
                    memmove(
                        addressof(_tmp),
                        binascii.unhexlify(pdu_part[_start : _start + ta]),
                        3,
                    )
                    _tmp.value = pdu_part[
                        _start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
                    ]
                    self.__pduList__.append(_tmp)
                    __len = __len - int(_pdu, 16) * 2
                    _start = _start + int(_pdu, 16) * 2
                elif self.__pdu_type == pduType.MCPC:
                    _tmp = mcpc_pdu()
                    _pdu = pdu_part[_start : _start + 2]
                    memmove(
                        addressof(_tmp),
                        binascii.unhexlify(pdu_part[_start : _start + ta]),
                        5,
                    )
                    _tmp.value = pdu_part[
                        _start + ta + 2 : _start + ta + 2 + int(_pdu, 16) * 2 - ta - 2
                    ]
                    self.__pduList__.append(_tmp)
                    __len = __len - int(_pdu, 16) * 2
                    _start = _start + int(_pdu, 16) * 2
                else:
                    break
    
            # Payload pdu part
            # end of payload data caluication
            memmove(
                addressof(self.__IspEndField__),
                binascii.unhexlify(esc_msg[-6:]),
                self.__IspEndFieldLen__,
            )
            self.__crc16__ = self.__crc16(binascii.unhexlify(esc_msg[2:-6]))
            if self.__IspEndField__.crc != self.__crc16__:
                print("ISP crc code parse error.drop this packets ... ...")
                return False
            else:
                return True
    
        def __crc16(self, code: bytes):
            return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
    
        def __escape_recv_packets(self, msg: bytes):
            if msg[0:2] != b"7e" or msg[-2:] != b"7e":
                print("escape packets error")
            return msg[2:-2].decode().replace("5e5d", "5e").replace("5e7d", "7e")
    
        def payload(self):
            return self.__pduList__
    
        def crc16(self):
            return self.__crc16__
    
        def show(self):
            pass
    
    
    def example_IspLogprotolparse():
        buf = b"7E03010100000000160000030200FF090000000503BF050000C0050000C1050000C2050000C3050000C4050000C5050000C6050000C7050000C8050000C9050000CA050000CB050000CD050000CE050000CF050000D2050000D3050000D70500000120000002200000062000000920000001210000022100000621000009210000022200000322000006220000072200000A2200000B2200000C2200000D2200000E2200000F22000001230000022300000323000011230000122300001323000021230000222300002323000031230000322300003323000051230000522300005323000061230000622300006323000002250000042500000A2500000326000004260000052600001326000092A47E"
        # buf = b"7e03010000000000020000010200d90900010102000300040005000600070008000a000b001000110012001300140015001600170018001900200021002200230024003000310032003300490075005c005d005e5d005f00010102011001110112011301140115012001300131013301340136013701380139014101420143014401500151015201530172010102020204020502080209020f02100211022002210222022302240225022802010302030403050308030903100311030f032003210322032303240325032803a204530454047308a005a105a205a3057c08b005b105b205b305c0056d1b7e"
        hdr = IspProtocolForLog()
        hdr_len = sizeof(IspProtocolForLog)
        hdr.deserdes(buf)
        print("this packets:%04x" % hdr.crc16())
        for it in hdr.payload():
            print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
    
    
    class IspProtocol(Structure):
        __IspCommonField__ = IspCommonField()
        __IspCommonFieldLen__ = sizeof(__IspCommonField__)
        __IspEndField__ = IspEndField()
        __IspEndFieldLen__ = sizeof(__IspEndField__)
        __crc16__ = 0x0
        __crcFlag__ = False
        __pdu_type: pduType
        __pdu__ = {}
        __pduTagLen__: int
    
        def deserdes(self, msg: bytes):
            esc_msg = self.__escape_recv_packets(msg)
            esc_msg = b"\x7e" + esc_msg + b"\x7e"
    
            memmove(
                addressof(self.__IspCommonField__),
                esc_msg,
                self.__IspCommonFieldLen__,
            )
    
            ta = 4
            if self.__IspCommonField__.mcp_layer == 3:
                self.__pdu_type = pduType.MCPC
                ta = 4
            elif self.__IspCommonField__.mcp_layer == 1:
                self.__pdu_type = pduType.MCPA
                ta = 2
    
            self.__pduTagLen__ = ta
            start = self.__IspCommonFieldLen__
            end = -3
            self.__pduList__ = []
            pdu_part = esc_msg[start:end]
            __len = len(pdu_part)
            _start = 0
            _pdu = 0
    
            while __len > 0:
                if self.__pdu_type == pduType.MCPA:
                    _tmp = mcpa_tag()
                    _pdu = pdu_part[_start]
                    memmove(
                        addressof(_tmp),
                        pdu_part[_start : _start + ta],
                        sizeof(mcpa_tag),
                    )
                    _tmp.value = pdu_part[_start + ta + 1 : _start + ta + 1 + _pdu - ta - 1]
                    self.__pduList__.append(_tmp)
                    __len = __len - _pdu
                    _start = _start + _pdu
                elif self.__pdu_type == pduType.MCPC:
                    _tmp = mcpc_tag()
                    _pdu = pdu_part[_start]
                    memmove(
                        addressof(_tmp),
                        pdu_part[_start : _start + ta],
                        sizeof(mcpc_tag),
                    )
                    _tmp.value = pdu_part[_start + ta + 1 : _start + ta + 1 + _pdu - ta - 1]
                    self.__pduList__.append(_tmp)
                    __len = __len - _pdu
                    _start = _start + _pdu
                else:
                    break
    
            # Payload pdu part
            # end of payload data caluication
            memmove(
                addressof(self.__IspEndField__),
                esc_msg[-3:],
                self.__IspEndFieldLen__,
            )
            self.__crc16__ = self.__crc16(esc_msg[1:-3])
            if self.__IspEndField__.crc != self.__crc16__:
                print("ISP crc code parse error.drop this packets ... ...")
                return False
            else:
                return True
    
        def __crc16(self, code: bytes):
            return crcmod.mkCrcFun(0x11021, rev=False, initCrc=0, xorOut=0x0)(code)
    
        def __escape_recv_packets(self, msg: bytes):
            if msg[0] != b"\x7e" or msg[-1] != b"\x7e":
                print("escape packets error")
            return msg[1:-1].replace(b"\x5e\x5d", b"\x5e").replace(b"\x5e\x7d", b"\x7e")
    
        def PduType(self):
            return self.__pdu_type
    
        def pduTagLen(self):
            return self.__pduTagLen__
    
        def payload(self):
            return self.__pduList__
    
        def crc16(self):
            return self.__crc16__
    
    
    def example_parse_Isp_protocol():
        buf = b"\x7E\x03\x01\x01\x00\x00\x00\x00\x16\x00\x00\x03\x02\x00\xFF\x09\x00\x00\x00\x05\x03\xBF\x05\x00\x00\xC0\x05\x00\x00\xC1\x05\x00\x00\xC2\x05\x00\x00\xC3\x05\x00\x00\xC4\x05\x00\x00\xC5\x05\x00\x00\xC6\x05\x00\x00\xC7\x05\x00\x00\xC8\x05\x00\x00\xC9\x05\x00\x00\xCA\x05\x00\x00\xCB\x05\x00\x00\xCD\x05\x00\x00\xCE\x05\x00\x00\xCF\x05\x00\x00\xD2\x05\x00\x00\xD3\x05\x00\x00\xD7\x05\x00\x00\x01\x20\x00\x00\x02\x20\x00\x00\x06\x20\x00\x00\x09\x20\x00\x00\x01\x21\x00\x00\x02\x21\x00\x00\x06\x21\x00\x00\x09\x21\x00\x00\x02\x22\x00\x00\x03\x22\x00\x00\x06\x22\x00\x00\x07\x22\x00\x00\x0A\x22\x00\x00\x0B\x22\x00\x00\x0C\x22\x00\x00\x0D\x22\x00\x00\x0E\x22\x00\x00\x0F\x22\x00\x00\x01\x23\x00\x00\x02\x23\x00\x00\x03\x23\x00\x00\x11\x23\x00\x00\x12\x23\x00\x00\x13\x23\x00\x00\x21\x23\x00\x00\x22\x23\x00\x00\x23\x23\x00\x00\x31\x23\x00\x00\x32\x23\x00\x00\x33\x23\x00\x00\x51\x23\x00\x00\x52\x23\x00\x00\x53\x23\x00\x00\x61\x23\x00\x00\x62\x23\x00\x00\x63\x23\x00\x00\x02\x25\x00\x00\x04\x25\x00\x00\x0A\x25\x00\x00\x03\x26\x00\x00\x04\x26\x00\x00\x05\x26\x00\x00\x13\x26\x00\x00\x92\xA4\x7E"
        buf = b"\x7e\x03\x01\x00\x00\x00\x00\x00\x02\x00\x00\x01\x02\x00\xd9\x09\x00\x01\x01\x02\x00\x03\x00\x04\x00\x05\x00\x06\x00\x07\x00\x08\x00\x0a\x00\x0b\x00\x10\x00\x11\x00\x12\x00\x13\x00\x14\x00\x15\x00\x16\x00\x17\x00\x18\x00\x19\x00\x20\x00\x21\x00\x22\x00\x23\x00\x24\x00\x30\x00\x31\x00\x32\x00\x33\x00\x49\x00\x75\x00\x5c\x00\x5d\x00\x5e\x5d\x00\x5f\x00\x01\x01\x02\x01\x10\x01\x11\x01\x12\x01\x13\x01\x14\x01\x15\x01\x20\x01\x30\x01\x31\x01\x33\x01\x34\x01\x36\x01\x37\x01\x38\x01\x39\x01\x41\x01\x42\x01\x43\x01\x44\x01\x50\x01\x51\x01\x52\x01\x53\x01\x72\x01\x01\x02\x02\x02\x04\x02\x05\x02\x08\x02\x09\x02\x0f\x02\x10\x02\x11\x02\x20\x02\x21\x02\x22\x02\x23\x02\x24\x02\x25\x02\x28\x02\x01\x03\x02\x03\x04\x03\x05\x03\x08\x03\x09\x03\x10\x03\x11\x03\x0f\x03\x20\x03\x21\x03\x22\x03\x23\x03\x24\x03\x25\x03\x28\x03\xa2\x04\x53\x04\x54\x04\x73\x08\xa0\x05\xa1\x05\xa2\x05\xa3\x05\x7c\x08\xb0\x05\xb1\x05\xb2\x05\xb3\x05\xc0\x05\x6d\x1b\x7e"
        hdr = IspProtocol()
        hdr.deserdes(buf)
        print("this packets:%04x" % hdr.crc16())
        print(hdr.PduType())
        start = 0
        _tag = mcpc_tag()
        print("protocol layer:%x" % hdr.pduTagLen())
        for it in hdr.payload():
            print("len:%d tag:%04x valueLen:%d" % (it.len, it.tag, len(it.value)))
            if it.tag == 0x9:
                _len = len(it.value)
                if hdr.PduType() == pduType.MCPA:
                    _tag = mcpa_idx()
                elif hdr.PduType() == pduType.MCPC:
                    _tag = mcpc_idx()
                while _len > 0:
                    memmove(
                        addressof(_tag),
                        it.value[start : start + hdr.pduTagLen()],
                        hdr.pduTagLen(),
                    )
                    print("tag:%08x" % _tag.tag)
                    start = start + hdr.pduTagLen()
                    _len = _len - hdr.pduTagLen()
    
    
    example_parse_Isp_protocol()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
  • 相关阅读:
    图表开发工具LightningChart .NET v12.0正式发布——拥有新图表、新系列类型
    硬件新问答
    Abnova丨血液总核酸纯化试剂盒预装相关说明书
    Blender点操作
    java之ArrayList和Vector源码分析
    Unity 下载Zip压缩文件并且解压缩
    Spring Cloud 架构
    用实例带你深入理解Java内存模型
    使用magic-api构建迅速开发平台的成功案例分享
    【高并发】通过源码深度分析线程池中Worker线程的执行流程
  • 原文地址:https://blog.csdn.net/weixin_45647912/article/details/134010391